要求每个省份手机号输出的文件中按照总流量内部排序。增加自定义分区类,分区按照省份手机号设置。
13470253144 180 180 360 13509468723 7335 110349 117684 13560439638 918 4938 5856 13568436656 3597 25635 29232 13590439668 1116 954 2070 13630577991 6960 690 7650 13682846555 1938 2910 4848 13729199489 240 0 240 13736230513 2481 24681 27162 13768778790 120 120 240 13846544121 264 0 264 13956435636 132 1512 1644 13966251146 240 0 240 13975057813 11058 48243 59301 13992314666 3008 3720 6728 15043685818 3659 3538 7197 15910133277 3156 2936 6092 15959002129 1938 180 2118 18271575951 1527 2106 3633 18390173782 9531 2412 11943 84188413 4116 1432 55481.1.2、分析流程 1.1.3、代码实现
实体bean
package com.song.writablecomparable; import lombok.Data; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @Data public class FlowBean implements WritableComparable{ private long upFlow; //上行流量 private long downFlow; //下行流量 private long sumFlow; //总流量 //2 提供无参构造 public FlowBean() { } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } //4 实现序列化和反序列化方法,注意顺序一定要保持一致 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } //5 重写ToString @Override public String toString() { return upFlow + "t" + downFlow + "t" + sumFlow; } @Override public int compareTo(FlowBean bean) { int result; // 按照总流量大小,倒序排列 if (this.sumFlow > bean.getSumFlow()) { result = -1; } else if (this.sumFlow < bean.getSumFlow()) { result = 1; } else { // 按照上行流量大小,倒序排列 if (this.upFlow > bean.upFlow) { return -1; } else if (this.upFlow < bean.upFlow) { return 1; } else { result = 0; } } return result; } }
mapper
package com.song.writablecomparable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper{ private FlowBean outK = new FlowBean(); private Text outV = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1 获取一行数据 String line = value.toString(); //2 按照"t",切割数据 String[] split = line.split("t"); //3 封装outK outV outK.setUpFlow(Long.parseLong(split[1])); outK.setDownFlow(Long.parseLong(split[2])); outK.setSumFlow(); outV.set(split[0]); //4 写出outK outV context.write(outK, outV); } }
reducer
package com.song.writablecomparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer{ @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { //遍历values集合,循环写出,避免总流量相同的情况 for (Text value : values) { //调换KV位置,反向写出 context.write(value, key); } } }
partitioner
package com.song.partitionerandwritablecomparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner{ @Override public int getPartition(FlowBean flowBean, Text text, int numPartitions) { //获取手机号前三位prePhone String phone = text.toString(); String prePhone = phone.substring(0, 3); //定义一个分区号变量partition,根据prePhone设置分区号 int partition; if ("136".equals(prePhone)) { partition = 0; } else if ("137".equals(prePhone)) { partition = 1; } else if ("138".equals(prePhone)) { partition = 2; } else if ("139".equals(prePhone)) { partition = 3; } else { partition = 4; } //最后返回分区号partition return partition; } }
driver
package com.song.partitionerandwritablecomparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联本Driver类
job.setJarByClass(FlowDriver.class);
//3 关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 设置Map端输出KV类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 指定自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
// 同时指定相应数量的ReduceTask
job.setNumReduceTasks(5);
//6 设置程序的输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\test_data\data.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\test_data\output8888"));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
2、Combiner合并
2.1、Combiner合并Demo
2.1.1、需求
统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能
2.1.2、数据banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang2.1.3、分析 2.1.4、代码实现
mapper
package com.song.combiner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 输出 for (String word : words) { k.set(word); context.write(k, v); } } }
reducer
package com.song.combiner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 1 累加求和 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 输出 v.set(sum); context.write(key,v); } }
combiner
package com.song.combiner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountCombiner extends Reducer{ private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } //封装outKV outV.set(sum); //写出outKV context.write(key, outV); } }
driver
package com.song.combiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 方式一
// job.setCombinerClass(WordCountCombiner.class);
// 方式二
job.setCombinerClass(WordCountReducer.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\test_data\hello.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\test_data\out11"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
3、OutputFormat数据输出
3.1、自定义OutputFormat案例实操
3.1.1、需求
过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。
3.1.2、输入数据http://www.baidu.com http://www.google.com http://cn.bing.com http://www.atguigu.com http://www.sohu.com http://www.sina.com http://www.sin2a.com http://www.sin2desa.com http://www.sindsafa.com3.1.3、分析 3.1.4、代码实现
mapper
package com.song.outputformat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class LogMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //不做任何处理,直接写出一行log数据 context.write(value,NullWritable.get()); } }
reducer
package com.song.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 防止有相同的数据,迭代写出 for (NullWritable value : values) { context.write(key, NullWritable.get()); } } }
LogOutputFormat
package com.song.outputformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class LogOutputFormat extends FileOutputFormat{ @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { //创建一个自定义的RecordWriter返回 return new LogRecordWriter(job); } }
LogRecordWriter
package com.song.outputformat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class LogRecordWriter extends RecordWriter{ private FSDataOutputStream atguiguOut; private FSDataOutputStream otherOut; public LogRecordWriter(TaskAttemptContext job) { try { //获取文件系统对象 FileSystem fs = FileSystem.get(job.getConfiguration()); //用文件系统对象创建两个输出流对应不同的目录 atguiguOut = fs.create(new Path("d:\test_data\hadoop\atguigu.log")); otherOut = fs.create(new Path("d:\test_data\hadoop\other.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String log = key.toString(); //根据一行的log数据是否包含atguigu,判断两条输出流输出的内容 if (log.contains("atguigu")) { atguiguOut.writeBytes(log + "n"); } else { otherOut.writeBytes(log + "n"); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { //关流 IOUtils.closeStream(atguiguOut); IOUtils.closeStream(otherOut); } }
driver
package com.song.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置自定义的outputformat
job.setOutputFormatClass(LogOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("D:\test_data"));
//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path("D:\test_data\logoutput"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}



