栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark-shuffle源码细读一:ShuffleManager

Spark-shuffle源码细读一:ShuffleManager

@Author Jeffrey.miao
转载请标明出处: Spark-shuffle源码细读一:ShuffleManager_The_Inertia的博客-CSDN博客

欢迎关注个人知乎专栏:Spark源码学习成长 - 知乎

version:spark 3.0.1

ShuffleManager目前只有一个实现SortShuffleManager。分析其源码:

一.成员变量 1.taskIdMapsForShuffle
  
  private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]()

taskIdMapsForShuffle,记录了ShuffleId和其对应的taskIds的映射关系。

一个ShuffleDependency对应一个shuffleId(从0开始累加)

SparkContext#_cleaner在清理shuffle数据(ContextCleaner#doCleanupShuffle)时,会发送RemoveShuffle(shuffleId)请求,BlockManagerSlaveEndpoint接收到request后,最终会由ShuffleManager调用unregisterShuffle方法,根据以上的taskIdMapsForShuffle清理shuffleId对应的task的shuffle数据文件和索引文件。

2.shuffleBlockResolver
  private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)

  override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

这边会分别在ShuffleManager和LocalDiskShuffleExecutorComponents中实例化两个ShuffleBlockResolver,ShuffleBlockResolver不保存状态,只需要指定blockId来寻找数据文件和索引文件,即可保证写出文件和清理文件的一致性。如,以LocalDiskShuffleExecutorComponents的createSingleFileMapOutputWriter为例,该writer用于在shuffle write spill files只有一个的场景下,生成索引文件,通过其成员变量blockResolver获取数据文件和索引文件,而blockResolver又通过blockManager获取特定blockId的数据文件和索引文件,进行写入。同时,ShuffleManager在清理shuffle数据时,另一个shuffleBlockResolver实例也是通过blockId作为寻址方式寻找特定的数据文件和索引文件进行清理。

二.函数 2.1 registerShuffle

根据不同的条件判断shuffle writer的选型。

另外,ShuffleDependency会调用shuffleManager的registerShuffle生成ShuffleHandle,作为血缘元数据传递给shuffle reader,后续,shuffle reader可以根据ShuffleHandle中的ShuffleId,计算出blockId,然后去拉取相应的数据。同时ShuffleHandle也给shuffle reader传递了具体的计算函数,如aggregator等。

2.2 getReader 和 getReaderForRange

getReader根据startPartition和endPartition找到blockId,其中,startPartition是每个reducer所对应的Partition的indexId,endPartition一般是startPartition+1。

ShuffleBlockId(shuffleId, status.mapId, part),其中的part就是startPartition到endPartition,前闭后开。

而getReaderForRange根据startMapIndex,endMapIndex,startPartition和endPartition找到blockId。

      case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex) =>
        SparkEnv.get.shuffleManager.getReaderForRange(
          dependency.shuffleHandle,
          startMapIndex,
          endMapIndex,
          reducerIndex,
          reducerIndex + 1,
          context,
          sqlMetricsReporter)

如上,在org.apache.spark.sql.execution.ShuffledRowRDD#compute中,就调用了getReaderForRange。场景是,AQE(下一章会进一步介绍),在join遇到倾斜的partition时,先根据mapstatues计算出reducerId需要fetch的block size,如果发现有一些reducer可能产生倾斜,则会计算出targetSize,然后根据targetSize对该reducer需要fetch数据的mapper进行再次的split,split成多个range partition,由多个reducer去读取自己的range(startMapIndex~endMapIndex)对应的这些mapper中的数据,最后,与getReader相同,再加上自己的reducerId,算出blockId。

2.3 getWriter

主要是根据shuffleHandle去匹配相应的shuffle writer。需要注意,shuffle writer在写数据时,也会根据自己的mapId来拼接文件名。便于shuffle reader根据相同的shuffleId和mapId去读取block。

private def getIndexFile(
      shuffleId: Int,
      mapId: Long,
      dirs: Option[Array[String]] = None): File = {
    val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
    dirs
      .map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, blockId.name))
      .getOrElse(blockManager.diskBlockManager.getFile(blockId))
  }

@DeveloperApi
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Long, reduceId: Int) extends BlockId {
  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/722407.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号