栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink SQL CDC数据同步最佳实践--多个数据库间数据同步方案

Flink SQL CDC数据同步最佳实践--多个数据库间数据同步方案

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

利用Flink SQL CDC实现多个数据库间数据同步
  • 一、案例
  • 二、实现

一、案例

数据库databaseA、databaseB、databaseC中的表tableA和tableB需同步汇总数据到databaseD中的tableTarget表中,此需求可通过Flink SQL CDC来进行实现

二、实现
CREATE TABLE table_target (
    user_name INT,
    user_code VARCHAR,
    age INT,
    tenant_code VARCHAR,
    PRIMARY KEY (data_time,user_name,user_code) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://127.0.0.1:3306/databaseD',
    'table-name' = 'tableTarget',
    'username' = 'root',
    'password' = '123456',
    'scan.auto-commit' = 'false',
    'sink.buffer-flush.max-rows' = '1'
);

CREATE TABLE table_A_source ( 
    user_name VARCHAR,
    user_code VARCHAR,
    tenant_code VARCHAR
)WITH ( 
    'connector' = 'mysql-cdc',
   'hostname' = '127.0.0.1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'database(A|B|C)',
    'table-name' = 'tableA',
    'debezium.snapshot.locking.mode' = 'none'
);

CREATE TABLE table_B_source ( 
    age INT,
    user_code VARCHAR,
    tenant_code VARCHAR
)WITH ( 
    'connector' = 'mysql-cdc',
    'hostname' = '127.0.0.1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'database(A|B|C)',
    'table-name' = 'tableB',
    'debezium.snapshot.locking.mode' = 'none'
);

set table.dynamic-table-options.enabled=true;
set table.exec.sink.not-null-enforcer=drop;

INSERT INTO table_target 
SELECT a.user_name,a.user_code,a.tenant_code,b.age
FROM table_A_source a LEFT JOIN table_B_source b 
ON a.user_code = b.user_code AND a.tenant_code = b.tenant_code
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278481.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号