目录
1.写在前面
2.相关依赖
3.代码实现
4.运行效果
1.写在前面
Flink CDC有两种实现方式,一种是DataStream方式,一种是FlinkSQL方式。
2.相关依赖
org.apache.flink
flink-java
1.12.7
org.apache.flink
flink-streaming-java_2.12
1.12.7
org.apache.flink
flink-clients_2.12
1.12.7
org.apache.hadoop
hadoop-client
2.7.7
mysql
mysql-connector-java
5.1.49
com.alibaba.ververica
flink-connector-mysql-cdc
1.2.0
com.alibaba
fastjson
1.2.75
org.apache.flink
flink-table-planner-blink_2.12
1.12.7
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
org.apache.maven.plugins
maven-compiler-plugin
8
8
3.代码实现
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 开启CK并指定状态后端为FS memory fs rocksdb
env.setStateBackend(new FsStateBackend("hdfs://192.168.0.111:9000/gmall-flink-cdc/ck"));
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
//2.通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("192.168.0.111")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall2021")
.tableList("gmall2021.user_info") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource streamSource = env.addSource(sourceFunction);
//3.打印数据
streamSource.print();
//4.启动任务
env.execute("FlinkCDC");
}
}
4.运行效果
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 开启CK并指定状态后端为FS memory fs rocksdb
env.setStateBackend(new FsStateBackend("hdfs://192.168.0.111:9000/gmall-flink-cdc/ck"));
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
//2.通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("192.168.0.111")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall2021")
.tableList("gmall2021.user_info") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
.deserializer(new StringDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource streamSource = env.addSource(sourceFunction);
//3.打印数据
streamSource.print();
//4.启动任务
env.execute("FlinkCDC");
}
}



