topN排名
Map类
package topN_01;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
public class MyMap extends
Mapper {
int len;
int[] top;
@Override
protected void setup(Context context){
//这个是从配置文件中,获得N的个数,如果没有设置这个变量,默认的是10
len = context.getConfiguration().getInt("N",10);
top = new int[len+1];
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(",",-1);
if(arr.length == 4){
int x = Integer.valueOf(arr[2]);
add(x);
}
}
protected void add(int val){
//专门用来存放放进来的数据
top[0] = val;
//排序,从小到大排序,top初始值都是0
Arrays.sort(top);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (int i = 1; i < len+1; i++) {
context.write(new IntWritable(top[i]),new IntWritable(top[i]));
}
}
}
Reduce类
package topN_01;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Arrays;
public class MyReduce extends Reducer {
int len;
int[] top;
@Override
protected void setup(Context context){
//这个是从配置文件中,获得N的个数,如果没有设置这个变量,默认的是10
len = context.getConfiguration().getInt("N",10);
top = new int[len+1];
}
@Override
protected void reduce(IntWritable key,Iterable values,Context context){
for (IntWritable val: values) {
add(val.get());
}
}
protected void add(int val){
top[0] = val;
Arrays.sort(top);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
for (int i = len; i > 0 ; i--) {
//从最大值开始正序排序
context.write(new Text(String.valueOf(len -i + 1)),new Text(String.valueOf(top[i])));
}
}
}
Job类
package topN_01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class TestJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
//1 get a job
Job job = Job.getInstance(conf);
//2 set jar main class
job.setJarByClass(TestJob.class);
//3 set map class and reducer class
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
//4 set map reduce output type
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//5 set key/value output file format and input/output path
FileInputFormat.setInputPaths(job,new Path("file:///simple/word.txt"));
FileOutputFormat.setOutputPath(job,new Path("file:///simple/result"));
//6 commit job
job.waitForCompletion(true);
}
}