栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

4.3.3 Flink-流处理框架-Flink CDC数据实时数据同步-Flink CDC实操-DataStream方式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

4.3.3 Flink-流处理框架-Flink CDC数据实时数据同步-Flink CDC实操-DataStream方式

目录

1.写在前面

2.相关依赖

3.代码实现

4.运行效果


1.写在前面

        Flink CDC有两种实现方式,一种是DataStream方式,一种是FlinkSQL方式。

2.相关依赖

        
            org.apache.flink
            flink-java
            1.12.7
        

        
            org.apache.flink
            flink-streaming-java_2.12
            1.12.7
        

        
            org.apache.flink
            flink-clients_2.12
            1.12.7
        

        
            org.apache.hadoop
            hadoop-client
            2.7.7
        

        
            mysql
            mysql-connector-java
            5.1.49
        

        
            com.alibaba.ververica
            flink-connector-mysql-cdc
            1.2.0
        

        
            com.alibaba
            fastjson
            1.2.75
        

        
            org.apache.flink
            flink-table-planner-blink_2.12
            1.12.7
        
    

    
        
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.0.0
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    8
                    8
                
            
        
    

3.代码实现
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK并指定状态后端为FS    memory  fs  rocksdb
        env.setStateBackend(new FsStateBackend("hdfs://192.168.0.111:9000/gmall-flink-cdc/ck"));
        env.enableCheckpointing(5000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);

//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart());

        //2.通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction sourceFunction = MySQLSource.builder()
                .hostname("192.168.0.111")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall2021")
                .tableList("gmall2021.user_info")   //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
                .deserializer(new StringDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource streamSource = env.addSource(sourceFunction);
        //3.打印数据
        streamSource.print();
        //4.启动任务
        env.execute("FlinkCDC");
    }
}

4.运行效果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/770946.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号