Shuffle机制
目录
Shuffle机制
一、Partition分区
1.什么是分区
2.默认的Partition分区
3.自定义Partition步骤
二、WritableComparable排序
1.默认排序
2.排序概述
3.排序分类
三、Combiner合并
MapReduce总流程:
input >> map(map+sort) >>reduce(copy+sort+reduce) >> output map的sort+reduce的+copy+sort 组成shuffle
一、Partition分区
1.什么是分区
要求将统计结果按照条件输出到不同文件中,这个过程就叫做分区。
2.默认的Partition分区
默认分区是根据key的hashCode%reduceTask数量得到所有的分区号,用户无法控制key存储到哪个分区,默认reduceTask的数量为1,也可以在driver端进行设置。
3.自定义Partition步骤
1)自定义继承Partitioner,重写getPartition()方法
2)在Job驱动中,设置自定义Partitioner
3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
注意:a.如果ReduceTask的数量>getPartition的结果数,则会产生多个空的输出文件part-r-000xx
b.如果如果ReduceTask的数量 c.如果ReduceTask数量=1,则不管MapTask端输出多少个分区文件,最终结果都会交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000 d.分区号必须从零开始,逐一累加 实例: 在Hadoop序列化_lslslslslss的博客-CSDN博客中序列化的例子上进行改动 (1)增加一个分区类 (2)在驱动函数中增加自定义数据分区设置和ReduceTask设置 MapTask和ReduceTask都会对数据按照key进行排序,这是Hadoop的默认行为,任何程序中的数据都会被排序,不管逻辑上是否需要。 默认排序按照字典顺序排序,且实现该排序的方法是快速排序。 MapTask:它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区的使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有的文件进行归并排序。 ReduceTask:它从每个MapTask上远程复制相应的数据文件,如果文件大小超过一定的阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序生成一个更大的文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上,当所有数据复制完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。 (1)部分排序 MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序。 (2)全排序 最终输出结果只有一个文件,且文件内部有序。 (3)辅助排序 GroupingComparator分组,在Reduce端对key进行分组。当接收key为bean对象时,想让一个或多个字段的key进入到同一个reduce方法,可以采用分组排序。 (4)二次排序 在自定义排序过程中,如果comparaTo中的判断条件为两个即为二次排序 实例1——全排序: (1)FlowBean对象在在需求1基础上增加了比较功能 (2)编写Mapper类 (3)编写Reducer类 (4)编写Driver类 实例2——区内排序 (1)增加自定义分区类 (2)在驱动类中添加分区类 (1)Combiner是MR程序中Mapper和Reducer之外的一种组件 (2)Combiner组件的父类就是Reducer (3)Combiner和Reducer的区别在于位置 Combiner是在每一个MapTask所在的节点运行 Reducer是在接受全局所有Mapper的输出结果 (4)Combiner的意思就是对每一个MapTask的输出进行局部汇总 (5)Combiner能够应用的前提是不能影响最终的业务逻辑,且Combiner的KV应该可以跟Reducer中的输入KV类型对应起来 (6)实例: 方案一: (1)增加一个WordCountCombiner类继承Reducer (2)在WordcountDriver驱动类中指定Combiner 方案二: 将WordcountReducer作为Combiner在WordcountDriver驱动类中指定
public class ProvincePartitioner extends Partitioner
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
//同时指定相应数量的ReduceTask
job.setNumReduceTasks(5);
FileInputFormat.setInputPaths(job, new Path("F:\phone_data.txt"));
FileOutputFormat.setOutputPath(job,new Path("F:\pp"));
job.waitForCompletion(true);
}
}
二、WritableComparable排序
1.默认排序
2.排序概述
3.排序分类
public class FlowBean implements WritableComparable
public class FlowMapper extends Mapper
public class FlowReducer extends Reducer
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//job.setPartitionerClass(ProvincePartitioner.class);
//job.setNumReduceTasks(5);
FileInputFormat.setInputPaths(job, new Path("F:、、input\part-r-00000"));
FileOutputFormat.setOutputPath(job,new Path("F:\cc"));
job.waitForCompletion(true);
}
}
public class ProvincePartitioner extends Partitioner
// 设置自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
// 设置对应的ReduceTask的个数
job.setNumReduceTasks(5);
三、Combiner合并
public class WordCountCombiner extends Reducer
// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordCountCombiner.class);
// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑
job.setCombinerClass(WordCountReducer.class);



