2021SC@SDUSC
目录
创建DAGSchedulerSource和BlockManagerSource
将SparkContext标记为激活
创建DAGSchedulerSource和BlockManagerSource
在创建DAGSchedulerSource、blockManagerSource之前首先调用taskScheduler的postStartHook方法,其目的是为了等待backend就绪,见代码清单3-53。 poststartHook的实现见代码清单3-54。
创建DAGSchedulerSource和BlockManagerSource的过程类似于ExecutorSource,只不过DAGSchedulerSource测量的信息是stage.failedStages、stage.runningStages、stage.waiting-Stages、stage.allJobs、stage.activeJobs,BlockManagerSource测员的信息是memory. maxMem_MB、 memory. remainingMem_MB. memory. memUsed_MB、 memory. diskSpaceUsed_MB。
// code 3-53
taskScheduler.postStartHook()
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriverMetrics ()
// code 3-54
override def postStartHook() {
waitBackendReady()
}
private def waitBackendReady(): Unit = {
if (backend.isReady) {
return
}
while (!backend.isReady) (
synchronized {
this.wait(100)
}
}
}
将SparkContext标记为激活
SparkContext 初始化的最后将当前 SparkContext 的状态从 contextBeingConstructed (正在构建中)改为 activeContext (已激活), 代码如下。
SparkContext.setActiveContext(this, allowMultipleContexts)
setActiveContext 方法的实现如下
private[spark] def setActiveContext(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextisRunning(sc, allowMultipleContexts)
contextBeingConstructed = None
activeContext =Some(sc)
}
}



