栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink CDC(二)自定义反序列化器

Flink CDC(二)自定义反序列化器

自定义反序列化器 (1)java生成的数据
SourceRecord{
sourcePartition={server=mysql_binlog_source}, 
sourceOffset={ts_sec=1648039948, file=mysql-bin.000064, pos=2881, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.flink.base_trademark', 
kafkaPartition=null, key=Struct{id=12}, 
keySchema=Schema{mysql_binlog_source.flink.base_trademark.Key:STRUCT},
value=Struct{before=Struct{id=12,tm_name=yyds,logo_url=aaa},
source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1648039948000,db=flink,table=base_trademark,server_id=1,file=mysql-bin.000064,pos=3018,row=0,thread=26},op=d,ts_ms=1648039948679},
 valueSchema=Schema{mysql_binlog_source.flink.base_trademark.Envelope:STRUCT},
timestamp=null, 
headers=ConnectHeaders(headers=)
}
(2) 自定义序列化器
package com.yyds;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;


public class MyDeserialization implements DebeziumDeserializationSchema {



    
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

        JSONObject res = new JSONObject();

        // 获取数据库和表名称
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\.");
        String database = fields[1];
        String tableName = fields[2];



        Struct value = (Struct)sourceRecord.value();
        // 获取before数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if(before != null){
            Schema beforeSchema = before.schema();
            List beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforevalue = before.get(field);
                beforeJson.put(field.name(),beforevalue);
            }
        }


        // 获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if(after != null){
            Schema afterSchema = after.schema();
            List afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(),afterValue);
            }
        }

        //获取操作类型 READ DELETE UPDATE CREATE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if("create".equals(type)){
            type = "insert";
        }

        // 将字段写到json对象中
        res.put("database",database);
        res.put("tableName",tableName);
        res.put("before",beforeJson);
        res.put("after",afterJson);
        res.put("type",type);

        //输出数据
        collector.collect(res.toString());
    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

(3) 测试
package com.yyds;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCWithMyDeserialization {


    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME","root");

        // 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);



        // 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
        env.enableCheckpointing(5000L);

        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 设置超时时间
//        env.getCheckpointConfig().setAlignmentTimeout(10000L);

        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);

        // 重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));

        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );


        // 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));


        // 2、通过cdc构建SourceFunction并且读取数据

        DebeziumSourceFunction mySQLSource = MySQLSource.builder()
                .hostname("centos01")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flink")
                .tableList("flink.base_trademark") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据  注意:指定的时候需要使用"db.table"的方式
                .deserializer(new MyDeserialization())
                .startupOptions(StartupOptions.initial())
                .build();


        DataStreamSource streamSource = env.addSource(mySQLSource);


        // 3、打印数据
        streamSource.print();


        // 4、启动任务
        env.execute("FlinkCDCWithMyDeserialization");



    }
}

结果:

{"database":"flink","before":{},"after":{"tm_name":"苹果","logo_url":"/static/default.jpg","id":2},"type":"insert","tableName":"base_trademark"}
{"database":"flink","before":{},"after":{"tm_name":"华为","logo_url":"/static/default.jpg","id":3},"type":"insert","tableName":"base_trademark"}
{"database":"flink","before":{},"after":{"tm_name":"TCL","logo_url":"/static/default.jpg","id":4},"type":"insert","tableName":"base_trademark"}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774257.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号