栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

实时同步mysql数据到doris,Mysql+kafka+flink+doris

实时同步mysql数据到doris,Mysql+kafka+flink+doris

1、使用背景

最近组里想要在现有的环境下搭一套实时数仓,综合分析之后doris将会作为实时数仓落地的数据库,数据源有消息数据,还有业务库的数据。

2、数据源接入

消息数据都好说,无论是pulsar,还是kafka,flink官方都已经提供了现成的source接口,照着官方文档去配置一下就ok,但是由于dba这个神秘组织的存在,他们担心会开启bin-log会增大他们的数据库压力,无法给我们开通访问bin-log的权限,作为大头兵的我只能默默接受,因此无法使用flink-cdc的方式去监控pg数据库的bin-log获取变动数据,只能自己写代码,通过查询数据的operation_time获取最新的数据,自己生成kafka消息。
相关代码如下:

public class MysqlToKafka {

    public static void main(String[] args) throws Exception {
        StudentInfo studentInfo = null;
        Properties pro = new Properties();
        pro.put("bootstrap.servers", "hadoop102:9092");
        pro.put("acks", "all");
        pro.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        pro.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        pro.put("retries", 3);

        KafkaProducer kafkaProducer = new KafkaProducer<>(pro);
        MysqlJDBC mysqlJDBC = new MysqlJDBC();
        ResultSet resultSet = null;
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String sql = null;
        mysqlJDBC.init();

        while(true){
            String timeStamp = sdf.format(new Date());
            sql = "select " +
                    "stu_id," +
                    "stu_name," +
                    "course," +
                    "stu_score," +
                    "stu_operation_time " +
                    "from student_info " +
                    "where timestampdiff(minute,stu_operation_time,""+timeStamp+"") <= ""+2+"";";
            resultSet = mysqlJDBC.select(sql);

            while(resultSet.next()){
                studentInfo = new StudentInfo(
                        resultSet.getString("stu_id"),
                        resultSet.getString("stu_name"),
                        resultSet.getString("course"),
                        resultSet.getString("stu_score"),
                        resultSet.getString("stu_operation_time"),
                        ""
                );
                System.out.println(studentInfo);
                kafkaProducer.send(new ProducerRecord<>("MysqlToKafka","mysql", JSON.toJSONString(studentInfo)));
            }
            Thread.sleep(60*1000);
        }
    }
}
3、数据处理和写入

数据处理
上面自己接入数据的逻辑是每过一分钟就去表里查询最近两分钟变化的数据,所以一定会出现重复的数据。
下面需要对数据进行去重处理,去重操作有两个地方可以进行
第一个:通过flink中的状态变量进行精准去重
第二个:在设计doris表的时候设计好字段数据的修改方式为replace,具体的建表语句可以参考doris官方文档

数据写入
往doris写入数据有很多种方式,可以参考doris官方文档,我们开始考虑通过jdbc的方式将数写入到doris中,但是insert into 方式并不适合大量数据的长时间插入,所有只能采用stream load或者使用doris扩展出来的flink-connector-doris,由于这个不是flink官方提供的sink组件所以在maven中央仓库并不能找到相关依赖,按照doris官方介绍可以通过自己编译一个doris的sink,偶然间在这里发现可以通过这种方式添加依赖,然后去调用DorisSink方法去实现,按着他的介绍底层也是Stream load的方式实现的,最后竟然调试成功了。

好奇这个DorisDB企业版文档和官方文档是什么关系??有知道的可以说一下
按照这里的方式添加依赖就可以


    
        dorisdb-maven-releases
        http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-releases/
    
    
        dorisdb-maven-snapshots
        http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-snapshots/
    


    com.dorisdb.connector
    flink-connector-doris
    1.0.32-SNAPSHOT  
    1.0.32_1.13-SNAPSHOT  

不知道这一步什么意思,所以没有操作这一步,好像并不影响

将
com.dorisdb.table.connector.flink.DorisDynamicTableSinkFactory
加入到:
src/main/resources/meta-INF/services/org.apache.flink.table.factories.Factory

相关代码如下:

public class SinkConnectorToDoris {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(3000);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        properties.setProperty("group.id", "test");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("max.poll.records", "10000");

        DataStreamSource dataStream = env.addSource(new FlinkKafkaConsumer<>("MysqlToKafka", new SimpleStringSchema(), properties));
        dataStream
        		//.map(t->JSON.parseObject(t))(多此一举)
                .keyBy(t->t)
                //RichFlatMapFunction对消息进行去重
                .flatMap(new RichFlatMapFunction() {
                    private transient ValueState isExist;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        ValueStateDescriptor vsd = new ValueStateDescriptor<>("isExist", Boolean.class);
                        isExist = getRuntimeContext().getState(vsd);
                    }

                    @Override
                    public void flatMap(String s, Collector collector) throws Exception {
                        if(isExist.value() == null){
                            collector.collect(s);
                            isExist.update(true);
                        }
                    }
                })
                .addSink(
                        DorisSink.sink(
                            DorisSinkOptions.builder()
                                    .withProperty("jdbc-url", "jdbc:mysql://172.17.60.10:19030/doris_qa")
                                    .withProperty("load-url", "172.17.60.10:18030")
                                    .withProperty("username", "root")
                                    .withProperty("password", "")
                                    .withProperty("table-name", "student_info_gxd_test")
                                    .withProperty("database-name", "doris_qa")
                                    .withProperty("sink.properties.format", "json")
                                    .withProperty("sink.properties.strip_outer_array", "true")
                                    .withProperty("sink.buffer-flush.interval-ms","1000")
                            .build()
                    )
                ).setParallelism(1);
            env.execute();
    }
}

注意点:
addSink传入的数据流应该是String泛型的数据流,一开始将数据流进行了.map(t->JSON.parseObject(t)),怎么也插不进去数据,希望大家不要犯同样的错误。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584627.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号