栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

山东大学软件工程应用与实践——Spark项目(十二)

山东大学软件工程应用与实践——Spark项目(十二)

2021SC@SDUSC

文章目录
  • 2021SC@SDUSC
  • 前言
  • 一.状态更新
  • 二.任务还原
    • 2.1 反序列化
    • 2.2 更新依赖
  • 三.任务运行
  • 四.任务执行后续处理


前言

    继续沿着前面博客的思路,我们继续分析Spark的任务提交过程。本次博客,我们正式分析任务的执行过程。


一.状态更新

    任务执行的开始阶段,Spark会调用Executor的launchTask方法,其代码实现如下:

def launchTask(
		context: ExecutorBackend, taskld: Long, taskName: String, serializedTask: ByteBuffer) {
	val tr = new TaskRunner(context, taskld, taskName, serializedTask) 
	runningTasks.put(taskld, tr) 
	threadPool.execute(tr)
}

    分析上述代码可知,该方法的执行过程如下:

  1. 创建 Task Runner,并将其与 taskld、taskName 及 serializedTask 添加到 runningTasks = new ConcurrentHashMap[Long, TaskRunner]中。
  2. TaskRunner实现了 Runnable接口,然后使用线程池执行TaskRunner。

    在线程执行时,会调用TaskRunner的run方法,该方法实现了状态更新、任务反序列化和任务运行的过程。下面我们分别来看一下这三个过程的实现:
    首先是调用execBackend的statusUpdate方法实现任务状态更新,代码如下:

execBackend.statusUpdate(taskId, Taskstate.RUNNING, EMPTY_BYTE_BUFFER)

    该过程会向LocaLActor发送StatusUpdate消息,代码如下:

override def StatusUpdate (taskld: Long, state: Taskstate, serializeddata: ByteBuffer) {
	locaLActor ! StatusUpdate(taskld, state, serializedData)
}

    LocalActor 在接收到 StatusUpdate 消息时,会匹配执行 TaskSchedulerlmpl 的 StatusUpdate 方 法,并根据Task的最新状态做一系列处理。


二.任务还原

    这里所谓的任务还原就是将Driver提交的Task在Executor上通过反序列化、更新依赖达到Task还原效果的过程。

2.1 反序列化

    我们对之前博客中的序列化的serializedTask执行反序列化操作,代码如下:

val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
2.2 更新依赖

    更新依赖的文件或者jar包,代码如下:

updateDependencies(taskFiles, taskJars)

    updateDependencies方法的实现如下:

private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap [String, Long]) {
	lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) 
	synchronized {
		// Fetch missing dependencies
		for ((name, timestarrp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { 
			loginfo ("Fetching " + name + " with timestamp " + timestamp) 
			// Fetch file with useCache mode, close cache for local mode. 
			Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, hadoopConf, timestamp, useCache = !isLocal) 
			currentFiles(name) = timestamp
		}
		for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
			loginfo ("Fetching" + name + " with time stamp " + timestamp)
			// Fetch file with useCache mode, close cache for local mode.
			Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, hadoopConf, timestamp, useCache = IisLocal) 
			currentJars(name) = timestamp
			// Add it to our class loader
			val localName = name. split ("/") .Last
			val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL 
			if (!urlClassLoader.getURLs.contains(url)) { 
				loginfo("Adding " + url + " to class loader") 
				urlClassLoader.addURL(url)
			}
		}
	}
}

    分析上述代码可以知道,updateDependencies方法是利用Utils.fetchFile方法来获取依赖的,并且会把下载的jar文件添加到Executor的URL中。

    最后将Task的ByteBufter反序列化为Task实例,实现如下:

task = ser.deserialize[Task[Any]](taskBytes, Thread.curreThread.getContext-ClassLoader)

三.任务运行

    经过状态更新和任务还原的过程,最后我们开始运行任务。TaskRunner会调用Task的run方法来运行任务,run方法的实现如下:

