文章目录
- 2021SC@SDUSC
- 前言
- 一.状态更新
- 二.任务还原
- 2.1 反序列化
- 2.2 更新依赖
- 三.任务运行
- 四.任务执行后续处理
前言
继续沿着前面博客的思路,我们继续分析Spark的任务提交过程。本次博客,我们正式分析任务的执行过程。
一.状态更新
任务执行的开始阶段,Spark会调用Executor的launchTask方法,其代码实现如下:
def launchTask(
context: ExecutorBackend, taskld: Long, taskName: String, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskld, taskName, serializedTask)
runningTasks.put(taskld, tr)
threadPool.execute(tr)
}
分析上述代码可知,该方法的执行过程如下:
- 创建 Task Runner,并将其与 taskld、taskName 及 serializedTask 添加到 runningTasks = new ConcurrentHashMap[Long, TaskRunner]中。
- TaskRunner实现了 Runnable接口,然后使用线程池执行TaskRunner。
在线程执行时,会调用TaskRunner的run方法,该方法实现了状态更新、任务反序列化和任务运行的过程。下面我们分别来看一下这三个过程的实现:
首先是调用execBackend的statusUpdate方法实现任务状态更新,代码如下:
execBackend.statusUpdate(taskId, Taskstate.RUNNING, EMPTY_BYTE_BUFFER)
该过程会向LocaLActor发送StatusUpdate消息,代码如下:
override def StatusUpdate (taskld: Long, state: Taskstate, serializeddata: ByteBuffer) {
locaLActor ! StatusUpdate(taskld, state, serializedData)
}
LocalActor 在接收到 StatusUpdate 消息时,会匹配执行 TaskSchedulerlmpl 的 StatusUpdate 方 法,并根据Task的最新状态做一系列处理。
二.任务还原
这里所谓的任务还原就是将Driver提交的Task在Executor上通过反序列化、更新依赖达到Task还原效果的过程。
2.1 反序列化我们对之前博客中的序列化的serializedTask执行反序列化操作,代码如下:
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)2.2 更新依赖
更新依赖的文件或者jar包,代码如下:
updateDependencies(taskFiles, taskJars)
updateDependencies方法的实现如下:
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap [String, Long]) {
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
for ((name, timestarrp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
loginfo ("Fetching " + name + " with timestamp " + timestamp)
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
loginfo ("Fetching" + name + " with time stamp " + timestamp)
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, hadoopConf, timestamp, useCache = IisLocal)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name. split ("/") .Last
val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
if (!urlClassLoader.getURLs.contains(url)) {
loginfo("Adding " + url + " to class loader")
urlClassLoader.addURL(url)
}
}
}
}
分析上述代码可以知道,updateDependencies方法是利用Utils.fetchFile方法来获取依赖的,并且会把下载的jar文件添加到Executor的URL中。
最后将Task的ByteBufter反序列化为Task实例,实现如下:
task = ser.deserialize[Task[Any]](taskBytes, Thread.curreThread.getContext-ClassLoader)
三.任务运行
经过状态更新和任务还原的过程,最后我们开始运行任务。TaskRunner会调用Task的run方法来运行任务,run方法的实现如下:
final def run(attemptld: Long): T = {
context = new TaskContextlmpl(stageId, partitionld, attemptld, runningLocally = false)
TaskContextHelper.setTaskContext(context)
context.taskMetrics.hostname = Utils.localHostName()
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
try {
runTask(context)
} finally {
context.markTaskCompleted()
TaskContextHelper.unset()
}
}
分析以上代码我们可以知道,run方法中创建了 TaskContextlmpl,并将其设置到TaskContext的ThreadLocal中,最后调用了runTask方法,改方法的实现如下:
override def runTask(context: TaskContext): MapStatus = {
val ser = SparkEnv.get.closureSerializer.newlnstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionld, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
return writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
那么上面的runTask方法完成了哪些工作呢?曾经分析submitMissingTasks的时候,对任务的RDD和ShufifleDependency进行过序列化操作,因此现在要进行反序列化了,这样就可以得到 RDD 和 ShuffleDependency。接下来会调用 SortShuffleManager 的 getWriter 方法获取 partitionld指定分区的SortShuffleWriter。之后便利用此Writer将计算的中间结果写入文件中。下面我们来看一下SortShuffle-Manager 的 getWriter 方法的代码实现:
override def getWriter[K, V] (handle: ShuffleHandle, mapld: Int, context: TaskContext)
: ShuffleWriter[K, V] = {
val baseShuffleHandle = handle.asInstanceOf[baseShuffleHandle [K, V, _]]
shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffield, baseShuffleHandle.numMaps)
new SortShuffleWriter (
shuffleBlockManager, baseShuffleHandle, mapld, context)
}
在上面的代码中,参数mapId实际传入的是partitionId,由此我们也可以看到partition和map任务的关系。
在这里,SortShuffleWriter主要负责计算结果的缓存处理及持久化,具体来说即map任务的Stage的任务执行结果将通过SortShuffleManager持久化到存储体系,更详细的内容在之前介绍MapReduce时已经有过详细的介绍,在此不作赘述。
四.任务执行后续处理
接下来我们介绍任务执行后续处理的一些方法,这里主要包括计量统计与执行结果序列化、内存回收、执行结果处理三部分内容,本篇博客先分析其中的计量统计与执行结果序列化,剩余两部分在下篇博客进行分析。
在Spark的任务处理过程中,当任务执行结束后,还会通过下列程序对任务进行一些后续统计和处理:
val taskFinish = System.currentTimeMiIlls()
if (task.killed) {
throw new TaskKilledException
}
val resultSer = SparkEnv.get.serializer.newlnstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
m.executorDeserializeTime = taskstart - deserializestartTime
m.executorRunTime = taskFinish - taskstart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
}
val accumUpdates = Accumulators.values
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task, metrics.orNull)
val serializedDirectResult = ser.serialize(directResult)
val resultsize = serializedDirectResult.limit
通过分析以上代码,我们可以知道Spark主要进行了以下后续处理:
- 任务执行结果的简单序列化。
- 计量统计,统计的指标包括:
| 指标 | 描述 |
|---|---|
| executorDeserializeTime | 反序列化消耗的时间 |
| executorRunTime | 实际执行任务消耗的时间 |
| jvmGCTime | 执行垃圾回收消耗的时间 |
| resultSerializationTime | 执行结果序列化消耗的时间 |
- 将前两步得到的简单序列化结果和计量统计内容封装为DirectTaskResult,然后对其执行序列化。



