栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

4.3.4 Flink-流处理框架-Flink CDC数据实时数据同步-Flink CDC实操-DataStream方式-自定义反序列化器实现

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

4.3.4 Flink-流处理框架-Flink CDC数据实时数据同步-Flink CDC实操-DataStream方式-自定义反序列化器实现

目录

1.写在前面

2.Maven依赖

3.代码实现

3.1 自定义反序列化器 CustomerDeserialization

3.2 主函数 FlinkCDCWithCustomerDeserialization

 4.集群测试

4.1 环境准备

4.2 查看任务结果


1.写在前面

Flink CDC有两种实现方式,一种是DataStream,另一种是FlinkSQL方式。

DataStream方式:优点是可以应用于多库多表,缺点是需要自定义反序列化器(灵活)FlinkSQL方式:优点是不需要自定义反序列化器,缺点是只能应用于单表查询

2.Maven依赖

    
        org.apache.flink
        flink-java
        1.12.7
    

    
        org.apache.flink
        flink-streaming-java_2.12
        1.12.7
    

    
        org.apache.flink
        flink-clients_2.12
        1.12.7
    

    
        org.apache.hadoop
        hadoop-client
        2.7.7
    

    
        mysql
        mysql-connector-java
        5.1.49
    

    
        com.alibaba.ververica
        flink-connector-mysql-cdc
        1.2.0
    

    
        com.alibaba
        fastjson
        1.2.75
    

    
        org.apache.flink
        flink-table-planner-blink_2.12
        1.12.7
    



    
        
            org.apache.maven.plugins
            maven-assembly-plugin
            3.0.0
            
                
                    jar-with-dependencies
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
        
            org.apache.maven.plugins
            maven-compiler-plugin
            
                8
                8
            
        
    

3.代码实现

3.1 自定义反序列化器 CustomerDeserialization
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomerDeserialization implements DebeziumDeserializationSchema {

    
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

        //1.创建JSON对象用于存储最终数据
        JSonObject result = new JSonObject();

        //2.获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSonObject beforeJson = new JSonObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforevalue = before.get(field);
                beforeJson.put(field.name(), beforevalue);
            }
        }

        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSonObject afterJson = new JSonObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //5.获取操作类型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        //6.将字段写入JSON对象
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);

        //7.输出数据
        collector.collect(result.toJSonString());

    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

3.2 主函数 FlinkCDCWithCustomerDeserialization
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCWithCustomerDeserialization {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction sourceFunction = MySQLSource.builder()
                .hostname("192.168.0.111")
                .port(3306)
                .username("root")
                .password("Gmt123456@")
                .databaseList("gmall2021")
                .tableList("gmall2021.base_trademark")   //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
                .deserializer(new CustomerDeserialization())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource streamSource = env.addSource(sourceFunction);

        //3.打印数据
        streamSource.print();

        //4.启动任务
        env.execute("FlinkCDCWithCustomerDeserialization");

    }

}

 4.集群测试

4.1 环境准备
    启动ha-hadoop集群:sh ha-hadoop.sh start

    创建作业归档目录,只需要创建一次:hdfs dfs -mkdir /flink-jobhistory

    启动Flink集群和任务历史服务

    1. start-cluster.sh
    2. historyserver.sh start
    运行该Flink任务:/opt/software/flink-1.12.7/bin/flink run -m yarn-cluster -ys 1 -ynm gmall-flink-cdc -c com.ucas.FlinkCDCWithCustomerDeserialization -d /root/mySoftware/gmall-flink-cdc.jar

4.2 查看任务结果

(1)打开yarn,查看任务:http://192.168.0.112:8088/cluster/apps,并且通过id点击进去

​(2)点击Tracking URL,进入FlinkWeb界面

 (3) 打开左侧TaskManagers中的Stdout查看控制台输出信息

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/770936.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号