FlinkCDC-Hudi:Mysql数据实时入湖全攻略一:初试风云
FlinkCDC-Hudi:Mysql数据实时入湖全攻略二:Hudi与Spark整合时所遇异常与解决方案
FlinkCDC-Hudi:Mysql数据实时入湖全攻略三:探索实现FlinkCDC mysql 主从库同步高可用
FlinkCDC-Hudi:Mysql数据实时入湖全攻略四:两种FlinkSql kafka connector的特征与应用
到目前为止,我们掌握了FlinkCDC分别同步数据到Hudi和Kafka的方法。
在生产应用中,会存在同时同步入湖和入kafka的需求。一般会怎么实现呢?
mysql> create table mysql_test_1( id bigint primary key not enforced, data String, create_time Timestamp(3), ) with ( 'connector'='mysql-cdc', 'hostname'='mysqlhost', 'port'='3306', 'server-id'='5800-5804', 'username'='user', 'password'='user_password', 'server-time-zone'='Asia/Shanghai', 'debezium.snapshot.mode'='initial', 'database-name'='flink_cdc', 'table-name'='test_1' );2.2 hudi ddl
create table hudi_test_1( id bigint, data String, create_time Timestamp(3), PRIMARY KEY (`id`) NOT ENFORCED ) with( 'connector'='hudi', 'path'='hdfs://hdfs-namespace/tmp/flink/cdcdata/hudi_test_1', 'hoodie.datasource.write.recordkey.field'='id', 'hoodie.parquet.max.file.size'='268435456', 'write.precombine.field'='create_time', 'write.tasks'='1', 'write.bucket_assign.tasks'='1', 'write.task.max.size'='1024', 'write.rate.limit'='30000', 'table.type'='MERGE_ON_READ', 'compaction.tasks'='1', 'compaction.async.enabled'='true', 'compaction.delta_commits'='1', 'compaction.max_memory'='500', 'changelog.enabled'='true', 'read.streaming.enabled'='true', 'read.streaming.check.interval'='3', 'hive_sync.enable'='true', 'hive_sync.mode'='hms', 'hive_sync.metastore.uris'='thrift://hiveserver2:9083', 'hive_sync.db'='test', 'hive_sync.table'='hudi_test_1', 'hive_sync.username'='flinkcdc', 'hive_sync.support_timestamp'='true' );2.3 kafka ddl
create table upsert_kafka_test_1( id bigint, data String, create_time Timestamp(3), PRIMARY KEY (`id`) NOT ENFORCED ) with ( 'connector'='upsert-kafka', 'topic'='test1', 'properties.bootstrap.servers' = 'broker:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'key.format'='json', 'value.format'='json' );二、需求实现
面对这个需求,我们可以使用数据pipeline,先将数据从mysql同步到kafka,再从kafka读取数据同步到hudi。
Flink SQL> insert into upsert_kafka_test_1 select * from mysql_test_1; Flink SQL> insert into hudi_test_1 select * from upsert_kafka_test_1 ;
另一种实现是,从Mysql同步后,分别输出到kafka 和hudi。
Flink SQL> insert into upsert_kafka_test_1 select * from mysql_test_1; Flink SQL> insert into hudi_test_1 select * from mysql_test_1 ;
在Flink Sql中,正常语境下,一条insert语句将触发一个flink 作业,当我们以上述方法执行sql时会触发2个flink作业。如果想节省运行资源,可以把两个作业合并成一个。同一个作业进行多输出时,可能会互相影响进度,触发反压。
三、 statement set语法
在Flink sql中,将多个insert作业合并为一个作业,需要用到statement set语法。先用"begin statement set;"启动语句集,然后执行任意条insert语句,最后以"end;"结束以触发作业执行。
以前述一个实现为例,使用statement set实现如下:
Flink SQL> begin statement set; --开启语句集 Flink SQL> insert into upsert_kafka_test_1 select * from mysql_test_1; Flink SQL> insert into hudi_test_1 select * from mysql_test_1 ; Flink SQL> end; --结束语句集,执行作业提交
至此,我们可以根据需求结合statement set语句可以轻松实现同一作业多个输出的需求。



