目录
1.写在前面
2.Maven依赖
3.代码实现
3.1 自定义反序列化器 CustomerDeserialization
3.2 主函数 FlinkCDCWithCustomerDeserialization
4.集群测试
4.1 环境准备
4.2 查看任务结果
1.写在前面
Flink CDC有两种实现方式,一种是DataStream,另一种是FlinkSQL方式。
DataStream方式:优点是可以应用于多库多表,缺点是需要自定义反序列化器(灵活)FlinkSQL方式:优点是不需要自定义反序列化器,缺点是只能应用于单表查询
2.Maven依赖
org.apache.flink
flink-java
1.12.7
org.apache.flink
flink-streaming-java_2.12
1.12.7
org.apache.flink
flink-clients_2.12
1.12.7
org.apache.hadoop
hadoop-client
2.7.7
mysql
mysql-connector-java
5.1.49
com.alibaba.ververica
flink-connector-mysql-cdc
1.2.0
com.alibaba
fastjson
1.2.75
org.apache.flink
flink-table-planner-blink_2.12
1.12.7
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
org.apache.maven.plugins
maven-compiler-plugin
8
8
3.代码实现
3.1 自定义反序列化器 CustomerDeserialization
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema {
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
//1.创建JSON对象用于存储最终数据
JSonObject result = new JSonObject();
//2.获取库名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSonObject beforeJson = new JSonObject();
if (before != null) {
Schema beforeSchema = before.schema();
List beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforevalue = before.get(field);
beforeJson.put(field.name(), beforevalue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSonObject afterJson = new JSonObject();
if (after != null) {
Schema afterSchema = after.schema();
List afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//6.将字段写入JSON对象
result.put("database", database);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//7.输出数据
collector.collect(result.toJSonString());
}
@Override
public TypeInformation getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
3.2 主函数 FlinkCDCWithCustomerDeserialization
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDCWithCustomerDeserialization {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("192.168.0.111")
.port(3306)
.username("root")
.password("Gmt123456@")
.databaseList("gmall2021")
.tableList("gmall2021.base_trademark") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
.deserializer(new CustomerDeserialization())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource streamSource = env.addSource(sourceFunction);
//3.打印数据
streamSource.print();
//4.启动任务
env.execute("FlinkCDCWithCustomerDeserialization");
}
}
4.集群测试
4.1 环境准备
启动ha-hadoop集群:sh ha-hadoop.sh start 创建作业归档目录,只需要创建一次:hdfs dfs -mkdir /flink-jobhistory
启动Flink集群和任务历史服务
- start-cluster.sh
- historyserver.sh start
运行该Flink任务:/opt/software/flink-1.12.7/bin/flink run -m yarn-cluster -ys 1 -ynm gmall-flink-cdc -c com.ucas.FlinkCDCWithCustomerDeserialization -d /root/mySoftware/gmall-flink-cdc.jar
4.2 查看任务结果
3.1 自定义反序列化器 CustomerDeserialization
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema {
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
//1.创建JSON对象用于存储最终数据
JSonObject result = new JSonObject();
//2.获取库名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSonObject beforeJson = new JSonObject();
if (before != null) {
Schema beforeSchema = before.schema();
List beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforevalue = before.get(field);
beforeJson.put(field.name(), beforevalue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSonObject afterJson = new JSonObject();
if (after != null) {
Schema afterSchema = after.schema();
List afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//6.将字段写入JSON对象
result.put("database", database);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//7.输出数据
collector.collect(result.toJSonString());
}
@Override
public TypeInformation getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
3.2 主函数 FlinkCDCWithCustomerDeserialization
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDCWithCustomerDeserialization {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("192.168.0.111")
.port(3306)
.username("root")
.password("Gmt123456@")
.databaseList("gmall2021")
.tableList("gmall2021.base_trademark") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
.deserializer(new CustomerDeserialization())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource streamSource = env.addSource(sourceFunction);
//3.打印数据
streamSource.print();
//4.启动任务
env.execute("FlinkCDCWithCustomerDeserialization");
}
}
4.集群测试
4.1 环境准备
启动ha-hadoop集群:sh ha-hadoop.sh start 创建作业归档目录,只需要创建一次:hdfs dfs -mkdir /flink-jobhistory
启动Flink集群和任务历史服务
- start-cluster.sh
- historyserver.sh start
运行该Flink任务:/opt/software/flink-1.12.7/bin/flink run -m yarn-cluster -ys 1 -ynm gmall-flink-cdc -c com.ucas.FlinkCDCWithCustomerDeserialization -d /root/mySoftware/gmall-flink-cdc.jar
4.2 查看任务结果
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDCWithCustomerDeserialization {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction sourceFunction = MySQLSource.builder()
.hostname("192.168.0.111")
.port(3306)
.username("root")
.password("Gmt123456@")
.databaseList("gmall2021")
.tableList("gmall2021.base_trademark") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
.deserializer(new CustomerDeserialization())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource streamSource = env.addSource(sourceFunction);
//3.打印数据
streamSource.print();
//4.启动任务
env.execute("FlinkCDCWithCustomerDeserialization");
}
}
4.集群测试
4.1 环境准备
启动ha-hadoop集群:sh ha-hadoop.sh start 创建作业归档目录,只需要创建一次:hdfs dfs -mkdir /flink-jobhistory
启动Flink集群和任务历史服务
- start-cluster.sh
- historyserver.sh start
运行该Flink任务:/opt/software/flink-1.12.7/bin/flink run -m yarn-cluster -ys 1 -ynm gmall-flink-cdc -c com.ucas.FlinkCDCWithCustomerDeserialization -d /root/mySoftware/gmall-flink-cdc.jar
4.2 查看任务结果
- 启动ha-hadoop集群:sh ha-hadoop.sh start
- start-cluster.sh
- historyserver.sh start
创建作业归档目录,只需要创建一次:hdfs dfs -mkdir /flink-jobhistory
启动Flink集群和任务历史服务
4.2 查看任务结果
(1)打开yarn,查看任务:http://192.168.0.112:8088/cluster/apps,并且通过id点击进去
(2)点击Tracking URL,进入FlinkWeb界面
(3) 打开左侧TaskManagers中的Stdout查看控制台输出信息



