2021SC@SDUSC
目录
SparkEnv
1.创建缓存管理器CacheManager
2.HTTP文件服务器HTTPFileServer
3.创建测量系统MetricsSystem
4.创建SparkEnv
SparkEnv
1.创建缓存管理器CacheManager
CacheManager用于缓存RDD某个分区计算后的中间结果,缓存计算结果发生在迭代计算的时候、创建CacheManager的代码如下
val cacheManager = new CacheManager(blockManager)
2.HTTP文件服务器HTTPFileServer
HttpFileServer主要提供对jar及其他文件的http访问,这些jar包包括用户上传的jar包。端口由属性spark.fileserver.port配置,默认为0,表示随机生成端口号。HttpFileServer的代码如下。
val httpFileServer =
if (isDriver) {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
} else {
null
}
HttpFileServer的初始化过程代码如下,主要有以下几个步骤:
- 使用Utils工具类创建文件服务器的根目录及临时目录(临时目录会在运行时环境关闭时会删除)。
- 创建存放jar包及其他文件的文件目录
- 创建并启动HTTP服务
def initialize() {
baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
httpServer.start()
serverUri = httpServer.Uri
logDebug("HTTP file server started at: " + serverUri)
}
HttpServer的构造和start方法的实现中,再次使用了Utils的静态方法startServiceOnPort,因此会回调doStart方法,代码如下
def start() {
if (server != null) {
throw new serverStateException("Server is already started")
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) =
Utils.startServiceOnPort[Server] (requestedPort, doStart, conf, serverName)
server = actualServer
port = actualPort
}
}
doStart方法中启用内嵌的Jetty所提供的HTTP服务,代码如下
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
sserver.addConnector(connector)
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourcebase(resourcebase, getAbsolutePath)
val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using secutiry")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}
server.start(0
val actualPort = server.getConnectors()(0).getLocalPort
(server, actualPort)
}
3.创建测量系统MetricsSystem
MetricsSystem是Spark的测量系统,创建MetricsSystem的代码如下:
val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
上面调用的createMetricsSystem方法实际创建了MetricsSystem,代码如下:
def createMetricsSystem(
instance: String, conf: SparkConf, securityMgr: SecurityManager):
MetricsSystem = {
new MetricsSystem(instance, conf, securityMgr)
}
构造MetricsSystem的过程最重要的是调用了MetricsConfig的initialize方法,代码如下:
def initialize() {
setDefaultProperties(properties)
var is: InputStream = null
try{
is = configFile match{
case Some(f) => new FileInputStream(f)
case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
}
if (is != null){
properties.load(is)
}
} catch {
case e: Exception => logError("Error loading configure file", e)
} finally {
if (is != null) is.close(0
}
propertyCategories = subProperties(properties, INSTANCE_REGEX)
if (propertyCategories.contains(DEFAULT_PREFIX)) {
import scala.collection.JavaConversions._
val defaultProperty = propertyCategories(DEFAULT_PREFIX)
for { (inst, prop) <- propertyCategories
if (inst != DEFAULT_PREFIX)
(k, v) <- defaultProperty
if (prop.getProperty(k) == null) } {
prop.setProperty(k, v)
}
}
}
MetricsConfig的initialize方法主要负责加载metrics.properties文件中的属性配置,并对属性进行初始化转换。
例如,将属性
{*.sink.servlet.path=/metrics/json, applications.sink.servlet.path=/metrics/applications/json, *sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet,
master.sink.servlet.path=/metrics/master/json}
转换为
Map(application -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/applications/json}, master -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/master/json}, * -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json})
4.创建SparkEnv
当所有的基础组件准备好后,最终使用下面的代码创建执行环境SparkEnv
new SparkEnv(executorId, actorSystem, serializer, closureSerializer, cacheManager, mapOutputTracker, shuffleManager, broadcastManager, blockTransferService, blockManager, securityManager, httpFileServer, sparkFilesDir, metricsSystem, shuffleMemoryManager, conf)



