栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

四十八、shuffle机制

四十八、shuffle机制

shuffle定义:

        在mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle;

        shuffle: 洗牌、发牌(核心机制:数据分区、排序、缓存);
        

        具体来说:就是将 maptask 输出的处理结果数据,分发给 reducetask ,并在分发的过程中,对数据按 key 进行了分区和排序。

partition分区:        

        (1)如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

        (2)如果1

        (3)如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;

        (4)分区号必须从零开始,逐一累加。

例如:假设自定义分区数为5,则

(1)job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件

(2)job.setNumReduceTasks(2);会报错

(3)job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件

流程详解


        (1)maptask收集我们的map()方法输出的kv对,放到内存缓冲区中

        (2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

        (3)多个溢出文件会被合并成大的溢出文件

        (4)在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序

        (5)reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据

        (6)reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)

        (7)合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)

         注:Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

        缓冲区的大小可以通过参数调整,参数:io.sort.mb  默认100M

Combiner合并

        (1)combiner是MR程序中Mapper和Reducer之外的一种组件

        (2)combiner组件的父类就是Reducer

        (3)combiner和reducer的区别在于运行的位置:

                Combiner是在每一个maptask所在的节点运行

                Reducer是接收全局所有Mapper的输出结果;

        (4)combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量

        (5)combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来

        (6)自定义Combiner实现步骤

                        (a)自定义一个Combiner继承Reducer,重写Reduce方法:

public class WordcountCombiner extends Reducer{

	@Override
	protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {

        // 1 汇总操作
		int count = 0;
		for(IntWritable v :values){
			count += v.get();
		}

        // 2 写出
		context.write(key, new IntWritable(count));
	}
}

                        (b)在job驱动类中设置:

job.setCombinerClass(WordcountCombiner.class);

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/581411.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号