栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark源码跟踪(四)stage提交与task任务生成

spark源码跟踪(四)stage提交与task任务生成

stage提交与task任务生成
  • 一,程序入口。
  • 二,stage提交
    • 2.1,
    • 2.2,
  • 三,Task任务生成
  • 四,总论

链接: spark源码跟踪(三)stage划分.

一,程序入口。
val sparkConnf=new SparkConf().setAppName("wordCount").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)

val rdd=sparkContext.parallelize(Array("hello thank you thank you very much are you ok"),3)
val words_rdd=rdd.flatMap(word=> word.split(" "))
val keyValue_rdd=words_rdd.map(word=>(word,1))
val result=keyValue_rdd.reduceByKey((x,y)=>x+y)
result.collect().map(x=> println(x._1+":"+x._2))
sparkContext.stop()

stage依赖关系。

二,stage提交

stage阶段划分: spark源码跟踪(三)stage划分.
阶段划分后,调用private def submitStage(stage: Stage): Unit 函数,提交DAG最末端的ResultStage。

private def submitStage(stage: Stage): Unit = {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug(s"submitStage($stage (name=${stage.name};" +
      s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}


val missing = getMissingParentStages(stage).sortBy(_.id) 会返回所有的祖宗ShuffleMapStage,根据ShuffleMapStage.id 排序,生成一个父ShuffleMapStage在前,子ShuffleMapStage在后的队列。这一段代码比较费解,看起来逻辑是:
如果没有父级ShuffleMapStage,则提交当前stage;如果有的话则迭代提交父级ShuffleMapStage,但是提交父级ShuffleMapStage之后并没有提交自己,结果是不是只提交了第一个stage?肯定不是的。有以下两处关键代码

2.1,

获取祖宗ShuffleMapStage的时候并不是返回了所有的ShuffleMapStage,其中有一个过滤条件
if (!mapStage.isAvailable)

Stage.scala文件相关代码

val numPartitions = rdd.partitions.length


def numAvailableOutputs: Int = mapOutputTrackerMaster.getNumAvailableOutputs(shuffleDep.shuffleId)



def isAvailable: Boolean = numAvailableOutputs == numPartitions

MapOutputTracker.scala相关代码

def getNumAvailableOutputs(shuffleId: Int): Int = {
  shuffleStatuses.get(shuffleId).map(_.numAvailableOutputs).getOrElse(0)
}


def numAvailableOutputs: Int = withReadLock {
  _numAvailableOutputs
}

private[this] var _numAvailableOutputs: Int = 0


def addMapOutput(mapIndex: Int, status: MapStatus): Unit = withWriteLock {
  if (mapStatuses(mapIndex) == null) {
    _numAvailableOutputs += 1
    invalidateSerializedMapOutputStatusCache()
  }
  mapStatuses(mapIndex) = status
}

提交stage后tasks执行的生命周期中会对通过调用
def addMapOutput(mapIndex: Int, status: MapStatus): Unit
函数改变变量"_numAvailableOutputs"的数值从而控制
if (!mapStage.isAvailable),
具体调用的地方是
DAGScheduler.scala 文件
private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit函数。结果是依赖链头部的stage提交后,下一次提交stage时已经提交过的stage不会出现在missing队列中,开始提交依赖链中下一个stage。

可以看到submitStage(stage:Stage):Unit 提交“头部”stage后并不会提交子stage,那子stage是在哪里提交的呢?

2.2,

往回追溯submitStage(stage:Stage):Unit 是在一个eventThread的线程中被调用的,线程中有while循环,所以方法回被多次调用,每次调用都提交依赖链中的“头部”stage。

三,Task任务生成

继续跟踪stage提交的过程

代码太多,只看关键部分

val tasks: Seq[Task[_]] = try {
  val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  stage match {
    case stage: ShuffleMapStage =>
      stage.pendingPartitions.clear()
      partitionsToCompute.map { id =>
        val locs = taskIdToLocations(id)
        val part = partitions(id)
        stage.pendingPartitions += id
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
          taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
          Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
      }


    case stage: ResultStage =>
      partitionsToCompute.map { id =>
        val p: Int = stage.partitions(id)
        val part = partitions(p)
        val locs = taskIdToLocations(id)
        new ResultTask(stage.id, stage.latestInfo.attemptNumber,
          taskBinary, part, locs, id, properties, serializedTaskMetrics,
          Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
          stage.rdd.isBarrier())
      }
  }
} catch {
  case NonFatal(e) =>
    abortStage(stage, s"Task creation failed: $en${Utils.exceptionString(e)}", Some(e))
    runningStages -= stage
    return
}


if (tasks.nonEmpty) {
  logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
    s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
  taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
  // Because we posted SparkListenerStageSubmitted earlier, we should mark
  // the stage as completed here in case there are no tasks to run
  markStageAsFinished(stage, None)


  stage match {
    case stage: ShuffleMapStage =>
      logDebug(s"Stage ${stage} is actually done; " +
          s"(available: ${stage.isAvailable}," +
          s"available outputs: ${stage.numAvailableOutputs}," +
          s"partitions: ${stage.numPartitions})")
      markMapStageJobsAsFinished(stage)
    case stage : ResultStage =>
      logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
  }
  submitWaitingChildStages(stage)
}

val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()。
两类stage中都是这行代码决定了task的数量。

ShuffleMapStage.scala 


override def findMissingPartitions(): Seq[Int] = {
  mapOutputTrackerMaster
    .findMissingPartitions(shuffleDep.shuffleId)
    .getOrElse(0 until numPartitions)
}

val numPartitions = rdd.partitions.length

ShuffleMapStage的情况下,partitionsToCompute 的值为ShufferDependcy关系中父RDD的分区数量,所以ShuffleMapStage中ShuffleMapTask的数量等于ShuffleMapStage中最后一个RDD的分区数量。

ResultStage.scala


override def findMissingPartitions(): Seq[Int] = {
  val job = activeJob.get
  (0 until job.numPartitions).filter(id => !job.finished(id))
}

同样ResultTask的数量也是spark app中执行行动操作的rdd的分区数量,也就是ResultStage最后一个RDD的分区数量。

四,总论

1,Task的分为ShuffleMapTask和ResultTask。

2,Task的总数量为所有ShuffleMapStage的最后一个rdd的分区数之和+一个ResultStage最后一个rdd的分区数。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/349901.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号