栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark - shuffle运行机制

Spark - shuffle运行机制

SortShuffleManager 运行原理
如何确定分区规则?
map():输出record,并计算其partitionId

partitionId = hash(key)%partitionNum,一个partitionId 就是一个分区

SortShuffleManager 运行机制有两种,一种是普通运行机制,另一种是 bypass 运行机制。当 shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数值时 (默认是 200 ) ,就会启用 bypass 机制。

1、普通机制

在该模式下,Shuffle Write 阶段会将数据写入一个内存的数据结构中,此时根据不同的算子会有不同的数据结构。比如是 reduceByKey 这种聚合类的 shuffle 算子,会选用 Map 数据结构,一遍用 Map 进行聚合(HashShuffleManager 聚合操作是放在 Shuffle Read 阶段),一遍写入内存;如果是 join 相关的普通 shuffle 算子的话,会用 Array 数据结构,直接写入内存。当内存达到临界阈值之后,会将内存中的数据进行排序,然后分批次写入磁盘 (默认每批次有 1W 条数据),在写入磁盘的时候不会像 HashShuffleManager 那样直接写入磁盘,这里会先写入内存缓冲流,当缓冲流满溢之后一次性写入磁盘。

此时也会生成大批量的文件,最后会将之前所有的临时磁盘文件进行合并,这就是 merge 过程 (就是将所有的临时磁盘文件中的数据读取出来,然后依次写入最终的文件中)。每个 task 最终会生成一份磁盘文件和一份索引文件,索引文件中标示了下游每个 task 的数据在文件中的 start offset 和 end offset。

比如,当前 stage 有 5 个 Executor,每个 Executor 分配 1 个 cpu core,共有 50 个 task,每个 Executor 执行 10 个 task;下一个 stage 有 100 个 task。那么每个 Executor 创建 10 个磁盘文件,一共有 50 个磁盘文件。

具体如下图所示:

2、bypass 机制

触发该机制的条件:

· shuffle reduce 端的 task 数量小于 spark.shuffle.sort.bypassMergeThreshold 参数值的时候;

· 不是聚合类的shuffle算子(比如reduceByKey);

该机制下,当前 stage 的每个 task 会将数据的 key 进行 hash,然后将相同 hash 的 key 锁对应的数据写入到同一个内存缓冲区,缓冲写满后会溢写到磁盘文件,这里和 HashShuffleManager一致。然后会进入 merge 阶段,将所有的磁盘文件合并成一个磁盘文件,并创建一个索引文件。

相比较于普通机制,这里有两个地方不同:

· 将数据写入内存时候,普通模式是将数据写入 Map 或者 Array 这样的内存数据结构中,这里是根据 key 的 Hash 值直接写入内存;

· 该模式下在写入磁盘之前不会排序;

· 磁盘写机制不同。

具体如图示:

map端的combine问题
combine的目的是减少shuffle read的数据量。

解决方案跟shuffle read阶段的聚合有细微区别区别。
区别一:不同点在于处理的数据量。
区别二:对于无法进行预聚合的算子(avg)不会进行预聚合。

shuffle read端的聚合处理的是所有map task输出的数据,combine聚合的是单个task输出的数据。

在哪里进行排序?
shuffle read阶段必须进行排序,因为从map task获取的数据不是全局有序的;shuffle write阶段可以选择性排序,如果shuffle read获取的数据已经是部分有序的,可以加速全局排序。

作用是方便shuffle read阶段直接从硬盘上读取数据,减少磁盘IO,减少磁盘巡道时间。且输出的数据是有序的,如果不需要排序也可以通过其他的方式(bypass机制)取消排序操作。

解决内存不足问题
shuffle数据量过大导致内存不足怎么办?

采用内存+磁盘混合存储的方案,现在用内存聚合(combine),如果内存空间不足,则将内存中的数据spill到磁盘上,空闲出来的内存可以继续处理新的数据。

这样的问题是spill到磁盘上的数据是部分聚合的结果,为了得到完整的聚合结果,我们需要对磁盘和内存上的数据进行全局聚合。为了加速全局聚合,在spill到磁盘的时候先排序,这样全局聚合的时候就能顺序读取磁盘上的数据,减少磁盘IO。

一个Executor包含多个task,每一个reducetask对应一个分区
HashShuffleManager:每一个 maptask生成的文件数都与后面的reducetask数相同
优化HashShuffleManager:每一个 Executor生成的文件数都与后面的reducetask数相同
SortShuffleManager:每个maptask生成一个带索引的文件,文件的个数与后面的reducetask的个数无关

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671486.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号