栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark - 源码解析-LocalCluster模式启动分析

Spark - 源码解析-LocalCluster模式启动分析

Spark常用的几种模式

  1. Local
  2. StandAlone
  3. Yarn - 
  4. Mesos

 Local模式又分为几种启动方式

  1. local - local单核
  2. LOCAL_N_REGEX(threads) - local[N]/local[*] 设置N个线程或启动与cpu core个数相同的线程
  3. LOCAL_N_FAILURES_REGEX(threads, maxFailures) - 设置N线程数与最大失败次数
  4. LOCAL_CLUSTER_REGEX(numWorkers, coresPerWorker, memoryPerWorker) - local伪分布式

 简单介绍下local-cluster伪分布式模式的启动流程

说明,源码基于spark-3.1.2

通过match case判断传入master的启动类型


case LOCAL_CLUSTER_REGEX(numWorkers, coresPerWorker, memoryPerWorker) =>
  checkResourcesPerTask(coresPerWorker.toInt)
  // Check to make sure memory requested <= memoryPerWorker. Otherwise Spark will just hang.
  val memoryPerWorkerInt = memoryPerWorker.toInt
  if (sc.executorMemory > memoryPerWorkerInt) {
    throw new SparkException(
      "Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(
        memoryPerWorkerInt, sc.executorMemory))
  }

根据指定的worker,core,内存创建new LocalSparkCluster() ,并调用LocalSparkCluster的start()方法

val scheduler = new TaskSchedulerImpl(sc) // 创建TaskScheduler实例
val localCluster = new LocalSparkCluster(
  numWorkers.toInt, coresPerWorker.toInt, memoryPerWorkerInt, sc.conf)
val masterUrls = localCluster.start()
localCluster.start()方法主要启动Master和Worker
val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)
masterWebUIPort = webUiPort
masterRpcEnvs += rpcEnv
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + rpcEnv.address.port
val masters = Array(masterUrl)


for (workerNum <- 1 to numWorkers) {
  val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker,
    memoryPerWorker, masters, null, Some(workerNum), _conf,
    conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE))
  workerRpcEnvs += workerEnv
}

通过Master和Worker的startRpcEnvAndEndpoint方法启动Rpc通讯

def startRpcEnvAndEndpoint(
    host: String,
    port: Int,
    webUiPort: Int,
    conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
  val securityMgr = new SecurityManager(conf)
  val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
  // setupEndpoint,实现Master注册RpcEnv
  val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
    new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
  val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
  (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
def startRpcEnvAndEndpoint(
    host: String,
    port: Int,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterUrls: Array[String],
    workDir: String,
    workerNumber: Option[Int] = None,
    conf: SparkConf = new SparkConf,
    resourceFileOpt: Option[String] = None): RpcEnv = {

  // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
  val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
  val securityMgr = new SecurityManager(conf)
  val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
  val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
  rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
    masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))
  rpcEnv
}

初始化TaskSchedule和Application

val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend) // 初始化scheduler,返回调度池
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
  localCluster.stop()
}
(backend, scheduler) // 最终返回一个(SchedulerBackend, TaskScheduler)类型Tuple

欢迎大佬指正

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/654211.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号