- 一,程序入口。
- 二,stage提交
- 2.1,
- 2.2,
- 三,Task任务生成
- 四,总论
链接: spark源码跟踪(三)stage划分. 一,程序入口。
val sparkConnf=new SparkConf().setAppName("wordCount").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)
val rdd=sparkContext.parallelize(Array("hello thank you thank you very much are you ok"),3)
val words_rdd=rdd.flatMap(word=> word.split(" "))
val keyValue_rdd=words_rdd.map(word=>(word,1))
val result=keyValue_rdd.reduceByKey((x,y)=>x+y)
result.collect().map(x=> println(x._1+":"+x._2))
sparkContext.stop()
stage依赖关系。
stage阶段划分: spark源码跟踪(三)stage划分.
阶段划分后,调用private def submitStage(stage: Stage): Unit 函数,提交DAG最末端的ResultStage。
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(s"submitStage($stage (name=${stage.name};" +
s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
val missing = getMissingParentStages(stage).sortBy(_.id) 会返回所有的祖宗ShuffleMapStage,根据ShuffleMapStage.id 排序,生成一个父ShuffleMapStage在前,子ShuffleMapStage在后的队列。这一段代码比较费解,看起来逻辑是:
如果没有父级ShuffleMapStage,则提交当前stage;如果有的话则迭代提交父级ShuffleMapStage,但是提交父级ShuffleMapStage之后并没有提交自己,结果是不是只提交了第一个stage?肯定不是的。有以下两处关键代码
获取祖宗ShuffleMapStage的时候并不是返回了所有的ShuffleMapStage,其中有一个过滤条件
if (!mapStage.isAvailable)
Stage.scala文件相关代码
val numPartitions = rdd.partitions.length def numAvailableOutputs: Int = mapOutputTrackerMaster.getNumAvailableOutputs(shuffleDep.shuffleId) def isAvailable: Boolean = numAvailableOutputs == numPartitions
MapOutputTracker.scala相关代码
def getNumAvailableOutputs(shuffleId: Int): Int = {
shuffleStatuses.get(shuffleId).map(_.numAvailableOutputs).getOrElse(0)
}
def numAvailableOutputs: Int = withReadLock {
_numAvailableOutputs
}
private[this] var _numAvailableOutputs: Int = 0
def addMapOutput(mapIndex: Int, status: MapStatus): Unit = withWriteLock {
if (mapStatuses(mapIndex) == null) {
_numAvailableOutputs += 1
invalidateSerializedMapOutputStatusCache()
}
mapStatuses(mapIndex) = status
}
提交stage后tasks执行的生命周期中会对通过调用
def addMapOutput(mapIndex: Int, status: MapStatus): Unit
函数改变变量"_numAvailableOutputs"的数值从而控制
if (!mapStage.isAvailable),
具体调用的地方是
DAGScheduler.scala 文件
private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit函数。结果是依赖链头部的stage提交后,下一次提交stage时已经提交过的stage不会出现在missing队列中,开始提交依赖链中下一个stage。
可以看到submitStage(stage:Stage):Unit 提交“头部”stage后并不会提交子stage,那子stage是在哪里提交的呢?
往回追溯submitStage(stage:Stage):Unit 是在一个eventThread的线程中被调用的,线程中有while循环,所以方法回被多次调用,每次调用都提交依赖链中的“头部”stage。
继续跟踪stage提交的过程
代码太多,只看关键部分
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $en${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()。
两类stage中都是这行代码决定了task的数量。
ShuffleMapStage.scala
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
val numPartitions = rdd.partitions.length
ShuffleMapStage的情况下,partitionsToCompute 的值为ShufferDependcy关系中父RDD的分区数量,所以ShuffleMapStage中ShuffleMapTask的数量等于ShuffleMapStage中最后一个RDD的分区数量。
ResultStage.scala
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
同样ResultTask的数量也是spark app中执行行动操作的rdd的分区数量,也就是ResultStage最后一个RDD的分区数量。
四,总论1,Task的分为ShuffleMapTask和ResultTask。
2,Task的总数量为所有ShuffleMapStage的最后一个rdd的分区数之和+一个ResultStage最后一个rdd的分区数。



