经过了SqlParser. Analyzer. Optimizer 的处理,生成的逻辑执行计划无法被当做一般的Job来处理,为了能够将逻辑执行计划按照其他Job-样对待, 需要将逻辑执行计划转变为物理执行计划了。
物理执行计划SparkPlan
首先使用SparkPlanner,实际使用QueryPlanner的apply方法,见代码:
def apply(plan: LogicalPlan) : Iterator [PhysicalPlan] = {
val iter = strategies.view.flatMap(_ (plan)).toIterator
assert (iter.hasNext,s"No plan for Splan")
iter
}
SparkPlanner中strategies的定义见代码:
def strategies: Seq[Strategy] =
extraStrategies ++ (
CommandStrategy(self) ::
DataSourceStrategy ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
HashJoin ::
InMemoryScans ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil)
每个Strategy都实现了apply 方法。这些Strategy中,最常用的要算BasicOperators了,其实现见代码SparkStrategies中BasicOperators的实现。可以看到它对最常用的SQL关键字都做了处理。每个处理的分支,都会先调用planLater方法,见代码QueryPlanner的planLater方法,planLater 方法给child 节点的LogicalPlan应用SparkPlanner。于是就形成了迭代处理的过程,最终实现将整棵LogicalPlan树使用SparkPlanner来完成转换。
代码SparkStrategies中BasicOperators的实现:
object Basicoperators extends Strategy {
def numPartitions = self.numPartitions
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match{
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct (partial = true, planLater (child))) :: Nil
case logical.Sort (sortExprs, child) if sq1Context.externalSortEnabled =>
execution. ExternalSort (sortExprs,global = true, planLater (child)):: Nil
case logical. Sort (sortExprs,child) =>
execution. Sort (sortExprs, global = true, planLater (child)):: Nil
case logical. SortPartitions (sortExprs,child) =>
execution.Sort (sortExprs, global = false, planLater (child)) :: Nil
case logical. Project (projectList, child) =>
execution. Project (projectList,planLater(child)) ;: Nil
case logical.Filter (condition, child) =>
execution.Filter (condition, planLater(child)) :: Nil
case logical .Aggregate (group, agg, child) =>
execut ion. Aggregate (partial = false, group, agg, planLater (child)):: Nil
case logical .Sample (fraction, withReplacement, seed, child) =>
execution。Sample (fraction, withReplacement, seed, planLater (child)):: Nil
case SparkLogicalPlan (alreadyPlanned) => alreadyPlanned :: Nil
case logical.LocalRelation (output,data) =>
val nPartitions = if (data. isEmpty) 1 else numPartitions
PhysicalRDD(
output,
RDDConversions .productToRowRdd(sparkContext.parallelize(data,
nPartitions),
StructType . fromAttributes (output))) :: Nil
case logical.Limit (IntegerLiteral (limit),child) ->
execution.Limit(limit, planLater (chi1d)) :: Nil
case Unions (unionChildren) =
execution .Union (unionChildren .map (planLater)) :: Nil
case logical.Except(left, right) =)
execution . Except (planLater (left),planLater(right)) :: Nil
case logical. Intersect(left,right) =>
execution. Intersect (planLater(left),planLater (right)) :: Nil
case logical . Generate (generator, join, outer,_ , child) => .
execution . Generate (generator, join = join, outer = outer, planLater
(child)):: Nil
case 1ogical.NoRelation =>
execution.PhysicalRDD(Ni1, singleRowRdd) :: Nil
case logical . Repartition (expressions, child) => .
execution. Exchange (HashPartitioning(expressions, numPartitions),
planLater(child)) :: Nil
case e @ evaluatePython (udf,child,_) =>
BatchPythonevaluation(udf,e. output, planLater (child)) :: Nil
case LogicalRDD (output,rdd) => PhysicalRDD (output,rdd) :: Nil
case_ => Nil
}
}
代码QueryPlanner的planLater方法:
protected def planLater (plan: LogicalPlan) = apply(plan) .next ()
BasicOperators方法实际就是将logical.XXX转换为execution.XXx,将LogicalRDD转换为PhysicalRDD。
prcparcForExccution在执行前做准备工作,它的原理与Analyzer和Opimizer一样,就不详细展开。



