栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

山东大学软件工程应用与实践——Spark(10)代码分析

山东大学软件工程应用与实践——Spark(10)代码分析

2021SC@SDUSC 8.6 Analyzer 与Optimizer的设计与实现


        Analyzer将Unresolved LogicalPlan 与数据字典( catalog)进行绑定,生成Resolved
LogicalPlan。
        Optimizer对Resolved LogicalPlan进行优化,生成Optimized LogicalPlan。
        在SQLContext中使用Analyzer与Optimizer的唯一人 口是如下代码中的executePlan。
        protected[sq1] def executePlan(plan: LogicalPlan) : this .QueryExecution =
                new this . QueryExecution { val logical = plan }
        executePlan方法创建QueryExecution的匿名实现类实例,QueryExecution 中通过懒执行的方式使用Analyzer、Optimizer 、SparkPlanner、 prepareForExecution, 见代码:

SQLContext中QueryExecution的实现:


@DeveloperApi
protected abstract class QueryExecution {
    def logical: LogicalPlan

    lazy val analyzed = ExtractPythonUdfs (analyzer (logical) )
    lazy val withCachedData = useCachedData (analyzed)
    lazy val optimizedPlan = optimizer (wi thCachedData)

    lazy val sparkPlan = {
        SparkPlan. currentContext. set (self)
        planner (optimi zedPlan) .next ()
    }
    lazy val executedPlan: SparkPlan = prepareForExecution (sparkPlan)
    lazy val toRdd: RDD[Row] = executedPlan,execute ()

    protected def stringOrError[A] (f: => A): String =
        try f.toString catch { case e: Throwable => e.toString }

    def simpleString: String =
        s"""== Physical Plan ==
            . |${stringOrError (executedPlan) }
        """ .stripMargin. trim

    override def toString: String =
        s"""== Parsed Logical Plan == 
            |$ {stringOrError (logica1) }
            |== Analyzed Logical Plan ==
            I${stringOrError (analyzed) }
            |== Optimized Logical Plan ==
            |${stringOrError (optimizedPlan) }
            |== Physical Plan ==
            |${stringOrError (executedPlan) }
            |Code Generation: $ {stringOrError (executedPlan. codegenEnabled) }
            |== RDD ==
        """ .stripMargin.trim
}

        toRdd也是懒执行的,真正促使它发生转换的地方是在任务提交阶段执行代码的runJob方法时,首先调用RDD的partitions触发的。我们知道最终会调用SchemaRDD的dependencies 方法。实际是代码中RDD的dependencies方法,那么最终调用SchemaRDD的getDependencies 方法,代码如下:
        override protected def getDependencies: Seq [Dependency[. _]] = {
                schema // Force reification of the schema so it is available on executors .
                List (new oneToOneDependency (queryExecution. toRdd) )
        }
        调用QueryExecution的toRdd方法最终引发了Analyzer、Optimizer 将- -系列Rule应用到LogicalPlan。
        QueryExecution中调用的useCachedData方法实际是与SQLContext同在包org, apache.spark. sql下的CacheManager的useCachedData,用于将LogicalPlan的树段替换为缓存中的,见代码:树段替换就是用TreeNode的transformDown方法来完成的。

CacheManager的useCachedData方法:


private[sq1] def useCachedData (plan: LogicalPlan) : LogicalPlan = {
    plan transformDown {
        case currentF ragment =>
            lookupCachedData (cur rentF ragment )
                . map(_ . cachedRepresentation.wi thOutput (cur rentFragment .output) )
                .getOrElse (cur rentFragment)
    }
}

private[sql] def lookupCachedData (p1an: LogicalPlan): option[CachedData] = readLock {
    cachedData. find(cd => plan. sameResult (cd.plan)) 
}




 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/618076.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号