Partition分区的使用案例:
将统计结果按照条件输出到不同文件中(分区)
文章目录1)需求2)需求分析3)编程实现
1.创建Partitioner类2.创建Bean类3.创建Mapper类4.创建Reducer类 4.查看结果
1)需求将统计结果按照手机号开头输出到不同文件中
期望输出:手机号 136、137、138、139 开头都分别放到一个独立的 4 个文件中,其他开头的放到 一个文件中
2)需求分析 3)编程实现在原基础上,增加一个分区类:
1.创建Partitioner类package com.yingzi.mapreduce.partitioner2; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner2.创建Bean类{ @Override public int getPartition(Text text, FlowBean flowBean, int numPartitions) { // text:手机号 String phone = text.toString(); String prePhone = phone.substring(0, 3); int partition; if ("136".equals(prePhone)){ partition = 0; }else if("137".equals(prePhone)){ partition = 1; }else if("138".equals(prePhone)){ partition = 2; }else if("139".equals(prePhone)){ partition = 3; }else{ partition = 4; } return partition; } }
package com.yingzi.mapreduce.partitioner2;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
// 空参构造
public FlowBean(){
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
@Override
public String toString() {
return upFlow + "t" + downFlow + "t" + sumFlow;
}
}
3.创建Mapper类
package com.yingzi.mapreduce.partitioner2; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper4.创建Reducer类{ private Text outK = new Text(); private FlowBean outV = new FlowBean(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { // 1.获取一行 // 1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200 String line = value.toString(); // 2.切割 // 1,13736230513,192.196.100.1,www.atguigu.com 2481,24681,200 String[] split = line.split("t"); // 3.抓取想要的数据 // 手机号:13736230513 // 上行流量:2481 下行流量:24681 String phone = split[1]; String up = split[split.length - 3]; String down = split[split.length - 2]; // 4.封装 outK.set(phone); outV.setUpFlow(Long.parseLong(up)); outV.setDownFlow(Long.parseLong(down)); outV.setSumFlow(); // 5.写出 context.write(outK,outV); } }
package com.yingzi.mapreduce.partitioner2; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer{ private FlowBean outV = new FlowBean(); @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { // 1.遍历集合累加值 long totalUp = 0; long totalDown = 0; for (FlowBean value : values) { totalUp += value.getUpFlow(); totalDown += value.getDownFlow(); } // 2.封装outK,outV outV.setUpFlow(totalUp); outV.setDownFlow(totalDown); outV.setSumFlow(); // 3.写出 context.write(key,outV); } }
5.创建Driver类
package com.yingzi.mapreduce.partitioner2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar
job.setJarByClass(FlowDriver.class);
// 3.关联Mapper、Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4.设置mapper,输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5.设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//8.指定自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
//9 同时指定相应数量的 ReduceTask
job.setNumReduceTasks(5);
// 6.设置数据的输入和输出路径
FileInputFormat.setInputPaths(job,new Path("G:\计算机资料\大数据开发\尚硅谷大数据技术之Hadoop3.x\资料\11_input\inputflow"));
FileOutputFormat.setOutputPath(job,new Path("G:\计算机资料\大数据开发\尚硅谷大数据技术之Hadoop3.x\资料\_output\output7"));
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1);
}
}
4.查看结果



