栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink CDC实践

flink CDC实践

1. Flink CDC 项目:GitHub - ververica/flink-cdc-connectors: Change Data Capture (CDC) Connectors for Apache Flink

欢迎关注(star)

2.  项目文档: Welcome to Flink CDC — Flink CDC 2.0.0 documentation

3.  社区论坛:鼓励在论坛中提问,技术专家值守,保证有问必答 https://github.com/ververica/flink-cdc-connectors/discussions

4. 需求 & Bug 反馈: 问题(bug)反馈,新feature需求,请直接开 issue,地址:Issues · ververica/flink-cdc-connectors · GitHub

(请用英语,照顾外国小伙伴)

5. 社区贡献:issue列表里有三个列表: 

开发列表: https://github.com/ververica/flink-cdc-connectors/issues/339

bug 列表: https://github.com/ververica/flink-cdc-connectors/issues/340

需求列表:Tracking User Requirements · Issue #341 · ververica/flink-cdc-connectors · GitHub

大家有兴趣可以在里面挑选贡献,贡献的 PR 需要人 Review,可以在群里ping 雪尽 或 云邪


社区文章:

Flink 中文社区 | 中文学习教程

Github代码:

https://github.com/ververica/flink-cdc-connectors?spm=a2csy.flink.0.0.3d263bdcVv1mgE

Flink CDC的文档:

Streaming ETL for MySQL and Postgres with Flink CDC — Flink CDC 2.0.0 documentation

Flink CDC和doris的使用:

Flink CDC结合Doris flink connector实现Mysql数据实时入Apache Doris

以前的flink CDCD 的痛点:

 

疑问:

动态加表的问题:

https://github.com/ververica/flink-cdc-connectors/issues/55

上面的问题也回答了任务恢复的问题。

写一个初始的简单demo案例: 

主程序:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
//import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;


import java.util.Properties;

public class MySqlBinlogSourceExample {
    public static void main(String[] args) throws Exception {
        Properties debeziumProperties = new Properties();
        debeziumProperties.put("snapshot.locking.mode", "none");
        debeziumProperties.put("serverTimezone", "UTC");
        debeziumProperties.put("characterEncoding", "UTF-8");
        SourceFunction sourceFunction = MySqlSource.builder()
                .hostname("mysql57-main.dev.xx.cc")
                .port(6612)
                .databaseList("xxx") // set captured database
                .tableList("xxx.aaa") // set captured table
                .username("canal")
                .password("canal")
                .deserializer(new JsonDeserializationSchema()) // converts SourceRecord to String
                 .debeziumProperties(debeziumProperties)
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.addSource(sourceFunction).print("===>>>").setParallelism(1);

        env.execute();
    }


}

canal格式数据转成json


import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;


public class JsonDeserializationSchema implements DebeziumDeserializationSchema {
    private static final long serialVersionUID = -3168848963265670603L;


    public JsonDeserializationSchema() {
    }

    @Override
    public void deserialize(SourceRecord record, Collector out) throws Exception {
        Struct valueStruct = (Struct) record.value();
        Struct sourceStrut = valueStruct.getStruct("source");
        //获取数据库的名称
        String database = sourceStrut.getString("db");
        //获取表名
        String table = sourceStrut.getString("table");

        //获取类型(c-->insert,u-->update)
        String type = Envelope.operationFor(record).toString().toLowerCase();
        System.out.println("type = "+type);
        if(type.equals("create")){
            type="insert";
        }



        JSonObject json = new JSonObject();
        json.put("database",database);
        json.put("table",table);
        json.put("type",type);

        //获取数据data
        Struct afterStruct = valueStruct.getStruct("after");
        Struct beforeStruct = valueStruct.getStruct("before");
        System.out.println("beforeStruct = " + beforeStruct);
        JSonObject dataJson = new JSonObject();


        if(afterStruct!=null){
            for (Field field : afterStruct.schema().fields()) {
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(field);
                dataJson.put(fieldName,fieldValue);
            }
        }else {
            if (beforeStruct !=null){
                for (Field field : beforeStruct.schema().fields()) {
                    String fieldName = field.name();
                    Object fieldValue = beforeStruct.get(field);
                    dataJson.put(fieldName,fieldValue);
                }
            }
        }
        json.put("data",dataJson);

        //向下游传递数据
        out.collect(json);
    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.of(JSONObject.class);
    }
}

注意:要给连接的用户给类似canal的权限

GRANT SELECT, SHOW DATAbaseS, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

未完待续。。。。。。。。。。。。。。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350351.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号