package demo05.flowOrder;
import demo04.flow.FlowNum;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable {
private Integer upFlow;
private Integer downFlow;
private Integer upCountFlow;
private Integer downCountFlow;
@Override
public int compareTo(FlowNum o) {
return this.upFlow > o.getUpFlow()?-1:1;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(upFlow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(upCountFlow);
dataOutput.writeInt(downCountFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readInt();
this.downFlow = dataInput.readInt();
this.upCountFlow = dataInput.readInt();
this.downCountFlow = dataInput.readInt();
}
@Override
public String toString() {
return this.upFlow+"t"+this.downFlow+"t"+this.upCountFlow+"t"+this.downCountFlow;
}
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
}
package demo05.flowOrder;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowOrderMap extends Mapper {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
//分割数据
String[] strings = value.toString().split("t");
//获取上行流量,下行流量,上行总流量,下行总流量
FlowBean flowBean = new FlowBean();
flowBean.setUpFlow(Integer.parseInt(strings[1]));
flowBean.setDownFlow(Integer.parseInt(strings[2]));
flowBean.setUpCountFlow(Integer.parseInt(strings[3]));
flowBean.setDownCountFlow(Integer.parseInt(strings[4]));
context.write(flowBean,new Text(strings[0]));
}
}
package demo05.flowOrder;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowOrderReduce extends Reducer {
@Override
protected void reduce(FlowBean key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
//这个地方输出字节流,hadoop会自己根据compareTo排序后输出
context.write(key,values.iterator().next());
}
}