栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark SQL 支持哪些类型的 JOIN ?

Spark SQL 支持哪些类型的 JOIN ?

前言

本文隶属于专栏《大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见大数据技术体系


7 种 JOIN

Spark 3.3.0 版本支持 7 种类型的 JOIN。

    INNER JOINFULL OUTER JOINLEFT OUTER JOINRIGHT OUTER JOINLEFT SEMI JOINLEFT ANTI JOINCROSS JOIN

pyspark 支持 pandas 的merge_asof,即支持 ASOF JOIN,详情请参考我的这篇博客——ASOF JOIN 是什么?pandas 的 merge_asof() 如何使用?


区别 INNER JOIN

INNER JOIN 一般被译作内连接。

内连接查询能将左表和右表中能关联起来的数据连接后返回。

例如:

SELECt A.PK AS A_PK, B.PK AS B_PK,
       A.Value AS A_Value, B.Value AS B_Value
FROM Table_A A
INNER JOIN Table_B B
ON A.PK = B.PK;

查询结果:

+------+------+---------+---------+
| A_PK | B_PK | A_Value | B_Value |
+------+------+---------+---------+
|    1 |    1 | both ab | both ab |
+------+------+---------+---------+
1 row in set (0.00 sec)

FULL OUTER JOIN

FULL OUTER JOIN 一般被译作外连接、全连接,实际查询语句中还可以写作 FULL JOIN。

外连接查询能返回左右表里的所有记录,其中左右表里能关联起来的记录被连接后返回。

例如:

SELECt A.PK AS A_PK, B.PK AS B_PK,
       A.Value AS A_Value, B.Value AS B_Value
FROM Table_A A
FULL OUTER JOIN Table_B B
ON A.PK = B.PK;

查询结果:

+------+---------+------+---------+
| PK   | Value   | PK   | Value   |
+------+---------+------+---------+
|    1 | both ab |    1 | both ba |
|    2 | only a  | NULL | NULL    |
| NULL | NULL    |    3 | only b  |
+------+---------+------+---------+
3 rows in set (0.00 sec)

LEFT OUTER JOIN

LEFT OUTER JOIN 一般被译作左外连接,也写作 LEFT JOIN(左连接)。

左连接查询会返回左表中所有记录,不管右表中有没有关联的数据。

在右表中找到的关联数据列也会被一起返回。

例如:

SELECt A.PK AS A_PK, B.PK AS B_PK,
       A.Value AS A_Value, B.Value AS B_Value
FROM Table_A A
LEFT JOIN Table_B B
ON A.PK = B.PK;

查询结果:

+------+------+---------+---------+
| A_PK | B_PK | A_Value | B_Value |
+------+------+---------+---------+
|    1 |    1 | both ab | both ba |
|    2 | NULL | only a  | NULL    |
+------+------+---------+---------+
2 rows in set (0.00 sec)

RIGHT OUTER JOIN

RIGHT OUTER JOIN 一般被译作右外连接,也写作 RIGHT JOIN(右连接)。

右连接查询会返回右表中所有记录,不管左表中有没有关联的数据。

在左表中找到的关联数据列也会被一起返回。

例如:

SELECt A.PK AS A_PK, B.PK AS B_PK,
       A.Value AS A_Value, B.Value AS B_Value
FROM Table_A A
RIGHT JOIN Table_B B
ON A.PK = B.PK;

查询结果:

+------+------+---------+---------+
| A_PK | B_PK | A_Value | B_Value |
+------+------+---------+---------+
|    1 |    1 | both ab | both ba |
| NULL |    3 | NULL    | only b  |
+------+------+---------+---------+
2 rows in set (0.00 sec)

LEFT SEMI JOIN

LEFT SEMI JOIN (左半连接)是 IN/EXISTS 子查询的一种更高效的实现。

例如:

SELECt A.PK, A.Value
FROM A
WHERe A.PK IN (SELECt B.PK FROM B)

可以改写成:

