1. Flink CDC 项目:GitHub - ververica/flink-cdc-connectors: Change Data Capture (CDC) Connectors for Apache Flink
欢迎关注(star)
2. 项目文档: Welcome to Flink CDC — Flink CDC 2.0.0 documentation
3. 社区论坛:鼓励在论坛中提问,技术专家值守,保证有问必答 https://github.com/ververica/flink-cdc-connectors/discussions
4. 需求 & Bug 反馈: 问题(bug)反馈,新feature需求,请直接开 issue,地址:Issues · ververica/flink-cdc-connectors · GitHub
(请用英语,照顾外国小伙伴)
5. 社区贡献:issue列表里有三个列表:
开发列表: https://github.com/ververica/flink-cdc-connectors/issues/339
bug 列表: https://github.com/ververica/flink-cdc-connectors/issues/340
需求列表:Tracking User Requirements · Issue #341 · ververica/flink-cdc-connectors · GitHub
大家有兴趣可以在里面挑选贡献,贡献的 PR 需要人 Review,可以在群里ping 雪尽 或 云邪
社区文章:
Flink 中文社区 | 中文学习教程
Github代码:
https://github.com/ververica/flink-cdc-connectors?spm=a2csy.flink.0.0.3d263bdcVv1mgE
Flink CDC的文档:
Streaming ETL for MySQL and Postgres with Flink CDC — Flink CDC 2.0.0 documentation
Flink CDC和doris的使用:
Flink CDC结合Doris flink connector实现Mysql数据实时入Apache Doris
以前的flink CDCD 的痛点:
疑问:
动态加表的问题:
https://github.com/ververica/flink-cdc-connectors/issues/55
上面的问题也回答了任务恢复的问题。
写一个初始的简单demo案例:主程序:
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
//import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Properties;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
Properties debeziumProperties = new Properties();
debeziumProperties.put("snapshot.locking.mode", "none");
debeziumProperties.put("serverTimezone", "UTC");
debeziumProperties.put("characterEncoding", "UTF-8");
SourceFunction sourceFunction = MySqlSource.builder()
.hostname("mysql57-main.dev.xx.cc")
.port(6612)
.databaseList("xxx") // set captured database
.tableList("xxx.aaa") // set captured table
.username("canal")
.password("canal")
.deserializer(new JsonDeserializationSchema()) // converts SourceRecord to String
.debeziumProperties(debeziumProperties)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction).print("===>>>").setParallelism(1);
env.execute();
}
}
canal格式数据转成json
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; public class JsonDeserializationSchema implements DebeziumDeserializationSchema{ private static final long serialVersionUID = -3168848963265670603L; public JsonDeserializationSchema() { } @Override public void deserialize(SourceRecord record, Collector out) throws Exception { Struct valueStruct = (Struct) record.value(); Struct sourceStrut = valueStruct.getStruct("source"); //获取数据库的名称 String database = sourceStrut.getString("db"); //获取表名 String table = sourceStrut.getString("table"); //获取类型(c-->insert,u-->update) String type = Envelope.operationFor(record).toString().toLowerCase(); System.out.println("type = "+type); if(type.equals("create")){ type="insert"; } JSonObject json = new JSonObject(); json.put("database",database); json.put("table",table); json.put("type",type); //获取数据data Struct afterStruct = valueStruct.getStruct("after"); Struct beforeStruct = valueStruct.getStruct("before"); System.out.println("beforeStruct = " + beforeStruct); JSonObject dataJson = new JSonObject(); if(afterStruct!=null){ for (Field field : afterStruct.schema().fields()) { String fieldName = field.name(); Object fieldValue = afterStruct.get(field); dataJson.put(fieldName,fieldValue); } }else { if (beforeStruct !=null){ for (Field field : beforeStruct.schema().fields()) { String fieldName = field.name(); Object fieldValue = beforeStruct.get(field); dataJson.put(fieldName,fieldValue); } } } json.put("data",dataJson); //向下游传递数据 out.collect(json); } @Override public TypeInformation getProducedType() { return BasicTypeInfo.of(JSONObject.class); } }
注意:要给连接的用户给类似canal的权限
GRANT SELECT, SHOW DATAbaseS, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
未完待续。。。。。。。。。。。。。。



