使用FlinkSQL(1.13.6)纯SQL方式,通过FlinkCDC(2.1.1)获取MySQL的Binlog数据,以流的形式同步到Hive表中。小文件问题可使用FlinkSQL批处理定期执行表合并来解决。
步骤启动MySQL的Binlog功能(略)
FlinkCDC获取MySQL Binlog并写入Kafka表;
-- 读取MySQL源表 DROp TABLE IF EXISTS mysql_cdc; CREATE TABLE mysql_cdc ( `do_date` STRING ,`do_min` STRING ,`pv` INT PRIMARY KEY(do_date) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'xxx.xxx.xxx.xxx', 'port' = '3306', 'username' = 'root', 'password' = 'xxxxxxxx', 'database-name' = 'test', 'table-name' = 'result_total_pvuv_min_mysql', 'scan.startup.mode'='initial', 'debezium.snapshot.mode' = 'initial' ); -- Sink表,将数据写入Kafka DROP TABLE IF EXISTS kafka_mysql_cdc; CREATE TABLE kafka_mysql_cdc ( `do_date` STRING ,`do_min` STRING ,`pv` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test', 'properties.group.id' = 'flinkTest', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'collector101:9092,collector102:9092', 'format' = 'debezium-json' ); INSERT INTO kafka_mysql_cdc SELECT * FROM mysql_cdc;
- FlinkSQL Kafka表解析Map
-- 另一种解析表,以纯Json格式解析Debezium—json DROp TABLE IF EXISTS kafka_mysql_parser; CREATE TABLE kafka_mysql_parser ( `before` MAP,`after` MAP ,`op` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'test', 'properties.group.id' = 'flinkTest', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'collector101:9092,collector102:9092', 'format' = 'json' );
- 将解析后的数据存入Hive
-- 创建Hive表 SET table.sql-dialect=hive; DROP TABLE IF EXISTS kafka_hive_cdc; CREATE TABLE kafka_hive_cdc ( do_date STRING, do_min STRING, pv STRING, op STRING, process_time STRING ) PARTITIonED BY (dt STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='0S', 'sink.partition-commit.policy.kind'='metastore,success-file', 'auto-compaction'='true', 'compaction.file-size'='128MB' ); -- 通过字典获取Kafka中数据 INSERT INTO kafka_hive_cdc SELECT case when `before` is not null then `before`['do_date'] else `after`['do_date'] end as a ,case when `before` is not null then `before`['do_min'] else `after`['do_min'] end as a ,case when `before` is not null then `before`['pv'] else `after`['pv'] end as a ,`op` ,CAST(PROCTIME() AS STRING) ,DATE_FORMAT(PROCTIME(), 'yyyy-MM-dd') FROM kafka_mysql_parser;
- 小文件合并,注意这步需要关闭Checkpoint,否则会IllegalArgumentException: Checkpoint is not supported for batch jobs.
-- 小文件合并; SET execution.runtime-mode=batch; INSERT OVERWRITE kafka_hive_cdc SELECt * FROM kafka_hive_cdc;
- 从Hive中读取最终数据
-- 从Hive表中取出最终版数据 SELECt `do_date` ,`do_min` ,`pv` FROM ( SELECt * ,ROW_NUMBER() OVER(PARTITION BY `do_date` ORDER BY `process_time` DESC, `op`) AS `rn` FROM kafka_hive_cdc) a WHERe `rn` = 1 AND `op` = 'c';



