栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

实时数仓(二)业务数据库到ods(kafka)

实时数仓(二)业务数据库到ods(kafka)

利用flink cdc将业务数据库到ods(kafka)

(1)主要代码

package com.yyds.app.ods;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.yyds.app.function.MyDeserialization;
import com.yyds.utils.MyKafkaUtils;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {


    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME","root");

        // 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);



        // 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
        env.enableCheckpointing(5000L);

        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 设置超时时间
//        env.getCheckpointConfig().setAlignmentTimeout(10000L);

        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);

        // 重启策略
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));

        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );


        // 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC/ck"));


        // 2、通过cdc构建SourceFunction并且读取数据
        DebeziumSourceFunction mySQLSource = MySQLSource.builder()
                .hostname("centos01")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flink")
               // .tableList("flink.base_trademark") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据  注意:指定的时候需要使用"db.table"的方式
                .deserializer(new MyDeserialization())
                .startupOptions(StartupOptions.latest())
                .build();


        DataStreamSource streamSource = env.addSource(mySQLSource);


        // 3、打印数据,将数据写入到kafka中
        streamSource.print();
        String sinkTopic = "ods_base_db";
        streamSource.addSink(MyKafkaUtils.getKafkaProducer(sinkTopic));


        // 4、启动任务
        env.execute("FlinkCDCWithMyDeserialization");

    }
}

(2)自定义反序列化器

package com.yyds.app.function;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;


public class MyDeserialization implements DebeziumDeserializationSchema {



    
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

        JSONObject res = new JSONObject();

        // 获取数据库和表名称
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\.");
        String database = fields[1];
        String tableName = fields[2];



        Struct value = (Struct)sourceRecord.value();
        // 获取before数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if(before != null){
            Schema beforeSchema = before.schema();
            List beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforevalue = before.get(field);
                beforeJson.put(field.name(),beforevalue);
            }
        }


        // 获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if(after != null){
            Schema afterSchema = after.schema();
            List afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(),afterValue);
            }
        }

        //获取操作类型 READ DELETE UPDATE CREATE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if("create".equals(type)){
            type = "insert";
        }

        // 将字段写到json对象中
        res.put("database",database);
        res.put("tableName",tableName);
        res.put("before",beforeJson);
        res.put("after",afterJson);
        res.put("type",type);

        //输出数据
        collector.collect(res.toString());
    }

    @Override
    public TypeInformation getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

(3)工具类

package com.yyds.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class MyKafkaUtils {


    private static String brokers = "centos01:9092,centos02:9092,centos03:9092";

    public static FlinkKafkaProducer getKafkaProducer(String topic){
        return new FlinkKafkaProducer(
                brokers,
          topic,
          new SimpleStringSchema()
        );
    }


    public static FlinkKafkaConsumer getKafkaConsumer(String topic,String groupId){
        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);

        return new FlinkKafkaConsumer(
                topic,
                new SimpleStringSchema(),
                properties
        );
    }


}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780710.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号