package com.lcy.app.customer; import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; public class CustomerBinlogDeserialization implements DebeziumDeserializationSchema{ @Override public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { String topic = sourceRecord.topic(); String[] tableInfos = topic.split("\."); String tableName = tableInfos[2]; String dbName = tableInfos[1]; Struct value = (Struct)sourceRecord.value(); Struct before = value.getStruct("before"); List fields = null; JSonObject beforeJson = new JSonObject(); if (before == null) { fields = before.schema().fields(); fields.forEach(field -> { Object v = before.get(field.name()); beforeJson.put(field.name(), v); }); } Struct after = value.getStruct("after"); JSonObject afterJson = new JSonObject(); if (after != null) { fields = after.schema().fields(); fields.forEach(field -> { Object v = after.get(field.name()); afterJson.put(field.name(), v); }); } Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toLowerCase(); if ("create".equals(type)) { type = "insert"; } JSonObject result = new JSonObject(); result.put("dbName",dbName); result.put("tableName",tableName); result.put("type",type); result.put("before",beforeJson); result.put("after",afterJson); collector.collect(result.toJSonString()); } @Override public TypeInformation getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }



