栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink实战-(1)Flink-CDC MySQL同步到MySQL(select)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink实战-(1)Flink-CDC MySQL同步到MySQL(select)

背景

基于select语句的Flink-CDC 适用于数据同步的全量同步的场景,可以结合 Azkaban 或者dolphin scheduler 做定时调度 T+1 数据同步。

1、maven
   
        1.13.6
        2.11
    

    
        
            org.apache.flink
            flink-java
            ${flink.version}
        

        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-runtime-web_${scala.binary.version}
            ${flink.version}
        

        
            com.alibaba.ververica
            flink-connector-mysql-cdc
            1.4.0
        

        
            org.projectlombok
            lombok
            1.18.20
        

        
            com.alibaba
            fastjson
            1.2.75
        
    

    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.1.0
                
                    false
                
                
                    
                        package
                        
                            shade
                        

                        
                            

                                
                                    
                                    com.flink.cdc.demo.MysqlCdcMysql
                                
                                
                                    reference.conf
                                
                            
                            
                                
                                    *:*:*:*
                                    
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                    
                                
                            
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    6
                    6
                
            
        
    
2、MysqlReader
package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class MysqlReader extends RichSourceFunction> {

    private Connection connection = null;
    private PreparedStatement ps = null;


    //该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
        connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
        ps = connection.prepareStatement("select id,username,password from flink_cdc_test.t_test");
    }


    @Override
    public void run(SourceContext> sourceContext) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Tuple3 tuple = new Tuple3();
            tuple.setFields(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3));
            sourceContext.collect(tuple);
        }
    }

    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3、MysqlWriter
package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;



public class MysqlWriter extends RichSinkFunction> {
    private Connection connection = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (connection == null) {
            Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
            connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
        }

        ps = connection.prepareStatement("insert into ods_flink_cdc_test.ods_t_test values (?,?,?)");
        System.out.println("完成");
    }

    @Override
    public void invoke(Tuple3 value, Context context) throws Exception {
        //获取JdbcReader发送过来的结果
        try {
            ps.setInt(1, value.f0);
            ps.setString(2, value.f1);
            ps.setString(3, value.f2);
            ps.executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (ps != null) {
            ps.close();
        }
        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}
4、主类MysqlCdcMysql
package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MysqlCdcMysql {
    public static void main(String[] args) throws Exception {
//        ExecutionEnvironment env  =  ExecutionEnvironment.createRemoteEnvironment("localhost",8081,"D:\flink-steven\target\flink-0.0.1-SNAPSHOT.jar");

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\flink-steven\target\flink-0.0.1-SNAPSHOT.jar");
// 最好不要在程序设置并行度 如果设置了8 要保证安装的flink配置里面的parallelism这个参数大于8 不然会导致资源异常
//        env.setParallelism(8);
        DataStreamSource> dataStream = env.addSource(new MysqlReader());
        dataStream.print();
        dataStream.addSink(new MysqlWriter());
        env.execute("Flink cost MySQL data to write MySQL");
    }
}
5、本地运行

6、打成jar包进行上传

注意:flink版本要和maven里的版本一致 scala版本也要保持一致 

7、运行

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/825689.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号