栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink-CDC 动态监控 mysql 数据表

Flink-CDC 动态监控 mysql 数据表

Flink-CDC监控mysql的好处在于,在项目中无需向canal和maxwell那样要先将数据先存入kafka,而是直接将数据拉取到实时流当中。
Flink - API方式监控
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class FlinkCDC01_DS {
    public static void main(String[] args) throws Exception {
        // 创建flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);

        // 开启检查点,5秒插入一次
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 建立检查点超时时间为1分钟
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        // 检查点重启次数和重启间隔
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000L));
        // 设置任务关闭的时候保留最后一次CK数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs:///flinkCDC"));
        // 设置访问HDFS用户名
        System.setProperty("HADOOP_USER_NAME", "zyj");

        DebeziumSourceFunction sourceFunction = MySQLSource.builder()
                .hostname("hadoop101") // mysql主机名
                .port(3306) // mysql 端口号
                .databaseList("project_realtime") // 要监控的数据库名,可写多个
                .tableList("project_realtime.t_user") // 要监控的数据表,数据库.数据表方式
                .username("root") // mysql用户名
                .password("root") // mysql登录密码
                .startupOptions(StartupOptions.initial()) // 从最开始的binlog读取数据
                .deserializer(new StringDebeziumDeserializationSchema()) // CDC 输出的文件格式
                .build();

        env
                .addSource(sourceFunction)
                .print();

        env.execute();
    }
}
Flink - SQL方式监控
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class FlinkCDC02_SQL {
    public static void main(String[] args) throws Exception {
        //TODO 1.准备环境
        //1.1 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.2 表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 1.3 设置并行度
        env.setParallelism(1);

        //TODO 2.创建动态表
        tableEnv.executeSql("CREATE TABLE user_info (" +
                "  id INT," +
                "  name STRING," +
                "  age INT" +
                ") WITH (" +
                "  'connector' = 'mysql-cdc'," +
                "  'hostname' = 'hadoop110'," +
                "  'port' = '3306'," +
                "  'username' = 'root'," +
                "  'password' = 'root'," +
                "  'database-name' = 'project_realtime'," +
                "  'table-name' = 't_user'" +
                ")");

        tableEnv.executeSql("select * from user_info").print();

        env.execute();
    }
}
pom依赖


    4.0.0

    com.atguigu.gmall
    gmall0224-cdc
    1.0-SNAPSHOT

    
        
            org.apache.flink
            flink-java
            1.12.0
        

        
            org.apache.flink
            flink-streaming-java_2.11
            1.12.0
        

        
            org.apache.flink
            flink-clients_2.11
            1.12.0
        

        
            org.apache.hadoop
            hadoop-client
            3.0.0
        

        
            mysql
            mysql-connector-java
            5.1.48
        

        
            com.alibaba.ververica
            flink-connector-mysql-cdc
            1.2.0
        

        
            com.alibaba
            fastjson
            1.2.75
        

        
            org.apache.flink
            flink-table-planner-blink_2.11
            1.12.0
        

    

    
        
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.0.0
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698744.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号