众所周知,Join的种类丰富:
按照**关联形式(**Join type)划分:
有内关联,外关联,左关联,右关联,半关联,逆关联等,由业务逻辑决定的关联形式决定了Spark任务的运行结果;
按照关联机制(Join Mechanisms)划分:
有NLJ(Nested Loop Join) , SMJ(Sort Merge Join)和HJ(Hash Join),由数据内容决定的实现机制,则决定了Spark任务的运行效率;
关联形式Spark支持的关联形式:
| 关联形式 | Join Type关键字 |
|---|---|
| 内连接 | inner |
| 左外关联 | left / left outer / left_outer |
| 右外关联 | right/ right outer/right_outer |
| 全外关联 | outer/ full / full outer / full_outer |
| 左半关联 | left semi/ left_semi |
| 左逆关联 | left anti / left_anti |
| 交叉关联 | crossJoin |
import spark.implicits._
import org.apache.spark.sql.Dataframe
// 创建员工信息表
val seq = Seq((1, "Mike", 28, "Male"), (2, "Lily", 30, "Female"), (3, "Raymond", 26, "Male"), (5, "Dave", 36, "Male"))
val employees: Dataframe = seq.toDF("id", "name", "age", "gender")
// 创建薪资表
val seq2 = Seq((1, 26000), (2, 30000), (4, 25000), (3, 20000))
val salaries:Dataframe = seq2.toDF("id", "salary")
// 左表
salaries.show
// 右表
employees.show
内连接(Inner join)
内连接是默认关联形式,可以省略写成join. 左右表按照join key连接, 舍弃未匹配的行,仅仅保留左右表中满足关联条件的那些数据记录.
// 内关联
val jointDF: Dataframe = salaries.join(employees, salaries("id") === employees("id"), "inner")
jointDF.show
外连接(Outer join)
val jointDF: Dataframe = salaries.join(employees, salaries("id") === employees("id"), "left")
jointDF.show
外连接的左右指的是不满足join条件的数据来自于哪张表,上述的"left"左外连接,就让第三行数据来自于左表.
半关联(semi join) // 左半关联
val jointDF: Dataframe = salaries.join(employees, salaries("id") === employees("id"), "leftsemi")
jointDF.show
半关联是inner join的一半返回,left semi join返回左表数据, right semi join返回右表数据
逆关联(anti join)// 左逆关联
val jointDF: Dataframe = salaries.join(employees, salaries("id") === employees("id"), "leftanti")
jointDF.show
逆关联返回的是未关联上的行.
关联机制假设我们将join表称为"驱动表",将被join的表称为"基表",基于这两个称呼:
spark的关联机制| join实现机制 | 工作原理 | 适用场景 |
|---|---|---|
| Nested Loop Join | 在驱动表与基表之上,使用嵌套的双层for循环实现关联,效率最低,算法复杂度为O(M*N) | 其他策略不使用的情况 |
| Sort Merge Join | 首先将两张表排序,然后以双指针遍历的方式实现关联,关联阶段的算法复杂度为O(M+N) | 两张表都按照join key排序的情况 |
| Hash Join | 关联过程分为两个阶段 Build + Probe , Build 使用Hash算法生成哈希表O(N),Probe 查表获得值O(M) | 哈希表较小且易生成. |
| 对比项 | Shuffle Join | Broadcast Join |
|---|---|---|
| 实现原理 | * 根据Join Keys 计算哈希值 * 将哈希值按照并行度(parallelism)取模 | 封装一张小表为广播变量,发送到所有Executor. |
| 优点 | 不论数据的体量是大是小、不管内存是否足够,Shuffle Join 在功能上都能成功地完成数据关联的计算 | 通过分发较小数据表,SQL的执行性能高效. |
| 适用场景 | 任何场景都能完成 | 广播表较小 |
| 前提条件 | 无 | 基表需要足够小(小于Excutor内存) |
| 缺点 | shuffle IO 带来的性能瓶颈 | 无 |
3种关联机制跟 2中关联策略的组合,出现了6中join.由于Broadcast SMJ < Broadcast HJ ,去掉毫无用武之处的Broadcast SMJ,余下了以下5种join方式.
| join方式 | 原理 | 适用场景 |
|---|---|---|
| Broadcast HJ | 基表加工成哈希表,广播到所有Executor,内部查表连接 | 基表小,等值join查表快 |
| Broadcast NLJ | 广播小表,Excutor内部用双重循环连接 | 基表小,可以用于不等值join |
| Shuffle SMJ | 因为Shuffle要按照join key排序,所以spark 优先选择SMJ | 基表大 |
| Shuffle HJ | 因为SMJ的原因,Shuffle HJ基本不用 | 同Shuffle SMJ |
| Shuffle NLJ | 两张表都比较大,而且join是非等值join | 其他场景都不适用的情况 |



