场景:mysql 的 binlog 通过 canal 采集放入 kafka ,生成结果表
| iceberg 处理 change log stream | 业务库产生的数据操作, insert update delete , 入湖iceberg 表 |
处理思路: 主要是利用 spark 的merge into
| --flink 建表sql CREATE TABLE `sample_stream_test01` ( `id` BIGINT NOT NULL, `data` VARCHAr(2147483647) ) PARTITIONED BY (`id`) WITH ( 'catalog-database' = 'test001', 'write.metadata.delete-after-commit.enabled' = 'true', 'warehouse' = 'hdfs://nameservice2/user/hive/warehouse/', 'uri' = 'thrift://10.8.49.114:9083,thrift://10.8.49.115:9083', 'write.metadata.previous-versions-max' = '2', 'catalog-table' = 'sample_stream_test02', 'catalog-type' = 'hive', 'write.distribution-mode' = 'hash' ); CREATE TABLE `sample_stream_test02` ( `id` BIGINT NOT NULL, `data` VARCHAr(2147483647), `op` VARCHAr(2147483647) ) PARTITIONED BY (`id`) WITH ( 'catalog-database' = 'test001', 'write.metadata.delete-after-commit.enabled' = 'true', 'warehouse' = 'hdfs://nameservice2/user/hive/warehouse/', 'uri' = 'thrift://10.8.49.114:9083,thrift://10.8.49.115:9083', 'write.metadata.previous-versions-max' = '2', 'catalog-table' = 'sample_stream_test02', 'catalog-type' = 'hive', 'write.distribution-mode' = 'hash' ); // sample_stream_test01 表为合并之后的示例表,sample_stream_test02 为kafka 接入的 canal 表模拟示例表,op 里面为 insert | update |detele insert into sample_stream_test01 values (1,'1'); insert into sample_stream_test01 values (2,'2'); insert into sample_stream_test02 values (1,'111','update'); insert into sample_stream_test02 values (2,'22','delete'); insert into sample_stream_test02 values (3,'333','update'); insert into sample_stream_test02 values (4,'4','insert'); MERGE INTO test001.sample_stream_test01 t USING (SELECT * from test001.sample_stream_test02 ) s ON t.id = s.id WHEN MATCHED AND s.op='delete' THEN DELETE WHEN MATCHED AND s.op='update' THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ; |
注意点: 1.sample_stream_test02 表中 单批次必须同一个 on 关联条件下,必须唯一,不然报错 2.merge into 之后 产生的 状态为 overwrite , 不能再流式读取 3.如果 sample_stream_test01 表中不存在数据, sample_stream_test02表中有记录op 为delete , 这样就出一个问题,执行的不是delete ,而是 not metched 执行insert 操作,结果就出现问题 官方merge into 特性, WHEN MATCHED 为 delete 和 update , WHEN NOT MATCHED 为insert 所以,这种情况,delete 在单独处理,merge into 只处理 insert 和 update , 4. 接入kafka 数据 如有数据为 同一批次,同一 主键,只有保留一条记录, 那么如id=1, 1,insert; 1,update , 这样处理的 成一条数据就成了 1,update ; 写入表中又无数据,匹配不到update 条件 所以在 进入 WHEN NOT MATCHED ,不要加入 AND s.op='insert' , 和3 的相互的效果。要注意细节。
参考文章:业务数据实时流入iceberg数据湖的方法 - 知乎
1.流式读取kafka表
| val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "bootstrapServers地址") .option("subscribe", "topic地址") // .option("startingOffsets", "latest") .option("startingOffsets", "earliest") .option("failOnDataLoss", "false") .load() |
2.流式写入处理
| val query = df.writeStream // .format("console") // 打印控制台 .trigger(Trigger.ProcessingTime("2 seconds")) // 触发时间 // .outputMode(OutputMode.Append()) // 写入模式 .option("checkpointLocation", DEFAULTFS + "/tmp/lz") // 地址为 hdfs 地址,一定要注意,否则报 空指针异常 // .option("checkpointLocation", "~/tmp/ll") // 地址为 hdfs 地址,一定要注意,否则报 空指针异常 .option("fanout-enabled", "true") .foreachBatch { (batchDF: DataFrame, batchId: Long) => { executorBatchDf(spark, batchDF.selectExpr("CAST(value AS STRING)")) } } .start() query.awaitTermination() |
3. 处理批式处理
| def executorBatchDf(spark: SparkSession, batchDF: DataFrame) = { val schema = new StructType() .add("database", StringType) .add("table", StringType) .add("type", StringType) .add("ts", LongType) .add("id", LongType) .add("data", StringType) batchDF .withColumn("value", from_json(col("value"), schema)) .select(col("value.*")) .createOrReplaceTempView("batch_all_data") batchDF.sparkSession.sqlContext.cacheTable("batch_all_data") import spark.implicits._ import scala.collection.JavaConverters._ val tableData = batchDF.sparkSession.sql( """ select database,table,type,ts,id,data from batch_all_data """.stripMargin ).flatMap( line => { println(line) val rows = JSON.parseArray(line.getAs[String]("data")) val database = line.getAs[String]("database") val table = line.getAs[String]("table") val op = line.getAs[String]("type") val ts = line.getAs[Long]("ts") rows.asInstanceOf[JSONArray].asScala.map( r => { val rows = JSON.parseObject(r.toString) val key = s"${database}:${table}:${rows.getString("oms_order_id")}" val jsonStr = JSON.toJSONString(r, SerializerFeature.WriteMapNullValue) RecordItem(key, op, ts, database, table, jsonStr) }) }) .rdd .groupBy(_.key) .map( records => { val items = records._2.toSeq.sortBy(_.ts) items.last } ).toDF("data","op")
// tableData.collect().foreach( // line => { // println(line) // } // ) val tableName = "test001.order_source" val whereSql = " t.order_id = s.order_id " val specName = " order_create_time " val icebergTable = Spark3Util.loadIcebergTable(spark, tableName) val icebergSchema = SparkSchemaUtil.convert(icebergTable.schema()) tableData.select(from_json(col("data"), icebergSchema) as "data", col("op")).createOrReplaceTempView("tmp_merge_data") val mergeSql = new StringBuffer() mergeSql.append(s"MERGE INTO $tableName t n") mergeSql.append(s"USING (SELECT data.*, op FROM tmp_merge_data ORDER BY $specName DESC) s n") mergeSql.append(s" ON ${whereSql.toString} " + s" AND DATE_FORMAT(t.$specName, 'yyyy-MM-dd') >= DATE_SUB(DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyy-MM-dd'), 30)n") mergeSql.append(" WHEN MATCHED AND s.op = 'delete' THEN DELETE n") mergeSql.append(" WHEN MATCHED AND s.op = 'update' THEN UPDATE SET * n") mergeSql.append(" WHEN NOT MATCHED THEN INSERT * n") tableData.sparkSession.sql(mergeSql.toString) icebergTable.refresh() |



