工具选型着重点在本地调试flink-cdc踩坑
- MySQL 5.7.31
- flink 1.13.3 , flink-connector-mysql-cdc 2.2-SNAPSHOT
- Flink-CDC文档
选择flink-cdc的原因是为简化cdc过程中依赖的工具链,flink-cdc通过复用debezium的connect和kafka-connect实现直连flink,再者可通过flink平台适配的各种source sink和SQL client 轻松实现数据源同步。
参考过程本地调试也需要flink的依赖, 注意冲突此处使用1.13.3 [国内仓库下载]
(https://mirrors.huaweicloud.com/apache/flink/)
需要把flink包下lib文件夹拷贝到本地项目调试,idea启动项也需要勾选 include provided
项目骨架
MySQL 配置
[mysqld] server-id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10
示例代码
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class Application {
public static void main(String[] args) throws Exception {
mysql_cdc();
}
public static void mysql_cdc() throws Exception {
Properties properties = new Properties();
properties.setProperty("decimal.handling.mode", "double"); //debezium 小数转换处理策略
properties.setProperty("database.serverTimezone", "GMT+8"); //debezium 配置以database. 开头的属性将被传递给jdbc url
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("192.168.31.233")
.port(3306)
.databaseList("test_database") // set captured database
.tableList("test_database.test_table") // set captured table
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.debeziumProperties(properties)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.getCheckpointConfig().setCheckpointStorage(
new FileSystemCheckpointStorage("file:///flink-ck/checkpoints"));
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
依赖参考, 注意冲突天坑
完了开跑4.0.0 org.example flink-cdc 1.0-SNAPSHOT UTF-8 1.13.3 1.8 2.11 ${java.version} ${java.version} 1.5.4.Final 2.2.0 org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-core ${flink.version} provided org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} provided org.apache.flink flink-core ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided com.ververica flink-connector-mongodb-cdc 2.2-SNAPSHOT com.ververica flink-connector-mysql-cdc 2.2-SNAPSHOT mysql mysql-connector-java 8.0.22 org.apache.maven.plugins maven-compiler-plugin 3.1 ${java.version} ${java.version} org.apache.maven.plugins maven-shade-plugin 3.0.0 package shade false org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* log4j:* *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA com.demo.cdc.Application
第一次是全量快照, 之后是binlog的offset拉取,Flink Checkpoint 持久化断点续传
{"before":null,"after":{"id":1,"name":"zhangsan"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288811,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288815,"transaction":null}
{"before":null,"after":{"id":3,"name":"wangwu"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288816,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288816,"transaction":null}
{"before":null,"after":{"id":2,"name":"lisi"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288816,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288816,"transaction":null}
十二月 28, 2021 3:41:30 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.31.233:3308 at mysql-bin.000003/2526 (sid:6257, cid:36)
{"before":null,"after":{"id":5,"name":"qianqi"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677322000,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":2724,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1640677323291,"transaction":null}
{"before":{"id":5,"name":"qianqi"},"after":{"id":5,"name":"钱七"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677338000,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":3001,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1640677338533,"transaction":null}