SELECt A.PK, A.Value
FROM A LEFT SEMI JOIN B
ON (A.PK = B.PK)
特点
    LEFT SEMI JOIN 的限制是, JOIN 子句中右边的表只能在 ON 子句中设置过滤条件,在 WHERe 子句、SELECt 子句或其他地方过滤都不行。LEFT SEMI JOIN 只传递表的 join key 给 map 阶段,因此LEFT SEMI JOIN 中最后 SELECT 的结果只许出现左表。因为 LEFT SEMI JOIN 是 in(keySet) 的关系,遇到右表重复记录,左表会跳过,而 JOIN 则会一直遍历。这就导致右表有重复值的情况下 LEFT SEMI JOIN 只产生一条,JOIN 会产生多条,也会导致LEFT SEMI JOIN的性能更高。

LEFT ANTI JOIN

LEFT ANTI JOIN 一般译作左反连接。

A LEFT ANTI JOIN B 的功能是在查询过程中,剔除 A 表中和 B 表有交集的部分

LEFT SEMI JOIN只会返回左表中能匹配到右表的数据,LEFT ANTI JOIN只会返回左表中不能匹配到右表的数据。

其实INNER JOIN 和 LEFT SEMI/ANTI JOIN 的效果功能,都可以使用LEFT OUTER JOIN 最后在WHERe中加以限制的方式进行实现,但是这样的做法会导致查询效率变低。


CROSS JOIN

如果不带WHERe条件子句,它将会返回被连接的两个表的笛卡尔积,返回结果的行数等于两个表行数的乘积;

比如,下面 A、B、C 执行结果相同,但是效率不一样:

A:

SELECT a.*, b.* FROM t1 a, t2 b WHERe a.id=b.id

B:

SELECt * FROM t1 a CROSS JOIN t2 b WHERe a.id=b.id

注:CROSS JOIN后加条件只能用WHERe,不能用ON

C:

SELECt * FROM t2 a INNER JOIN t2 b ON a.id=b.id

一般不建议使用 A 和 B,因为如果有WHERe子句的话,往往会先生成两个表行数乘积的行的数据表然后才根据WHERe条件从中选择。

因此,如果两个需要求交际的表太大,将会非常非常慢,不建议使用。


joinType

每种类型对应的joinType字符串为:

类型joinType
Innerinner
FullOuterouter/full/fullouter/full_outer
LeftOuterleftouter/left/left_outer
RightOuterrightouter/right/right_outer
LeftSemileftsemi/left_semi/semi
LeftAntileftanti/left_anti/anti
Crosscross

我们通过Dataset中的 API 可以选择不同的joinType:

def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): Dataframe
实践 源码下载

spark-examples 代码已开源,本项目致力于提供最具实践性的 Apache Spark 代码开发学习指南。

点击链接前往 github 下载源码:spark-examples


package com.shockang.study.spark.sql.join

import com.shockang.study.spark.util.Utils.formatPrint
import org.apache.spark.sql.SparkSession


object JoinExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("JoinExample").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    val df1 = spark.createDataframe(List(
      ("Shockang", "程序员"),
      ("Tom", "猫"),
      ("Jerry", "老鼠")
    )).toDF("name", "occupation").cache()

    df1.createTempView("t1")

    val df2 = spark.createDataframe(List(
      ("Alice", 12),
      ("Tom", 10),
      ("Jerry", 11)
    )).toDF("name", "age").cache()

    df2.createTempView("t2")

    // INNER JOIN
    formatPrint("""df1.join(df2, Seq("name"), joinType = "inner").show()""")
    df1.join(df2, Seq("name"), joinType = "inner").show()

    formatPrint("""spark.sql("SELECt * FROM t1 INNER JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 INNER JOIN t2 ON t1.name=t2.name").show()
    formatPrint("""spark.sql("SELECt * FROM t1 JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 JOIN t2 ON t1.name=t2.name").show()

    // FULL OUTER JOIN
    formatPrint("""df1.join(df2, Seq("name"), joinType = "outer").show()""")
    df1.join(df2, Seq("name"), joinType = "outer").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "full").show()""")
    df1.join(df2, Seq("name"), joinType = "full").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "fullouter").show()""")
    df1.join(df2, Seq("name"), joinType = "fullouter").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "full_outer").show()""")
    df1.join(df2, Seq("name"), joinType = "full_outer").show()

    formatPrint("""spark.sql("SELECt * FROM t1 FULL OUTER JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 FULL OUTER JOIN t2 ON t1.name=t2.name").show()
    formatPrint("""spark.sql("SELECt * FROM t1 FULL JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 FULL JOIN t2 ON t1.name=t2.name").show()

    // LEFT OUTER JOIN
    formatPrint("""df1.join(df2, Seq("name"), joinType = "leftouter").show()""")
    df1.join(df2, Seq("name"), joinType = "leftouter").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "left").show()""")
    df1.join(df2, Seq("name"), joinType = "left").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "left_outer").show()""")
    df1.join(df2, Seq("name"), joinType = "left_outer").show()

    formatPrint("""spark.sql("SELECt * FROM t1 LEFT OUTER JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 LEFT OUTER JOIN t2 ON t1.name=t2.name").show()
    formatPrint("""spark.sql("SELECt * FROM t1 LEFT JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 LEFT JOIN t2 ON t1.name=t2.name").show()

    // RIGHT OUTER JOIN
    formatPrint("""df1.join(df2, Seq("name"), joinType = "rightouter").show()""")
    df1.join(df2, Seq("name"), joinType = "rightouter").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "right").show()""")
    df1.join(df2, Seq("name"), joinType = "right").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "right_outer").show()""")
    df1.join(df2, Seq("name"), joinType = "right_outer").show()

    formatPrint("""spark.sql("SELECt * FROM t1 RIGHT OUTER JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 RIGHT OUTER JOIN t2 ON t1.name=t2.name").show()
    formatPrint("""spark.sql("SELECt * FROM t1 RIGHT JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 RIGHT JOIN t2 ON t1.name=t2.name").show()

    // LEFT SEMI JOIN
    formatPrint("""df1.join(df2, Seq("name"), joinType = "leftsemi").show()""")
    df1.join(df2, Seq("name"), joinType = "leftsemi").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "left_semi").show()""")
    df1.join(df2, Seq("name"), joinType = "left_semi").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "semi").show()""")
    df1.join(df2, Seq("name"), joinType = "semi").show()

    formatPrint("""spark.sql("SELECt * FROM t1 LEFT SEMI JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 LEFT SEMI JOIN t2 ON t1.name=t2.name").show()
    formatPrint("""spark.sql("SELECt * FROM t1 SEMI JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 SEMI JOIN t2 ON t1.name=t2.name").show()

    // LEFT ANTI JOIN
    formatPrint("""df1.join(df2, Seq("name"), joinType = "leftanti").show()""")
    df1.join(df2, Seq("name"), joinType = "leftanti").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "left_anti").show()""")
    df1.join(df2, Seq("name"), joinType = "left_anti").show()
    formatPrint("""df1.join(df2, Seq("name"), joinType = "anti").show()""")
    df1.join(df2, Seq("name"), joinType = "anti").show()

    formatPrint("""spark.sql("SELECt * FROM t1 LEFT ANTI JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 LEFT ANTI JOIN t2 ON t1.name=t2.name").show()
    formatPrint("""spark.sql("SELECt * FROM t1 ANTI JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 ANTI JOIN t2 ON t1.name=t2.name").show()

    // CROSS JOIN
    formatPrint("""df1.join(df2, Seq("name"), joinType = "cross").show()""")
    df1.join(df2, Seq("name"), joinType = "cross").show()

    formatPrint("""spark.sql("SELECt * FROM t1 CROSS JOIN t2 ON t1.name=t2.name").show()""")
    spark.sql("SELECt * FROM t1 CROSS JOIN t2 ON t1.name=t2.name").show()

    spark.stop()
  }
}
控制台打印
========== df1.join(df2, Seq("name"), joinType = "inner").show() ==========
+-----+----------+---+
| name|occupation|age|
+-----+----------+---+
|  Tom|        猫| 10|
|Jerry|      老鼠| 11|
+-----+----------+---+

========== spark.sql("SELECt * FROM t1 INNER JOIN t2 ON t1.name=t2.name").show() ==========
+-----+----------+-----+---+
| name|occupation| name|age|
+-----+----------+-----+---+
|  Tom|        猫|  Tom| 10|
|Jerry|      老鼠|Jerry| 11|
+-----+----------+-----+---+

========== spark.sql("SELECt * FROM t1 JOIN t2 ON t1.name=t2.name").show() ==========
+-----+----------+-----+---+
| name|occupation| name|age|
+-----+----------+-----+---+
|  Tom|        猫|  Tom| 10|
|Jerry|      老鼠|Jerry| 11|
+-----+----------+-----+---+

