栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark的Stage到Task的生成过程

Spark的Stage到Task的生成过程

RDD任务的划分主要可以分为四块:Application、Job、Stage 和 Task。

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1,因为最后一个Stage是ResultStage;
  • Task:一个 Stage 中,最后一个 RDD 的分区个数就是 Task 的个数。Driver根据Stage内的最后RDD的分区数确定TaskSet,从而发给对应的Executors。

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

接下来我们来看看Stage是如何生成Task的。
首先,我们从collect的runJob入手,这个之前的博客已经讲过了,这里不再赘述,我们一路到handleJobSubmitted。

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }

可以看到,我们在该函数的最后有一个submitStage操作,提交我们的finalStage。我们进去看看。

首先,这里有一个getMissingParentStages的操作,来寻找我们的finalStage里面是否还有更前面的Stage。
如果没有,我们就调用submitMissingTasks函数生成当前Stage的Tasks;
如果有,就执行父宽依赖的Stage的submitStage(parent),做一个递归。

  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

这里为什么要寻找父依赖的Stage呢?是因为我们的Tasks毫无疑问是从头往后执行的,所以这里也需要让前面的Stage先生成Tasks,才能保证执行顺序一致。

进入submitMissingTasks后,有很长的一段代码,我们直接看生成Task的关键代码。

    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $en${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

这里根据我们Stage的类型创建不同的Task,分成了ShuffleMapStage和ResultStage两类,因为也只有这两种Stage,窄依赖不生成Stage。

这里我们可以看到,两个case都是按照partitionsToCompute.map生成Task,每个方法体生成一个Task。由于map并不会改变数据的数量,所以partitionsToCompute有多少个,Task就会生成多少个。

case stage: ShuffleMapStage =>
  stage.pendingPartitions.clear()
  partitionsToCompute.map { id =>
    val locs = taskIdToLocations(id)
    val part = partitions(id)
    stage.pendingPartitions += id
    new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
      taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
      Option(sc.applicationId), sc.applicationAttemptId)
  }

case stage: ResultStage =>
  partitionsToCompute.map { id =>
    val p: Int = stage.partitions(id)
    val part = partitions(p)
    val locs = taskIdToLocations(id)
    new ResultTask(stage.id, stage.latestInfo.attemptNumber,
      taskBinary, part, locs, id, properties, serializedTaskMetrics,
      Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
  }

我们看这个名字就觉得这个数量应该和分区数一致,为了验证想法,我们接着往下看。

这个partitionsToCompute的值就是当前Stage的findMissingPartitions函数返回结果。

// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

我们进入findMissingPartitions函数,发现它是Stage抽象类的抽象函数。

但实际上,两个Stage实现的结果都相同,就是返回该Stage最后一个RDD的分区。这就验证了这里生成Task的数量就是该Stage最后RDD的分区数。

ShuffleMapStage的findMissingPartitions:

  override def findMissingPartitions(): Seq[Int] = {
    mapOutputTrackerMaster
      .findMissingPartitions(shuffleDep.shuffleId)
      .getOrElse(0 until numPartitions)
  }

ResultStage的findMissingPartitions:

  override def findMissingPartitions(): Seq[Int] = {
    val job = activeJob.get
    (0 until job.numPartitions).filter(id => !job.finished(id))
  }

这里的(0 until numPartitions)就足够证明数量和分区数对应。

因此,我们捋清了Stage生成Tasks的过程,并且得出结论:
一个 Stage 中,最后一个 RDD 的分区个数就是该Stage生成Tasks 的个数。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389226.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号