栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink(58):Flink之FlinkCDC(上)

Flink(58):Flink之FlinkCDC(上)

目录

0. 相关文章链接

1. CDC简介

1.1. 什么是CDC

1.2. CDC的种类

2. 基于DataStream方式的FlinkCDC应用

2.1. 导入依赖

2.2. 编写代码

2.2.1. 主类-从业务库中获取数据并写入到kafka中

2.2.2. 自定义反序列化器

2.2.3. 各方法参数详解

3. FlinkSQL方式的应用


0. 相关文章链接

Flink文章汇总

1. CDC简介

1.1. 什么是CDC

        CDC 是 Change Data Capture(变更数据获取) 的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

1.2. CDC的种类

CDC 主要分为 基于查询 和 基于Binlog 两种方式,我们主要了解一下这两种之间的区别:

2. 基于DataStream方式的FlinkCDC应用

2.1. 导入依赖

    
    UTF-8
    UTF-8
    8
    8

    
    1.12
    1.12.0
    1.2.0
    2.12

    
    3.1.3

    
    1.2.17
    1.7.21

    
    5.1.49
    1.2.75
    1.9.4
    29.0-jre
    3.6.0
    2.4.1
    2.6.1
    3.10





    

        

        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
        

        
        
            org.apache.flink
            flink-table-api-java
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner-blink_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge_${scala.binary.version}
            ${flink.version}
        

        
        
            org.apache.flink
            flink-statebackend-rocksdb_${scala.binary.version}
            ${flink.version}
        

        
        
            org.apache.flink
            flink-connector-kafka_${scala.binary.version}
            ${flink.version}
        
        
            com.alibaba.ververica
            flink-connector-mysql-cdc
            ${flink.cdc.version}
        


        

        
        
            org.apache.flink
            flink-hadoop-compatibility_2.11
            ${flink.version}
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        


        

        
            log4j
            log4j
            ${log4j.version}
        
        
            org.slf4j
            slf4j-api
            ${slf4j.version}
        
        
            org.slf4j
            slf4j-log4j12
            ${slf4j.version}
        
        
            org.slf4j
            jcl-over-slf4j
            ${slf4j.version}
        


        

        
        
            commons-beanutils
            commons-beanutils
            ${commons.beanutils.version}
        

        
        
            com.google.guava
            guava
            ${guava.version}
        

        
        
            mysql
            mysql-connector-java
            ${mysql.version}
        

        
        
            com.alibaba
            fastjson
            ${fastjson.version}
        

        
        
            com.squareup.okhttp3
            okhttp
            ${okhttp.version}
        

        
        
            org.springframework.boot
            spring-boot-starter-jdbc
            ${springboot.version}
        

        
        
            com.zaxxer
            HikariCP
            ${HikariCP.version}
        

        
        
            org.apache.commons
            commons-lang3
            ${commons.lang3.version}
        

    

2.2. 编写代码

2.2.1. 主类-从业务库中获取数据并写入到kafka中
package com.ouyang.gmall.realtime.app.ods;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ouyang.gmall.realtime.app.function.CustomerDeserialization;
import com.ouyang.gmall.realtime.utils.FlinkUtil;
import com.ouyang.gmall.realtime.utils.ModelUtil;
import com.ouyang.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class GmallCDC {

    public static Logger logger = LoggerFactory.getLogger(GmallCDC.class);
    public static final String ODS_base_DB_TOPIC_NAME = ModelUtil.getConfigValue("kafka.topic.ods.base.db");

    public static void main(String[] args) throws Exception {

        String applicationName = "gmall-cdc";
        long interval = 5000L;

        // 1.获取执行环境,并配置checkpoint
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkUtil.deployRocksdbCheckpoint(env, applicationName, interval, true);


        // 2.通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction sourceFunction = MySQLSource.builder()
                .hostname(ModelUtil.getConfigValue("mysql.hostname"))
                .port(Integer.parseInt(ModelUtil.getConfigValue("mysql.port")))
                .username(ModelUtil.getConfigValue("mysql.username"))
                .password(ModelUtil.getConfigValue("mysql.password"))
                .databaseList(ModelUtil.getConfigValue("mysql.database.gmall"))
                .deserializer(new CustomerDeserialization())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource streamSource = env.addSource(sourceFunction);


        //3.对数据进行日志打印,并将数据输出到Kafka中
        streamSource
                .map(new MapFunction() {
                    @Override
                    public String map(String value) throws Exception {
                        logger.warn(value);
                        return value;
                    }
                })
                .addSink(MyKafkaUtil.getKafkaProducerExactlyonce(ODS_base_DB_TOPIC_NAME));


        //4.启动任务
        env.execute(applicationName);


    }

}

