实验汇总
MapReduce计算案例
序列化
Writable常用的Writable类自定义Writable序列号案例分析总结 排序
排序方式自定义排序全排序案例区内排序 TextInputFormat切片
CombineTextInputFormat切片机制
代码CombineTextInputFormat 切片机制的原理 FileInputFormat实现类
TextInputFormatKeyValueTextInputFormat
KeyValueTextInputFormat具体实现样例 NLFileInputFormat
NLFileInputFormat具体实现样例 ==自定义InputFormat==
样例==自定义InputFormat====自定义RecordReader类==MapperReduceJob总结 Partition分区
自定义Partition流程案例 Combiner
自定义Combiner方法案例总结 GroupingComparator 分组(辅助排序)
案例 自己瞎写的公众号与博客
实验汇总 MapReduce计算案例 序列化Writable序列化:把结构化对象转换为字节流。
反序列化
java序列化Serializable
hadoop有自己的序列化机制Writable
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
让实体类实现Writable接口
MR的任意key必须实现WritableComparable接口
WritableComparables can be compared to each other, typically via Comparators. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface.
@InterfaceAudience.Public @InterfaceStability.Stable public interface WritableComparable常用的Writable类extends Writable, Comparable { }
Text相当于String的Writable
@Stringable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Text extends BinaryComparable
implements WritableComparable {
}
自定义Writable
public class FlowBean implements Writable {
private Long upFlow;
private Long downFlow;
private Long sumFlow;
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
//记得加上get、set、构造方法、equals等方法!
}
序列号案例分析
需求:统计每一个手机号耗费的总上行流量、下行流量、总流量
输入数据格式
7 13560436666 120.196.100.99 1116 954 200
Id 手机号码 网络ip 上行流量 下行流量 网络状态码
Map阶段:抽取一行,切分字段,抽取手机号、上行、下行流量三个字段。
以手机号为key,bean对象(已实现序列号接口)为value输出给Reduce
期望输出给Reduce:
13560436666 1116 954 2070
手机号码 上行流量 下行流量 总流量(通过Map计算)
Reduce阶段:累积同一个号码的上行、下行流量得到总的流量。
期望输出的最终结果:
13560436666 1116 + 954 = 2070
手机号码 上行流量 下行流量 总流量
代码
FlowBean代码在上面噢!Mapper
public class FlowCountMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取value值 String line = value.toString(); //切割获取字段 String[] fields = line.split(" "); //封装对象 //0为id 1为手机号码 ... String phoneNum = fields[1]; //获取上下行流量 Long upFlow = Long.valueOf(fields[fields.length-3]); Long downFlow = Long.valueOf(fields[fields.length-2]); //封装上传下载流量到FlowBean FlowBean flowBean = new FlowBean(upFlow,downFlow); System.out.println("Mapper过程:电话号码"+phoneNum+"流量flowBean:"+flowBean); //mapper输出 <134....,1116,954> //<134....,FlowBean> context.write(new Text(phoneNum), flowBean); } }
Reduce
public class FlowCountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Long sum_upFlow= Long.valueOf(0); Long sum_downFlow= Long.valueOf(0); //遍历bean进行统计累加 for (FlowBean value : values) { sum_downFlow+=value.getDownFlow(); sum_upFlow+=value.getDownFlow(); } //获取的总和流量进行封装 FlowBean resultflowBean = new FlowBean(sum_upFlow,sum_downFlow); //输出 context.write(key, resultflowBean); } }
Job
public class FlowSumDriver {
public static void main(String[] args) throws Exception {
//输入输出路径根据电脑设置
args = new String[]{"E:/phone_data.txt","E:/"};
//配置信息 job对象实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//入口jar包本地路径
job.setJarByClass(FlowSumDriver.class);
//MapperReduce配置
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
//输出格式指定
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//输入输出路径配置
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//将job和相关jar和类交给yarn运行
System.exit(job.waitForCompletion(true)?0:1);
}
}
输出结果
hadoop jar wordcount.jar cn.mr.WordCount [Inputpath] [OutputPath]
总结13736230513 2481 24681
13846544121 200 1432
13956435636 200 200
13966251146 404 200
18271575951 1527 2106
84188413 4116 1432
13590439668 954 200
15910133277 3156 2936
13729199489 200 200
13630577991 6960 690
15043685818 3659 3538
15959002129 1938 180
13560439638 200 500
13470253144 200 200
13682846555 2910 200
13992314666 3008 3720
13509468723 7335 110349
18390173782 9531 2412
13975057813 11058 48243
13768778790 200 200
13568436656 2481 24681
13568436656 954 200
1、什么是序列化,其使用场景是什么?
Java中序列化是将对象转换为字节码的过程,字节码能够更好得进行数据的传输,字节码文件在传输过程能够很好的保持完整性。
反序列化是将字节码文件转换为原来对象的方法
序列化的使用场景:跨平台使用,网络中流的形式将对象进行传输,序列化的字节码数据进行储存。
2、简述 hadoop 实现自定义对象序列化的步骤?
本实例中自定义FlowBean
排序(1)实现writable接口,实现其中的方法write为序列号、化方法,readFileds为反序列号方法
(2)默认空参构造方法,java反射通过字节码反射生成原来对象,需要空参构造方法。
(3)重写write方法,让bean中需要进行序列化的字段写入DataOutput中
(4)重写readFileds方法,读取输入的DataInput,赋值于本实例,进行反序列化
(5)复写toString()
(6)根据要求写入构造方法
Hadoop默认对key按照字典顺序快速排序;
排序方式对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
部分排序 根据输入的键排序,保证输出的每个文件内部有序。全排序 最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。辅助排序 在Reduce中对key进行分组,接收key为bean对象时让字段相同的key进入同一个reduce()方法二次排序 自定义排序,如果compareTo中判断条件为两个即为二次排序 自定义排序
通过bean对象实现WritableComparable接口重写compareTo方法实现排序。
@Override
public int compareTo(FlowBean o) {
return Long.compare(o.sumFlow,this.sumFlow);
// int result;
// //按流量大小倒序排列
// if (sumFlow > bean.getSumFlow()){
// //大
// result = -1;
// }else if (sumFlow < bean.getSumFlow()){
// //小
// result = 1;
// }else {
// //相等
// result = 0;
// }
// return result;
}
全排序案例
bean
public class FlowBean implements WritableComparable{ private long upFlow; private long downFlow; private long sumFlow; // 反序列化时,需要反射调用空参构造函数,所以必须有 //getset @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "t" + downFlow + "t" + sumFlow; } @Override public int compareTo(FlowBean o) { int result; // 按照总流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; } }
Mapper Reduce:参考如上案例
Job
public static void main(String[] args) throws Exception {
//配置信息 job对象实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//入口jar包本地路径
job.setJarByClass(FlowSumDriver.class);
//MapperReduce配置
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
//输出格式指定
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定自定义数据分区
// job.setPartitionerClass(ProvincePartitioner.class);
//
// //同时指定相应数量的 reduce task
// job.setNumReduceTasks(5);
//输入路径
for (int i = 0; i < args.length-1 ; i++) {
FileInputFormat.setInputPaths(job,new Path(args[i]));
}
FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));
//将job和相关jar和类交给yarn运行
System.exit(job.waitForCompletion(true)?0:1);
}
总结
bean类实现比较的接口,在通过key、value的输入输出时会自动使用该方法来比较。
区内排序即分区(Partition)+排序的组合
在如上全排序基础上,增加分区代码即可
public class ProvincePartitioner extends Partitioner{ @Override public int getPartition(FlowBean flowBean,Text text, int numPartitions) { //获取前3位号码 String preNum = text.toString().substring(0, 3); int partition = 4; //判断省份 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } //返回结果 return partition; } }
job
//指定自定义数据分区 job.setPartitionerClass(ProvincePartitioner.class); //同时指定相应数量的 reduce task job.setNumReduceTasks(5);
执行结果
排序后实现了对不同号码进入不同文件的分类,并实现流量排序。
TextInputFormat切片hadoop框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
CombineTextInputFormat切片机制
输入格式
5个文件 0.5M + 1.12M + 0.4M + 0.9M + 0.04M
代码public class MapReduceWCDriver {
public static void main(String[] args) throws Exception {
//配置信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//job设置主类(本类)
job.setJarByClass(MapReduceWCDriver.class);
//Mapper reduce设置
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//最终输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置 4m
CombineTextInputFormat.setMaxInputSplitSize(job,1024*1024*4);
//输入输出路径设置
for (int i = 0; i < args.length-1; i++) {
FileInputFormat.addInputPath(job,new Path(args[i]));
}
FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
不做任何处理,运行 WordCount 案例程序,观察切片个数为5
即5个文件5个输出
MapReduceWCDriver增加上述代码后输出结果为1
调整切片大小为4M时只有1个输出
MapReduceWCDriver再次修改如下,结果为3
调整切片大小为1M时有3个输出
// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置 1m CombineTextInputFormat.setMaxInputSplitSize(job,1024*1024*1);CombineTextInputFormat 切片机制的原理
虚拟储存过程
通过设置setMaxInputSplitSize设置切片输入最大值,输入文件时,这个最大值与文件的大小比较,当小于这个最大值时,自动划分为一个块;当大于这个最大值但小于最大值的2倍,则划分为2个等大小的块;当大于2倍最大值时,分割为一个最大值块,剩余的文件分割为2个大小相得的块。
实际切片过程
从(1)中获取到的虚拟存储过程的文件,当虚拟存储文件大于或等于设置的最大值则会形成一个切片块,当小于则会和下一个虚拟存储文件合并形成一个新的切片
常见的FileInputFormat实现类
TextInputFormat 按行读取KeyValueTextInputFormatNLineInputFormatCombineTextInputFormat自定义InputFormat
FIleInputFormat
FileInputFormat is the base class for all file-based InputFormats. This provides a generic implementation of {@link #getSplits(JobConf, int)}. Subclasses of FileInputFormat can also override the {@link #isSplitable(FileSystem, Path)} method to ensure input-files are not split-up and are processed as a whole by {@link Mapper}s.
@InterfaceAudience.Public @InterfaceStability.Stable public abstract class FileInputFormatTextInputFormatimplements InputFormat {
TextInputFormat是==默认的FileInputFormat实现类==。
按行读取每条记录。键是存储该行在整个文件中的
起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),
@InterfaceAudience.Public @InterfaceStability.Stable public class TextInputFormat extends FileInputFormatKeyValueTextInputFormatimplements JobConfigurable {...}
@InterfaceAudience.Public @InterfaceStability.Stable public class KeyValueTextInputFormat extends FileInputFormatKeyValueTextInputFormat具体实现样例implements JobConfigurable {
需求:统计输入文件中每一行第一个单词相同的行数。
Mapper
通过设置分割符为空格,直接将每一行第一个单词作为key,value为1输出给reduce
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
//写出
context.write(key,new LongWritable(1));
}
Reduce
统计,相同key(即初始第一个单词相同的会进入同一个reduce),只需要用一个全局变量v统计次数即可。
LongWritable v = new LongWritable(); @Override protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException { //初始为0 long sum = 0L; for (LongWritable value : values) { sum += value.get(); //获取数目 } v.set(sum); //统计结果输出 context.write(key,v); }
Job
//配置文件设置切割符 为空格 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," "); // 使用 KeyValueTextInputFormat处理记录数 job.setInputFormatClass(KeyValueTextInputFormat.class);
public class MapperReduceDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//配置文件设置切割符 为空格
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," ");
Job job = Job.getInstance(conf);
//设置jar包位置,省略....
// 使用 KeyValueTextInputFormat处理记录数
job.setInputFormatClass(KeyValueTextInputFormat.class);
//输入路径
for (int i = 0; i < args.length-1 ; i++) {
FileInputFormat.setInputPaths(job,new Path(args[i])); }
FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));
//job提交
job.waitForCompletion(true);
}
}
输出
NLFileInputFormat按照NLFileInputFormat指定的行数划分切片,切片数目=总行数/N
NLFileInputFormat具体实现样例NLineInputFormat which splits N lines of input as one split.
In many “pleasantly” parallel applications, each process/mapper
processes the same input file (s), but with computations are
controlled by different parameters.(Referred to as “parameter sweeps”).
One way to achieve this, is to specify a set of parameters
(one set per line) as input in a control file
(which is the input path to the map-reduce application,
where as the input dataset is specified via a config variable in JobConf.).
The NLineInputFormat can be used in such applications, that splits
the input file such that by default, one line is fed as
a value to one map task, and key is the offset.
i.e. (k,v) is (LongWritable, Text).
The location hints will span the whole mapred cluster.
需求:对每个单词进行个数统计,要求根据每个输入文件的行数来规定输出多少个切片。要求每三行放入一个切片中。
Mapper
划分每个单词,并循环输出<[key],1>,相同key进入同一个Reduce
private Text k = new Text();
private LongWritable v = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取一行
String line = value.toString();
//2.切割
String[] splits = line.split(" ");
//3.循环写出
for (int i = 0; i < splits.length ; i++) {
k.set(splits[i]);
context.write(k,v);
}
}
Reduce
统计每个单词1的累加为value
@Override protected void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException { //初始为0 long sum = 0; for (LongWritable value : values) { sum += value.get(); } v.set(sum); //统计结果输出 context.write(key,v); }
Job 设置切片
//设置每个切片 InputSplit 中划分三条记录 NLineInputFormat.setNumLinesPerSplit(job, 3); //使用 NLineInputFormat 处理记录数 job.setInputFormatClass(NLineInputFormat.class);自定义InputFormat
在企业开发中, Hadoop 框架自带的 InputFormat 类型不能满足所有应用场景,需要自定义 InputFormat 来解决实际问题。
自定义 InputFormat 步骤如下:
自定义一个类继承FileInputFormat。改写RecordReader,实现一次读取一个完整文件封装为KV。在输出时使用SequenceFileOutPutFormat输出合并文件。 样例
需求: 将多个小文件合并成一个 SequenceFile 文件(SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对的文件格式),SequenceFile 里面存储着多个文件,存储的形式为文件路径+名称为 key,文件内容为 value。
实现过程
自定义一个类继承 FileInputFormat 。
重写==isSplitable()==方法,返回false不可切割重写createRecordReader(),创建自定义的RecordReader对象,并初始化
改写 RecordReader ,实现一次读取一个完整文件封装为 KV 。
采用IO流一次读取一个文件输出到value中,因为设置了不可切片,最终把所有文
件都封装到了value中
获取文件路径信息+名称,并设置key
在输出时使用 SequenceFileOutPutFormat 输出合并文件。
如下图:
自定义InputFormat// 定义类继承 FileInputFormat public class WholeFileInputformat extends FileInputFormat自定义RecordReader类{ @Override protected boolean isSplitable(JobContext context, Pathfilename) { return false;//返回false不可切割 } @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split, context); return recordReader; } }
一次读取一个文件输出到value中,由于如上isSplitable()已经返回false,故直接不分割文件。获取文件的路径信息与名称作为一个key
public class WholeRecordReader extends RecordReaderMapper{ private FileSplit split; private Configuration configuration; //返回结果值 private Text k = new Text(); private BytesWritable v = new BytesWritable(); private boolean isProgress = true; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit) split; configuration= context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { //isProgress if (isProgress) { //1.自定义缓存区 byte[] contexts = new byte[(int) split.getLength()]; FSDataInputStream fsDataInputStream = null; FileSystem fileSystem = null; try { //2.读取文件系统 //文件路径 Path path = split.getPath(); //文件系统获取 fileSystem= path.getFileSystem(configuration); //3.从文件系统读取数据 fsDataInputStream = fileSystem.open(path); //4.读取文件到缓存区 readFully全部读取 IOUtils.readFully(fsDataInputStream,contexts,0,contexts.length); //5.输出文件内容 values v.set(contexts,0,contexts.length); //6.文件路径+名称获取 String name = split.getPath().toString(); //输出k和v k.set(name); } catch (IOException e) { e.printStackTrace(); }finally { //流关闭 IOUtils.closeStream(fsDataInputStream); } isProgress=false; return true; } return false; } }
直接输出
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
Reduce
直接输出
public class SequenceFileReducer extends ReducerJob{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key,values.iterator().next()); } }
job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class);
总结1.InputFormat是用于处理各种数据源的。下面实现自定义的InputFormat,数据源是来自于内存。
1.1 在程序的job.setInputFormatClass(MySelfInputFormat.class);
1.2 实现InputFormat extends InputFormat,实现其中的2个方法,分别是getSplits(…)和createRecordReader(…)
1.3 getSplits(…)返回的是java.util.List,里面中的每个元素是InputSplit。每个InputSpilt对应一个mapper任务。
1.4 InputSplit是对原始海量数据源的划分。本例中是在内存中产生数据,封装到InputSplit中。
1.5 InputSplit封装的必须是hadoop数据类型,实现Writable接口。
1.6 RecordReader读取每个InputSplit中的数据,解析成一个个的,供map处理 。
1.7 RecordReader有4个核心方法,分别是initialize(…),nextKeyValue(),getCurrentKey()和getCurrentValue()。
1.8 initialize(…)的重要性在于拿到InputSplit和定义临时变量。
1.9 nextKeyValue(…)方法的每次调用可以获得key和value值
1.10 当nextKeyValue(…)调用后,紧接着调用getCurrentKey()和getCurrentValue()。
————————————————
版权声明:本文为CSDN博主「波哥的技术积累」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/woshisap/article/details/42275305
总结来说,InputFormat是将文件切片----->再转化为对转交给Mapper处理。在InputFormat类中只有两个方法,一个负责切片,一个返回能将切片信息转化为相应的键值对的对象
而当使用KeyValueInputFormat并设置分隔符后,Mapper以分隔符前的内容作为Key来接收,以分隔符后面的内容作为Value来接收。那么在数据提交到Mapper之前,数据就必须被格式化为满足Mapper接收的格式,这个工作就是由InputFormat来完成的,而InputFormat实际上并不能完成这项工作,而是创建一个RecordReader来完成这项转换工作
Partition分区默认Partition 默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
@InterfaceAudience.Public @InterfaceStability.Stable public class HashPartitioner自定义Partition流程implements Partitioner { public void configure(JobConf job) {} public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
- 继承Partition,重写getPartition()方法job设置Partitionjob设置ReduceTask数量
分区数量
如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;如果1 例如:假设自定义分区数为5,则 job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件 job.setNumReduceTasks(2); 会报错 job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件 需求:安照要求将文本内容输出到不同文件中。 输入数据 电话号码文本 13736230513 2481 24681 13956435636 132 1512 13846544121 264 0 13630577991 6960 690 13560439638 918 4938 期望输出 5个输出文件 Partition分区 Mapper Reduce Job 输出结果 Combiner是MR程序中Mapper和Reducer之外的一种组件。 Combiner组件的父类就是Reducer。 Combiner和Reducer的区别在于运行的位置 Combiner是在每一个MapTask所在的节点运行; Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。 Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。 自定义一个 Combiner 继承 Reducer,重写 Reduce 方法 job设置job.setCombinerClass(....Combiner.class) 需求: 对每一个MapTask的输出局部汇总Combiner 方案1 Combiner继承Reducer 方案2 将Reducer直接作为Combiner
1、Hadoop MR 框架的 Combiner 机制有什么作用? 对Reduce前的MapTask的输出进行局部汇总,以减少网络传输量,“reduce的输入每个key所对应的value将是一大串1,但处理的文本很多时,这一串1已将占用很大的带宽,如果我们在map的输出给于reduce之前做一下合并或计算,那么传给reduce的数据就会少很多,减轻了网络压力。” 2 .自定义Combiner步骤 说白了,就是在Mapper后增加一个Combiner过程,构成整个MapTask,这个Combiner需要继承自Reduce 参考job内如下方法: 需求:输出如下的每个订单中最贵的商品 订单id 商品id 金额 0000001 Pdt_01 222.8 利用“订单 id 和成交金额”作为 key,可以将 Map 阶段读取到的所有订单数据按照 id 升序排序,如果 id 相同再按照金额降序排序,发送到 Reduce。在 Reduce 端利用 groupingComparator 将订单 id 相同的 kv 聚合成组,然后取第一个即是该订单中最贵商品, bean 注意compareTo,按照id升序排序,相同id,比较价格 Mapper GroupingComparator 我们需要欺骗Reducer 只靠order_id进行分组,不需要price 普通二本数据科学与大数据技术专业菜鸟一个,望各位大神多多指导!互相学习进步! whai的个人博客 whaifree.top 欢迎留言!
public class ProvincePartitioner extends Partitioner
public class FlowCountMapper extends Mapper
public static void main(String[] args) throws Exception {
//配置信息 job对象实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//入口jar包本地路径
job.setJarByClass(FlowSumDriver.class);
//MapperReduce配置
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
//输出格式指定
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);
//同时指定相应数量的 reduce task
job.setNumReduceTasks(5);
//输入路径
for (int i = 0; i < args.length-1 ; i++) {
FileInputFormat.setInputPaths(job,new Path(args[i]));
}
FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));
//将job和相关jar和类交给yarn运行
System.exit(job.waitForCompletion(true)?0:1);
}
}
案例
public class WordCountCombiner extends Reducer
// 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑
job.setCombinerClass(WordcountReducer.class);
总结
Combiner继承Reduce,重写reduce()job设置Combiner
public void setCombinerClass(Class extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
GroupingComparator 分组(辅助排序)
案例
0000002 Pdt_05 722.4
0000001 Pdt_02 33.8
0000003 Pdt_06 232.8
0000003 Pdt_02 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4public class OrderBean implements
WritableComparable
public class OrderMapper extends Mapper
public class OrderSortGroupingComparator extends WritableComparator {
@Override
public int compare(WritableComparable a,
WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = 1;
} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = -1;
} else {
result = 0;
}
return result;
}
protected OrderSortGroupingComparator() {
super(OrderBean.class,true);
}
}
自己瞎写的公众号与博客



