RDD任务的划分主要可以分为四块:Application、Job、Stage 和 Task。
- Application:初始化一个 SparkContext 即生成一个 Application;
- Job:一个 Action 算子就会生成一个 Job;
- Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1,因为最后一个Stage是ResultStage;
- Task:一个 Stage 中,最后一个 RDD 的分区个数就是 Task 的个数。Driver根据Stage内的最后RDD的分区数确定TaskSet,从而发给对应的Executors。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
接下来我们来看看Stage是如何生成Task的。
首先,我们从collect的runJob入手,这个之前的博客已经讲过了,这里不再赘述,我们一路到handleJobSubmitted。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
可以看到,我们在该函数的最后有一个submitStage操作,提交我们的finalStage。我们进去看看。
首先,这里有一个getMissingParentStages的操作,来寻找我们的finalStage里面是否还有更前面的Stage。
如果没有,我们就调用submitMissingTasks函数生成当前Stage的Tasks;
如果有,就执行父宽依赖的Stage的submitStage(parent),做一个递归。
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
这里为什么要寻找父依赖的Stage呢?是因为我们的Tasks毫无疑问是从头往后执行的,所以这里也需要让前面的Stage先生成Tasks,才能保证执行顺序一致。
进入submitMissingTasks后,有很长的一段代码,我们直接看生成Task的关键代码。
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $en${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
这里根据我们Stage的类型创建不同的Task,分成了ShuffleMapStage和ResultStage两类,因为也只有这两种Stage,窄依赖不生成Stage。
这里我们可以看到,两个case都是按照partitionsToCompute.map生成Task,每个方法体生成一个Task。由于map并不会改变数据的数量,所以partitionsToCompute有多少个,Task就会生成多少个。
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
我们看这个名字就觉得这个数量应该和分区数一致,为了验证想法,我们接着往下看。
这个partitionsToCompute的值就是当前Stage的findMissingPartitions函数返回结果。
// First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
我们进入findMissingPartitions函数,发现它是Stage抽象类的抽象函数。
但实际上,两个Stage实现的结果都相同,就是返回该Stage最后一个RDD的分区。这就验证了这里生成Task的数量就是该Stage最后RDD的分区数。
ShuffleMapStage的findMissingPartitions:
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
ResultStage的findMissingPartitions:
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
这里的(0 until numPartitions)就足够证明数量和分区数对应。
因此,我们捋清了Stage生成Tasks的过程,并且得出结论:
一个 Stage 中,最后一个 RDD 的分区个数就是该Stage生成Tasks 的个数。



