手机号 总上行流量 总下行流量 总流量
11136230513 500 500 1000 11111111111 600 500 1100 12959002129 1938 180 2118 19943685818 3659 3538 7197 22246544121 300 250 550 22256435636 250 250 500 33366251146 240 0 240 33371575951 1527 2106 3633 33388413456 4116 1432 5548代码说明
请参考 MapReduce统计流量案例(自定义排序-全排序)基本实现 中代码,修改部分修改部分在本文已写,替换即可。
自定义Patition类(myPartition)package com.test.mapreduce.comparableAndpartition; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class myPartition extends Partitioner自定义Driver类(WordCountDriver){ @Override public int getPartition(FlowBean flowBean, Text text, int i) { // 获取手机号 String phone = text.toString(); // 获取手机号前缀 String prePhone = phone.substring(0, 3); // 定义一个分区号变量 int partition; // 不同开头数字去不同分区 if ("111".equals(prePhone)){ partition = 0; }else if ("222".equals(prePhone)){ partition = 1; }else if ("333".equals(prePhone)){ partition = 2; }else { partition = 3; } return partition; } }
package com.test.mapreduce.comparableAndpartition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.创建配置信息Configuration对象并获取Job单例对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置关联本Driver程序的jar
job.setJarByClass(FlowDriver.class);
// 3.设置关联Mapper和Reducer的jar
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.设置Mapper输出的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5. 设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.设置自定义分区
job.setPartitionerClass(myPartition.class);
// 7.设置ReduceTask数量
job.setNumReduceTasks(4);
// 8.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\output"));
// 9.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
输出数据
part-r-00000
11111111111 600 500 1100 11136230513 500 500 1000
part-r-00001
22246544121 300 250 550 22256435636 250 250 500
part-r-00002
33388413456 4116 1432 5548 33371575951 1527 2106 3633 33366251146 240 0 240
part-r-00003
19943685818 3659 3538 7197 12959002129 1938 180 2118



