一.成员变量 1.taskIdMapsForShuffle@Author Jeffrey.miao
转载请标明出处: Spark-shuffle源码细读一:ShuffleManager_The_Inertia的博客-CSDN博客欢迎关注个人知乎专栏:Spark源码学习成长 - 知乎
version:spark 3.0.1
ShuffleManager目前只有一个实现SortShuffleManager。分析其源码:
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]()
taskIdMapsForShuffle,记录了ShuffleId和其对应的taskIds的映射关系。
一个ShuffleDependency对应一个shuffleId(从0开始累加)
SparkContext#_cleaner在清理shuffle数据(ContextCleaner#doCleanupShuffle)时,会发送RemoveShuffle(shuffleId)请求,BlockManagerSlaveEndpoint接收到request后,最终会由ShuffleManager调用unregisterShuffle方法,根据以上的taskIdMapsForShuffle清理shuffleId对应的task的shuffle数据文件和索引文件。
2.shuffleBlockResolverprivate lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf) override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
这边会分别在ShuffleManager和LocalDiskShuffleExecutorComponents中实例化两个ShuffleBlockResolver,ShuffleBlockResolver不保存状态,只需要指定blockId来寻找数据文件和索引文件,即可保证写出文件和清理文件的一致性。如,以LocalDiskShuffleExecutorComponents的createSingleFileMapOutputWriter为例,该writer用于在shuffle write spill files只有一个的场景下,生成索引文件,通过其成员变量blockResolver获取数据文件和索引文件,而blockResolver又通过blockManager获取特定blockId的数据文件和索引文件,进行写入。同时,ShuffleManager在清理shuffle数据时,另一个shuffleBlockResolver实例也是通过blockId作为寻址方式寻找特定的数据文件和索引文件进行清理。
二.函数 2.1 registerShuffle根据不同的条件判断shuffle writer的选型。
另外,ShuffleDependency会调用shuffleManager的registerShuffle生成ShuffleHandle,作为血缘元数据传递给shuffle reader,后续,shuffle reader可以根据ShuffleHandle中的ShuffleId,计算出blockId,然后去拉取相应的数据。同时ShuffleHandle也给shuffle reader传递了具体的计算函数,如aggregator等。
2.2 getReader 和 getReaderForRangegetReader根据startPartition和endPartition找到blockId,其中,startPartition是每个reducer所对应的Partition的indexId,endPartition一般是startPartition+1。
ShuffleBlockId(shuffleId, status.mapId, part),其中的part就是startPartition到endPartition,前闭后开。
而getReaderForRange根据startMapIndex,endMapIndex,startPartition和endPartition找到blockId。
case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex) =>
SparkEnv.get.shuffleManager.getReaderForRange(
dependency.shuffleHandle,
startMapIndex,
endMapIndex,
reducerIndex,
reducerIndex + 1,
context,
sqlMetricsReporter)
如上,在org.apache.spark.sql.execution.ShuffledRowRDD#compute中,就调用了getReaderForRange。场景是,AQE(下一章会进一步介绍),在join遇到倾斜的partition时,先根据mapstatues计算出reducerId需要fetch的block size,如果发现有一些reducer可能产生倾斜,则会计算出targetSize,然后根据targetSize对该reducer需要fetch数据的mapper进行再次的split,split成多个range partition,由多个reducer去读取自己的range(startMapIndex~endMapIndex)对应的这些mapper中的数据,最后,与getReader相同,再加上自己的reducerId,算出blockId。
2.3 getWriter主要是根据shuffleHandle去匹配相应的shuffle writer。需要注意,shuffle writer在写数据时,也会根据自己的mapId来拼接文件名。便于shuffle reader根据相同的shuffleId和mapId去读取block。
private def getIndexFile(
shuffleId: Int,
mapId: Long,
dirs: Option[Array[String]] = None): File = {
val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
dirs
.map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, blockId.name))
.getOrElse(blockManager.diskBlockManager.getFile(blockId))
}
@DeveloperApi
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
}



