可以使用
PairRDDFunctions或Spark数据帧来完成。由于数据帧操作受益于Catalyst
Optimizer,因此第二个选项值得考虑。
假设您的数据如下所示:
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])使用PairRDD:
内部联接:
rdd1.join(rdd2)
左外连接:
rdd1.leftOuterJoin(rdd2)
笛卡尔积(不需要
RDD[(T, U)]):
rdd1.cartesian(rdd2)
广播加入(不需要
RDD[(T, U)]):
- 请参见Spark:将2元组键RDD与单键RDD结合在一起的最佳策略是什么?
最后,
cogroup它没有直接的SQL等效项,但在某些情况下可能有用:
cogrouped = rdd1.cogroup(rdd2)cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]使用Spark数据框
您可以使用SQL DSL或使用来执行原始SQL
sqlContext.sql。
df1 = spark.createDataframe(rdd1, ('k', 'v1'))df2 = spark.createDataframe(rdd2, ('k', 'v2'))# Register temporary tables to be able to use `sparkSession.sql`df1.createOrReplaceTempView('df1')df2.createOrReplaceTempView('df2')内部联接:
# inner is a default value so it could be omitteddf1.join(df2, df1.k == df2.k, how='inner') spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')左外连接:
df1.join(df2, df1.k == df2.k, how='left_outer')spark.sql('SELECt * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')交叉连接(在Spark 2.0中需要显式交叉连接或配置更改-Spark
2.x的spark.sql.crossJoin.enabled):
df1.crossJoin(df2)spark.sql('SELECt * FROM df1 CROSS JOIN df2')~~~~
df1.join(df2)sqlContext.sql('SELECt * FROM df JOIN df2')从1.6(Scala中为1.5)开始,这些
broadcast功能都可以与功能结合使用:
from pyspark.sql.functions import broadcastdf1.join(broadcast(df2), df1.k == df2.k)
执行广播加入。另请参阅为什么我的BroadcastHashJoin比Spark中的ShuffledHashJoin慢



