栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce基础知识(个人总结)

MapReduce基础知识(个人总结)

    声明: 1. 本文为我的个人复习总结, 并非那种从零基础开始普及知识 内容详细全面, 言辞官方的文章
              2. 由于是个人总结, 所以用最精简的话语来写文章
              3. 若有错误不当之处, 请指出

MapReduce有三个进程:

  1. MrAppMaster: 负责整个程序的过程调度及状态协调
  2. MapTask
  3. ReduceTask

类型对应:

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
切片规则:
  • 切片时不考虑整体, 而是每个Block块都单独进行切片
  • 切片大小默认等于Block大小, 使得切片效果最佳 使小文件切片少一些
  • 一个切片会启动一个MapTask

几种不同的切片机制:

  1. FileInputFormat

    • 看剩余部分是否在1.1倍Block范围内,不超过则剩余部分就按一块来切; 所以切片最大为1.1倍Block

    • 计算切片大小的源码 computeSplitSize(Math.max(minSize,Math.min(maxSize,blockSize)))

      要是想调小切片大小, 就应该把maxSize值调小

      要是想调大切片大小, 就应该把minSize值调大

    则150M文件被切成128M+22M

  2. CombineTextInputFormat

    虚拟存储切片最大值设置: CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m

    切片过程: setMaxInputSplitSize的最大值记住maxSize

    1. 虚拟存储过程(虚拟存储文件块切分):

      • <=maxSize时, 先逻辑上划为一个块
      • >maxSize && <=2*maxSize时, 逻辑上均分成2个块 ((防止出现太小切片; 这块太小了, 切了更小后可以和别的组进行合并成大的)
      • >2*maxSize时, 逻辑上以maxSize进行切块
    2. 切片过程(对虚拟存储文件块 小文件合并,然后切片):

      >=maxSize时, 直接形成一个切片

Shuffle:

环形缓冲区:

​ 默认大小100M, 达到80%开始溢写;

​ 这个缓冲区本质上是一个数组, 叫环形是因为两边分别存 索引 和 实际数据, 排序时是排索引, 溢写时的排序按索引去查找实际数据(我推测索引应该近似于key值, 去找实际的K-V)

  • 发生在Map之后, Reduce之前 combiner就是发生在这里的

  • 期间有3个排序:

    1. 发生在Map端: 在环形缓冲区里getPatrtition( ) 后, 溢写时按key的索引进行快排
    2. 发生在Map端: 溢写的小文件内的同一分区内有序, 接下来溢写小文件间取10个组队进行归并排序
    3. 发生在Reduce端: Reduce读取了Map端的文件, 由于Map端输出的文件大小>内存大小, 想要用小内存去排序 众多内部分区间内已有序的文件, 进行了借助磁盘的全局的归并排序(类似于1G内存对100G数据进行排序)
    4. 发生在Reduce端: 同一个分区中按key分组后, 就可以执行reduce方法了

    这些排序是默认发生的, 它是为了reduce方法的同key在一组的分组, 排序有利于分组的进行

优化:

  1. getPatrtition( ) 里把key加固定区间范围的随机数打散, 避免数据倾斜
  2. 增大环形缓冲区大小 或 提高溢写文件的阈值百分比(减小了溢写文件的个数, 后面归并方便些)
  3. 在不影响业务逻辑的前提下, 使用combiner预聚合, 减少Map到Reduce的数据传输
  4. 增大一次进行归并文件的个数
  5. 采用压缩, 减少磁盘IO和数据网络传输
  6. 提高Reduce端拉取Map端的文件个数

MapTask工作机制:

  1. Read阶段: 通过用户编写的RecordReader, 从输入InputSplit中解析出一个个key-value
  2. Map阶段: 该节点主要是将解析出的key/value交给用户编写map( )函数处理, 并产生一系列新的key-value
  3. Collect收集阶段: 调用OutputCollector.collect( )输出结果。它会将生成的key/value分区(调用Partitioner), 写入环形缓冲区
  4. Shuffle的Spill阶段
  5. Shuffle的Combine阶段

ReduceTask工作机制:

  1. Copy阶段: ReduceTask拷贝MapTask计算出的数据; 并针对某一片数据, 如果其大小超过一定阈值, 则写到磁盘上, 否则直接放到内存中

  2. Sort阶段: 对所有数据全局进行一次归并排序

  3. Reduce阶段: reduce( )函数将计算结果输出

数据倾斜:

数据倾斜是热点数据造成的

数据倾斜有计算倾斜(reduce端数据分布不均)和存储倾斜(Hbase的Region存储数据分布不均)

MR解决计算数据倾斜的方案:

  1. 在不影响业务的前提下, 提前在map进行预聚合combine, 减少传输的数据量

    map数量较多, 这点预聚合计算任务对map而言并不重

  2. 处理热点key

    • key为NULL时

      • 属于异常数据就提前过滤掉
      • 不属于异常数据就给他赋随机值(或固定前缀再拼接随机值(能识别其为异常值即可))
    • key不为NULL时

      拼接随机值, 进行两次MR。第一次MR带着随机值聚合一部分, 即局部聚合; 第二次MR去掉随机值进行最终聚合 即全局聚合

      所谓的随机值,并不是UUID完全随机,因为那样第一个MR相当于没干任何聚合的活,第二个MR拆掉后缀随机值后照样数据倾斜;
      应该是某一个固定区间内的随机值(如随机值%100), 当1亿个同key的数据%100 [0,99]分区进行聚合,第二个MR去掉后缀随机值后只需要聚合的是这100个同key的数据, 任务量就很小了

  3. 实现自定义分区

    自定义散列函数, 将key均匀分配到不同Reducer

  4. Join操作时使用MapJoin, 提前加载数据到内存,再用Mapper去执行join逻辑; 没有Reducer了就减少了数据倾斜发生的概率

    Mapper很多, 且key并不是按hash( )取模 决定放在哪一台机器的, 所以一般就不会数据倾斜
    Reducer很少, 且key按hash( )取模 决定放在哪一台机器, 容易产生数据倾斜

Map不是越多越好, 因为其本身也要占用资源, 启动也慢

Reduce不是越多越好:

  • 因为其本身也要占用资源, 启动也慢;

  • 而且有多少个Reduce就会有多少个文件, 增大出现小文件的概率

**Map端的Partition到各个Reducer: **

默认的getPatition( ) 是hash%reduce数量, 分区号从0开始数

如果继承Partition方法 自定义分区不合理, 可能出现以下状况

  1. Reduce数量>getPatition( )数量, 则会多产生几个空的输出文件part-r-000xx
  2. 1
  3. Reduce数量=1, 则不管Map端输出多少个分区文件, 都会交给这个ReduceTask去执行

自定义Combiner: 继承Reducer, 重写reduce方法

自定义排序: 实现WritableComparable接口重写compareTo方法

排序分类:

三种join:

Map端和Reduce端的setup( )方法, 是初始化方法, 可以获取到Context从而获取到文件名

  1. ReduceJoin

    setup( )加载提取文件名,

    map( )根据不同文件名进行提取不同字段, 封装的Bean里设置flag标签

    reduce( )里进行join

  2. MapJoin (适用于有一张是小表)

    // 加载缓存数据

    job.addCacheFile(new URI(“file:///e:/input/inputcache/pd.txt”));

    job.setNumReduceTasks(0);

    setup( ), 加载缓存文件, 缓存数据到集合

    map( )端进行join, 连接flag标签

    没有reduce( )端

  3. SemiJoin(半连接, 是前两种的结合, 适用于有一张是小表)

    // 加载缓存数据

    job.addCacheFile(new URI(“file:///e:/input/inputcache/pd.txt”));

    setup( ), 加载缓存文件, 缓存数据到集合

    map( )端进行过滤, 如果大表的连接字段不在小表连接字段的集合中, 就提前过滤掉;

    ​ 封装的Bean里设置flag标签

    reduce( )里进行join, 连接flag标签

ReduceJoin的缺点:

  • join这种重操作导致Reduce压力大; 而Map端只是读取封装Bean, 且数量多, 资源利用率低
  • 且Reduce端容易产生数据倾斜
坑:
  1. Hadoop为了更省内存, 对集合迭代器进行了优化;

​ reduce方法的迭代器values, for循环或者迭代器 遍历得到的value指向的那个对象内存一直被重用,

​ 所有value用的同一块内存进行了覆盖, 导致了ReduceJoin得到的集合元素数据不断在覆盖前面的内容

​ 所以需要自己BeanUtils.copyProperties进行拷贝后, 再添加到List集合里

  1. Mapper中第一个参数必须是LongWritable或NullWritable, 不可以是IntWritable, 否则报类型转换异常。因为LongWritable是文件行数, 默认它是大数据场景不能为IntWritable
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/460664.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号