- Flink-CDC简介
- DataStream方式应用
- FlinkSQL方式应用
- 自定义反序列化器
官网地址https://ververica.github.io/flink-cdc-connectors/master/
CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的 变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
CDC主要分为基于查询和基于Binlog两种方式
| 基于查询的CDC | 基于Binlog的CDC | |
|---|---|---|
| 开源产品 | Sqoop、Kafka JDBC Source | Canal、MAxwell、Debezium |
| 执行模式 | Batch批处理 | Streaming流处理 |
| 是否可以捕获所有数据变化 | 否 | 是 |
| 延迟性 | 高延迟 | 低延迟 |
| 是否增加数据库压力 | 是 | 否 |
引入依赖
org.apache.flink flink-java 1.12.0 org.apache.flink flink-streaming-java_2.12 1.12.0 org.apache.flink flink-clients_2.12 1.12.0 org.apache.hadoop hadoop-client 3.1.3 mysql mysql-connector-java 5.1.49 com.alibaba.ververica flink-connector-mysql-cdc 1.2.0 com.alibaba fastjson 1.2.75
打包方式:全量包
org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single
监控数据库变化
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Flink-cdc将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断电续传,需要从CheckPoint或者SavePoint启动程序
// 开启CK,每隔5秒钟做一次CK,CK超时时间10s,最大并发数2,CK之间间隔1,访问HDFS的用户名root,重启策略延时2秒重启3次
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-210325/ck"));
System.setProperty("HADOOP_USER_NAME", "root");
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1L);
// 2. 通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("root")
.databaseList("gmall-210325-flink")
.tableList("gmall-210325-flink.base_trademark") // 如果不添加该参数,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table,必须带上库名
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource streamSource = env.addSource(sourceFunction);
// 3. 打印数据
streamSource.print();
// 4. 启动任务
env.execute("FlinkCDC");
}
启动任务
bin/flink run -m hadoop102:8081 -c com.atguigu.FlinkCDC ./gmall-flink-cdc.jar
保存savepoint
# bin/flink savepoint 任务id hdfs地址 bin/flink savepoint eaebb93839f0c66014b34a9bf21b4cfa hdfs://hadoop102:8020/gmall-210325/sv
从savepoint处启动
bin/flink run -m hadoop102:8081 -s hdfs://hadoop102:8020/gmall-210325/sv/savepoint-eaebb9-0759b72fba40 -c com.atguigu.FlinkCDC ./gmall-flink-cdc.jarFlinkSQL方式应用
引入依赖
org.apache.flink flink-table-planner-blink_2.12 1.12.0
监控数据库变化
public static void main(String[] args) throws Exception {
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// DDL方式建表
tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
" id STRING NOT NULL, " +
" tm_name STRING, " +
" logo_url STRING " +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = 'hadoop102', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'database-name' = 'gmall-210325-flink', " +
" 'table-name' = 'base_trademark' " +
")");
// 查询数据
Table table = tableEnv.sqlQuery("select * from mysql_binlog");
// 将动态表转换为流
DataStream> retractStream = tableEnv.toRetractStream(table, Row.class);
retractStream.print();
// 启动任务
env.execute("FlinkCDCWithSQL");
}
自定义反序列化器
public class CustomerDeserialization implements DebeziumDeserializationSchema{ // 封装的数据格式 json @Override public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { // 创建JSON对线用于存储最终数据 JSONObject result = new JSONObject(); // 获取库名和表名 String topic = sourceRecord.topic(); String[] fields = topic.split("\."); String database = fields[1]; String tableName = fields[2]; Struct value = (Struct) sourceRecord.value(); // 获取before数据 Struct before = value.getStruct("before"); JSONObject beforeJson = new JSONObject(); if (before != null) { Schema beforeSchema = before.schema(); List beforeFields = beforeSchema.fields(); for (int i = 0; i < beforeFields.size(); i++) { Field field = beforeFields.get(i); Object beforevalue = before.get(field); beforeJson.put(field.name(), beforevalue); } } // 获取after数据 Struct after = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if (after != null) { Schema afterSchema = after.schema(); List afterFields = afterSchema.fields(); for (Field field : afterFields) { Object afterValue = after.get(field); afterJson.put(field.name(), afterValue); } } // 获取操作类型 Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toLowerCase(); if("create".equals(type)){ type="insert"; } // 将字段写入JSON对线 result.put("database", database); result.put("tableName", tableName); result.put("before", beforeJson); result.put("after", afterJson); result.put("type", type); // 输出数据 collector.collect(result.toJSONString()); } @Override public TypeInformation getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
使用自定义序列化器
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Flink-cdc将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断电续传,需要从CheckPoint或者SavePoint启动程序
// 开启CK,每隔5秒钟做一次CK,CK超时时间10s,最大并发数2,CK之间间隔1,访问HDFS的用户名root,重启策略延时2秒重启3次
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-210325/ck"));
System.setProperty("HADOOP_USER_NAME", "root");
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1L);
// 2. 通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("root")
.databaseList("gmall-210325-flink")
.tableList("gmall-210325-flink.base_trademark") // 如果不添加该参数,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table,必须带上库名
.deserializer(new CustomerDeserialization())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource streamSource = env.addSource(sourceFunction);
// 3. 打印数据
streamSource.print();
// 4. 启动任务
env.execute("FjavalinkCDCWithCustomerDeserialization");
}



