栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Overloaded method foreachBatch with alternatives

Overloaded method foreachBatch with alternatives

前言

本文隶属于专栏《Spark异常问题汇总》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见 Spark异常问题汇总

正文 问题描述

Spark 编译报错:

Error:(34, 25) overloaded method foreachBatch with alternatives: (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to ((org.apache.spark.sql.Dataframe, scala.Long) => org.apache.spark.sql.Dataframe) askDF.writeStream.foreachBatch { (askDF: Dataframe, batchId: Long) =>

我的代码如下所示:

val properties = new java.util.Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "123456")

    val query = wordCounts.writeStream
      .outputMode("complete")
      .foreachBatch((ds, batchID) => {
        println("BatchID:" + batchID)
        if(ds.count() != 0){
          ds.cache()
          ds.write.json(PATH_PREFIX + batchID)
          ds.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://node1:3306/spark_bigdata_analyze", "t_word_count", properties)
          ds.unpersist()
        }
      }).start()

    query.awaitTermination()

  }
问题定位

这是由于Scala 版本由 2.11 升级成 2.12 所致。

由于Scala 2.12中的一些更改,DataStreamWriter.foreachBatch方法需要对代码进行一些更新,否则就会发生这种模糊性。

可以在此处查看两种foreachBatch方法:

https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamWriter.html

问题解决

可以改用scala 2.11,或者查看已解决这个问题的链接:

https://docs.databricks.com/release-notes/runtime/7.0.html

代码修改
    val properties = new java.util.Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "123456")

    val query = wordCounts.writeStream
      .outputMode("complete")
      .foreachBatch((ds: Dataset[Row], batchId: Long) => myFunc(ds, batchId, properties)).start()

    query.awaitTermination()
  private def myFunc(ds: Dataset[Row], batchID: Long, properties: java.util.Properties): Unit = {
    println("BatchID:" + batchID)
    if (ds.count() != 0) {
      ds.cache()
      ds.write.json(PATH_PREFIX + batchID)
      ds.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://node1:3306/spark_bigdata_analyze", "t_word_count", properties)
      ds.unpersist()
    }
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350568.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号