Analyzer将Unresolved LogicalPlan 与数据字典( catalog)进行绑定,生成Resolved
LogicalPlan。
Optimizer对Resolved LogicalPlan进行优化,生成Optimized LogicalPlan。
在SQLContext中使用Analyzer与Optimizer的唯一人 口是如下代码中的executePlan。
protected[sq1] def executePlan(plan: LogicalPlan) : this .QueryExecution =
new this . QueryExecution { val logical = plan }
executePlan方法创建QueryExecution的匿名实现类实例,QueryExecution 中通过懒执行的方式使用Analyzer、Optimizer 、SparkPlanner、 prepareForExecution, 见代码:
SQLContext中QueryExecution的实现:
@DeveloperApi
protected abstract class QueryExecution {
def logical: LogicalPlan
lazy val analyzed = ExtractPythonUdfs (analyzer (logical) )
lazy val withCachedData = useCachedData (analyzed)
lazy val optimizedPlan = optimizer (wi thCachedData)
lazy val sparkPlan = {
SparkPlan. currentContext. set (self)
planner (optimi zedPlan) .next ()
}
lazy val executedPlan: SparkPlan = prepareForExecution (sparkPlan)
lazy val toRdd: RDD[Row] = executedPlan,execute ()
protected def stringOrError[A] (f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
def simpleString: String =
s"""== Physical Plan ==
. |${stringOrError (executedPlan) }
""" .stripMargin. trim
override def toString: String =
s"""== Parsed Logical Plan ==
|$ {stringOrError (logica1) }
|== Analyzed Logical Plan ==
I${stringOrError (analyzed) }
|== Optimized Logical Plan ==
|${stringOrError (optimizedPlan) }
|== Physical Plan ==
|${stringOrError (executedPlan) }
|Code Generation: $ {stringOrError (executedPlan. codegenEnabled) }
|== RDD ==
""" .stripMargin.trim
}
toRdd也是懒执行的,真正促使它发生转换的地方是在任务提交阶段执行代码的runJob方法时,首先调用RDD的partitions触发的。我们知道最终会调用SchemaRDD的dependencies 方法。实际是代码中RDD的dependencies方法,那么最终调用SchemaRDD的getDependencies 方法,代码如下:
override protected def getDependencies: Seq [Dependency[. _]] = {
schema // Force reification of the schema so it is available on executors .
List (new oneToOneDependency (queryExecution. toRdd) )
}
调用QueryExecution的toRdd方法最终引发了Analyzer、Optimizer 将- -系列Rule应用到LogicalPlan。
QueryExecution中调用的useCachedData方法实际是与SQLContext同在包org, apache.spark. sql下的CacheManager的useCachedData,用于将LogicalPlan的树段替换为缓存中的,见代码:树段替换就是用TreeNode的transformDown方法来完成的。
CacheManager的useCachedData方法:
private[sq1] def useCachedData (plan: LogicalPlan) : LogicalPlan = {
plan transformDown {
case currentF ragment =>
lookupCachedData (cur rentF ragment )
. map(_ . cachedRepresentation.wi thOutput (cur rentFragment .output) )
.getOrElse (cur rentFragment)
}
}
private[sql] def lookupCachedData (p1an: LogicalPlan): option[CachedData] = readLock {
cachedData. find(cd => plan. sameResult (cd.plan))
}



