val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)进入到SparkContext的createTaskScheduler方法中进行模式匹配
// local模式 略过
// standAlone 模式
case SPARK_REGEX(sparkUrl) =>
// 创建TaskScheduler的实现 TaskSchedulerImpl
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
因为查看TaskScheduler的创建需要 backend 所以我们先看Backend的创建
// 因为是StandAlone 模式下的 所以实现类是StandaloneSchedulerBackend 构造方法中依旧没有重要内容
TaskSchedulerImpl 构造方法
def this(sc: SparkContext) = {
this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
}
// 构造方法中没有什么内容 我们 通过scheduler.initialize(backend)
// 查看重要的初始化实现 scheduler.initialize(backend)
def initialize(backend: SchedulerBackend) {
// 把 TaskSchedulerImpl 讲 SchedulerBackend 设置成 TaskScheduler 的成员变量
// 维护起来
this.backend = backend
schedulableBuilder = {
schedulingMode match {
// 配置文件中 默认调度为FIFO
case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf)
case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
Yarn 模式
// yarn 模式
case masterUrl => val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e)
}
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
//获取到所有实现了ExternalClusterManager接口的实例 再过滤url不匹配的
val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
if (serviceLoaders.size > 1) {
throw new SparkException(s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
}
serviceLoaders.headOption
}
// master 为 yarn 所以 cm就为 YarnClusterManager
// 调用了
createTaskScheduler
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
case "cluster" => new YarnClusterScheduler(sc)
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}
//我们可以看出来 YarnClusterScheduler 是TaskScheduler的一个子类
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc)
private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc)
// 初始化方法
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
创建DAGScheduler
_dagScheduler = new DAGScheduler(this)
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
this(sc, taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env)
}
//最后调用构造方法
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]
// Stages that must be resubmitted due to fetch failures
private[scheduler] val failedStages = new HashSet[Stage]
private[scheduler] val activeJobs = new HashSet[ActiveJob]
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
private val executorFailureEpoch = new HashMap[String, Long]
....... //初始化了一堆需要的数据结构 和组件 不太重要就省略
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
// 将DAGScheduler 设置为TaskSchederler的成员变量 和Backend一样
taskScheduler.setDAGScheduler(this)
我们来看看这个DAGSchedulerEventProcessLoop
1: 重写了父类的onReceive方法
2:父类初始化的时候就会创建并启动一个线程 从队列中获取到事件event 并调用onReceive
3:然后就会调用doonReceive 根据不同的事件类型 来进行不同的处理
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent](
"dag-scheduler-event-loop") with Logging {
private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler
.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler
.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) => dagScheduler.handleStageCancellation(stageId, reason)
case JobCancelled(jobId, reason) => dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled => dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) => val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)
case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message)
case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo)
case SpeculativeTaskSubmitted(task) => dagScheduler.handleSpeculativeTaskSubmitted(task)
case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages => dagScheduler.resubmitFailedStages()
}
override def onError(e: Throwable): Unit = {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stopInNewThread()
}
override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}
private[spark] object DAGScheduler {
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
val RESUBMIT_TIMEOUT = 200
// Number of consecutive stage attempts allowed before a stage is aborted
val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
}
EventLoop
// 我们 开启一个线程 来直接 获取event 队列中的事件 然后 进行处理
// 后面的任务调教代码中 我们会看到向event队列中添加事件
// private val eventQueue: BlockingQueue[E] = new linkedBlockingDeque[E]()
private[spark] val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
// 拿出事件
val event = eventQueue.take()
try {
onReceive(event)
} catch {
case NonFatal(e) => try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
启动TaskScheduler 调用实现类的start方法
override def start() {
// 这里调用backend的start方法
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
// spark的推测执行
// 当stage中的某个Task特别慢的时候
// 就会将其放到另外一个Excutor中启动Task
// 执行得快 就会将其作为结果 并kill掉最先的那个
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
StandAloneBackend 的start方法做的事
1:创建了一个DriverEndPoint通信终端
2:给自己发送一条消息ReviveOffers
当启动TaskScheduler的时候就会调用内部维护的StandaloneBacken的start方法
当方法启动的时候就会先去执父类的start() 方法 然后就会 开启一个定时任务 把
内存中维护的Excotor队列中不可用的删除掉
然后就会拼接java 命令 然后 在命令中写死了的 启动 org.apache.spark.executor.CoarseGrainedExecutorBackend 类
然后调用 这个类 伴生对象的的main 方法 其中再调用run方法
然后执行
env.rpcEnv
.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
来创建了一个 CoarseGrainedExecutorBackend 这个Excutork工作节点的一种实现类
再节点启动的时候就会发送请求了
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
//TODO: ask 就会到receiveAndReply (ask 问 就会有回答嘛 )
// CoarseGrainedSchedulerBackend 的 receiveAndReply RegisterExecutor
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => // Always receive `true`. Just ignore it
case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
然后接受到请求后 就会往一个HashMap中存数据 维护起来
3: 接受到消息 申请资源 加载Task (之后的Task执行源码再详解)然后我们来看通用父类的start() 方法 CoarseGrainedSchedulerBackend 这里就是我们
方法的调用栈 start()->createDriverEndpointRef()->createDriverEndpoint()->new DriverEndpoint(rpcEnv, properties) 来创建一个Driver终端
在创建Driver的时候就会 创建定时任务 给自己发送 ReviveOffers 消息
然后自己接受到消息后 就会去队列中 过滤掉 死掉的节点
ps:因为这里没task提交 所以也不会去执行Task的加载
然后
// start方法
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
//创建 维护一个 DriverEndPoint 和主节点 Master进行通信
driverEndpoint = createDriverEndpointRef(properties)
}
// 最后就是 new DriverEndpoint(rpcEnv, properties) 然后注册到rpcEnv中去
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging {
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]
protected val addressToExecutorId = new HashMap[RpcAddress, String]
override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
reviveThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None => // Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId")
}
}
case ReviveOffers => makeOffers()
case KillTask(taskId, executorId, interruptThread, reason) => executorDataMap.get(executorId) match {
case Some(executorInfo) => executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread, reason))
case None => // Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
case KillExecutorsOnHost(host) => scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
killExecutors(exec.toSeq, adjustTargetNumExecutors = false, countFailures = false, force = true)
}
case UpdateDelegationTokens(newDelegationTokens) => executorDataMap.values.foreach { ed =>
ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
}
case RemoveExecutor(executorId, reason) => // We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
}
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map
{ case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
// 申请资源
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}



