1.数据准备
2.基本思路:
Map阶段:
a.读取一行数据,切分字段;
b.抽取手机号、套餐基本费、语音通信费、短信彩信费、流量费;
c.以手机号为key,bean对象为value输出,即context.write(手机号,bean)。
Reduce阶段:
a.累加套餐基本费、语音通信费、短信彩信费、流量费得到总花费;
b.实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输;
c. MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。
3.freebean
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class freebean implements Writable {
//套餐基本费
private long baseFee;
//语音通信费
private long communicateFee;
//短信彩信费
private long msgFee;
//流量费
private long flowFee;
//总费用
private long sumFee;
//2 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public freebean() {
}
public freebean(long baseFee, long communicateFee, long msgFee, long flowFee) {
super();
this.baseFee = baseFee;
this.communicateFee = communicateFee;
this.msgFee = msgFee;
this.flowFee = flowFee;
this.sumFee = sumFee;
}
//设置参数的便利方法
public void setFee(long baseFee, long communicateFee, long msgFee, long flowFee) {
this.baseFee = baseFee;
this.communicateFee = communicateFee;
this.msgFee = msgFee;
this.flowFee = flowFee;
this.sumFee = baseFee + communicateFee + msgFee + flowFee;
}
//3 写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(baseFee);
out.writeLong(communicateFee);
out.writeLong(msgFee);
out.writeLong(flowFee);
out.writeLong(sumFee);
}
//4 反序列化方法
//5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
@Override
public void readFields(DataInput in) throws IOException {
baseFee = in.readLong();
communicateFee = in.readLong();
msgFee = in.readLong();
flowFee = in.readLong();
sumFee = in.readLong();
}
// 6 编写toString方法,方便后续打印到文本
@Override
public String toString() {
return baseFee + "t" + communicateFee + "t" + msgFee + "t" + flowFee + "t" + sumFee;
}
public long getbaseFee() {
return baseFee;
}
public void setbaseFee(long baseFee) {
this.baseFee = baseFee;
}
public long getCommunicateFee() {
return communicateFee;
}
public void setCommunicateFee(long communicateFee) {
this.communicateFee = communicateFee;
}
public long getMsgFee() {
return msgFee;
}
public void setMsgFee(long msgFee) {
this.msgFee = msgFee;
}
public long getFlowFee() {
return flowFee;
}
public void setFlowFee(long flowFee) {
this.flowFee = flowFee;
}
public long getSumFee() {
return sumFee;
}
public void setSumFee(long sumFee) {
this.sumFee = sumFee;
}
}
4.freemap
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import java.io.IOException; public class freemap extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String datas[]=value.toString().split("t"); String phoneNum=datas[1]; freebean fb=new freebean( Long.parseLong(datas[2]), Long.parseLong(datas[3]), Long.parseLong(datas[4]), Long.parseLong(datas[5])); context.write(new Text(phoneNum),fb); } }
5.freeReduce
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class freeReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long baseFee=0; long communicateFee=0; long msgFee=0; long flowFee=0; for(freebean value:values){ baseFee+=value.getbaseFee(); communicateFee+=value.getCommunicateFee(); msgFee+=value.getMsgFee(); flowFee+=value.getFlowFee(); } freebean fb=new freebean(baseFee,communicateFee,msgFee,flowFee); context.write(key,fb); } }
6.freeDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
public class freeDrive {
public static void main(String[] args) throws Exception{
Job job=Job.getInstance();
job.setJarByClass(freeDrive.class);
job.setMapperClass(freemap.class);
job.setReducerClass(freeReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(freebean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(freebean.class);
Path input=new Path("F:/test_shixun/word/phoneFee.txt");
Path output=new Path("F:/test_shixun/output3");
FileSystem fileSystem=output.getFileSystem(new Configuration());
if(fileSystem.exists(output)){
fileSystem.delete(output,true);
}
FileInputFormat.setInputPaths(job,input);
FileOutputFormat.setOutputPath(job,output);
boolean flag=job.waitForCompletion(true);
System.exit(flag?0:-1);
}
}
7.输出结果
8.原来的数据



