手机号 总上行流量 总下行流量 总流量
11136230513 500 500 1000 12959002129 1938 180 2118 19943685818 3659 3538 7197 22246544121 250 250 500 22256435636 250 250 500 33366251146 240 0 240 33371575951 1527 2106 3633 33388413456 4116 1432 5548Maven必须配置
参考 MapReduce统计流量案例 的Maven配置
resources目录下log4j.properties 配置参考 MapReduce统计流量案例 的log4j.properties配置
自定义Writable类实现(FlowBean)package com.test.mapreduce.comparable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable自定义Mapper类实现(FlowMapper){ private Long upFlow; // 上行流量 private Long downFlow; // 下行流量 private Long sumFlow; // 总流量 public Long getUpFlow() { return upFlow; } public void setUpFlow(Long upFlow) { this.upFlow = upFlow; } public Long getDownFlow() { return downFlow; } public void setDownFlow(Long downFlow) { this.downFlow = downFlow; } public Long getSumFlow() { return sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } public FlowBean() { } @Override public void write(DataOutput dataOutput) throws IOException { // 序列化Value数据 dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { // 反序列化数据 this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } @Override public String toString() { return upFlow + "t" + downFlow + "t" + sumFlow; } @Override public int compareTo(FlowBean o) { // 按照总流量比较,倒序排序(倒序-1, 正序1) if (this.sumFlow > o.sumFlow){ return -1; }else if(this.sumFlow < o.sumFlow){ return 1; }else { return 0; } } }
package com.test.mapreduce.comparable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper自定义Reducer类实现(FlowReducer){ // 定义一个FlowBean对象,用于封装key private FlowBean k = new FlowBean(); // 定义一个Text对象,用于封装value private Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.获取每一行数据 String line = value.toString(); // 2.切割 String[] dataList = line.split("t"); // 3.封装数据 k.setUpFlow(Long.parseLong(dataList[1])); k.setDownFlow(Long.parseLong(dataList[2])); k.setSumFlow(); v.set(dataList[0]); // 4.输出 context.write(k, v); } }
package com.test.mapreduce.comparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer自定义Reducer类实现(FlowDriver){ @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { // 循环写出,避免总流量相同情况 for (Text value : values) { context.write(value, key); } } }
package com.test.mapreduce.comparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.创建配置信息Configuration对象并获取Job单例对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置关联本Driver程序的jar
job.setJarByClass(FlowDriver.class);
// 3.设置关联Mapper和Reducer的jar
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.设置Mapper输出的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5. 设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\output"));
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
输出数据
19943685818 3659 3538 7197 33388413456 4116 1432 5548 33371575951 1527 2106 3633 12959002129 1938 180 2118 11136230513 500 500 1000 22256435636 250 250 500 22246544121 250 250 500 33366251146 240 0 240



