SortShuffleManager 运行原理
如何确定分区规则?
map():输出record,并计算其partitionId
partitionId = hash(key)%partitionNum,一个partitionId 就是一个分区
SortShuffleManager 运行机制有两种,一种是普通运行机制,另一种是 bypass 运行机制。当 shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数值时 (默认是 200 ) ,就会启用 bypass 机制。
1、普通机制
在该模式下,Shuffle Write 阶段会将数据写入一个内存的数据结构中,此时根据不同的算子会有不同的数据结构。比如是 reduceByKey 这种聚合类的 shuffle 算子,会选用 Map 数据结构,一遍用 Map 进行聚合(HashShuffleManager 聚合操作是放在 Shuffle Read 阶段),一遍写入内存;如果是 join 相关的普通 shuffle 算子的话,会用 Array 数据结构,直接写入内存。当内存达到临界阈值之后,会将内存中的数据进行排序,然后分批次写入磁盘 (默认每批次有 1W 条数据),在写入磁盘的时候不会像 HashShuffleManager 那样直接写入磁盘,这里会先写入内存缓冲流,当缓冲流满溢之后一次性写入磁盘。
此时也会生成大批量的文件,最后会将之前所有的临时磁盘文件进行合并,这就是 merge 过程 (就是将所有的临时磁盘文件中的数据读取出来,然后依次写入最终的文件中)。每个 task 最终会生成一份磁盘文件和一份索引文件,索引文件中标示了下游每个 task 的数据在文件中的 start offset 和 end offset。
比如,当前 stage 有 5 个 Executor,每个 Executor 分配 1 个 cpu core,共有 50 个 task,每个 Executor 执行 10 个 task;下一个 stage 有 100 个 task。那么每个 Executor 创建 10 个磁盘文件,一共有 50 个磁盘文件。
具体如下图所示:
2、bypass 机制
触发该机制的条件:
· shuffle reduce 端的 task 数量小于 spark.shuffle.sort.bypassMergeThreshold 参数值的时候;
· 不是聚合类的shuffle算子(比如reduceByKey);
该机制下,当前 stage 的每个 task 会将数据的 key 进行 hash,然后将相同 hash 的 key 锁对应的数据写入到同一个内存缓冲区,缓冲写满后会溢写到磁盘文件,这里和 HashShuffleManager一致。然后会进入 merge 阶段,将所有的磁盘文件合并成一个磁盘文件,并创建一个索引文件。
相比较于普通机制,这里有两个地方不同:
· 将数据写入内存时候,普通模式是将数据写入 Map 或者 Array 这样的内存数据结构中,这里是根据 key 的 Hash 值直接写入内存;
· 该模式下在写入磁盘之前不会排序;
· 磁盘写机制不同。
具体如图示:
map端的combine问题
combine的目的是减少shuffle read的数据量。
解决方案跟shuffle read阶段的聚合有细微区别区别。
区别一:不同点在于处理的数据量。
区别二:对于无法进行预聚合的算子(avg)不会进行预聚合。
shuffle read端的聚合处理的是所有map task输出的数据,combine聚合的是单个task输出的数据。
在哪里进行排序?
shuffle read阶段必须进行排序,因为从map task获取的数据不是全局有序的;shuffle write阶段可以选择性排序,如果shuffle read获取的数据已经是部分有序的,可以加速全局排序。
作用是方便shuffle read阶段直接从硬盘上读取数据,减少磁盘IO,减少磁盘巡道时间。且输出的数据是有序的,如果不需要排序也可以通过其他的方式(bypass机制)取消排序操作。
解决内存不足问题
shuffle数据量过大导致内存不足怎么办?
采用内存+磁盘混合存储的方案,现在用内存聚合(combine),如果内存空间不足,则将内存中的数据spill到磁盘上,空闲出来的内存可以继续处理新的数据。
这样的问题是spill到磁盘上的数据是部分聚合的结果,为了得到完整的聚合结果,我们需要对磁盘和内存上的数据进行全局聚合。为了加速全局聚合,在spill到磁盘的时候先排序,这样全局聚合的时候就能顺序读取磁盘上的数据,减少磁盘IO。
一个Executor包含多个task,每一个reducetask对应一个分区
HashShuffleManager:每一个 maptask生成的文件数都与后面的reducetask数相同
优化HashShuffleManager:每一个 Executor生成的文件数都与后面的reducetask数相同
SortShuffleManager:每个maptask生成一个带索引的文件,文件的个数与后面的reducetask的个数无关



