栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

StructedStreaming消费Kafka数据突然存储不到HDFS

StructedStreaming消费Kafka数据突然存储不到HDFS

问题描述:

StructedStreaming消费Kafka数据存储到HDFS中,以前正常存储,突然就存储不进去了,可以新建文件夹,但是数据写入不进去了。
分析:通过流写出到控制台,能消费数据,说明消费正常,但是就是写不进HDFS中,说明写时不能触发HDFS保存。

val query = spark.sql(sql)
    .writeStream
    .format("console")
    .outputMode("append")
    .start()
解决方案

网上也没找到相关解决方案,所以自己只能考虑,换种写数据的方式,通过foreachBatch这种方式,可以对每一条流进行触发。
foreachBatch允许在每个微批次的输出上进行任意操作和自定义逻辑。
foreachBatch(…)允许您指定对流式查询的每个微批次的输出数据执行的函数。从 Spark 2.4 开始,Scala、Java 和 Python 都支持这一点。它有两个参数:一个 Dataframe 或 Dataset,其中包含微批次的输出数据和微批次的唯一 ID。
using-foreach-and-foreachbatch - 官网详解

// 原来写数据方式
val query = spark.sql(sql)
    .coalesce(1)
    .writeStream
    .format("parquet")
    .outputMode("append")
    .option("truncate", "false")
    .option("path", hdfs_save_path)
    .option("checkpointLocation", checkPointDir)
    .partitionBy("part_date")
    .trigger(Trigger.ProcessingTime("1 minutes"))
    .queryName("game_event")
    .start()
// 修改为foreachBatch的方式
 val query = spark.sql(sql)
            .coalesce(1)
            .writeStream.foreachBatch((batchDF: Dataframe, batchId: Long) =>
            batchDF.
              write.format("parquet")
              .mode("append")
              .partitionBy("part_date")
              .save(hdfs_save_path))
            .outputMode("append")
            .option("checkpointLocation", checkPointDir)
            .trigger(Trigger.ProcessingTime("1 minutes"))
            .queryName("game_event")
            .start()

修改后,意外的发现可以正常些数据了。

注意事项:

修改写入方式后,有可能还会报一个

Offsets out of range with no configured reset policy for partitions

错误。
解决方案:
将checkPointDir更换一个目录,或者将原来checkPointDir目录中的数据全部清掉。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761815.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号