SourceRecord{
sourcePartition={server=mysql_binlog_source},
sourceOffset={ts_sec=1648039948, file=mysql-bin.000064, pos=2881, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.base_trademark',
kafkaPartition=null, key=Struct{id=12},
keySchema=Schema{mysql_binlog_source.flink.base_trademark.Key:STRUCT},
value=Struct{before=Struct{id=12,tm_name=yyds,logo_url=aaa},
source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1648039948000,db=flink,table=base_trademark,server_id=1,file=mysql-bin.000064,pos=3018,row=0,thread=26},op=d,ts_ms=1648039948679},
valueSchema=Schema{mysql_binlog_source.flink.base_trademark.Envelope:STRUCT},
timestamp=null,
headers=ConnectHeaders(headers=)
}
(2) 自定义序列化器
package com.yyds; import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; public class MyDeserialization implements DebeziumDeserializationSchema(3) 测试{ @Override public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { JSONObject res = new JSONObject(); // 获取数据库和表名称 String topic = sourceRecord.topic(); String[] fields = topic.split("\."); String database = fields[1]; String tableName = fields[2]; Struct value = (Struct)sourceRecord.value(); // 获取before数据 Struct before = value.getStruct("before"); JSONObject beforeJson = new JSONObject(); if(before != null){ Schema beforeSchema = before.schema(); List beforeFields = beforeSchema.fields(); for (Field field : beforeFields) { Object beforevalue = before.get(field); beforeJson.put(field.name(),beforevalue); } } // 获取after数据 Struct after = value.getStruct("after"); JSONObject afterJson = new JSONObject(); if(after != null){ Schema afterSchema = after.schema(); List afterFields = afterSchema.fields(); for (Field field : afterFields) { Object afterValue = after.get(field); afterJson.put(field.name(),afterValue); } } //获取操作类型 READ DELETE UPDATE CREATE Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toLowerCase(); if("create".equals(type)){ type = "insert"; } // 将字段写到json对象中 res.put("database",database); res.put("tableName",tableName); res.put("before",beforeJson); res.put("after",afterJson); res.put("type",type); //输出数据 collector.collect(res.toString()); } @Override public TypeInformation getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }
package com.yyds;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDCWithMyDeserialization {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME","root");
// 1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
env.enableCheckpointing(5000L);
//指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置超时时间
// env.getCheckpointConfig().setAlignmentTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// 重启策略
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
//设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));
// 2、通过cdc构建SourceFunction并且读取数据
DebeziumSourceFunction mySQLSource = MySQLSource.builder()
.hostname("centos01")
.port(3306)
.username("root")
.password("123456")
.databaseList("flink")
.tableList("flink.base_trademark") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据 注意:指定的时候需要使用"db.table"的方式
.deserializer(new MyDeserialization())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource streamSource = env.addSource(mySQLSource);
// 3、打印数据
streamSource.print();
// 4、启动任务
env.execute("FlinkCDCWithMyDeserialization");
}
}
结果:
{"database":"flink","before":{},"after":{"tm_name":"苹果","logo_url":"/static/default.jpg","id":2},"type":"insert","tableName":"base_trademark"}
{"database":"flink","before":{},"after":{"tm_name":"华为","logo_url":"/static/default.jpg","id":3},"type":"insert","tableName":"base_trademark"}
{"database":"flink","before":{},"after":{"tm_name":"TCL","logo_url":"/static/default.jpg","id":4},"type":"insert","tableName":"base_trademark"}



