FlinkCDC,简单了解下Change Data Capture(变更数据获取)的概念:
核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费
CDC的种类:主要分为基于查询和基于Binlog两种方式,区别:
针对Binlog的CDC,有如下的区别分析:
Flink自然也不甘示弱,FlinkCDC应运而生,通过flink-cdc-connectors 组件,可以直接从MySQL等数据库直接读取全量数据和增量变更数据的source组件
开源地址:https://github.com/ververica/flink-cdc-connectors
版本信息
通过创建maven项目,通过pom文件注入相关依赖:
org.apache.flink flink-java 1.12.0 org.apache.flink flink-streaming-java_2.12 1.12.0 org.apache.flink flink-clients_2.12 1.12.0 org.apache.hadoop hadoop-client 3.1.3 mysql mysql-connector-java 5.1.49 com.alibaba.ververica flink-connector-mysql-cdc 1.2.0 com.alibaba fastjson 1.2.75 org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single
打开IDEA
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
// 1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启 Checkpoint,每隔 5 秒钟做一次 Checkpoint
env.enableCheckpointing(5000L);
//指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置超时时间
env
//设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://centos01:8020/flinkCDC"));
// 2、通过cdc构建SourceFunction并且读取数据
DebeziumSourceFunction mySQLSource = MySQLSource.builder()
.hostname("centos01")
.port(3306)
.username("root")
.password("123456")
.databaseList("flink")
.tableList("flink.base_trademark") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据 注意:指定的时候需要使用"db.table"的方式
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource streamSource = env.addSource(mySQLSource);
// 3、打印数据
streamSource.print();
// 4、启动任务
env.execute("FlinkCDC");
}
}
```shell
flink启动standalone模式
端口号8081
[root@centos01 flink-1.13.1]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host centos01.
Starting taskexecutor daemon on host centos02.
Starting taskexecutor daemon on host centos03.
运行jar包
[root@centos01 flink-1.13.1]# bin/flink run -m centos01:8081 -c com.yyds.FlinkCDC ./flink-cdc-1.0.jar
开启savepoint
[root@centos01 flink-1.13.1]# bin/flink savepoint 88acb63b3d39ab4b6749e7378259676c hdfs://centos01:8020/flinkCDC/savepoint
Triggering savepoint for job 88acb63b3d39ab4b6749e7378259676c.
Waiting for response...
Savepoint completed. Path: hdfs://centos01:8020/flinkCDC/savepoint/savepoint-88acb6-97b82909494b
You can resume your program from this savepoint with the run command.
从savepoint中启动,实现断点续传的功能
[root@centos01 flink-1.13.1]# bin/flink run -m centos01:8081 -s hdfs://centos01:8020/flinkCDC/savepoint/savepoint-88acb6-97b82909494b -c com.yyds.FlinkCDC ./flink-cdc-1.0.jar
2.2 FlinkSql方式
同样首先注入依赖
org.apache.flink flink-table-planner-blink_2.12 1.12.0
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkCDCWithSql {
public static void main(String[] args) throws Exception {
// 1、获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2、DDL方式建表
tableEnv.executeSql("CREATE TABLE binlog (" +
" id BIGINT NOT NULL," +
" tm_name STRING," +
" logo_url STRING" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'centos01'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'database-name' = 'flink'," +
" 'table-name' = 'base_trademark'" +
")");
// 3、查询数据
Table table = tableEnv.sqlQuery("select * from binlog ");
// 4、将动态表转换维流
DataStream> retractStream = tableEnv.toRetractStream(table, Row.class);
retractStream.print();
// tableEnv.executeSql("select * from binlog").print();
// 5、启动任务
env.execute("FlinkCDCSQL");
}
}
二、Flink connector 连接器
Flink作为一个计算引擎,是缺少存储介质的,那么数据从哪儿来,到哪儿去,就需要连接器了,链接各种类型数据库,各种类型组件进行数据的抽取、计算、存储等,下面来看看flink都有哪些connector,怎么使用的?
介绍
看看目前支持的connector:
这是官方给出的:
有些支持数据源,有些不支持数据源,有些支持无边界流式处理,有些不支持,具体看上图。
我们目前市面上用的比较多的数据库,大概是以下几种:
# 支持jdbc mysql mongodb postgresql oracle db2 sybase sqlserver hive # 不支持jdbc hbase es 文件 消息队列(kafka rabbitmq rocketmq)
使用
kafka
CREATE TABLE MyUserTable ( -- declare the schema of the table `user` BIGINT, `message` STRING ) WITH ( -- declare the external system to connect to 'connector' = 'kafka', 'topic' = 'topic_name', 'scan.startup.mode' = 'earliest-offset', -- 还有可选从最近offset开始消费:latest-offset 'properties.bootstrap.servers' = 'localhost:9092', --kafka broker连接串 'format' = 'json' -- declare a format for this system )
详细可见官网链接:flink 官网



