HDFS、MapReduce、Yarn相关高频面试知识
HDFS读写流程
读流程写流程 HDFS小文件处理MapRuduce工作流程
Map阶段Reduce阶段Map后shuffle机制Shuffle 优化 Yarn资源调度工作机制Yarn调度器Hadoop解决数据倾斜的方法
HDFS读写流程 读流程- 客户端(HDFS Client)通过Distributed FileSystem分布式文件系统向NameNode请求下载文件,由NameNode返回目标文件的元数据;由FSDataInputStream请求读数据DataNode1上的block1,由DataNode1传输回数据;再向block2发送读数据请求…再想block3发送请求…
- 客户端(HDFS Client)通过Distributed FileSystem分布式文件系统向NameNode请求上传文件,由NameNode响应上传文件;请求上传第一个Block,请返回DataNode;返回data1, data2 ,data3表示用这三个DataNode存储数据;FSDataOutputStream请求建立Block传输通道,将三个DataNode进行串联;由末端DataNode向回传输应答成功信息;FsDataOutputStream向前传输数据Packet。
问题:一个Block占用NameNode 150字节,那么当小文件过多的时候则会占用大量内存空间
解决方法 1. 采用har归档方式,将小文件归类; 2. 采用CombineTextInputFormat,将小文件逻辑划分到一个切片中,使多个小文件被一个MapTask处理; 3. 在有小文件的情况下开启JVM重用,使JVM实例在同一个job中被重新使用N次,N的值可以在Hadoop的mapred-site.xml中进行配置。
*CombineTextInputFormat切片机制分为虚拟存储过程和切片过程(注意区分切片和block划分的概念):
- 虚拟存储过程:将输入目录下所有文件跟已设置的setMaxInputSplitSize值进行比较,如果不大于该值则逻辑上划分一个块,如果大于该值且大于两倍,则以最大值切割一块,当剩余数据大于该值且小于该值2倍,则将剩余数据平均划分;切片过程:判断虚拟存储的文件大小是否大于setMaxInputSplitSize,若大于等于则单独形成切片;如果不大于则跟下一个虚拟存储文件进行合并形成切片。
- 客户端提交前对文件进行分析,配置任务分配;提交信息后由Yarn的Resource Manager对文件进行MapTask数量计算;MapTask使用inputFormat(默认为TextinputFormat)的RecordReader进行数据读取;MapTask进行Mapper逻辑运算后写入outputColloector;ouputColloector写入环形缓冲区,缓冲区默认100M,其中一部分存储元数据(metadata),一部分存储数据;根据元数据中对数据标注的分区,在分区內部根据索引index进行快排;环形缓冲区写到80%时反向向磁盘进行溢写,如果溢写时新数据到达后超过环形缓冲区大小,则等待溢写完毕再写入;将产生的多个溢写文件,按照分区1,2进行归并排序(因为数据已经有顺序),保证每个分区内部数据有序;Combiner会进行一些预聚合,将分区内数据进行合并(, -> );启动相应数量的ReduceTask,并告知数据处理范围(数据分区)。 (可提前启动)
- ReduceTask主动pull MapTask中数据,且拉取的是指定分区(ReduceTask1拉取不同MapTask中的partition0)中的数据;合并文件,归并排序;ReduceTask使用outputFormat(默认为TextOutputFormat)向最终文件Part-r-000000进行write。(一个ReduceTask对应一个part-r-0000x)
总结:一次快排,两次归并
Map后shuffle机制
Shuffle是map之后、reduce之前的一个混洗流程(细节与上相同)
- 将数据存入环形缓冲区;对缓冲区内部数据根据分区进行快速排序;存储到80%进行反向溢写;对溢写结果进行Combiner(可选),根据Combiner进行分区合并、归并排序操作;对分区内数据进行压缩,写入磁盘等待ReduceTask进行pull;ReduceTask pull数据后先尝试放入内存,如果内存不够则放入磁盘;对内存和磁盘中的所有数据进行归并排序、分组,进入Reduce方法(每次都处理key相同的)。
1) Map阶段
- 增大环形缓冲区大小;增大环形缓冲区溢写比例,如80%->90%;减少对溢写文件的merge次数;采用Combiner提前进行合并。
2) Reduce阶段
- 合理设置Map和Reduce数量:太少会导致Task等待时间过长,太多会导致Map、Reduce形成资源竞争;设置Map和Reduce共存:调整slowstart.completedmaps参数,使map运行一段时间后,reduce也开始运行;集群性能允许前提下,增大reduce端存储数据内存大小;
3) IO传输阶段
主要采用数据压缩方式:
- map输入端考虑数据量大小和切片,使用如Bzip2、LZO等支持切片的压缩方式;map输出端考虑速度快的snappy、LZO;reduce输出端主要考虑整体需求,例如要作为下一个mr输入则需要考虑切片,如果要永久保存则考虑压缩率较大的gzip。
4) 整体
- 增加NodeManager内存,根据服务器实际配置灵活调整;增加单任务内存,根据服务器实际配置灵活调整;控制MapTask内存上限,调整mapreduce.map.memory.mb参数:如果数据量过大(>128M),则考虑增加内存上限;控制ReduceTask内存上限,调整mapreduce.reduce.memory.mb参数:如果数据量过大(>128M) ,则考虑增加内存上限;控制MapTask和ReduceTask的堆内存大小。mapreduce.map.java.opts、mapreduce.reduce.java.opts;增加MapTask和ReduceTask的CPU核数;增加Container的CPU核数和内存大小;在hdfs-site.xml文件中配置多目录;
- Mr程序提交到客户端所在节点后会创建一个YarnRunner(本地模式时是LocalRunner);由Yarn向ResourceManager申请一个Application ;RM返回Application资源提交路径和application_id;Yarn提交job运行所需资源:切片文件job.split,参数文件job.xml,代码xx.jar;申请mapreduceAppMaster,RM将请求初始化成一个Task,并放入任务队列中;NodeManager从RM中领取任务;NM创建一个Container,创建一个MRAppmaster,并从application集群路径中读取切片信息job.split;NM根据切片信息向RM申请相应数量的MapTask;其他NM也领取相应任务,但不建立mrAppMaster,由第一个NM中的mrAppMaster发起启动其他NM中MapTask和Yarn Child的指令;不同的NM的Container中执行各自的MapTask,将各自的数据按照分区拉到磁盘;(此时Map阶段已经结束)NM向RM申请Container运行ReduceTask,对应进程也是Yarn Child;mpAppMaster向RM申请注销。
调度器是在Yarn工作流程中管理调度任务队列的工具
1)Hadoop调度器主要分为三类:
FIFO、Capacity Scheduler(容量调度器)和Fair Sceduler(公平调度器);
Apache默认的资源调度器是容量调度器;
CDH默认的资源调度器是公平调度器。
2) 调度器区别:
FIFO调度器:支持单队列、先进先出,但生产环境不会使用。
容量调度器:支持多队列,保证先进入的任务优先执行。
公平调度器:支持多队列,保证每个任务公平享有队列资源。
3) 调度器的选择:
大厂:对并发度要求较高,选择公平调度器,因为其对服务器性能有一定要求。
中小厂:选择容量调度器,因为集群服务器资源不太充裕。
4) 在生产环境怎么创建队列?
调度器默认时只有一个default队列,不能满足生产需求。
1. 按照框架:hive/spark/flink每个框架的任务放入指定的队列。(企业用的不多)
2. 按照业务模块:如登录注册、购物车、下单等业务单独创建队列。
5) 多队列的好处?
1. 避免员工不小心写出死循环代码,将资源全部占用;
2. 实现任务的降级使用,特殊时期保证重要的任务队列资源充足。
数据倾斜:Hadoop中为了方便分布式计算对数据进行切分,但出现因数据本身而导致切分不均匀的情况,有些数据多,有些数据少,从而导致后续计算中不同reducer的负载不同,影响性能
- 提前在map进行combine,减少数据的传输量
在Mapper端加上combiner相当于提前进行了reduce,即把一个Mapper中相同的key进行聚合,减少了shuffle过程中传输的数据量以及reducer端的计算量。但如果数据倾斜的key大量分布在不同的mapper时效果不好。
- 导致数据倾斜的key大量分布在不同的mapper
①局部聚合加全局聚合
第一次在map阶段对那些导致了数据倾斜的key加上1到n的随机前缀,这样本来相同的key也会分到多个reducer中进行局部聚合,降低了数据倾斜数量;
第二次mapreduce去掉key的随机前缀,进行全局聚合。
该方法进行了两次mapreduce,性能稍差
- 增加reducer,提升并行度
JobConf.setNumReduceTasks(int)
- 实现自定义分区
根据数据分布情况,自定义散列函数,将key均匀分配到不同的Reducer。
- 自定义Shuffle分区算法
根据数据应用属性将数据重新进行分区划分。
参考:尚硅谷Hadoop教程



