1. 当join 条件使用 df("col_1") === df2("col_2"), 由于spark sql 执行计划的原因,有可能会导致结果出现一异常
解决方案: 使用sql 或者 重命名为一样的字段名字
@Test
def joinTest() = {
spark.conf.set("spark.sql.crossJoin.enabled", true)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.orc.filterPushdown", false)
val source = Seq(
("11111111", Seq("22222222", "33333333")),
("22222222", Seq("44444444", "33333333")),
("33333333", Seq("44444444", "22222222")),
("44444444", Seq( "22222222"))
)
val rawDF = spark.createDataframe(source).toDF("esn", "esn_list")
rawDF.createOrReplaceTempView("raw_tb")
rawDF.printSchema()
rawDF.show(false)
val tmpDF = rawDF.select(
col("esn")
, explode(col("esn_list")).as("group_esn")
)
tmpDF.createOrReplaceTempView("tmp_db")
tmpDF.printSchema()
tmpDF.show(false)
val rawTmpDF = rawDF.selectExpr("esn as group_esn", "esn_list")
//val rawTmpDF = rawDF.selectExpr("esn as esn2", "esn_list")
// val testDF = tmpDF.join(rawDF, rawDF("esn") === tmpDF("group_esn"), "inner") // 结果为空
val testDF = tmpDF.join(rawTmpDF, rawTmpDF("group_esn") === tmpDF("group_esn"), "inner")
// val testDF = tmpDF.join(rawTmpDF, rawTmpDF("esn2") === tmpDF("group_esn"), "inner")
testDF.printSchema()
testDF.show(false)
testDF.explain(true)
// sql 正常没有问题
// val sql =
// """
// |select *
// |from raw_tb t1
// |inner join tmp_db t2
// |on t1.esn = t2.group_esn
// |
// |""".stripMargin
//
// val frame = spark.sql(sql)
//
// frame.printSchema()
// frame.show(false)
// frame.explain(true)
}



