栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink-Conneector-DTS原理解析-附带源码

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink-Conneector-DTS原理解析-附带源码

学习flink
DTS简介:数据传输服务DTS(Data Transmission Service)支持RDBMS、NoSQL、OLAP等数据源间的数据交互,集数据同步、迁移、订阅、集成、加工于一体,助您构建安全、可扩展、高可用的数据架构。
阿里云dts连接器官方使用说明
Flink-Conneector-DTS原理解析:
一、POM文件初识DTS
根据pom文件(略过provided/test)内容:可以看出,dts实际上是kafka-connector的一层外包装,即dts是将数据库的变更数据读到kafka,然后通过消费kafka数据来实现数据同步。

        
            org.apache.kafka
            kafka-clients
            ${kafka.version}
        

        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
            provided
        

        
            org.apache.flink
            flink-connector-kafka_${scala.binary.version}
            ${flink.version}
            
                
                    org.apache.kafka
                    kafka-clients
                
            
        

二、配置参数深入DTS
读取DTS核心参数:dts.server、topic、dts.sid、dts.user、dts.password、dts.checkpoint、dts-cdc.table.name、format
在源码DtsDynamicSource类中getScanRuntimeProvider方法调用了createDtsConsumer方法创建了一个FlinkDtsConsumer对象,createDtsConsumer中有这么几行代码诠释了dts-connector的本质:

        //no sid means normal kafka
        if (StringUtils.isEmpty(this.sid)) {
            dtsConsumer = new FlinkDtsRawConsumer(this.server, this.topic, this.sid, this.group, this.user,
                    this.password, this.checkpoint, kafkaDeserializer);
        } else {
            dtsConsumer = new FlinkDtsKafkaConsumer(this.server, this.topic, this.sid, this.group, this.user,
                    this.password, this.checkpoint, kafkaDeserializer);
        }

        return dtsConsumer;

FlinkDtsRawConsumer:

    public FlinkDtsRawConsumer(
            String brokerUrl,
            String topic,
            String sid,
            String group,
            String user,
            String password,
            long startupOffsetsTimestamp,
            KafkaDeserializationSchema valueDeserializer,
            Properties kafkaExtraProps) {

        this.flinkKafkaConsumer =
                new FlinkKafkaConsumer(
                        topic,
                        valueDeserializer,
                        DtsKafkaUtil.getKafkaProperties(
                                brokerUrl, topic, sid, group, user, password, kafkaExtraProps));

        if (startupOffsetsTimestamp > 0) {
            this.flinkKafkaConsumer.setStartFromTimestamp(startupOffsetsTimestamp);
        } else {
            this.flinkKafkaConsumer.setStartFromGroupOffsets();
        }
    }

FlinkDtsKafkaConsumer:

    public FlinkDtsKafkaConsumer(
            String brokerUrl,
            String topic,
            String sid,
            String group,
            String user,
            String password,
            long startupOffsetsTimestamp,
            KafkaDeserializationSchema valueDeserializer,
            Properties kafkaExtraProps) {

        this.topicsDescriptor = new KafkaTopicsDescriptor(Collections.singletonList(topic), null);
        this.deserializer = valueDeserializer;

        Properties props = DtsKafkaUtil.getKafkaProperties(brokerUrl, topic, sid, group, user, password, kafkaExtraProps);

        this.properties = props;

        this.discoveryIntervalMillis = PropertiesUtil.getLong(Preconditions.checkNotNull(props, "props"),
                "flink.partition-discovery.interval-millis", Long.MIN_VALUE);

        this.useMetrics =  !PropertiesUtil.getBoolean(props, "flink.disable-metrics", false);

        if (startupOffsetsTimestamp > 0) {
            setStartFromTimestamp(startupOffsetsTimestamp);
        } else {
            setStartFromGroupOffsets();
        }

        // configure the polling timeout
        try {
            if (properties.containsKey(KEY_POLL_TIMEOUT)) {
                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
            } else {
                this.pollTimeout = DEFAULT_POLL_TIMEOUT;
            }
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + ''', e);
        }
    }

了解dts-cdc.table.name参数:如果表名匹配错误,数据将不会被数据收集器收集,具体见源码反序列化消息内容

    @Override
    public void deserialize(byte[] message, Collector out) throws IOException {
        LazyParseRecordImpl record = new LazyParseRecordImpl(message);

        if (!OperationType.INSERT.equals(record.getOperationType()) && !OperationType.UPDATE.equals(
                record.getOperationType()) && !OperationType.DELETE.equals(record.getOperationType())) {
            return;
        }
        if (StringUtils.isBlank(tableName) || !tableName.equals(record.getSchema().getFullQualifiedName().get())) {
            return;
        }

        if (record.getOperationType() == OperationType.INSERT) {
            GenericRowData insert = extractAfterRow(record);
            insert.setRowKind(RowKind.INSERT);
            out.collect(insert);
        } else if (record.getOperationType() == OperationType.DELETE) {
            GenericRowData delete = extractBeforeRow(record);
            delete.setRowKind(RowKind.DELETE);
            out.collect(delete);
        } else {
            GenericRowData before = extractBeforeRow(record);
            before.setRowKind(RowKind.UPDATE_BEFORE);
            out.collect(before);

            GenericRowData after = extractAfterRow(record);
            after.setRowKind(RowKind.UPDATE_AFTER);
            out.collect(after);
        }
    }

综上,可以得知:
dts.server就是kafka的服务地址,topic自然就是kafka的topic,dts.sid、dts.user、dts.password用于kafka的jaas认证,dts.checkpoint为kafka的消费方式从指定TIMESTAMP位置开始消费。
dts-cdc.table.name是数据订阅实际表的表名,format为dts-cdc,是dts读取数据库bin_log文件产生数据的格式。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/782670.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号