栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink-connector-mysql-cdc

flink-connector-mysql-cdc

官方参考文档:MySQL CDC Connector — Flink CDC documentationhttps://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html 1、测试步骤:

1)开启MySQL Binlog并重启MySQL

2)启动HDFS集群

[hadoop@linux100 flink-1.13.5]$ start-dfs.sh

3)启动Flink集群

[hadoop@linux100 flink-1.13.5]$ ./bin/start-cluster.sh

4) 打包flink程序jar,并上传到服务器

5)启动程序

[hadoop@linux100 flink-1.13.5]$ ./bin/flink run -c com.proj.other.FlinkCDCSql20220119 ./../jars/flink-v1_13_5-1.0-SNAPSHOT-jar-with-dependencies.jar

6)在MySQL的中对目标表进行添加、修改或者删除数据测试
Web界面查看HDFS的NameNode:http://linux100:9870/
Web界面查看flink_job:http://linux100:8081/

7)给当前的Flink程序创建Savepoint

[hadoop@linux100 flink-1.13.5]$ ./bin/flink savepoint 54e9288c149466b0915e8b3d8f067204 hdfs://linux100:8020/flink/save
# eg: ./bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save

8)关闭程序以后从Savepoint重启程序

[hadoop@linux100 flink-1.13.5]$ ./bin/flink run -s hdfs://linux100:8020/flink/save/savepoint-54e928-86a029a5383a -c com.proj.other.FlinkCDCSql20220119 ./../jars/flink-v1_13_5-1.0-SNAPSHOT-jar-with-dependencies.jar
2、Maven dependency


    
        flink-soaring
        com.proj
        1.0-SNAPSHOT
    
    4.0.0
    flink-v1_13_5

    
        1.13.5
        2.12
        2.2.0
    

    
        
            org.apache.flink
            flink-core
            1.13.5
        
        
            org.apache.flink
            flink-streaming-java_2.12
            1.13.5
        

        
            org.apache.flink
            flink-connector-jdbc_2.12
            1.13.5
        

        
            org.apache.flink
            flink-java
            1.13.5
        
        
            org.apache.flink
            flink-clients_2.12
            1.13.5
        
        
            org.apache.flink
            flink-table-api-java-bridge_2.12
            1.13.5
        
        
            org.apache.flink
            flink-table-common
            1.13.5
        

        
            org.apache.flink
            flink-table-planner_2.12
            1.13.5
        

        
            org.apache.flink
            flink-table-planner-blink_2.12
            1.13.5
        
        
            org.apache.flink
            flink-table-planner-blink_2.12
            1.13.5
            test-jar
        
        
            org.apache.flink
            flink-scala_2.12
            1.13.5
        
        
            org.apache.flink
            flink-streaming-scala_2.12
            1.13.5
        
        
            org.apache.flink
            flink-connector-kafka_2.12
            1.13.5
        

        
            org.apache.flink
            flink-table-api-java
            1.13.5
            compile
        
        
            org.apache.flink
            flink-json
            1.13.5
        






        
        
            com.ververica
            flink-connector-mysql-cdc

            2.0.2
        
        
            com.google.code.gson
            gson
            2.8.6
        
        
            org.slf4j
            slf4j-api
            1.7.25
        
        
            org.slf4j
            slf4j-log4j12
            1.7.25
        
        
            log4j
            log4j
            1.2.17
        
        
            org.apache.hadoop
            hadoop-client
            3.1.3
            provided
        
    

    
        
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.3.0
                
                    
                        
                            jar-with-dependencies
                        
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    
3、DataStream Source
package com.proj.other;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkCDCSql20220119 {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        env.enableCheckpointing(3000);

//        String path = "file:///Idea_Projects/workspace/flink-soaring/flink-v1_13_5/cp";
        String path = "hdfs://linux100:8020/ck/cp";
        env.getCheckpointConfig().setCheckpointStorage(path);

        //两个检查点之间间隔时间,默认是0,单位毫秒
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        //Checkpoint过程中出现错误,是否让整体任务都失败,默认值为0,表示不容忍任何Checkpoint失败
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);

        //Checkpoint是进行失败恢复,当一个 Flink 应用程序失败终止、人为取消等时,它的 Checkpoint 就会被清除
        //可以配置不同策略进行操作
        // DELETe_ON_CANCELLATION: 当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务
        // RETAIN_ON_CANCELLATION(多): 当作业手动取消时,将会保留作业的 Checkpoint 状态信息,要手动清除该作业的 Checkpoint 状态信息
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //Flink 默认提供 Extractly-once 保证 State 的一致性,还提供了 Extractly-Once,At-Least-once 两种模式,
        // 设置checkpoint的模式为EXACTLY_ONCE,也是默认的,
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //设置checkpoint的超时时间, 如果规定时间没完成则放弃,默认是10分钟
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        //设置同一时刻有多少个checkpoint可以同时执行,默认为1就行,以避免占用太多正常数据处理资源
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        //设置了重启策略, 作业在失败后能自动恢复,失败后最多重启3次,每次重启间隔10s
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

        // 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

        String productsSourceDDL = "CREATE TABLE products (n" +
                "    id INT,n" +
                "    name STRING,n" +
                "    description STRING,n" +
                "    PRIMARY KEY (id) NOT ENFORCEDn" +
                ") WITH (n" +
                "    'connector' = 'mysql-cdc',n" +
                "    'hostname' = '192.168.10.100',n" +
                "    'port' = '3306',n" +
                "    'username' = 'root',n" +
                "    'password' = '123456',n" +
                "    'database-name' = 'mydb',n" +
                "    'table-name' = 'products'n" +
                ")";

        String ordersSourceDDL = "CREATE TABLE orders (n" +
                "   order_id INT,n" +
                "   order_date TIMESTAMP(0),n" +
                "   customer_name STRING,n" +
                "   price DECIMAL(10, 5),n" +
                "   product_id INT,n" +
                "   order_status BOOLEAN,n" +
                "   PRIMARY KEY (order_id) NOT ENFORCEDn" +
                ") WITH (n" +
                "   'connector' = 'mysql-cdc',n" +
                "   'hostname' = '192.168.10.100',n" +
                "   'port' = '3306',n" +
                "   'username' = 'root',n" +
                "   'password' = '123456',n" +
                "   'database-name' = 'mydb',n" +
                "   'table-name' = 'orders'n" +
                ")";

        String enriched_ordersSinkDDL = "CREATE TABLE enriched_orders (n" +
                "   order_id INT,n" +
                "   order_date TIMESTAMP(0),n" +
                "   customer_name STRING,n" +
                "   price DECIMAL(10, 5),n" +
                "   product_id INT,n" +
                "   order_status BOOLEAN,n" +
                "   product_name STRING,n" +
                "   product_description STRING,n" +
                "   PRIMARY KEY (order_id) NOT ENFORCEDn" +
                ") WITH (n" +
                "   'connector' = 'jdbc',n" +
                "   'url' = 'jdbc:mysql://192.168.10.100:3306/mydb',n" +
                "   'table-name' = 'enriched_orders',n" +
                "   'password' = '123456',n" +
                "   'username' = 'root'n" +
                ")";

        String transformSql = "INSERT INTO enriched_ordersn" +
                "SELECT o.*,n" +
                "       p.name,n" +
                "       p.descriptionn" +
                "FROM orders AS on" +
                "LEFT JOIN products AS p ON o.product_id = p.id";

        tableEnv.executeSql(productsSourceDDL);
        tableEnv.executeSql(ordersSourceDDL);
        tableEnv.executeSql(enriched_ordersSinkDDL);

        tableEnv.executeSql(transformSql).print();
        System.out.println("=============================================================================");
        env.execute("sync-flink-cdc");
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/757961.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号