栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink CDC

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink CDC

文章目录

1.CDC概述

1.1 CDC1.2 CDC 分类1.3 Flink-CDC1.4 ETL 分析 2.Flink CDC 编码

2.1 提前准备2.2 mysql 的设置2.3 java 代码编写 3.利用自定义格式编码4.Flink Sql 编码5.Flink CDC 2.0 的新特性

1.CDC概述 1.1 CDC

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2 CDC 分类

分为查询CDC 和 Binlog CDC


常见的CDC 方案比较

1.3 Flink-CDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。

Flink CDC 的流式过程

1.4 ETL 分析
    传统的ETL 分析


E: Mysql - kafka Connect - Kafka
T: Flink
L: TiDB, kafka, ClickHouse, Hive, 数据湖: Hudi, Iceberg

    基于Flink CDC 的ETL 分析


E和L 都是Flink 去完成

2.Flink CDC 编码 2.1 提前准备


防火墙关闭


ip 地址为 192.168.66.66

2.2 mysql 的设置
vim /etc/my.cnf

必须开启为row 模式, 然后添加下面的数据库为binlog模式

2.3 java 代码编写
public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        // 开启CK
        environment.enableCheckpointing(5000);
        environment.getCheckpointConfig().setCheckpointTimeout(10000);
        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // 状态后端
//        environment.setStateBackend(new FsStateBackend())


        DebeziumSourceFunction sourceFunction = MySqlSource.builder()
                .hostname("192.168.66.66")
                .username("root")
                .password("root")
                .databaseList("cdc_test")
                .tableList("cdc_test.test1")
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

        environment.addSource(sourceFunction).print();

        environment.execute("FlinkCDC");
    }
}

这里相当于开启了database = cdc_test 中的test1表的cdc 功能

这里用的是StringDebeziumDeserializationSchema 的格式
因为是initial 的模式。

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1648277764, file=mysql-bin.000037, pos=154, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.cdc_test.test1', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=1,name=1},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1648277764498,snapshot=true,db=cdc_test,table=test1,server_id=0,file=mysql-bin.000037,pos=154,row=0},op=r,ts_ms=1648277764502}, valueSchema=Schema{mysql_binlog_source.cdc_test.test1.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1648277764, file=mysql-bin.000037, pos=154}} ConnectRecord{topic='mysql_binlog_source.cdc_test.test1', kafkaPartition=null, key=null, keySchema=null, value=Struct{after=Struct{id=2,name=2},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1648277764506,snapshot=last,db=cdc_test,table=test1,server_id=0,file=mysql-bin.000037,pos=154,row=0},op=r,ts_ms=1648277764506}, valueSchema=Schema{mysql_binlog_source.cdc_test.test1.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

添加以后会出现

3.利用自定义格式编码
public class CustomerDeserializationSchema implements DebeziumDeserializationSchema {
    // db, tableName, after, before, op
    // before: id, name
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
        // 修改为json 形式的
        JSONObject jsonObject = new JSONObject();

        // 获取表名称
        String topic = sourceRecord.topic();
        String[] split = topic.split("\.");
        jsonObject.put("db", split[1]);
        jsonObject.put("tableName", split[2]);
        // before:
        Struct value = (Struct) sourceRecord.value();
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null){
            Schema schema = before.schema();
            List fields = schema.fields();
            for (Field field : fields) {
                beforeJson.put(field.name(), before.get(field));
            }
        }
        jsonObject.put("before", beforeJson);
        // After:
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            //获取列信息
            Schema schema = after.schema();
            List fieldList = schema.fields();

            for (Field field : fieldList) {
                afterJson.put(field.name(), after.get(field));
            }
        }
        jsonObject.put("after", afterJson);

        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        jsonObject.put("op", operation);


        // 输出数据
        collector.collect(jsonObject.toJSONString());

    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}


4.Flink Sql 编码
public class FlinkSqlCDC {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE user_info ( " +
                " id STRING primary key, " +
                " name STRING, " +
                " sex STRING " +
                ") WITH ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'scan.startup.mode' = 'latest-offset', " +
                " 'hostname' = '192.168.66.66', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'root', " +
                " 'database-name' = 'cdc_test', " +
                " 'table-name' = 'user_info' " +
                ")");

        //3.查询数据并转换为流输出
        Table table = tableEnv.sqlQuery("select * from user_info");
        DataStream> retractStream = tableEnv.toRetractStream(table, Row.class);
        retractStream.print();

        //4.启动
        env.execute("FlinkSQLCDC");

    }
}

5.Flink CDC 2.0 的新特性

Flink 1.0 的不足


1.加锁
2.无法并发
3.全量不支持checkpoint

Flink 2.0 修改


1.无锁
2.多并发
3.支持checkpoint

1.Chunk切分 2.Chunk读取 3.Chunk分配 4.Chunk汇报 5.Chunl读取

这里的读取利用的无锁。
分配和汇报形成一个闭环


读取:
1.先记录低位点
2.修改数据到buffer
3.记录高位点
4.低位点到高位点为binlog
5.修改buffer

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781163.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号