Flink-CDC监控mysql的好处在于,在项目中无需向canal和maxwell那样要先将数据先存入kafka,而是直接将数据拉取到实时流当中。
Flink - API方式监控
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC01_DS {
public static void main(String[] args) throws Exception {
// 创建flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(1);
// 开启检查点,5秒插入一次
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
// 建立检查点超时时间为1分钟
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 检查点重启次数和重启间隔
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2000L));
// 设置任务关闭的时候保留最后一次CK数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs:///flinkCDC"));
// 设置访问HDFS用户名
System.setProperty("HADOOP_USER_NAME", "zyj");
DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("hadoop101") // mysql主机名
.port(3306) // mysql 端口号
.databaseList("project_realtime") // 要监控的数据库名,可写多个
.tableList("project_realtime.t_user") // 要监控的数据表,数据库.数据表方式
.username("root") // mysql用户名
.password("root") // mysql登录密码
.startupOptions(StartupOptions.initial()) // 从最开始的binlog读取数据
.deserializer(new StringDebeziumDeserializationSchema()) // CDC 输出的文件格式
.build();
env
.addSource(sourceFunction)
.print();
env.execute();
}
}
Flink - SQL方式监控
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkCDC02_SQL {
public static void main(String[] args) throws Exception {
//TODO 1.准备环境
//1.1 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.2 表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1.3 设置并行度
env.setParallelism(1);
//TODO 2.创建动态表
tableEnv.executeSql("CREATE TABLE user_info (" +
" id INT," +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'hadoop110'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = 'root'," +
" 'database-name' = 'project_realtime'," +
" 'table-name' = 't_user'" +
")");
tableEnv.executeSql("select * from user_info").print();
env.execute();
}
}
pom依赖
4.0.0
com.atguigu.gmall
gmall0224-cdc
1.0-SNAPSHOT
org.apache.flink
flink-java
1.12.0
org.apache.flink
flink-streaming-java_2.11
1.12.0
org.apache.flink
flink-clients_2.11
1.12.0
org.apache.hadoop
hadoop-client
3.0.0
mysql
mysql-connector-java
5.1.48
com.alibaba.ververica
flink-connector-mysql-cdc
1.2.0
com.alibaba
fastjson
1.2.75
org.apache.flink
flink-table-planner-blink_2.11
1.12.0
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single