Map阶段
reduce阶段
二、Shuffle机制流程图
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
2.1Partition分区分区指的是:将MapReduce统计的结果按照条件输出到不同的文件中。
public int getPartition(K key,V value,int numReduceTasks){
return (key.hashCode()&Intger.MAX_VALUE)%numReduceTasks;
}
默认分区:是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
自定义Partitioner步骤
- 自定义继承Partitioner,重写getPartition()方法在Job驱动中,设置自定义Partitioner自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
分区总结:
如果ReduceTask的数量>getPartition的结果数,则会多生产几个空的输出文件;如果1ReduceTask的数量如果ReduceTask的数量=1,无论MapTask端输出多少分区文件,最终都是ReduceTask,最终也只是产生一个结果文件;分区号必须从零开始,逐一累加; 2.1.1案例分析:
需求分析
在之前Mapreduce学习文章中的统计流量的案例基础上添加代码
1、添加分区类
public class ProvincePartitioner extends Partitioner{ @Override public int getPartition(Text key, FlowBean value, int numPartitions) { //1.获取电话号码前三位 String preNum = key.toString().substring(0, 3); int partition = 4; //2.判断是哪个省 if ("136".equals(preNum)) { partition = 0; } else if ("137".equals(preNum)) { partition = 1; } else if ("138".equals(preNum)) { partition = 2; } else if ("139".equals(preNum)) { partition = 3; } return partition; } }
2、驱动类中增加分区设置和ReduceTask设置
//8.指定自定义数据分区和指定reduce task数量 job.setPartitionerClass(ProvincePartitioner.class); job.setNumReduceTasks(5);2.2WritableCompare排序 2.2.1 概述
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据key进行排序。该操作属于Hadoop的默认行为。任何应用程序中和数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕之后,它会对磁盘上所有文件进行归并排序。对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。 2.2.2排序的分类
部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失 MapReduce所提供的并行架构。
辅助排序(分组排序)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
2.2.3自定义排序WritableComparable原理分析
bean对象作为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。
案例实操:WritableComparable排序案例实操(全排序)
需求分析
代码实现
1.FlowBean基础上增加比较功能
public class FlowBean implements WritableComparable{ //1 实现Writable接口 private long upFlow; private long downFlow; private long sumFlow; //2 反序列化时,需要反射调用空参构造函数 public FlowBean() { } public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } //3.写序列化方法 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //4.写反序列化方法 //5.顺序要和写序列化方法一致 @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } //6.toString方法 @Override public String toString() { return upFlow + "t" + downFlow + "t" + sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void set(long downFlow, long upFlow) { this.downFlow = downFlow; this.upFlow = upFlow; sumFlow = downFlow + upFlow; } //7.compare to 方法 @Override public int compareTo(FlowBean bean) { int result; //按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; } else if (sumFlow < bean.getSumFlow()) { result = 1; } else { result = 0; } return result; } }
2.编写mapper类
public class FlowCountSortMapper extends Mapper{ FlowBean bean = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //1.获取一行 String line = value.toString(); //2.切割字段 String[] fields = line.split("t"); //3.封装对象 //取出手机号 String phoneNum = fields[1]; //取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); v.set(phoneNum); bean.set(downFlow, upFlow); //4.写出 context.write(bean, v); } }
3.Reducer
public class FlowCountSortReducer extends Reducer{ @Override protected void reduce(FlowBean key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { for (Text text:values){ context.write(text,key); } } }
4.Driver
public class FlowCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
args = new String[]{"d:/work/input1", "d:/work/output"};
//1.获取job实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//2.设置jar加载路径
job.setJarByClass(FlowCountDriver.class);
//3.设置Map类和Reduce类
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);
//4.设置Map输出
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//8.指定自定义数据分区和指定reduce task数量
// job.setPartitionerClass(ProvincePartitioner.class);
// job.setNumReduceTasks(5);
//6.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
WritableComparable排序案例实操(区内排序)
1.需求
要求每个省份手机号输出的文件中按照总流量内部排序。
2.需求分析
基于前一个需求,增加自定义分区类,分区按照省份手机号设置。
只需要将之前分区代码注释放开就ok了
//8.指定自定义数据分区和指定reduce task数量
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(5);
2.3Combiner合并
(1)Combiner是MR程序中Mapper和Reducer之外的一种组件
(2)Combiner组件的父类就是Reducer。
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每一个MapTask所在的节点运行。
Reducer是接收全局所有的Mapper的输出结果;
(4)Combiner的意义就是对每个MapTask的输出进行局部汇总,以减小网络传输量。
(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出kv应该能和Reducer的输入kv对应起来。
(6)自定义Combiner实现:自定义Combiner继承Reducer,重写Reduce方法,然后在驱动类中配置。
案例实操:wordcount案例优化:对每个MapTask的输出进行局部汇总;期望经过Combiner合并,减少输出。
Combiner类
public class WordCountCombiner extends Reducer{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } v.set(sum); context.write(key, v); } }
driver类
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
args = new String[]{"D:/WORK/input3",
"D:/WORK/output"};
//1.获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//2.设置jar加载路径
job.setJarByClass(WordCountDriver.class);
//3.设置map和reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置合并规则
job.setCombinerClass(WordCountCombiner.class);
//6.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
或者不用写新的Combiner类,因为内容和Reducer完全一样,可以在Driver类中直接指定Reducer为合并规则,这样效果也是相同的。
2.4GroupingComparator分组(辅助排序)对Reduce阶段的数据根据某一个或几个字段进行分组。
分组排序步骤:
(1)自定义类继承WritableComparator
(2)重写compare()方法
(3)创建一个构造将比较对象的类传给父类
protected OrderGroupingComparator(){
super(OrderBean.class,true);
}
案例实操:
需求分析:
代码实现:
Bean
public class OrderBean implements WritableComparable{ //订单id号 private int order_id; //价格 private double price; public OrderBean() { } public OrderBean(int order_id, double price) { this.order_id = order_id; this.price = price; } @Override public int compareTo(OrderBean o) { int result; if (order_id > o.getOrder_id()) { result = 1; } else if (order_id < o.getOrder_id()) { result = -1; } else { result = price > o.getPrice() ? -1 : 1; } return result; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(order_id); dataOutput.writeDouble(price); } @Override public void readFields(DataInput dataInput) throws IOException { order_id = dataInput.readInt(); price = dataInput.readDouble(); } @Override public String toString() { return order_id + "t" + price; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } }
mapper
public class OrderSortMapper extends Mapper{ OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //1获取一行 String line = value.toString(); //2截取 String[] fields = line.split("t"); //3封装对象 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); //4写出 context.write(k, NullWritable.get()); } }
GroupingComparator
public class OrderSortGroupingComparator extends WritableComparator {
protected OrderSortGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = -1;
} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = 1;
} else {
result = 0;
}
return result;
}
}
Reducer
public class OrderSortReducer extends Reducer{ @Override protected void reduce(OrderBean key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
driver
public class OrderSortDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"d:/WORK/input5",
"d:/output1"};
//1获取配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2设置jar包加载路径
job.setJarByClass(OrderSortDriver.class);
//3加载map/reduce类
job.setMapperClass(OrderSortMapper.class);
job.setReducerClass(OrderSortReducer.class);
//4设置map输出数据key和value类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
//5设置最终输出数据的key和value类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
//6设置输入数据和输出数据路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//8设置reduce端的分组
job.setGroupingComparatorClass(OrderSortGroupingComparator.class);
//7提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}



