栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

山东大学软件工程应用与实践: Spark(五) 代码分析

山东大学软件工程应用与实践: Spark(五) 代码分析

2021SC@SDUSC


目录

SparkEnv

1.创建缓存管理器CacheManager

2.HTTP文件服务器HTTPFileServer

3.创建测量系统MetricsSystem

4.创建SparkEnv


SparkEnv

1.创建缓存管理器CacheManager

CacheManager用于缓存RDD某个分区计算后的中间结果,缓存计算结果发生在迭代计算的时候、创建CacheManager的代码如下

val cacheManager = new CacheManager(blockManager)

2.HTTP文件服务器HTTPFileServer

HttpFileServer主要提供对jar及其他文件的http访问,这些jar包包括用户上传的jar包。端口由属性spark.fileserver.port配置,默认为0,表示随机生成端口号。HttpFileServer的代码如下。

val httpFileServer = 
    if (isDriver) {
        val fileServerPort = conf.getInt("spark.fileserver.port", 0)
        val server = new HttpFileServer(conf, securityManager, fileServerPort)
        server.initialize()
        conf.set("spark.fileserver.uri", server.serverUri)
        server
    } else {
        null
    }

HttpFileServer的初始化过程代码如下,主要有以下几个步骤:

  1. 使用Utils工具类创建文件服务器的根目录及临时目录(临时目录会在运行时环境关闭时会删除)。
  2. 创建存放jar包及其他文件的文件目录
  3. 创建并启动HTTP服务
def initialize() {
    baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
    fileDir = new File(baseDir, "files")
    jarDir = new File(baseDir, "jars")
    fileDir.mkdir()
    jarDir.mkdir()
    logInfo("HTTP File server directory is " + baseDir)
    httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
    httpServer.start()
    serverUri = httpServer.Uri
    logDebug("HTTP file server started at: " + serverUri)
}

HttpServer的构造和start方法的实现中,再次使用了Utils的静态方法startServiceOnPort,因此会回调doStart方法,代码如下

def start() {
    if (server != null) {
        throw new serverStateException("Server is already started")
    } else {
        logInfo("Starting HTTP Server")
        val (actualServer, actualPort) = 
            Utils.startServiceOnPort[Server] (requestedPort, doStart, conf, serverName)
        server = actualServer
        port = actualPort
    }
}

doStart方法中启用内嵌的Jetty所提供的HTTP服务,代码如下

private def doStart(startPort: Int): (Server, Int) = {
    val server = new Server()
    val connector = new SocketConnector
    connector.setMaxIdleTime(60*1000)
    connector.setSoLingerTime(-1)
    connector.setPort(startPort)
    sserver.addConnector(connector)
    
    val threadPool = new QueuedThreadPool
    threadPool.setDaemon(true)
    server.setThreadPool(threadPool)
    val resHandler = new ResourceHandler
    resHandler.setResourcebase(resourcebase, getAbsolutePath)

    val handlerList = new HandlerList
    handlerList.setHandlers(Array(resHandler, new DefaultHandler))

    if (securityManager.isAuthenticationEnabled()) {
        logDebug("HttpServer is using secutiry")
        val sh = setupSecurityHandler(securityManager)
        // make sure we go through security handler to get resources
        sh.setHandler(handlerList)
        server.setHandler(sh)
    } else {
        logDebug("HttpServer is not using security")
        server.setHandler(handlerList)
    }

    server.start(0
    val actualPort = server.getConnectors()(0).getLocalPort
    
    (server, actualPort)
}
    

3.创建测量系统MetricsSystem

MetricsSystem是Spark的测量系统,创建MetricsSystem的代码如下:

val metricsSystem = if (isDriver) {
        MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
        conf.set("spark.executor.id", executorId)
        val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
        ms.start()
        ms
    }

上面调用的createMetricsSystem方法实际创建了MetricsSystem,代码如下:

def createMetricsSystem(
    instance: String, conf: SparkConf, securityMgr: SecurityManager):
        MetricsSystem = {
    new MetricsSystem(instance, conf, securityMgr)
}

构造MetricsSystem的过程最重要的是调用了MetricsConfig的initialize方法,代码如下:

def initialize() {
    setDefaultProperties(properties)

    var is: InputStream = null
    try{
        is = configFile match{
            case Some(f) => new FileInputStream(f)
            case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
        }
        
        if (is != null){
            properties.load(is)
        }
    } catch {
        case e: Exception => logError("Error loading configure file", e)
    } finally {
        if (is != null) is.close(0
    }

    propertyCategories = subProperties(properties, INSTANCE_REGEX)
    if (propertyCategories.contains(DEFAULT_PREFIX)) {
        import scala.collection.JavaConversions._

        val defaultProperty = propertyCategories(DEFAULT_PREFIX)
        for { (inst, prop) <- propertyCategories
            if (inst != DEFAULT_PREFIX)
            (k, v) <- defaultProperty
            if (prop.getProperty(k) == null) } {
        prop.setProperty(k, v)
        }
    }
}

MetricsConfig的initialize方法主要负责加载metrics.properties文件中的属性配置,并对属性进行初始化转换。

例如,将属性

{*.sink.servlet.path=/metrics/json, applications.sink.servlet.path=/metrics/applications/json, *sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet,
master.sink.servlet.path=/metrics/master/json}

转换为

Map(application -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/applications/json}, master -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/master/json}, * -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json})

4.创建SparkEnv

当所有的基础组件准备好后,最终使用下面的代码创建执行环境SparkEnv

new SparkEnv(executorId, actorSystem, serializer, closureSerializer, cacheManager, mapOutputTracker, shuffleManager, broadcastManager, blockTransferService, blockManager, securityManager, httpFileServer, sparkFilesDir, metricsSystem, shuffleMemoryManager, conf)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/652798.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号