FlinkCDC,简单了解下Change Data Capture(变更数据获取)的概念:
监控并捕获数据库的变更,将这些变更按照发生的顺序进行记录,写入消息中间件供其他服务订阅及消费。
CDC的种类:主要分为基于查询和基于Binlog两种方式,区别:
Flink自然也不甘示弱,FlinkCDC应运而生,通过flink-cdc-connectors 组件,可以直接从MySQL等数据库直接读取全量数据和增量变更数据的source组件
2. 实战Coding通过一个简单的Demo学会使用FlinkCDC
2.1 DataStream方式通过创建maven项目,通过pom文件注入相关依赖:
org.apache.flink flink-java 1.12.0 org.apache.flink flink-streaming-java_2.12 1.12.0 org.apache.flink flink-clients_2.12 1.12.0 org.apache.hadoop hadoop-client 3.1.3 mysql mysql-connector-java 5.1.49 com.alibaba.ververica flink-connector-mysql-cdc 1.2.0 com.alibaba fastjson 1.2.75 org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single
依赖注入后就可以开始Coding…(愉快的打开IDEA)
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.1开启CheckPoint,每五秒做一次CheckPoint
env.enableCheckpointing(5);
//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE);
//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定从 CK 自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://master:8020/flinkCDC"));
//2.6 设置访问 HDFS 的用户名
System.setProperty("HADOOP_USER_NAME", "root");
//3.创建 Flink-MySQL-CDC 的 Source
DebeziumSourceFunction mysqlSource = MySQLSource.builder()
.hostname("master")
.port(3306)
.username("root")
.password("000000")
.databaseList("mall-flink")
.tableList("mall-flink.z_user_info")
//可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
.startupOptions(StartupOptions.initial())
.deserializer(new StringDebeziumDeserializationSchema())
.build();
//4.使用 CDC Source 从 MySQL 读取数据
DataStreamSource mysqlDS = env.addSource(mysqlSource);
//5.打印数据
mysqlDS.print();
//6.执行任务
env.execute();
}
}
ok,到这里代码部分已经完成,接下来开始测试
将代码打包上传至服务器 mvn clean package
(确保MySQL Binlog开启状态,若是首次开始,则需重启MySQL)
启动Flink,HDFS集群,最后启动程序(java -jar FlinkCDC.jar)
同样首先注入依赖
org.apache.flink flink-table-planner-blink_2.12 1.12.0
public class FlinkSQL_CDC {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.创建 Flink-MySQL-CDC 的 Source
tableEnv.executeSql("CREATE TABLE user_info (" +
" id INT," +
" name STRING," +
" phone_num STRING" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'master'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '000000'," +
" 'database-name' = 'mall-flink'," +
" 'table-name' = 'z_user_info'" +
")");
tableEnv.executeSql("select * from user_info").print();
env.execute();
}
}
2.3 自定义反序列化器
public class Flink_CDCWithCustomerSchema {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.创建 Flink-MySQL-CDC 的 Source
Properties properties = new Properties();
DebeziumSourceFunction mysqlSource = MySQLSource.builder()
.hostname("master")
.port(3306)
.username("root")
.password("000000")
.databaseList("mall-flink")
.tableList("mall-flink.z_user_info")
.startupOptions(StartupOptions.initial())
.deserializer(new DebeziumDeserializationSchema() { //自定义数据解析器
@Override
public void deserialize(SourceRecord sourceRecord, Collector
collector) throws Exception {
//获取主题信息,包含着数据库和表名
mysql_binlog_source.gmall-flink.z_user_info
String topic = sourceRecord.topic();
String[] arr = topic.split("\.");
String db = arr[1];
String tableName = arr[2];
//获取操作类型 READ DELETE UPDATE CREATE
Envelope.Operation operation =
Envelope.operationFor(sourceRecord);
//获取值信息并转换为 Struct 类型
Struct value = (Struct) sourceRecord.value();
//获取变化后的数据
Struct after = value.getStruct("after");
//创建 JSON 对象用于存储数据信息
JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
Object o = after.get(field);
data.put(field.name(), o);
}
//创建 JSON 对象用于封装最终返回值数据信息
JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
//发送数据至下游
collector.collect(result.toJSONString());
}
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
//3.使用 CDC Source 从 MySQL 读取数据
DataStreamSource mysqlDS = env.addSource(mysqlSource);
//4.打印数据
mysqlDS.print();
//5.执行任务
env.execute();
}
}



