@[toc] 一、数据切片及Maptask并行度决定机制
1.1、问题引入1.2、Maptask并行度决定机制 二、Job提交流程简述三、切片机制概述
3.1、TextInputFormant切片机制3.2、CombineTextInputFormat切片机制
首先,我们需要了解MapRedurce的基本框架
我们都知道MapReduce主要分为Map阶段和Reduce阶段,我们将Map阶段处理的任务称之为MapTask,Reduce阶段处理的任务称之为ReduceTask
而在处理Mapper之前,需要经历一个数据如何输入的问题,即InputFormat阶段。
一、数据切片及Maptask并行度决定机制1.1、问题引入InputFormat:(中文翻译:输入格式):是org.apache.hadoop.mapreduce包下的一个抽象类。描述了 Map-Reduce 作业的输入规范。Map-Reduce 框架依赖作业的InputFormat来:验证作业的输入规范
Map-Reduce框架通过FileInputFormat的子类 将输入文件拆分为逻辑InputSplit(数据切片),后将每个切片分配给单独的Mapper,进而形成一个个的MapTask。
默认使用的输入方式为:TextInputFormat
注意:MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度,
如何理解:例:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?
附:InputFormat层次图
首先,我们要区分数据切片和数据块的概念
数据块:Block,是HDFS物理上将文件分成一块一块进行存储,每一个Block都能在磁盘上找到,是真实存在的。数据块是HDFS存储数据的单位
数据切片:InputSplit,逻辑概念,只是在数据输入的时候逻辑上对数据进行切片处理,并不会在磁盘上将其进行分配分片存储。数据切片是MapReduce程序计算输入数据的单位
一个数据切片会对应启动一个MapTask,默认情况下,切片大小等于块大小(BlockSize)
由此可见:一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
此外:当输入数据为多个文件时,此时的切片操作是不考虑数据集的整体的,是逐个针对每个文件单独切片。
二、Job提交流程简述首先我们需要了解,用户的一个计算请求就是一个Job(例WordCount),而每一个Job都会被拆分为多个Task(MapTask、ReduceTask)进行处理。
注:在分布式环境下,多Task在多台主机下处理
每一个MapReduce程序中 都需要一个Driver类来执行提交,提交的是一个描述了各种必要信息的Job对象。
//1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3.关联mapper、reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.设置map输出的KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入路径 输出路径
FileInputFormat.setInputPaths(job, new Path("E:\hadooptest\input"));
FileOutputFormat.setOutputPath(job, new Path("E:\hadooptest\output\outputword"));
//7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
其中,boolean result = job.waitForCompletion(true);代码的具体底层实现功能大致为:
- 建立连接:其中需要判断 是本地运行环境还是yarn集群运行环境提交Job:
- 获取jobid,并创建job路径,该路径下要存放的是该job相关的一些文件,包括切片文件、job.xml,以及相关jar包等。计算切片,生成切片规划文件
参考:Job提交流程源码和切片源码详解 - 有心有梦 - 博客园 (cnblogs.com)
三、切片机制概述3.1、TextInputFormant切片机制通用的是:FileInputFormat
按照文件的内容长度进行切片,且默认切片大小 = blocksize每次切片时,都要判断剩下的部分是否大于块的1.1倍,不大于就将剩下的内容划分为一个切片切片信息会被写到一个切片规划文件中,提交切片规划信息到Yarn后,Yarn上的MrAppMaster会根据切片规划文件计算开启MapTask的个数
TextInputFormant为FileInputFormat的子类,是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
3.2、CombineTextInputFormat切片机制框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
CombineTextInputFormat切片过程分为两部分:虚拟存储过程和切片过程。具体切片机制参考:MapReduce-CombineTextInputFormat 切片机制 - 江湖小小白 - 博客园 (cnblogs.com)



