对于spark的不同的join操作来说,有些谓词是能下推,是有谓词是不能下推的,今天我们实地操作一番,从现象和源码级别分析一下,这到底是怎么回事。
版本&环境spark 3.2.0
macbook pro
我们解释一下几个名词:
- Preserved Row table (留存表)
在join操作中返回所有行的表 - Null Supplying table (补空表)
在join操作中,对于不匹配的行,补bull的表 - During Join predicate (join中谓词)
在join中on 语句中的谓词,例如:在 R1 join R2 on R1.x = 5,R1.x = 5 我们称之为 join中谓词 - After Join predicate (join后谓词)
在join中,位于where中的谓词
outer join谓词下推规则:
| 留存表 | 补空表 | |
|---|---|---|
| join中谓词 | 不下推 | 下推 |
| join后谓词 | 下推 | 不下推 |
根据当前spark版本,我们把join类型分为以下多种类型,也就是我们进行验证的各种join类型
- inner
- outer | full | fullouter
- leftouter | left
- rightouter | right
- leftsemi | semi
- leftanti | anti
- cross
因为 fullouter join和inner join以及leftsemi/anti join 在join中谓词和join后谓词是没有区别的,所以我们不探讨
ross join 没有on操作这么一说,所以我们也不探讨
注意:理论只是理论,在实际应用中会做一些优化,这和理论是有区别
3. sql解析对于spark来说,任何一个sql的解析都会经过以下几个阶段:
Unresolved Logical Plan -> Analyzer Logical Plan -> Optimzer Logical Plan -> SparkPlan -> ExecutedPlan分析
- leftouter-join中谓词-留存表
运行
val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 and intNum1 > 1")
ResolveRelations规则只是用catalog元数据解析出parquet表,如下:
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
'Project [*] 'Project [*]
+- 'Join LeftOuter, (('intNum1 = 'intNum2) AND ('intNum1 > 1)) +- 'Join LeftOuter, (('intNum1 = 'intNum2) AND ('intNum1 > 1))
! :- 'UnresolvedRelation [tbl1], [], false :- SubqueryAlias tbl1
! +- 'UnresolvedRelation [tbl2], [], false : +- View (`tbl1`, [decNum1#33,intNum1#34])
! : +- Relation [decNum1#33,intNum1#34] parquet
! +- SubqueryAlias tbl2
! +- View (`tbl2`, [decNum2#37,intNum2#38])
! +- Relation [decNum2#37,intNum2#38] parquet
PushDownPredicates规则有所变化,只是变化了一下on中两个条件的位置,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates === !Join LeftOuter, ((intNum1#34 = intNum2#38) AND (intNum1#34 > 1)) Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38)) :- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet +- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
InferFiltersFromConstraints做了谓词下推,但是下推的是补空表,而不是保留表,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38)) Join LeftOuter, ((intNum1#34 > 1) AND (intNum1#34 = intNum2#38)) :- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet !+- Relation [decNum2#37,intNum2#38] parquet +- Filter ((intNum2#38 > 1) AND isnotnull(intNum2#38)) ! +- Relation [decNum2#37,intNum2#38] parquet
其实从源码上我们也可以看到其实现,如下:
case LeftOuter | LeftAnti =>
val allConstraints = getAllConstraints(left, right, conditionOpt)
val newRight = inferNewFilter(right, allConstraints)
join.copy(right = newRight)
结果:
|decNum1 |intNum1|decNum2 |intNum2| +---------------------+-------+-----------------------+-------+ |11.000000000000000000|1 |null |null | |22.000000000000000000|2 |2222.000000000000000000|2 | |33.000000000000000000|3 |null |null | +---------------------+-------+-----------------------+-------+
对应的物理计划:
- leftouter-join中谓词-补空表
运行
val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 and intNum2 > 1")
这个时候PushDownPredicates规则又有所变化,直接把谓词下推下去了,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates === !Join LeftOuter, ((intNum1#34 = intNum2#38) AND (intNum2#38 > 1)) Join LeftOuter, (intNum1#34 = intNum2#38) :- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet !+- Relation [decNum2#37,intNum2#38] parquet +- Filter (intNum2#38 > 1) ! +- Relation [decNum2#37,intNum2#38] parquet
源码实现部分参考如下:
case LeftOuter | LeftAnti | ExistenceJoin(_) =>
// push down the right side only join filter for right sub query
val newLeft = left
val newRight = rightJoinConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
Join(newLeft, newRight, joinType, newJoinCond, hint)
InferFiltersFromConstraints的规则,也就只是加了isnotnull(intNum2#38)判断,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
Join LeftOuter, (intNum1#34 = intNum2#38) Join LeftOuter, (intNum1#34 = intNum2#38)
:- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet
!+- Filter (intNum2#38 > 1) +- Filter (isnotnull(intNum2#38) AND (intNum2#38 > 1))
+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
结果:
+---------------------+-------+-----------------------+-------+ |decNum1 |intNum1|decNum2 |intNum2| +---------------------+-------+-----------------------+-------+ |11.000000000000000000|1 |null |null | |22.000000000000000000|2 |2222.000000000000000000|2 | |33.000000000000000000|3 |null |null | +---------------------+-------+-----------------------+-------+
对应的物理计划:
- leftouter-join后谓词-留存表
运行
val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 where intNum1 > 1")
PushDownPredicates规则把filter进行了下推,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Filter (intNum1#34 > 1) Join LeftOuter, (intNum1#34 = intNum2#38)
!+- Join LeftOuter, (intNum1#34 = intNum2#38) :- Filter (intNum1#34 > 1)
! :- Relation [decNum1#33,intNum1#34] parquet : +- Relation [decNum1#33,intNum1#34] parquet
! +- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
InferFiltersFromConstraints规则把谓词进行了推导,补空表也进行了下推,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
Join LeftOuter, (intNum1#34 = intNum2#38) Join LeftOuter, (intNum1#34 = intNum2#38)
!:- Filter (intNum1#34 > 1) :- Filter (isnotnull(intNum1#34) AND (intNum1#34 > 1))
: +- Relation [decNum1#33,intNum1#34] parquet : +- Relation [decNum1#33,intNum1#34] parquet
!+- Relation [decNum2#37,intNum2#38] parquet +- Filter ((intNum2#38 > 1) AND isnotnull(intNum2#38))
! +- Relation [decNum2#37,intNum2#38] parquet
运行结果如下:
+---------------------+-------+-----------------------+-------+ |decNum1 |intNum1|decNum2 |intNum2| +---------------------+-------+-----------------------+-------+ |22.000000000000000000|2 |2222.000000000000000000|2 | |33.000000000000000000|3 |null |null | +---------------------+-------+-----------------------+-------+
对应的物理计划:
- leftouter-join后谓词-补空表
运行:
val dfResult = spark.sql("select * from tbl1 left outer join tbl2 on intNum1 == intNum2 where intNum2 > 1")
但是多了一条EliminateOuterJoin规则,这个规则会把left join操作,变换为inner join,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin ===
Filter (intNum2#38 > 1) Filter (intNum2#38 > 1)
!+- Join LeftOuter, (intNum1#34 = intNum2#38) +- Join Inner, (intNum1#34 = intNum2#38)
:- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet
+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
PushDownPredicates规则和InferFiltersFromConstraints分析和leftouter-join后谓词-留存表 一样,只不过join类型变成了inner join(由于EliminateOuterJoin变换的),也是会进行下推.
结果如下:
+---------------------+-------+-----------------------+-------+ |decNum1 |intNum1|decNum2 |intNum2| +---------------------+-------+-----------------------+-------+ |22.000000000000000000|2 |2222.000000000000000000|2 | +---------------------+-------+-----------------------+-------+
对应的物理计划:
- rightouter join中谓词-留存表
运行:
val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 and intNum2 > 1")
PushDownPredicates规则只是把join条件的位置进行了变化,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates === !Join RightOuter, ((intNum1#34 = intNum2#38) AND (intNum2#38 > 1)) Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38)) :- Relation [decNum1#33,intNum1#34] parquet :- Relation [decNum1#33,intNum1#34] parquet +- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
而InferFiltersFromConstraints会衍生出下推,如:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints === Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38)) Join RightOuter, ((intNum2#38 > 1) AND (intNum1#34 = intNum2#38)) !:- Relation [decNum1#33,intNum1#34] parquet :- Filter ((intNum1#34 > 1) AND isnotnull(intNum1#34)) !+- Relation [decNum2#37,intNum2#38] parquet : +- Relation [decNum1#33,intNum1#34] parquet ! +- Relation [decNum2#37,intNum2#38] parquet
结果:
+---------------------+-------+-----------------------+-------+ |decNum1 |intNum1|decNum2 |intNum2| +---------------------+-------+-----------------------+-------+ |null |null |1111.000000000000000000|1 | |22.000000000000000000|2 |2222.000000000000000000|2 | |null |null |4444.000000000000000000|4 | +---------------------+-------+-----------------------+-------+
对应的物理计划:
- rightouter join中谓词-补空表
运行:
val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 and intNum1 > 1")
PushDownPredicates规则会把补空表进行下推,如:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Join RightOuter, ((intNum1#34 = intNum2#38) AND (intNum1#34 > 1)) Join RightOuter, (intNum1#34 = intNum2#38)
!:- Relation [decNum1#33,intNum1#34] parquet :- Filter (intNum1#34 > 1)
!+- Relation [decNum2#37,intNum2#38] parquet : +- Relation [decNum1#33,intNum1#34] parquet
! +- Relation [decNum2#37,intNum2#38] parquet
InferFiltersFromConstraints规则,会添加isnull的判断:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
Join RightOuter, (intNum1#34 = intNum2#38) Join RightOuter, (intNum1#34 = intNum2#38)
!:- Filter (intNum1#34 > 1) :- Filter (isnotnull(intNum1#34) AND (intNum1#34 > 1))
: +- Relation [decNum1#33,intNum1#34] parquet : +- Relation [decNum1#33,intNum1#34] parquet
+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
结果:
+---------------------+-------+-----------------------+-------+ |decNum1 |intNum1|decNum2 |intNum2| +---------------------+-------+-----------------------+-------+ |null |null |1111.000000000000000000|1 | |22.000000000000000000|2 |2222.000000000000000000|2 | |null |null |4444.000000000000000000|4 | +---------------------+-------+-----------------------+-------+
对应的物理计划:
- rightouter join后谓词-留存表
运行:
val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 where intNum2 > 1")
PushDownPredicates规则会把留存表的谓词下推到join之后,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
!Filter (intNum2#38 > 1) Join RightOuter, (intNum1#34 = intNum2#38)
!+- Join RightOuter, (intNum1#34 = intNum2#38) :- Relation [decNum1#33,intNum1#34] parquet
! :- Relation [decNum1#33,intNum1#34] parquet +- Filter (intNum2#38 > 1)
+- Relation [decNum2#37,intNum2#38] parquet +- Relation [decNum2#37,intNum2#38] parquet
InferFiltersFromConstraints则会进行衍生,如下:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
Join RightOuter, (intNum1#34 = intNum2#38) Join RightOuter, (intNum1#34 = intNum2#38)
!:- Relation [decNum1#33,intNum1#34] parquet :- Filter ((intNum1#34 > 1) AND isnotnull(intNum1#34))
!+- Filter (intNum2#38 > 1) : +- Relation [decNum1#33,intNum1#34] parquet
! +- Relation [decNum2#37,intNum2#38] parquet +- Filter (isnotnull(intNum2#38) AND (intNum2#38 > 1))
! +- Relation [decNum2#37,intNum2#38] parquet
结果:
+---------------------+-------+-----------------------+-------+ |decNum1 |intNum1|decNum2 |intNum2| +---------------------+-------+-----------------------+-------+ |22.000000000000000000|2 |2222.000000000000000000|2 | |null |null |4444.000000000000000000|4 | +---------------------+-------+-----------------------+-------+
对应的物理计划:
- rightouter join后谓词-补空表
运行:
val dfResult = spark.sql("select * from tbl1 right outer join tbl2 on intNum1 == intNum2 where intNum1 > 1")
EliminateOuterJoin的规则和PushDownPredicates以及InferFiltersFromConstraints的分析和 leftouter-join后谓词-补空表一样,此处不再累赘
结果:
+---------------------+-------+-----------------------+-------+ |decNum1 |intNum1|decNum2 |intNum2| +---------------------+-------+-----------------------+-------+ |22.000000000000000000|2 |2222.000000000000000000|2 | +---------------------+-------+-----------------------+-------+
对应的物理计划:
| left join | 留存表 | 补空表 |
|---|---|---|
| join中谓词 | 不下推 | 下推 |
| join后谓词 | 下推 | 下推 |
| right join | 留存表 | 补空表 |
|---|---|---|
| join中谓词 | 不下推 | 下推 |
| join后谓词 | 下推 | 下推 |
合并一下就是
| outer join | 留存表 | 补空表 |
|---|---|---|
| join中谓词 | 不下推 | 下推 |
| join后谓词 | 下推 | 下推 |
对比之下,其实 理论上说的 join后谓词 补空表不下推和实践中得出来的下推还是有区别(不同点用黑体进行了区分),也就印证了那句话,实践中会对理论做优化,也和CAP原理类似。
其实这区别的来源是spark增加了EliminateOuterJoin规则



