栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【HADOOP】浅谈Hadoop中MapReduce工作机制

【HADOOP】浅谈Hadoop中MapReduce工作机制

这可能是我最不想写的一章。


首先,如Hadoop 官网 Shuffle 图所示,从map的输出到reduce输入的中间这个数据处理过程叫做Shuffle

那么我们先大体的介绍一下:

map task:

  1. input split :输入切片信息。我们知道,MapTask的数量是根据文件的切片,而这个输入的切片信息从哪里来呢?这是根据客户端在submit() 前,获取待处理数据的信息,然后根据参数配置,对文件形成一个任务分配的划分,它会将一个文件以默认的切片方式进行切片,生成一个job.split文件给map方法。

  2. map:map方法。在Mapper中,调用context.write(key,value)实际是调用代理NewOutPutCollector的wirte方法,然后调用了MapOutPutBuffer的collect(),且在进行收集前,会调用partitioner来计算每个key-value的分区号。

  3. buffer in memory: 内存中的缓冲区。也就是俗称的环形缓冲区,默认大小是100M(io.sort.mb)的字节数组(kvbuffer = new byte[100 << 20]),数据的默认开始位置是0(bufstart = bufend = bufindex = equator = 0),当kvindex == kvend == kvstart时,缓冲区为空。

    数据和元数据以相反的方向进行输入,当缓冲区内空间不足20%(io.sort.spill.percent 默认为0.8,80%)时,他就开始准备往外溢写数据

  4. partition sort and spill to disk:分区排序并溢出到磁盘。而在写文件之前缓冲区内会进行一个分区和快速排序(这个快排只是对元数据进行了排序,真实数据未变)。排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
    然后按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。每次溢写都会产生一个临时文件。
    如果定义了Combiner,使用CombinerRunner.conbine()对key-value处理(combiner)后再次溢写!
    最后将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

  5. merge on disk:在磁盘上合并。当 Shuffle 机制结束后,会在磁盘上看到很多溢写的临时文件。而当所有数据处理(这里所有数据指当前一个MapTask需要处理的数据)结束后,MapTask会以分区为单位进行合并,对于某个分区,它将采用多轮递归合并的方式。每轮合并10(io.sort.factor默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。这之后,如果用户设置了Combiner(前提是溢写的文件个数大于3 min.num.splits.for.combine默认为3),则写入文件之前,对每个分区中的数据再进行一次区内聚集操作。并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

    最终生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待ReduceTask来拷贝相应分区的数据
    若定义了压缩,则在生成最终文件之前数据将会被压缩。

reduce task:

  1. copy phase:复制阶段。map任务可以在不同时间完成,因此只要由一个任务完成,reduce任务就会复制其输出到磁盘的文件,reduce任务有少量复制线程,因此能够并行地取得map的输出。默认是5个线程(mapred.reduce.parallel.copies默认是5个)。
    如果Copy的Map输出磁盘数据相当小,则会直接复制到Reduce Task的内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent默认是堆空间的百分比),否则,被复制到磁盘中。若复制的数据让内存缓冲区达到阈值大小(由mapred.job.shuffle.merge.percent决定阈值大小)或达到map输出阈值(由mapred.inmem.merge.threshold控制)时,会被合并,进而写到磁盘中。 这个合并的过程是在复制Map数据同时做的。
    如果Map端数据是压缩格式的,都会在内存中被解压缩,以便于合并。
  2. sort phase:排序阶段。ReduceTask对所有数据进行一次归并排序(合并Map输出,维持其按顺序排序)。具体点是:比如,如果有50个map输出,而合并系数是10(10为默认设置,由io.sort.facotr属性设置,与map的合并类似),合并进行5轮。每轮将10个文件合并成一个文件,因此最后有5个中间文件。 这个过程是循环进行的。
  3. reduce phase:Reduce阶段。首先,排序阶段会最后有5个中间文件,最后一轮会直接将5个文件进行合并将数据并输送入Reduce() 中,而不会先合并成一个已排序的文件,然后再让这个排序文件作为Reduce() 的输入。
    这个地方省略了一次磁盘的往返。

    参考资料:《Hadoop权威指南》、《尚硅谷大数据技术之Hadoop》等各大网站程序员大佬。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671971.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号