栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

山东大学软件工程应用与实践: Spark(十二) 代码分析

山东大学软件工程应用与实践: Spark(十二) 代码分析

2021SC@SDUSC


目录

创建和启动ExecutorAllocationManager

ContextCleaner的创建和启动

Spark环境更新


创建和启动ExecutorAllocationManager

ExecutorAllocationManager用于对已分配的Executor进行管理,创建和启动ExecutorAllocationManager的代码如下:

private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = 
    if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
        Some ( new ExecutorAllocationManager(this, listenerBus, conf))
    } else {
        None
    }
executorAllocationManager.foreach(_.start())

默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件、动态添加、删除Executor。并且通过Thread不断添加Executor,遍历Executor,将超过的Executor杀掉并移除。ExecutorAllocationManager的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码如下(3-47):

// code 3-47
private val intervalMillis: Long = 100
private var clock: Clock = new RealClock
private val listener = new ExecutorAllocationListener
def start(): Unit = {
    listenerBus.addListener(listener)
    startPolling()
}

private def startPolling(): Unit = {
    val t = new Thread {
        override def run(): Unit = {
            while (true) {
                try {
                    scheduel()
                } catch {
                    case e: Exception => logError("Exception in dynamic executor allocation         
                        thread!", e)
                }
                Thread.sleep(intervalMillis)
            }
        }
    }
    t.setName("spark-dynamic-executor-allocation")
    t.setDaemon(true)
    t.start()
}    

ContextCleaner的创建和启动

ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下:

private[spark] val cleaner: Option[ContextCleaner] = {
    if (conf.getBoolean("spark.cleaner.referneceTracking", true)) {
        Some(new ContextCleaner(this))
    } else {
        None
    }
}
cleaner.foreach(_.start())

ContextCleaner的组成如下:

  • referenceQueue:缓存顶级的AnyRef引用
  • referenceBuffer:缓存AnyRef的虚引用
  • listeners:缓存清理工作的监听器数组
  • cleaningThread:用于具体清理工作的线程

ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现代码如下(3-48):

// code 3-48

private def keepCleaning(): Unit = Utils.logUncaughtException {
    while (!stopped) {
        try {
            val reference =         
                Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
                .map(_.asInstanceOf[CleanupTaskWeakReference])
            // Synchronize here to avoid being interrupted on stop()
            synchronized {
                reference.map(_.task).foreach { task => 
                logDebug("Got cleaning task " + task) 
                referenceBuffer -= reference.get 
                task match {
                    case CleanRDD(rddId) =>
                        doCleanupRDD(rddId, blocking = blockOnCleanupTasks) 
                    case CleanShuffle(shuffleId) => 
                        doCleanupShuffle(shuffleId, blocking= blockOnShuffleCleanupTasks)                 
                    case CleanBroadcast(broadcastId) => 
                        doCleanupBroadcast(broadcastId, blocking= blockOnCleanupTasks)
                    }
                }
            }
        } catch {
            case ie: InterruptedException if stopped => // ignore 
            case e: Exception => logError("Error in cleaning thread", e)
        }
    }
}

Spark环境更新

在 SparkContext 的初始化过程中,可能对其环境造成影响,所以需要更新环境如下,代码如下:

postEnvironmentUpdate () 
postApplicationStart () 

SparkContext 初始化过程中,如果设置了 spark.jars 屈性,spark.jars 指定的 jar 包将由 addJar 方法加入 httpFileServer 的 jarDir 变址指定的路径下 spark.files 指定的文件将由 addFile 方法加入 httpFileServer 的 fileDir 变扯指定的路径下,代码如下(3-49):

val jars: Seq[String] =
    conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).
        toSeq.flatten

val files: Seq[String] =
    conf.getOption("spark.files").map(_.split (",")).map(_.filter(_.size != 0)).
        toSeq.flatten

// Add each JAR given through the constructor
    if (jars != null} {
        jars.foreach(addJar)
    }

    if (files != null) {
        files.foreach(addFile)
}

httpFileServer的addFile和addJar方法,代码如下(3-50):

// code 3-50
def addFile(file: File) : String = {
    addFileToDir(file, fileDir) 
    serverUri +"/files/"+ file.getName 
}

def addJar(file: File): String = {
    addFileToDir (file, jarDir) 
    serverUri +"jars/"+ file.getName
}

def addFileToDir(file: File, dir: File) : String = {
    if (file.isDirectory) { 
        throw new IllegalArgumentException(s"$file cannot be a directory.") 

    Files.copy(file, new File(dir, file.getName)) 
    dir + "/" + file.getName 
}

postEnvironmentUpdate的实现见代码(3-51),处理步骤如下:

  1. 通过调用SparkEnv的方法 environmentDetails 最终影响环境的JVM参数Spark属性、系统属性、classPath等,见代码(3-52)
  2. 生成事件 SparkListencrEnvironmentUpdate,并 pos t到 listenerBus,此事件被Environ­mentListener监听,最终影响 EnvironmentPage 页面中的输出内容。
// code 3-51
private def postEnvironment:Update () ( 
    if (taskScheduler != null) ( 
        val schedulingMode = getSchedulingMode.toString
        val addedJarPaths = addedJars.keys.toSeq
        val addedFilePaths = addedFiles.keys.toSeq 
        val environrnentDetiails = 
            SparkEnv.environmentDetiails(conf, schedulingMode, addedJarPaths,
                addedFilePaths) 
        val environrnentUpdate = SparkListenerEnvironrnentUpdate(environrnentDelails)         
        listenerBus.post(environmentUpdate) 
    }
}
// code 3-52
val jvmlnformation Seq(
    ("Java Version", s"$javaVersion ($javaVendor) "), 
    ("Java Home", javaHome), 
    ("Scala Version", versionString) 
).sorted

val schedulerMode = 
    if (! conf.contains("spark.scheduler.mode")) {
        Seq(("spark.scheduler.mode", schedulingMode))
    } else {
        Seg | (String, String)] ()
    }

val sparkProperities = conf.getAll ++ schectulerMode).sorted 

// System properties that are not java classpaths 
val systemProperties = Utils.getSystemProperties.toSeq 
val otherProperties = systemProperties.filter { case (k, _) => 
    k != "java.class.path" && !k. startsWith("spark._")
}.sorted
// Class paths including all added jars and files 
val classPathEntries = javaClassPath 
    .split(File.pathSeparator) 
    .filLerNot(_.isEmpty) 
    .map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) 
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted 

Map[String, Seq[(String, String)]] { 
    "JVM Information" -> jvminformation, 
    "Spark Properties" -> sparkProperties,
    "System Properties" -> otherProperties, 
    "Classpath Entries" -> classPaths) 
}

postApplicationStart方法很简单,只是向IistenerBus发送(SparkListener ApplicationStart)事件,代码如下:

ListenerBus.post(SparkListenerApplicationStart (appName, Some(applicationId), startTime, sparkUser))

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676941.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号