flink 1.12.1
flink-cdc 1.2.0
mysql 7+
最近使用公司的大数据平台做实时ETL,发现平台提供的实时消费binlog到kafka的服务有点坑,莫名其妙失败重启,日志也没有效的信息,于是打算另辟蹊径。主要考虑能控制整个流程,能查看完整日志,不新增技术栈,最好还是基于flink,于是选择了flink-cdc,它底层基于debezium。
- sterp 1
首先需要确保MySQL开启binlog,并且有采集binlog的全限
#需要的mysql配置 log-bin = on #开启BINLOG binlog-format = ROW #选择ROW模式 gtid_mode = ON #开启GTID #可以使用数据库用户执行以下命令确认是否有采集binlog全限 #1、是否打开binlog特性,on表示开启 show global variables where Variable_name = 'log_bin' #2、是否为row模式,row表示已开启row模式 show global variables where Variable_name = 'binlog_format' #3、是否打开gtid特性,on表示开启 show global variables where Variable_name = 'gtid_mode' #4、查询是否有获取binlog日志全限,为空则表示全选不够 show binary logs #获取binlog文件列表 show master logs #查看master的binlog日志 show master status #查看当前写入的binlog文件 show binlog events in 'mysql-bin.000001' #查看指定binlog文件内容 #如果全限不够,可以参看下面的语句授权: GRANT SELECt, SHOW DATAbaseS, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'
有时候会报需要RELOAD权限,这时有两种办法:
1、给用户赋予reload权限
2、编写flinksql时配置debezium.snapshot.locking.mode=node参数,跳过全局锁
- step2
依赖
com.alibaba.ververica flink-connector-mysql-cdc 1.2.0 mysql mysql-connector-java 8.0.18
demo
-- register a MySQL table 'orders' in Flink SQL CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'orders' ); -- read snapshot and binlogs from orders table SELECt * FROM orders;
这样就能采集binlog日志了,下来就可以将采集的数据写入到下游,比如kafka,es,hudi等。
参考:
https://ververica.github.io/flink-cdc-connectors/release-1.4/index.html
https://nightlies.apache.org/flink/flink-docs-release-1.12/



