1.MySql创建表:
CREATE TABLE `student` ( `id` int(10) NOT NULL, `name` varchar(128) CHARACTER SET latin1 COLLATE latin1_swedish_ci DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = latin1;
2.maven依赖:
1.13.0 1.8 1.8 org.projectlombok lombok 1.18.2 org.apache.flink flink-java ${flink-version} org.apache.flink flink-streaming-java_2.12 ${flink-version} org.apache.flink flink-clients_2.12 ${flink-version} org.apache.hadoop hadoop-client 3.1.3 mysql mysql-connector-java 5.1.49 org.apache.flink flink-table-planner-blink_2.12 ${flink-version} com.ververica flink-connector-mysql-cdc 2.0.2 com.alibaba fastjson 1.2.75 org.apache.flink flink-connector-jdbc_2.12 1.13.3
3.代码实现:
public class FlinkCDCToMySql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String sourceSql = "CREATE TABLE IF NOT EXISTS mySqlSource (" +
"id Int primary key, " +
"name String ," +
"age Int" +
") with ( " +
" 'connector' = 'mysql-cdc', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'hostname' = '192.168.0.101', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'szzzcheng', " +
" 'database-name' = 'flinktest', " +
" 'table-name' = 'student' " +
")";
String sinkSql = " CREATE TABLE IF NOT EXISTS mySqlSink (" +
"id Int primary key , " +
"name String ," +
"age Int" +
") with (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://192.168.0.101:3306/flinktest'," +
"'table-name' = 'flinktest_cdc'," +
" 'username' = 'root'," +
" 'password' = 'szzzcheng' " +
" )";
tableEnv.executeSql(sourceSql);
tableEnv.executeSql(sinkSql);
tableEnv.executeSql("insert into mySqlSink select * from mySqlSource ");
}
}
之后在student表里增删改数据,flinktest_cdc表都会体现出来。
当然也可以改为打印在控制台的方式,只需要把sinkSql改为:
String sinkSql = " CREATE TABLE IF NOT EXISTS mySqlSink (" +
"id Int primary key , " +
"name String ," +
"age Int" +
") with (" +
" 'connector' = 'print'" +
" )";
结果如下:
+I[3, tom, 12] +I[6, jack, 16] -U[6, jack, 16] +U[6, jack2, 18]



