一种选择是通过rdd1向驾驶员收集并将其广播给所有映射器来执行广播联接。如果正确完成,这将使我们避免大型rdd2RDD的昂贵改组:
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())val joined = rdd2.mapPartitions({ iter => val m = rdd1Broadcast.value for { ((t, w), u) <- iter if m.contains(t) } yield ((t, w), (u, m.get(t).get))}, preservesPartitioning = true)该preservesPartitioning = true告诉星火此映射函数不修改的键rdd2; 这样,Spark可以避免rdd2对基于该(t, w)密钥加入的任何后续操作进行重新分区。
由于广播涉及驾驶员的通信瓶颈,因此广播效率可能很低。原则上,可以在不涉及驱动程序的情况下将一个RDD广播到另一个。我有一个原型,希望对此进行概括并添加到Spark中。
另一种选择是重新映射的键rdd2并使用Sparkjoin方法。这将涉及rdd2(可能rdd1)的全部改组:
rdd1.join(rdd2.map { case ((t, w), u) => (t, (w, u))}).map { case (t, (v, (w, u))) => ((t, w), (u, v))}.collect()在我的示例输入中,这两种方法都产生相同的结果:
res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
第三种选择是重组rdd2,t使其成为关键,然后执行上述连接。