========== df1.join(df2, Seq("name"), joinType = "outer").show() ==========
+--------+----------+----+
|    name|occupation| age|
+--------+----------+----+
|   Alice|      null|  12|
|   Jerry|      老鼠|  11|
|Shockang|    程序员|null|
|     Tom|        猫|  10|
+--------+----------+----+

========== df1.join(df2, Seq("name"), joinType = "full").show() ==========
+--------+----------+----+
|    name|occupation| age|
+--------+----------+----+
|   Alice|      null|  12|
|   Jerry|      老鼠|  11|
|Shockang|    程序员|null|
|     Tom|        猫|  10|
+--------+----------+----+

========== df1.join(df2, Seq("name"), joinType = "fullouter").show() ==========
+--------+----------+----+
|    name|occupation| age|
+--------+----------+----+
|   Alice|      null|  12|
|   Jerry|      老鼠|  11|
|Shockang|    程序员|null|
|     Tom|        猫|  10|
+--------+----------+----+

========== df1.join(df2, Seq("name"), joinType = "full_outer").show() ==========
+--------+----------+----+
|    name|occupation| age|
+--------+----------+----+
|   Alice|      null|  12|
|   Jerry|      老鼠|  11|
|Shockang|    程序员|null|
|     Tom|        猫|  10|
+--------+----------+----+

========== spark.sql("SELECt * FROM t1 FULL OUTER JOIN t2 ON t1.name=t2.name").show() ==========
+--------+----------+-----+----+
|    name|occupation| name| age|
+--------+----------+-----+----+
|    null|      null|Alice|  12|
|   Jerry|      老鼠|Jerry|  11|
|Shockang|    程序员| null|null|
|     Tom|        猫|  Tom|  10|
+--------+----------+-----+----+

========== spark.sql("SELECt * FROM t1 FULL JOIN t2 ON t1.name=t2.name").show() ==========
+--------+----------+-----+----+
|    name|occupation| name| age|
+--------+----------+-----+----+
|    null|      null|Alice|  12|
|   Jerry|      老鼠|Jerry|  11|
|Shockang|    程序员| null|null|
|     Tom|        猫|  Tom|  10|
+--------+----------+-----+----+

========== df1.join(df2, Seq("name"), joinType = "leftouter").show() ==========
+--------+----------+----+
|    name|occupation| age|
+--------+----------+----+
|Shockang|    程序员|null|
|     Tom|        猫|  10|
|   Jerry|      老鼠|  11|
+--------+----------+----+

========== df1.join(df2, Seq("name"), joinType = "left").show() ==========
+--------+----------+----+
|    name|occupation| age|
+--------+----------+----+
|Shockang|    程序员|null|
|     Tom|        猫|  10|
|   Jerry|      老鼠|  11|
+--------+----------+----+

========== df1.join(df2, Seq("name"), joinType = "left_outer").show() ==========
+--------+----------+----+
|    name|occupation| age|
+--------+----------+----+
|Shockang|    程序员|null|
|     Tom|        猫|  10|
|   Jerry|      老鼠|  11|
+--------+----------+----+

========== spark.sql("SELECt * FROM t1 LEFT OUTER JOIN t2 ON t1.name=t2.name").show() ==========
+--------+----------+-----+----+
|    name|occupation| name| age|
+--------+----------+-----+----+
|Shockang|    程序员| null|null|
|     Tom|        猫|  Tom|  10|
|   Jerry|      老鼠|Jerry|  11|
+--------+----------+-----+----+

========== spark.sql("SELECt * FROM t1 LEFT JOIN t2 ON t1.name=t2.name").show() ==========
+--------+----------+-----+----+
|    name|occupation| name| age|
+--------+----------+-----+----+
|Shockang|    程序员| null|null|
|     Tom|        猫|  Tom|  10|
|   Jerry|      老鼠|Jerry|  11|
+--------+----------+-----+----+

========== df1.join(df2, Seq("name"), joinType = "rightouter").show() ==========
+-----+----------+---+
| name|occupation|age|
+-----+----------+---+
|Alice|      null| 12|
|  Tom|        猫| 10|
|Jerry|      老鼠| 11|
+-----+----------+---+