final def run(attemptld: Long): T = {
	context = new TaskContextlmpl(stageId, partitionld, attemptld, runningLocally = false)
	TaskContextHelper.setTaskContext(context) 
	context.taskMetrics.hostname = Utils.localHostName() 
	taskThread = Thread.currentThread()
	if (_killed) {
		kill(interruptThread = false)
	}
	try {
		runTask(context)
	} finally {
		context.markTaskCompleted()
		TaskContextHelper.unset()
	}
}

    分析以上代码我们可以知道,run方法中创建了 TaskContextlmpl,并将其设置到TaskContext的ThreadLocal中,最后调用了runTask方法,改方法的实现如下:

override def runTask(context: TaskContext): MapStatus = {
	val ser = SparkEnv.get.closureSerializer.newlnstance()
	val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
	metrics = Some(context.taskMetrics)
	var writer: ShuffleWriter[Any, Any] = null
	try {
		val manager = SparkEnv.get.shuffleManager
		writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionld, context)
		writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
		return writer.stop(success = true).get
	} catch {
		case e: Exception =>
		try {
			if (writer != null) {
			writer.stop(success = false)
		}
	} catch {
		case e: Exception =>
			log.debug("Could not stop writer", e)
		}
		throw e
	}
}

    那么上面的runTask方法完成了哪些工作呢?曾经分析submitMissingTasks的时候,对任务的RDD和ShufifleDependency进行过序列化操作,因此现在要进行反序列化了,这样就可以得到 RDD 和 ShuffleDependency。接下来会调用 SortShuffleManager 的 getWriter 方法获取 partitionld指定分区的SortShuffleWriter。之后便利用此Writer将计算的中间结果写入文件中。下面我们来看一下SortShuffle-Manager 的 getWriter 方法的代码实现:

override def getWriter[K, V] (handle: ShuffleHandle, mapld: Int, context: TaskContext)
		: ShuffleWriter[K, V] = {
	val baseShuffleHandle = handle.asInstanceOf[baseShuffleHandle [K, V, _]] 		
	shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffield, baseShuffleHandle.numMaps)
	new SortShuffleWriter (
		shuffleBlockManager, baseShuffleHandle, mapld, context)
}

    在上面的代码中,参数mapId实际传入的是partitionId,由此我们也可以看到partition和map任务的关系。
    在这里,SortShuffleWriter主要负责计算结果的缓存处理及持久化,具体来说即map任务的Stage的任务执行结果将通过SortShuffleManager持久化到存储体系,更详细的内容在之前介绍MapReduce时已经有过详细的介绍,在此不作赘述。


四.任务执行后续处理

    接下来我们介绍任务执行后续处理的一些方法,这里主要包括计量统计与执行结果序列化、内存回收、执行结果处理三部分内容,本篇博客先分析其中的计量统计与执行结果序列化,剩余两部分在下篇博客进行分析。
    在Spark的任务处理过程中,当任务执行结束后,还会通过下列程序对任务进行一些后续统计和处理:

val taskFinish = System.currentTimeMiIlls()

	if (task.killed) {
		throw new TaskKilledException
	}
	
	val resultSer = SparkEnv.get.serializer.newlnstance()
	val beforeSerialization = System.currentTimeMillis()
	val valueBytes = resultSer.serialize(value)
	val afterSerialization = System.currentTimeMillis()
	
	for (m <- task.metrics) {
		m.executorDeserializeTime = taskstart - deserializestartTime
		m.executorRunTime = taskFinish - taskstart
		m.jvmGCTime = gcTime - startGCTime
		m.resultSerializationTime = afterSerialization - beforeSerialization
	}

	val accumUpdates = Accumulators.values

	val directResult = new DirectTaskResult(valueBytes, accumUpdates, task, metrics.orNull)
	val serializedDirectResult = ser.serialize(directResult)
	val resultsize = serializedDirectResult.limit

    通过分析以上代码,我们可以知道Spark主要进行了以下后续处理:

  1. 任务执行结果的简单序列化。
  2. 计量统计,统计的指标包括:
指标描述
executorDeserializeTime反序列化消耗的时间
executorRunTime实际执行任务消耗的时间
jvmGCTime执行垃圾回收消耗的时间
resultSerializationTime执行结果序列化消耗的时间
  1. 将前两步得到的简单序列化结果和计量统计内容封装为DirectTaskResult,然后对其执行序列化。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677241.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号