提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
利用Flink SQL CDC实现多个数据库间数据同步- 一、案例
- 二、实现
数据库databaseA、databaseB、databaseC中的表tableA和tableB需同步汇总数据到databaseD中的tableTarget表中,此需求可通过Flink SQL CDC来进行实现
二、实现CREATE TABLE table_target (
user_name INT,
user_code VARCHAR,
age INT,
tenant_code VARCHAR,
PRIMARY KEY (data_time,user_name,user_code) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/databaseD',
'table-name' = 'tableTarget',
'username' = 'root',
'password' = '123456',
'scan.auto-commit' = 'false',
'sink.buffer-flush.max-rows' = '1'
);
CREATE TABLE table_A_source (
user_name VARCHAR,
user_code VARCHAR,
tenant_code VARCHAR
)WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'database(A|B|C)',
'table-name' = 'tableA',
'debezium.snapshot.locking.mode' = 'none'
);
CREATE TABLE table_B_source (
age INT,
user_code VARCHAR,
tenant_code VARCHAR
)WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'database(A|B|C)',
'table-name' = 'tableB',
'debezium.snapshot.locking.mode' = 'none'
);
set table.dynamic-table-options.enabled=true;
set table.exec.sink.not-null-enforcer=drop;
INSERT INTO table_target
SELECT a.user_name,a.user_code,a.tenant_code,b.age
FROM table_A_source a LEFT JOIN table_B_source b
ON a.user_code = b.user_code AND a.tenant_code = b.tenant_code



