栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark源码跟踪(二)stage划分

spark源码跟踪(二)stage划分

stage划分
  • 一,stage类图
  • 二,源码跟踪
  • 三,结论

一,stage类图

二,源码跟踪

入口

val sparkConnf=new SparkConf().setAppName("wordCount").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)

val rdd=sparkContext.parallelize(Array("hello thank you thank you very much are you ok"),3)
val words_rdd=rdd.flatMap(word=> word.split(" "))
val keyValue_rdd=words_rdd.map(word=>(word,1))
val result=keyValue_rdd.reduceByKey((x,y)=>x+y)
result.collect().map(x=> println(x._1+":"+x._2))
sparkContext.stop()

点击result.collect() 跟踪方法调用。关键代码截图:



出现了一个名为eventThread的线程。

// Exposed for testing.
private[spark] val eventThread = new Thread(name) {
  setDaemon(true)


  override def run(): Unit = {
    try {
      while (!stopped.get) {
        val event = eventQueue.take()
        try {
          onReceive(event)
        } catch {
          case NonFatal(e) =>
            try {
              onError(e)
            } catch {
              case NonFatal(e) => logError("Unexpected error in " + name, e)
            }
        }
      }
    } catch {
      case ie: InterruptedException => // exit even if eventQueue is not empty
      case NonFatal(e) => logError("Unexpected error in " + name, e)
    }
  }
}

eventThread是一个守护线程,启动后从eventQueue中获取上文中post函数提交的对象,并通过子类的onReceive函数处理。

JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,Utils.cloneProperties(properties))


private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  checkBarrierStageWithDynamicAllocation(rdd)
  checkBarrierStageWithNumSlots(rdd)
  checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}

val parents = getOrCreateParentStages(rdd, jobId)

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  getShuffleDependencies(rdd).map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  }.toList
}

getShuffleDependencies(rdd) 会从后往前查找rdd每条依赖链上的最后一级ShuffleDependency。


private[scheduler] def getShuffleDependencies(
    rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  val parents = new HashSet[ShuffleDependency[_, _, _]]
  val visited = new HashSet[RDD[_]]
  val waitingForVisit = new ListBuffer[RDD[_]]
  waitingForVisit += rdd
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.remove(0)
    if (!visited(toVisit)) {
      visited += toVisit
      toVisit.dependencies.foreach {
        case shuffleDep: ShuffleDependency[_, _, _] =>
          parents += shuffleDep
        case dependency =>
          waitingForVisit.prepend(dependency.rdd)
      }
    }
  }
  parents
}


如上图的依赖关系会返回ShuffleDependency1,ShuffleDependency3组成的hashset。
继续跟踪getOrCreateShuffleMapStage(shuffleDep, firstJobId)

private def getOrCreateShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
  shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) =>
      stage


    case None =>
      // Create stages for all missing ancestor shuffle dependencies.
      getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
        // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
        // that were not already in shuffleIdToMapStage, it's possible that by the time we
        // get to a particular dependency in the foreach loop, it's been added to
        // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
        // SPARK-13902 for more information.
        if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
          createShuffleMapStage(dep, firstJobId)
        }
      }
      // Finally, create a stage for the given shuffle dependency.
      createShuffleMapStage(shuffleDep, firstJobId)
  }
}

getMissingAncestorShuffleDependencies函数会根据依赖链以上文找到的最后一级ShuffleDependency的父RDD为起点从后往前追溯,获取所有的ShuffleDependency组成队列返回,prepend函数将新的元素插入到队列的开始位置,所以返回的ancestors队列中祖先的ShuffleDependency在队首,子ShuffleDependency在队尾,与DAG中顺序一致。

createShuffleMapStage函数会按照DAG中ShuffleDependency出现的顺序依次被调用创建ShuffleMapStage,并存储在名为stageIdToStage的HashMap[Int, Stage]中,key依次递增。如果有多个依赖分支,则一个一个分支创建,祖先ShuffleMapStage编号小于子ShuffleMapStage编号。

private val nextStageId = new AtomicInteger(0)

private[scheduler] val stageIdToStage = new HashMap[Int, Stage]


def createShuffleMapStage[K, V, C](
    shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
  val rdd = shuffleDep.rdd
  checkBarrierStageWithDynamicAllocation(rdd)
  checkBarrierStageWithNumSlots(rdd)
  checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
  val numTasks = rdd.partitions.length
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new ShuffleMapStage(
    id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)


  stageIdToStage(id) = stage
  shuffleIdToMapStage(shuffleDep.shuffleId) = stage
  updateJobIdStageIdMaps(jobId, stage)


  if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo(s"Registering RDD ${rdd.id} (${rdd.getCreationSite}) as input to " +
      s"shuffle ${shuffleDep.shuffleId}")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  }
  stage
}
三,结论

ShuffleMapStage与ShuffleDependency的一一对应;
总stage的数量=ShuffleMapStage总数 + 1(一个ResultStage)。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342348.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号