在上一篇文章的最后,当stage划分完了,task计算好了最佳位置,就要调用taskScheduler.submitTasks,创建taskSet对象并提交
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
默认情况下,使用standalone,taskScheduler只是一个trait,实际使用的是taskSchedulerImpl
找到taskSchedulerImpl
并找到submitTasks方法
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//给每个taskset创建一个tasksetmanager,负责taskset的监控和管理
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//然后加入缓存
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//在sparkcontext源码那里就出现过,创建TaskScheduler时,会为TaskSchedulerImpl创建一个SparkDeploySchedulerBackend,这个backend就是之前创建好的SparkDeploySchedulerBackend,这个backend是负责创建AppClient,向Master注册Application的
backend.reviveOffers()
}
进入createTaskSetManager
// Label as private[scheduler] to allow tests to swap in different task set managers if necessary
private[scheduler] def createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager = {
new TaskSetManager(this, taskSet, maxTaskFailures)
}
什么是TaskSetManager
在 TaskSchedulerImpl 中的单个 TaskSet 中调度任务。此类跟踪每个任务,如果任务失败则重试任务(最多有限次数),并通过延迟调度处理此 TaskSet 的位置感知调度。它的主要接口是 resourceOffer,它询问 TaskSet 是否要在一个节点上运行任务,以及 statusUpdate,它告诉它其中一个任务改变了状态(例如,已完成)
然后看到submitTasks方法的最后一句, backend.reviveOffers()
进入CoarseGrainedSchedulerBackend,找到reviveOffers方法
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
可以看到使用driverEndpoint发送了ReviveOffers
进入到driverEndpoint
var driverEndpoint: RpcEndpointRef = null
protected def minRegisteredRatio: Double = _minRegisteredRatio
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// TODO (prashant) send conf instead of properties
driverEndpoint = createDriverEndpointRef(properties)
}
一直进入createDriverEndpointRef,找到DriverEndpoint
并找到其中的reviveOffers方法
case ReviveOffers =>
makeOffers()
进入makeOffers
// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
//先过滤出alive的executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
//对alive的executor,得到它们上面可用的资源
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
//调用scheduler的resourceOffers,执行任务分配算法,将task分配到executor上
//然后调用launchTasks,将分配的task发送launchTasks消息到对应的executor上,由executor启动和执行task
launchTasks(scheduler.resourceOffers(workOffers))
}
先看看resourceOffers,它传入的就是executor的可用资源
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
//先shuffle,打乱executor,负载均衡
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
//创建tasks,分配给worker的
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
//从调度池中取出排序好的taskset,这个调度池就是sparkcontext初始化创建的那个调度池
//所有创建好的taskset,都会放入调度池
//执行task分配算法时,就会从池子里取出taskset
//所以,分配时是以taskset为单位的
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
//下面就是核心的任务分配算法了
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
//每个taskset从最高的本地化级别开始遍历
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) {
//对每个taskset,尝试使用每一种本地化级别
//将taskset上的task,在executor上启动
//如果无法启动,跳出do-while循环,使用另一种本地化级别
//
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
进入resourceOfferSingleTaskSet方法
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
//遍历所有executor
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
//如果cpu足够提供给每个task
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
//找到,在这个executor上的task,并且根据传入的本地化级别
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
//放入tasks,给指定的executor要启动的task
tasks(i) += task
//至此,就实现了task的分配算法
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
//设置为true
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
回到makeOffers,看看launchTasks
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//序列化为字节数组
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
//找到对应的executor
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//向 executor发送launchTask消息
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}



