两张mysql 的表通过cdc 进行同步2. 准备条件
Flink 1.12.4
Mysql 5.7
CDC 1.2
[root@basenode lib]# ll 总用量 350476 -rw-r--r-- 1 root root 661717 9月 25 21:01 fastjson-1.2.78.jar -rw-r--r-- 1 root root 194725 9月 25 20:18 flink-connector-jdbc_2.11-1.12.4.jar -rw-r--r-- 1 root root 27135783 9月 25 20:38 flink-connector-mysql-cdc-1.2.0.jar -rw-r--r-- 1 501 games 89597 5月 11 05:03 flink-csv-1.12.4.jar -rw-r--r-- 1 501 games 114594794 5月 11 05:07 flink-dist_2.11-1.12.4.jar -rw-r--r-- 1 root root 81363 9月 25 20:18 flink-hadoop-compatibility_2.12-1.12.0.jar -rw-r--r-- 1 root root 136663 9月 25 21:00 flink-json-1.12.0.jar -rw-r--r-- 1 501 games 134826 5月 11 05:03 flink-json-1.12.4.jar -rw-r--r-- 1 root root 43317025 9月 25 20:18 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -rw-r--r-- 1 501 games 7709741 10月 8 2020 flink-shaded-zookeeper-3.4.14.jar -rw-r--r-- 1 root root 38101480 9月 25 20:18 flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar -rw-r--r-- 1 501 games 36096225 5月 11 05:06 flink-table_2.11-1.12.4.jar -rw-r--r-- 1 root root 118412 9月 25 20:40 flink-table-api-java-bridge_2.11-1.12.4.jar -rw-r--r-- 1 501 games 40258604 5月 11 05:06 flink-table-blink_2.11-1.12.4.jar -rw-r--r-- 1 root root 822850 9月 25 20:50 flink-table-common-1.12.4.jar -rw-r--r-- 1 root root 23265394 9月 25 20:18 iceberg-flink-runtime-0.12.0.jar -rw-r--r-- 1 root root 23083607 9月 25 20:18 iceberg-hive-runtime-0.12.0.jar -rw-r--r-- 1 501 games 67114 2月 21 2020 log4j-1.2-api-2.12.1.jar -rw-r--r-- 1 501 games 276771 2月 21 2020 log4j-api-2.12.1.jar -rw-r--r-- 1 501 games 1674433 2月 21 2020 log4j-core-2.12.1.jar -rw-r--r-- 1 501 games 23518 2月 21 2020 log4j-slf4j-impl-2.12.1.jar -rw-r--r-- 1 root root 1007502 9月 25 20:18 mysql-connector-java-5.1.47.jar [root@basenode lib]# pwd /opt/module/flink/flink-1.12.4/lib [root@basenode lib]#3.1 配置mysql
[root@basenode ~]# vi /etc/my.cnf # For advice on how to change settings please see # http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html [mysqld] max_allowed_packet=1024M log-bin=mysql-bin server-id=180 binlog-format=row binlog-do-db=test expire_logs_days=304.Mysql 建表 4.1 原表
CREATE TABLE `Flink_iceberg` ( `id` bigint(64) NOT NULL, `name` varchar(64) DEFAULT NULL, `age` int(20) DEFAULT NULL, `dt` varchar(64) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin14.2 目标表
CREATE TABLE `Flink_iceberg-cdc` ( `id` bigint(64) NOT NULL, `name` varchar(64) DEFAULT NULL, `age` int(20) DEFAULT NULL, `dt` varchar(64) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin15. 进入flink sql
./sql-client.sh embedded5.1 flink sql cdc 连接
create table Flink_icebergcdc05(id bigint, name string, age int,dt string) with( 'connector' = 'mysql-cdc', 'hostname' = '192.168.1.180', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'test', 'table-name' = 'Flink_iceberg' );5.2 flink mysql 连接
create table Flink_iceberg07(id bigint primary key, name string, age int,dt string)
with(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.180:3306/test',
'username'='root',
'password'='123456',
'table-name' = 'Flink_iceberg-cdc',
'sink.buffer-flush.max-rows'='1',
'sink.buffer-flush.interval'='0'
);
5.3 插入数据
insert into Flink_iceberg07 select * from Flink_icebergcdc05;6. 整体操作
Flink SQL> create table Flink_icebergcdc05(id bigint, name string, age int,dt string) > with( > 'connector' = 'mysql-cdc', > 'hostname' = '192.168.1.180', > 'port' = '3306', > 'username' = 'root', > 'password' = '123456', > 'database-name' = 'test', > 'table-name' = 'Flink_iceberg' > ); [INFO] Table has been created. Flink SQL> create table Flink_iceberg07(id bigint primary key, name string, age int,dt string) > with( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://192.168.1.180:3306/test', > 'username'='root', > 'password'='123456', > 'table-name' = 'Flink_iceberg-cdc', > 'sink.buffer-flush.max-rows'='1', > 'sink.buffer-flush.interval'='0' > ); [INFO] Table has been created. link SQL> select * from Flink_iceberg07; [INFO] Result retrieval cancelled. Flink SQL> insert into Flink_iceberg07 select * from Flink_icebergcdc05; [INFO] Submitting SQL update statement to the cluster... Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary. [INFO] Table update statement has been successfully submitted to the cluster: Job ID: 1a86e138627179f6f44dd332871e39df7.成功
Flink SQL> insert into Flink_iceberg07 select * from Flink_icebergcdc05;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 325c988dd896e261c167f90e18b5f879
Flink SQL> select * from Flink_iceberg07;
SQL Query Result (Table)
Table program finished. Page: Last of 1 Updated: 23:35:03.544
id name age dt
10002 flink-cdc-update 22 2021-09-25
10011 flink-mysql 19 2021-09-24
10012 flink-mysqA 19 2021-09-24



