栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flinkcdc基于mysql亲测验证有效

flinkcdc基于mysql亲测验证有效

1.修改mysql配置文件

window下的my.ini

# 验证flink cdc 可以自定义,但必须唯一
server-id = 1
# 可以自定义,binlog文件的前缀名
log_bin=mysql-bin
#必须是row 
binlog_format=ROW
# 必须是full          
# binlog_row_image  = FULL
binlog-do-db=bigdata

2.cmd在mysql  bin目录下重启mysql

net stop mysql
net start mysql

2.mysql数据库表结构

3.flink代码

package com.atguigu.gmall.cdc.app;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.atguigu.gmall.cdc.map.MyMapFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;


@Slf4j
public class FlinkCDC01_DS {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        ///TODO 2.开启检查点   Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,
        // 需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK  ,并指定CK的一致性语义
        // env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        //2.2 设置超时时间为1分钟
        // env.getCheckpointConfig().setCheckpointTimeout(60000);
        //2.3 指定从CK自动重启策略
        // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000L));
        //2.4 设置任务关闭的时候保留最后一次CK数据
        // env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.5 设置状态后端
        // env.setStateBackend(new FsStateBackend("hdfs://hadoop202:8020/flinkCDC"));
        // //2.6 设置访问HDFS的用户名
        // System.setProperty("HADOOP_USER_NAME", "atguigu");


        SourceFunction sourceFunction = MySQLSource.builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("bigdata")
                .tableList("bigdata.t_student")
                .username("root")
                .password("123456")
                .serverTimeZone("UTC")
                .startupOptions(StartupOptions.initial())
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();


        env
                .addSource(sourceFunction)
                .map(new MyMapFunction())
                .print();

        env.execute();
    }


}

gitee源码: gmall0224-parent: flink实时数仓项目

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389387.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号