2021SC@SDUSC
目录
创建和启动ExecutorAllocationManager
ContextCleaner的创建和启动
Spark环境更新
创建和启动ExecutorAllocationManager
ExecutorAllocationManager用于对已分配的Executor进行管理,创建和启动ExecutorAllocationManager的代码如下:
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
Some ( new ExecutorAllocationManager(this, listenerBus, conf))
} else {
None
}
executorAllocationManager.foreach(_.start())
默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件、动态添加、删除Executor。并且通过Thread不断添加Executor,遍历Executor,将超过的Executor杀掉并移除。ExecutorAllocationManager的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码如下(3-47):
// code 3-47
private val intervalMillis: Long = 100
private var clock: Clock = new RealClock
private val listener = new ExecutorAllocationListener
def start(): Unit = {
listenerBus.addListener(listener)
startPolling()
}
private def startPolling(): Unit = {
val t = new Thread {
override def run(): Unit = {
while (true) {
try {
scheduel()
} catch {
case e: Exception => logError("Exception in dynamic executor allocation
thread!", e)
}
Thread.sleep(intervalMillis)
}
}
}
t.setName("spark-dynamic-executor-allocation")
t.setDaemon(true)
t.start()
}
ContextCleaner的创建和启动
ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下:
private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referneceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
}
cleaner.foreach(_.start())
ContextCleaner的组成如下:
- referenceQueue:缓存顶级的AnyRef引用
- referenceBuffer:缓存AnyRef的虚引用
- listeners:缓存清理工作的监听器数组
- cleaningThread:用于具体清理工作的线程
ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现代码如下(3-48):
// code 3-48
private def keepCleaning(): Unit = Utils.logUncaughtException {
while (!stopped) {
try {
val reference =
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking= blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking= blockOnCleanupTasks)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
Spark环境更新
在 SparkContext 的初始化过程中,可能对其环境造成影响,所以需要更新环境如下,代码如下:
postEnvironmentUpdate () postApplicationStart ()
SparkContext 初始化过程中,如果设置了 spark.jars 屈性,spark.jars 指定的 jar 包将由 addJar 方法加入 httpFileServer 的 jarDir 变址指定的路径下 spark.files 指定的文件将由 addFile 方法加入 httpFileServer 的 fileDir 变扯指定的路径下,代码如下(3-49):
val jars: Seq[String] =
conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).
toSeq.flatten
val files: Seq[String] =
conf.getOption("spark.files").map(_.split (",")).map(_.filter(_.size != 0)).
toSeq.flatten
// Add each JAR given through the constructor
if (jars != null} {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
httpFileServer的addFile和addJar方法,代码如下(3-50):
// code 3-50
def addFile(file: File) : String = {
addFileToDir(file, fileDir)
serverUri +"/files/"+ file.getName
}
def addJar(file: File): String = {
addFileToDir (file, jarDir)
serverUri +"jars/"+ file.getName
}
def addFileToDir(file: File, dir: File) : String = {
if (file.isDirectory) {
throw new IllegalArgumentException(s"$file cannot be a directory.")
Files.copy(file, new File(dir, file.getName))
dir + "/" + file.getName
}
postEnvironmentUpdate的实现见代码(3-51),处理步骤如下:
- 通过调用SparkEnv的方法 environmentDetails 最终影响环境的JVM参数Spark属性、系统属性、classPath等,见代码(3-52)
- 生成事件 SparkListencrEnvironmentUpdate,并 pos t到 listenerBus,此事件被EnvironmentListener监听,最终影响 EnvironmentPage 页面中的输出内容。
// code 3-51
private def postEnvironment:Update () (
if (taskScheduler != null) (
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val environrnentDetiails =
SparkEnv.environmentDetiails(conf, schedulingMode, addedJarPaths,
addedFilePaths)
val environrnentUpdate = SparkListenerEnvironrnentUpdate(environrnentDelails)
listenerBus.post(environmentUpdate)
}
}
// code 3-52
val jvmlnformation Seq(
("Java Version", s"$javaVersion ($javaVendor) "),
("Java Home", javaHome),
("Scala Version", versionString)
).sorted
val schedulerMode =
if (! conf.contains("spark.scheduler.mode")) {
Seq(("spark.scheduler.mode", schedulingMode))
} else {
Seg | (String, String)] ()
}
val sparkProperities = conf.getAll ++ schectulerMode).sorted
// System properties that are not java classpaths
val systemProperties = Utils.getSystemProperties.toSeq
val otherProperties = systemProperties.filter { case (k, _) =>
k != "java.class.path" && !k. startsWith("spark._")
}.sorted
// Class paths including all added jars and files
val classPathEntries = javaClassPath
.split(File.pathSeparator)
.filLerNot(_.isEmpty)
.map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
Map[String, Seq[(String, String)]] {
"JVM Information" -> jvminformation,
"Spark Properties" -> sparkProperties,
"System Properties" -> otherProperties,
"Classpath Entries" -> classPaths)
}
postApplicationStart方法很简单,只是向IistenerBus发送(SparkListener ApplicationStart)事件,代码如下:
ListenerBus.post(SparkListenerApplicationStart (appName, Some(applicationId), startTime, sparkUser))



