目录
1.需求
2.需求分析
3.代码
(1)在之前的序列化案例实操的基础上,增加一个分区类
(2) 在driver类中增加自定义数据分区设置和ReduceTask设置
1.需求
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据:txt文件
(2)期望输出数据:
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
2.需求分析
在之前的序列化案例实操上进行修改。
3.代码
(1)在之前的序列化案例实操的基础上,增加一个分区类
public class ProvincePartitioner extends Partitioner {
@Override
public int getPartition(Text text, FlowBean flowBean, int i) {
//test是手机号
String phone = text.toString();
//取前三位,substring(0,3)包含左边,不包含右边
String prePhone = phone.substring(0, 3);
int partition;
if("136".equals(prePhone)){
partition = 0;
}else if("137".equals(prePhone)){
partition = 1;
}else if("138".equals(prePhone)){
partition = 2;
}else if("139".equals(prePhone)){
partition = 4;
}else{
partition = 5;
}
return partition;
}
}
(2) 在driver类中增加自定义数据分区设置和ReduceTask设置
public class ProvincePartitioner extends Partitioner{ @Override public int getPartition(Text text, FlowBean flowBean, int i) { //test是手机号 String phone = text.toString(); //取前三位,substring(0,3)包含左边,不包含右边 String prePhone = phone.substring(0, 3); int partition; if("136".equals(prePhone)){ partition = 0; }else if("137".equals(prePhone)){ partition = 1; }else if("138".equals(prePhone)){ partition = 2; }else if("139".equals(prePhone)){ partition = 4; }else{ partition = 5; } return partition; } }
(2) 在driver类中增加自定义数据分区设置和ReduceTask设置
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(6);
FileInputFormat.setInputPaths(job,new Path("D:\code\Hadoop\input\inputflow"));
FileOutputFormat.setOutputPath(job,new Path("D:\code\Hadoop\output55555"));
boolean result = job.waitForCompletion(true);
System.exit(result? 0:1);
}
}