========== df1.join(df2, Seq("name"), joinType = "right").show() ==========
+-----+----------+---+
| name|occupation|age|
+-----+----------+---+
|Alice|      null| 12|
|  Tom|        猫| 10|
|Jerry|      老鼠| 11|
+-----+----------+---+

========== df1.join(df2, Seq("name"), joinType = "right_outer").show() ==========
+-----+----------+---+
| name|occupation|age|
+-----+----------+---+
|Alice|      null| 12|
|  Tom|        猫| 10|
|Jerry|      老鼠| 11|
+-----+----------+---+

========== spark.sql("SELECt * FROM t1 RIGHT OUTER JOIN t2 ON t1.name=t2.name").show() ==========
+-----+----------+-----+---+
| name|occupation| name|age|
+-----+----------+-----+---+
| null|      null|Alice| 12|
|  Tom|        猫|  Tom| 10|
|Jerry|      老鼠|Jerry| 11|
+-----+----------+-----+---+

========== spark.sql("SELECt * FROM t1 RIGHT JOIN t2 ON t1.name=t2.name").show() ==========
+-----+----------+-----+---+
| name|occupation| name|age|
+-----+----------+-----+---+
| null|      null|Alice| 12|
|  Tom|        猫|  Tom| 10|
|Jerry|      老鼠|Jerry| 11|
+-----+----------+-----+---+

========== df1.join(df2, Seq("name"), joinType = "leftsemi").show() ==========
+-----+----------+
| name|occupation|
+-----+----------+
|  Tom|        猫|
|Jerry|      老鼠|
+-----+----------+

========== df1.join(df2, Seq("name"), joinType = "left_semi").show() ==========
+-----+----------+
| name|occupation|
+-----+----------+
|  Tom|        猫|
|Jerry|      老鼠|
+-----+----------+

========== df1.join(df2, Seq("name"), joinType = "semi").show() ==========
+-----+----------+
| name|occupation|
+-----+----------+
|  Tom|        猫|
|Jerry|      老鼠|
+-----+----------+

========== spark.sql("SELECt * FROM t1 LEFT SEMI JOIN t2 ON t1.name=t2.name").show() ==========
+-----+----------+
| name|occupation|
+-----+----------+
|  Tom|        猫|
|Jerry|      老鼠|
+-----+----------+

========== spark.sql("SELECt * FROM t1 SEMI JOIN t2 ON t1.name=t2.name").show() ==========
+-----+----------+
| name|occupation|
+-----+----------+
|  Tom|        猫|
|Jerry|      老鼠|
+-----+----------+

========== df1.join(df2, Seq("name"), joinType = "leftanti").show() ==========
+--------+----------+
|    name|occupation|
+--------+----------+
|Shockang|    程序员|
+--------+----------+

========== df1.join(df2, Seq("name"), joinType = "left_anti").show() ==========
+--------+----------+
|    name|occupation|
+--------+----------+
|Shockang|    程序员|
+--------+----------+

========== df1.join(df2, Seq("name"), joinType = "anti").show() ==========
+--------+----------+
|    name|occupation|
+--------+----------+
|Shockang|    程序员|
+--------+----------+

========== spark.sql("SELECt * FROM t1 LEFT ANTI JOIN t2 ON t1.name=t2.name").show() ==========
+--------+----------+
|    name|occupation|
+--------+----------+
|Shockang|    程序员|
+--------+----------+

========== spark.sql("SELECt * FROM t1 ANTI JOIN t2 ON t1.name=t2.name").show() ==========
+--------+----------+
|    name|occupation|
+--------+----------+
|Shockang|    程序员|
+--------+----------+

========== df1.join(df2, Seq("name"), joinType = "cross").show() ==========
+-----+----------+---+
| name|occupation|age|
+-----+----------+---+
|  Tom|        猫| 10|
|Jerry|      老鼠| 11|
+-----+----------+---+

========== spark.sql("SELECt * FROM t1 CROSS JOIN t2 ON t1.name=t2.name").show() ==========
+-----+----------+-----+---+
| name|occupation| name|age|
+-----+----------+-----+---+
|  Tom|        猫|  Tom| 10|
|Jerry|      老鼠|Jerry| 11|
+-----+----------+-----+---+
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784859.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号