package ks;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class mybean implements Writable{
//定义变量
private long upFlow; //上行数据包数
private long downFlow; //下行数据包数
private long sumFlow;
//空参构造
public mybean(){
super();
}
//有参构造
public mybean(long upFlow, long downFlow){
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
sumFlow = upFlow + downFlow;
}
//定义set方法,可有可无
public void set(long upFlow,long downFlow){
this.upFlow = upFlow;
this.downFlow = downFlow;
sumFlow = upFlow + downFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getUpFlow() {
return upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public void setSumFlow(long sumFlow){
this.sumFlow = sumFlow;
}
public long getSumFlow(){
return sumFlow;
}
重写tostring方法
public String toString() {
return upFlow +
"t" + downFlow +
"t" + upFlow + downFlow;
}
//序列化方法
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//反序列化方法,顺序必须和序列化方法一致
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
}
#重写map方法
package ks; import mreduce.mybean; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //第一个参数表示接收key的数据类型(一般都为LongWritable ) //第二个参数表示接受的value的数据类型(一般都为Text) //第三个参数表示写出的key的数据类型 //第四个参数表示写出的value的数据类型 public class mymapper extends Mapper重写reduce方法{ Text k = new Text(); mybean v = new mybean(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //获取一行 String line = value.toString(); //对获取到的文本进行切分 String[] splited = line.split("t"); //设置key值 k.set(splited[1]); long upFlow = Long.parseLong(splited[splited.length-3]); long downFlow = Long.parseLong(splited[splited.length-2]); //设置value值 v.set(upFlow,downFlow); //写出数据 context.write(k,v); } }
package ks; import mreduce.mybean; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.Text; import java.io.IOException; //第一个参数表示接收key的数据类型(一般都为LongWritable ) //第二个参数表示接受的value的数据类型(一般都为Text) //第三个参数表示写出的key的数据类型 //第四个参数表示写出的value的数据类型 public class myReducer extends Reducer分区{ //定义一个bean对象,用于存写出的value mybean v = new mybean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long sumUpFlow = 0; long sumDownFlow = 0; //进行统计 for (mybean flow: values) { sumUpFlow += flow.getUpFlow(); sumDownFlow += flow.getDownFlow(); } //调用set方法,初始化v对象 v.set(sumUpFlow,sumDownFlow); //写出 context.write(key,v); } }
import mreduce.mybean; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.io.Text; //第一个参数表示接收key的数据类型 //第二个参数表示接受的value的数据类型 public class mypartition extends Partitioner主类{ @Override public int getPartition(Text key, mybean value, int numPartitions) { //获取手机号的前三位数字 String pre = key.toString().substring(0,3); //默认分区为4号分区 int partition = 4; if("136".equals(pre)){ //手机号前三位为136在0号分区 partition = 0; }else if("137".equals(pre)){ //手机号前三位为137在1号分区 partition = 1; }else if("138".equals(pre)){ //手机号前三位为138在2号分区 partition = 2; }else if("139".equals(pre)){ //手机号前三位为139在3号分区 partition = 3; } //返回分区 return partition; } }
package ks;
import mreduce.mybean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
public class mydriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("mapred.jar", "D:\Hadoop\untitled1\untitled\target\untitled1-1.0-SNAPSHOT.jar");
Job job = Job.getInstance(conf);
//进行打包
job.setJarByClass(mydriver.class);
//设置数据源路径
FileInputFormat.setInputPaths(job, new Path("file:///d:\value.txt"));
//绑定map类
job.setMapperClass(mymapper.class);
//shuffle过程
job.setPartitionerClass(mypartition.class);
job.setNumReduceTasks(5);
//设置map的输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(mybean.class);
//绑定reduce类
job.setReducerClass(myReducer.class);
//设置reduce的输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(mybean.class);
//设置文件保存路径
FileOutputFormat.setOutputPath(job, new Path("file:///d:\output1"));
//交给yarn去执行,直到执行结束才退出本程序
job.waitForCompletion(true);
BasicConfigurator.configure();
}
}



