栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark Structured Streaming基于数据动态写入ES Index

Spark Structured Streaming基于数据动态写入ES Index

        Spark structured streaming(注意和spark streaming的区别)写入ES的基本代码如下:

// df为需要写入es的Dataset
df.writeStream()
		.format("org.elasticsearch.spark.sql.checkpoint")
//		检查点地址,可不配置
		.option("checkpointLocation", CHECKPOINT_PATH)
//		ES节点
		.option("es.nodes", ES_HOST)
		.option("es.post", "9200")
		.option("es.net.http.auth.user", ES_USER)
		.option("es.net.http.auth.pass", ES_PASSWORD)
//		自动创建不存在的index
		.option("es.index.auto.create", "true")
//		es.nodes为域名而非节点时,该项必须为true
		.option("es.nodes.wan.only", "true")
//		mapping.id存在则覆盖,不存在则创建
		.option("es.write.operation", "upsert")
		.option("es.mapping.id", PRIMARY_KEY)
		.option("es.resource", INDEX_NAME)
		.outputMode(OutputMode.Append())
		.start();

        由于Spark streaming作业数据是连续不断,并且只会有一次启停,因此一般是将整个流的数据写入到ES指定的index中。但实际中可能会遇到各种场景,需要将连续的数据路由到不同的index中。比如根据业务、地域、时间等特性创建多个index,然后基于数据的一个或多个字段路由到所属的index中。

本文以最容易理解的场景为例:按天生成 ES index,spark streaming将处理后的数据根据业务时间写入所属index中。应该如何实现这个需求呢?

        最容易想到的一个方案(实际工作中确实遇到这么干的!)是写一个定时脚本,负责在每天0点的时候杀掉当前程序,然后重新启动程序以达到将数据写入具有新日期的index。这么做的缺点有很多:

  • 虽然可以通过checkpoint恢复作业防止数据丢失,但是0点附近早到和晚到数据会写入错误index,造成数据的准确性问题
  • 定时启停逻辑上将流程序切分开来,并且启停程序出现问题将直接影响流作业的准确性
  • 只适用这种时间域切分的index,不适用其它域上切分的index

总之,这种方式虽然简单,但是极不可靠而且很low!

通过实践,我总结了两种支持动态写入ES index的方式:

  1. 基于spark structured streaming特性的方式
  2. 基于spark ES配置的方式
 1. 基于spark structured streaming特性的方式

        这里用到的spark structured streaming的特性是foreachBatch Api。使用foreachBatch可以实现下面两个目标:

  • 重用数据源中的每批次数据 
  • 写入多个目标sink

        因此在foreachBatch中,可以将流数据以批的方式处理,根据字段拆分成不同域的数据集,然后写入对应域的index中,具体实现如下。

// df为需要写入es的Dataset
df.writeStream()
    .foreachBatch((Dataset dataset, Long batchId)-> {
        long currentTs = System.currentTimeMillis();
		String todayIndex = INDEX_PREFIX + DateUtil.parseTimestampToStr(currentTs, "yyyyMMdd");
		String yesterdayIndex = INDEX_PREFIX + DateUtil.parseTimestampToStr(currentTs-86400000, "yyyyMMdd");
		String tomorrowIndex = INDEX_PREFIX + DateUtil.parseTimestampToStr(currentTs+86400000, "yyyyMMdd");

		Dataset todayDs = dataset.filter("data_time>="+clock0Ts + " and data_time<"+clock24Ts);
		Dataset yesterdayDs = dataset.filter("data_time<"+clock0Ts);
		Dataset tomorrowDs = dataset.filter("data_time>="+clock24Ts);

//      写入es代码参考上文
		writeToEs(todayDs,todayIndex);
		writeToEs(yesterdayDs,yesterdayIndex);
		writeToEs(tomorrowDs,tomorrowIndex);
}.start();
		
 2.基于spark ES配置的方式

        使用foreachBatch的方式虽然解决了问题,但是稍显繁杂。在域划分很多的时候就要拆分出很多数据集。那么ES本身是否能支持根据指定字段,自动写入目标index中呢,答案当然是可以的!!

df.writeStream()
    .foreachBatch((Dataset dataset, Long batchId)-> {
        dataset.write()
            .format("org.elasticsearch.spark.sql")
            .option("checkpointLocation", CheckPointPath)
            .option("es.nodes", ES_HOST)
            .option("es.post", "9200")
            .option("es.net.http.auth.user", ES_USER)
            .option("es.net.http.auth.pass", ES_PASSWORD)
            .option("es.index.auto.create", "true")
            .option("es.nodes.wan.only", "true")
            .option("es.write.operation", "upsert")
            .option("es.mapping.id", PrimaryKey)
            .option("es.resource", IndexPrefix+"{date}")
            .mode(SaveMode.Append)
            .save();
    })
    .start();

        注意:这里的date就是df中数据的日期字段

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/654206.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号