栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark DataFrame API 踩坑记

Spark DataFrame API 踩坑记

1. 当join 条件使用 df("col_1") === df2("col_2"), 由于spark sql 执行计划的原因,有可能会导致结果出现一异常

解决方案: 使用sql 或者 重命名为一样的字段名字

 @Test
  def joinTest() = {
    spark.conf.set("spark.sql.crossJoin.enabled", true)
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    spark.conf.set("spark.sql.orc.filterPushdown", false)

    val source = Seq(
      ("11111111",  Seq("22222222", "33333333")),
      ("22222222",  Seq("44444444", "33333333")),
      ("33333333",  Seq("44444444", "22222222")),
      ("44444444",  Seq( "22222222"))
    )

    val rawDF = spark.createDataframe(source).toDF("esn", "esn_list")

    rawDF.createOrReplaceTempView("raw_tb")

    rawDF.printSchema()
    rawDF.show(false)

    val tmpDF = rawDF.select(
      col("esn")
      , explode(col("esn_list")).as("group_esn")
    )
    tmpDF.createOrReplaceTempView("tmp_db")

    tmpDF.printSchema()
    tmpDF.show(false)

    val rawTmpDF = rawDF.selectExpr("esn as group_esn", "esn_list")
    //val rawTmpDF = rawDF.selectExpr("esn as esn2", "esn_list")

    // val testDF = tmpDF.join(rawDF, rawDF("esn") === tmpDF("group_esn"), "inner") // 结果为空



    val testDF = tmpDF.join(rawTmpDF, rawTmpDF("group_esn") === tmpDF("group_esn"), "inner")
// val testDF = tmpDF.join(rawTmpDF, rawTmpDF("esn2") === tmpDF("group_esn"), "inner")

    testDF.printSchema()
    testDF.show(false)

    testDF.explain(true)

// sql 正常没有问题
//    val sql =
//      """
//        |select *
//        |from raw_tb t1
//        |inner join tmp_db t2
//        |on t1.esn = t2.group_esn
//        |
//        |""".stripMargin
//
//    val frame = spark.sql(sql)
//
//    frame.printSchema()
//    frame.show(false)
//    frame.explain(true)


  }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734864.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号