MapReduce框架原理一、InputFormat数据输入
切片与MapTask并行度决定机制FileInputFormat切片机制TextInputFormatCombineTextInputFormat切片机制 二、MapReduce工作流程三、Shuffle机制
Partition分区MapReduce中的排序Combiner合并 四、总结
MapReduce框架原理
一、InputFormat数据输入 切片与MapTask并行度决定机制
MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
1.Job提交流程源码详解
waitForCompletion()
submit();
- 建立连接connect();
创建提交Job的代理
new Cluster(getConfiguration());然后判断该任务是本地运行还是yarn集群环境运行
initialize(jobTrackAddr, conf); 提交jobsubmitter.submitJobInternal(Job.this, cluster)
创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);获取jobid ,并创建Job路径
JobID jobId = submitClient.getNewJobID();拷贝jar包到集群(本地运行不需要)
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
2. FileInputFormat切片源码解析(input.getSplits(job))
- 源码中计算切片大小的公式
在FileInputFormat.java中computeSplitSize方法
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
这里的blockSize、minSize和maxSize又在getSplits方法中获取
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); long blockSize = file.getBlockSize();
getFormatMinSplitSize()默认为1,getMinSplitSize()中SPLIT_MINSIZE="mapreduce.input.fileinputformat.split.minsize"默认为0
可在官网中查看,所以minSize 为1maxSize从getMaxSplitSize(job)获取,而SPLIT_MAXSIZE="mapreduce.input.fileinputformat.split.maxsize"在官网中没有默认值,所以取long的最大值blockSize 大小默认128M(本地默认为32M)根据Math.max(minSize, Math.min(maxSize, blockSize));可得切片大小=block块大小
protected long getFormatMinSplitSize() {
return 1;
}
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
TextInputFormat
- FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
1)应用场景:
由于InputFormat针对每一个文件都会生成一个切片的机制,无法处理许多小文件CombineTextInputFormat则用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个MapTask处理。
2)虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
3)切片机制
生成切片过程包括:虚拟存储过程和切片过程二部分。
工作流程(一)
工作流程(二)
说明:
1.客户端向MR上传一个200M的待处理文件,在submit()前会获取待处理的数据信息,然后根据参数配置对数据文件进行切片规划
2.job提交之后会提供job.split、jar包、job.xml信息,yarn此时调用Mrappmaster,Mrappmaster会查看job.split的切片信息,然后计算出需要的Map任务数(2个)并开启对应的MapTask
3.Map开启之后默认通过TextInputFormat中RecordReader方法按行读取kv数据,key为数据偏移量,value为一行内容,读完之后返回给mapper进行业务逻辑处理
4.mapper数据处理完之后会进入到环形缓冲区中,环形缓冲区就属于内存,默认100M,但是它会将一般内存用来存放数据,另一半用来存储该数据的元数据信息,包括索引、分区、kv索引开始位置信息;
5.环形缓冲区在内存达到80%后便会反向溢写到磁盘,而剩下的20%会继续接收map端传过来的数据,不至于等待;数据进入到环形缓冲区后,会将数据进行分区、排序、归并、合并等操作。
6.等到所有的MapTask都完成后(并不绝对),Reduce便会启动相应Reduce task按照分区拉取数据并还会合并文件、归并排序等操作,操作完之后会处理reduce业务逻辑
7.Reduce Task 完成之后进行写出操作
默认partition分区是通过key的hashcode对ReduceTask的任务数取模得到的,无法控制key值存储到哪个分区
public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
}
MapReduce中的排序
- 输入数据接口:InputFormat
(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
(3)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。逻辑处理接口:Mapper
用户根据业务需求实现其中三个方法:map() setup() cleanup ()Partitioner分区
(1)有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。Comparable排序
(1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
(2)部分排序:对最终输出的每一个文件进行内部排序。
(3)全排序:对所有数据进行排序,通常只有一个Reduce。
(4)二次排序:排序的条件有两个。Combiner合并
Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。逻辑处理接口:Reducer
用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()输出数据接口:OutputFormat
(1)默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。
(2)用户还可以自定义OutputFormat。MapReduce工作流程