2.2.2. 自定义反序列化器
package com.ouyang.gmall.realtime.app.function;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;


public class CustomerDeserialization implements DebeziumDeserializationSchema {

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

        

        

        //1.创建JSON对象用于存储最终数据
        JSonObject result = new JSonObject();

        //2.获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\.");
        String database = fields[1];
        String tableName = fields[2];


        //3.获取 "before"数据 和 "after"数据
        Struct value = (Struct) sourceRecord.value();

        // 3.1. "before"数据
        Struct before = value.getStruct("before");
        JSonObject beforeJson = new JSonObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            for (Field field : beforeSchema.fields()) {
                Object beforevalue = before.get(field);
                beforeJson.put(field.name(), beforevalue);
            }
        }

        // 3.2. "after"数据
        Struct after = value.getStruct("after");
        JSonObject afterJson = new JSonObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            for (Field field : afterSchema.fields()) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }


        //4.获取timestamp
        long ts = (long) value.get("ts_ms");


        //5.获取操作类型  CREATE UPDATe DELETE,并转换为小写,其中create转换为insert,方便后续写入
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        //6.将字段写入JSON对象
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);
        result.put("ts", ts);


        //7.输出数据
        collector.collect(result.toJSonString());

    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

}

2.2.3. 各方法参数详解
package com.ouyang.gmall.realtime.app.ods;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Test {
    
    public static void main(String[] args) throws Exception {

        //1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
        //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
        env.enableCheckpointing(5000L);
        //2.2 指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2.3 设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
        //2.5 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
        //2.6 设置访问 HDFS 的用户名
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        
        //3.创建 Flink-MySQL-CDC 的 Source
        //initial (default): Performs an initial snapshot on the monitored database tables uponfirst startup, and continue to read the latest binlog.
        //latest-offset: Never to perform snapshot on the monitored database tables upon firststartup, just read from the end of the binlog which means only have the changes since theconnector was started.
        //timestamp: Never to perform snapshot on the monitored database tables upon firststartup, and directly read binlog from the specified timestamp. The consumer will traverse thebinlog from the beginning and ignore change events whose timestamp is smaller than thespecified timestamp.
        //specific-offset: Never to perform snapshot on the monitored database tables uponfirst startup, and directly read binlog from the specified offset.
        DebeziumSourceFunction mysqlSource = MySQLSource.builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-flink")
                //可选配置项,如果不指定该参数,则会 读取上一个配置下的所有表的数据, 注意:指定的时候需要使用"db.table"的方式
                .tableList("gmall-flink.z_user_info") 
                .startupOptions(StartupOptions.initial())
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();
        
        //4.使用 CDC Source 从 MySQL 读取数据
        DataStreamSource mysqlDS = env.addSource(mysqlSource);
        
        //5.打印数据
        mysqlDS.print();

        //6.执行任务
        env.execute();
    }
    
}

3. FlinkSQL方式的应用
package com.ouyang.gmall.cdc.app;

import com.ouyang.gmall.common.utils.FlinkUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;


public class GmallCDCWithSQL {

    public static void main(String[] args) throws Exception {

        String applicationName = "gmall-cdc-sql";
        long interval = 5000L;

        // 1.获取执行环境,并配置checkpoint
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkUtil.deployRocksdbCheckpoint(env, applicationName, interval, true);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        //2.DDL方式建表
        tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
                " id STRING NOT NULL, " +
                " tm_name STRING, " +
                " logo_url STRING " +
                ") WITH ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'bigdata1', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = '123456', " +
                " 'database-name' = 'gmall', " +
                " 'table-name' = 'base_trademark' " +
                ")");

        //3.查询数据
        Table table = tableEnv.sqlQuery("select * from mysql_binlog");

        //4.将动态表转换为流
        DataStream> retractStream = tableEnv.toRetractStream(table, Row.class);
        retractStream.print();

        //5.启动任务
        env.execute(applicationName);

    }
}

注:此博客根据某马2020年贺岁视频改编而来 -> B站网址

注:其他相关文章链接由此进 -> Flink文章汇总

注:此博文为介绍FlinkCDC的详细使用,如果需要了解FlinkCDC2.x的新特性可以查看 Flink(59):Flink之FlinkCDC(下) 博文 


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728298.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号