step1 定义Mapper
step2 自定义Partitioner
step3 定义Reduce逻辑
step4 主类中设置分区类和ReduceTask个数
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class Partition {
public static class PartitionMapper extends Mapper {
// map方法将k1,v1转为 k2,v2
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
public static class MyPartitioner extends Partitioner {
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
// 1,拆分行文本数据(k2),获取中奖字段的值
String[] split = text.toString().split(",");
String num = split[3];
// 2,判断中奖字段的值和15的关系,然后返回对应的分区编号
if(Integer.parseInt(num)>15){
return 1;
}else{
return 0;
}
}
}
public static class PartitionReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1,创建job任务对象
Job job = Job.getInstance();
job.setJarByClass(Partition.class);
// 2,对job任务进行配置
//设置输入和输出类型
job.setInputFormatClass(TextInputFormat.class);
job.setOutputKeyClass(TextOutputFormat.class);
//设置map类和数据类型
job.setMapperClass(PartitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//指定分区类和 ReduceTask个数
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(2);
//设置reduce类和数据类型
job.setReducerClass(PartitionReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定输入路径和输出路径
String inpath = "./datas/cmdlist";
String outpath = "./output001";
TextInputFormat.addInputPath(job,new Path(inpath));
TextOutputFormat.setOutputPath(job,new Path(outpath));
//判断文件是否存在
FileSystem fs = FileSystem.get(new Configuration());
if(fs.exists(new Path(outpath))){
fs.delete(new Path(outpath),true);
}
job.waitForCompletion(true);
}
}
}
cmdlist文件:
17365849247,1001,man,13 17365569247,1001,man,15 17365849247,1001,man,12 17365849247,1001,man,11 17366549247,1001,man,17 17365849247,1001,man,17 17365878247,1001,man,14 17365849247,1001,man,13 17365379247,1001,man,10 17365849247,1001,man,11 17365945247,1001,man,12 17365924547,1001,man,15 17365459247,1001,man,18 17361559247,1001,man,17 17365849247,1001,man,19 17365849247,1001,man,18 17365849247,1001,man,16 17365849247,1001,man,13



