栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

山东大学软件工程应用与实践: Spark(二) 代码分析

山东大学软件工程应用与实践: Spark(二) 代码分析

2021SC@SDUSC


目录

1.SparkContext概述

2.创建执行环境SparkEnv


1.SparkContext概述

Spark Driver 用于提交用户应用程序,实际可以看作Spark的客户端。所以了解Spark Driver的初始化可以帮助我们理解用户应用程序在客户端的处理过程。

Spark Driver的初始化始终围绕着SparkContext的初始化。SparkContext可以算得上是所有Spark应用程序的发动机引擎,轿车想要跑起来,发动机首先要启动。SparkContext初始化完成,才能向Spark集群提供任务。比如在平坦的公路上,发动机只需以较低的转速、较低的功率就可以游刃有余;在山区,你可能需要一台能够提供大功率的发动机才能满足你的需求。这些参数都是驾驶员操纵油门、档位等传送给发动机的,而SparkContext的配置参数则由SparkConf负责,SparkConf就是你的操作面板。

SparkConf的构造很简答,主要是通过ConcurrentHashMap来维护各种Spark的配置属性。SparkConf代码结构如下图所示

class SparkConf(loadDefaults:Boolean) extends Cloneable with Logging{
    import SparkConf.
    def this(0 = this(true)
    private val settings = new ConcurrentHashMap[String, String] ()
    if (loadDefaults){
    //加载任何以spark . 开头的系统属性
    for ((key, value) <- Utils.getSystemProperties if key.startsWith ("spark.")) {
        set(key, value)
    }
  }
}

现在开始介绍SparkContext。SparkContext的初始化步骤如下:

  1. 创建Spark执行环境SparkEnv;
  2. 创建RDD清理器metadataCleaner;
  3. 创建并初始化SparkUI;
  4. Hadoop相关配置及Executor环境变量的设置;
  5. 创建任务调度TaskScheduler;
  6. 创建和启动DAGScheduler;
  7. TaskScheduler的启动;
  8. 初始化管理器BlockManager(BlockManager是存储体系的主要组件之一);
  9. 启动测量系统MetricsSystem;
  10. 创建和启动Executor分配管理器ExecutorAllocationManager;
  11. ContextCleaner的创建和启动;
  12. Spark环境更新;
  13. 创建DAGScheduler和BlockManagerSource;
  14. 将SparkContext标记为激活。

SparkContext的主构造器参数为SparkConf,代码如下:

class SparkContext(config: SparkConf) ex七ends Logging with Execu七orAllocationClient { 
private val creationSite: CallSite = Utils.getCallSite() 
    private valprivate val allowMultipleContexts: Boolean = 
        config.getBoolean("spark.driver.allowMultipleContexts", false)                 
    SparkContext.markPartiallyConstructed(this, allowMultipleContexts) 

上面代码中的CallSite存储了线程栈中最靠近栈顶的用户类及最靠近栈底的Scala或者Spark核心类消息。SparkContext默认只有一个实例(由属性spark.driver.allowMultipleContexts来控制),永华需要多个SparkContext实例时,可以将其设置为true),方法markPartiallyConstructed用来确保实例的唯一性,并将当前SparkContext标记为正在构建中。

接下来是对SparkConf进行复制,然后对各种配置信息进行校验,代码如下:

private[spark] val conf = config.clone()
conf.validateSettings()

if (!conf.contains("spark.master")){
    throw new SparkException("A master URL must be set in your configuration")
}
if (!conf.contains("spark.app.name")){
    throw new SparkException("An application name must be set in your configuration")
}

从上面的校验的代码看到必须指定属性spark.master和spark.app.name,否则会抛出异常,结束初始化过程。spark.master用于设置部署模式,spark.app.name用于指定应用程序名称。

2.创建执行环境SparkEnv

SparkEnv是Spark的执行环境对象,其中包含众多与Executor执行相关的对象。由于在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。创建SparkEnv主要使用SparkEnv的createDriverEnv,SparkEnv.createDriverEnv方法有三个参数:conf、isLocal和listenerBus。

val isLocal = (master == "local" || master. startsWith (" local [")) 
private[spark] val listenerBus = new LiveListenerBus 
    conf.set("spark.executor.id","driver")
   
    private [spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenBus)
    SparkEnv.set(env)

上面的代码中的conf是对SparkConf的复制,isLocal标识是否是单机模式,listenBus采用监听器模式维护各类事件的处理。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/630382.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号