import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class SortWritable implements WritableComparable {
private String first;
private int second;
@Override
public int compareTo(SortWritable o) {
int i = this.first.compareTo(o.first);
if(i==0){
int i1 = Integer.valueOf(this.second+"").compareTo(Integer.valueOf(o.second));
return i1;
}else{
return i;
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.first);
out.writeInt(second);
}
@Override
public void readFields(DataInput in) throws IOException {
this.first=in.readUTF();
this.second=in.readInt();
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public String toString() {
return this.first+"t"+this.second;
}
}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMap extends Mapper {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
//定义计数器Map输入多少条
Counter counter = context.getCounter("MR_COUNT", "MapRecordCounter");
counter.increment(1L);
String[] strings = value.toString().split("t");
SortWritable sortWritable = new SortWritable();
sortWritable.setFirst(strings[0]);
sortWritable.setSecond(Integer.parseInt(strings[1]));
context.write(sortWritable,NullWritable.get());
System.out.println(sortWritable.toString());
}
}
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReduce extends Reducer {
@Override
protected void reduce(SortWritable key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.Counter counter = context.getCounter(Counter.REDUCE_INPUT_RECORDS);
counter.increment(1L);
for (NullWritable nullWritable:values) {
context.write(key,NullWritable.get());
org.apache.hadoop.mapreduce.Counter counter1 = context.getCounter(Counter.REDUCE_OUTPUT_RECORDS);
counter1.increment(1L);
}
}
public static enum Counter{
REDUCE_INPUT_RECORDS, REDUCE_OUTPUT_RECORDS,
}
}