栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

实时数仓(四)业务数据从ods到dwd中自定义sink

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

实时数仓(四)业务数据从ods到dwd中自定义sink

(1)维度数据写入Hbase

package com.yyds.app.function;

import com.alibaba.fastjson.JSONObject;
import com.yyds.common.FlinkConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Set;



public class DimSinkFunction extends RichSinkFunction {


    private Connection connection;

    // 初始化连接
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName(FlinkConfig.PHOENIX_DRIVER);
        connection = DriverManager.getConnection(
                FlinkConfig.PHOENIX_SERVER
        );
    }

    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        // value = {"sinkTable":"", "database":"","before":{},"after":{},"type":"insert","tableName":""}
        // phoenix sql语句
        // upsert into db.tn (id,name) values('...','...')
        PreparedStatement preparedStatement = null;

        try {
            //1、获取sql
            String upsertSql = getUpsertSql(
                    value.getString("sinkTable"),
                    value.getJSONObject("after")
            );
            System.out.println("phoenix sql ===> " + upsertSql);
            // 2、预编译sql语句
            preparedStatement = connection.prepareStatement(upsertSql);
            // 3、执行插入操作
            preparedStatement.executeUpdate();

            connection.commit();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if(preparedStatement != null){
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    // upsert into db.tn (id,name) values('...','...')
    private String getUpsertSql(String sinkTable, JSONObject after) {
        String sql = " upsert into " + FlinkConfig.Hbase_SCHEMA + "." + sinkTable + " ( " ;
        Set keySet = after.keySet();
        Collection values = after.values();
        sql = StringUtils.join(keySet,",") + " ) values ('" +
                StringUtils.join(values,"','") + "')";
        return sql;
    }
}

 
(2)保存业务数据到 Kafka 主题 
package com.yyds.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class MyKafkaUtils {


    private static String brokers = "centos01:9092,centos02:9092,centos03:9092";


    private static String defaultTopic = "DWD_DEFAULT_TOPIC";

    

    public static  FlinkKafkaProducer getKafkaProducer(KafkaSerializationSchema kafkaSerializationSchema){

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);

        return new FlinkKafkaProducer(
                defaultTopic,
                kafkaSerializationSchema,
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_onCE
        );
    }




    public static FlinkKafkaConsumer getKafkaConsumer(String topic,String groupId){
        Properties properties = new Properties();

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers);

        return new FlinkKafkaConsumer(
                topic,
                new SimpleStringSchema(),
                properties
        );
    }


}

(3)主程序代码
//TODO 8、kafka数据到kafka主题  和 Hbase流数据写入phoenix表
        hbaseDataStream.addSink(new DimSinkFunction());
        kafkaDataStream.addSink(MyKafkaUtils.getKafkaProducer(new KafkaSerializationSchema() {
            @Override
            public ProducerRecord serialize(JSONObject element, @Nullable Long timestamp) {
                return new ProducerRecord(
                        element.getString("sinkTable"),
                        element.getString("after").getBytes()
                );
            }
        }));

其他参考:
实时数仓(三)业务数据从ods到dwd中数据的动态分流:
https://blog.csdn.net/qq_44665283/article/details/123724897

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号