package com.cn.demo_groupTopN;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroupCompactor extends WritableComparator {
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
return first.getOrder_id().compareTo(second.getOrder_id());
}
public MyGroupCompactor() {
super(OrderBean.class,true);
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable {
private String order_id;
private Double price;
@Override
public int compareTo(OrderBean orderBean) {
//如果订单号相同比较价格,否则比较无意义
if (this.order_id.compareTo(orderBean.getOrder_id())==0) {
return this.price.compareTo(orderBean.getPrice());
}
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(order_id);
dataOutput.writeDouble(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.order_id = dataInput.readUTF();
this.price = dataInput.readDouble();
}
public String getOrder_id() {
return order_id;
}
public void setOrder_id(String order_id) {
this.order_id = order_id;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return this.order_id + "t" + this.price;
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyGroupMap extends Mapper {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split("t");
OrderBean orderBean = new OrderBean();
orderBean.setOrder_id(splits[0]);
orderBean.setPrice(Double.parseDouble(splits[2]));
context.write(orderBean,new DoubleWritable(Double.parseDouble(splits[2])));
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyGroupReduce extends Reducer {
@Override
protected void reduce(OrderBean key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
int i = 0;
for (DoubleWritable value: values) {
i++;
if(i<=2){
context.write(key,value);
}else {
break;
}
}
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartion extends Partitioner {
@Override
public int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int i) {
return (orderBean.getOrder_id().hashCode() & Integer.MAX_VALUE)%i;
}
}
package com.cn.demo_groupTopN;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyGroupMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(),"group_demo");
job.setJarByClass(MyGroupMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义groupingComparator\input"));
job.setMapperClass(MyGroupMap.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(DoubleWritable.class);
//设置分区类
job.setPartitionerClass(MyPartion.class);
//设置分区数量
job.setNumReduceTasks(2);
//设置分组类
job.setGroupingComparatorClass(MyGroupCompactor.class);
//设置reduce类
job.setReducerClass(MyGroupReduce.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(DoubleWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义groupingComparator\output_TOPN"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(),new MyGroupMain(),args);
System.exit(run);
}
}