flinkcdc是一款flink的开源项目,他继承了传统的cdc工具,让实时开发更适合flink语言详情见下:
1:开发前的准备cdc开发所需要的maven依赖-切记flink版本的更新导致不同的工具在使用时候会存在不兼容的问题
2:开启mysql的binlog日志4.0.0 com.lkr.flink flink-cdc1.0 jar Flink Quickstart Job UTF-8 1.13.5 1.8 2.12 2.12.1 8 8 5.1.49 2.0.0 1.2.75 apache.snapshots Apache Development Snapshot Repository https://repository.apache.org/content/repositories/snapshots/ false true org.apache.flink flink-java${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version}${flink.version} provided org.apache.flink flink-clients_${scala.binary.version}${flink.version} provided org.apache.hadoop hadoop-common2.8.0 org.apache.hive hive-exec2.3.6 org.apache.hadoop hadoop-hdfs2.8.0 org.apache.hadoop hadoop-client2.8.0 junit junit3.8.1 test org.apache.flink flink-table-planner-blink_${scala.binary.version}${flink.version} provided com.ververica flink-connector-mysql-cdc${flinkcdc.version} com.alibaba fastjson${fastjson.version} org.apache.flink flink-connector-jdbc_${scala.binary.version}${flink.version} mysql mysql-connector-java${mysql.version} org.apache.flink flink-connector-kafka_${scala.binary.version}${flink.version} com.alibaba fastjson1.2.68 mysql mysql-connector-java5.1.47 org.apache.maven.plugins maven-compiler-plugin3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin3.1.1 package shade org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA com.lkr.flink.StreamingJob
一般binlog都保存在/etc/my.cnf中具体配置如下
开启日之后可以登陆到mysql中查看binlog是否开启(on表示已经开启)
之后我们需要在mysql创建一张测试的表(test_cdc)
3:创建kafka的toptic用于我们最后将同步数据sink到对应的主题4:打开kafka和zk在目标端开启对应的消费者 5:flinkcdc默认的序列化器不够灵活我们首先可以自定义序列化器
package flink_cdc; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.data.Struct; import java.util.List; public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema6:为了方操作创建一个简单的kafka的连接类{ @Override public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { Struct value = (Struct) sourceRecord.value(); Struct source = value.getStruct("source"); //获取数据库名称 String db = source.getString("db"); String table = source.getString("table"); //获取数据类型 String type = Envelope.operationFor(sourceRecord).toString().toLowerCase(); if(type.equals("create")){ type = "insert"; } JSonObject jsonObject = new JSonObject(); jsonObject.put("database",db); jsonObject.put("table",table); jsonObject.put("type",type); //获取数据data Struct after = value.getStruct("after"); JSonObject jsonObject2 = new JSonObject(); List fields = after.schema().fields(); for (Field field : fields) { String field_name = field.name(); Object fieldValue = after.get(field); jsonObject2.put(field_name,fieldValue); } jsonObject.put("date",jsonObject2); //向下游传递数据 collector.collect(jsonObject.toJSonString()); } @Override public TypeInformation getProducedType() { return TypeInformation.of(String.class); } }
package flink_cdc;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class kafakUntil {
private static String KAFKA_SERVER = "hadoop:9092";
private static Properties properties = new Properties();
static {
properties.setProperty("bootstrap.servers",KAFKA_SERVER);
}
public static FlinkKafkaProducer getKafkaSink(String topic){
return new FlinkKafkaProducer(topic,new SimpleStringSchema(),properties);
}
}
7:入口类
1:我们使用的是flinkcdc-mysql的jar包所以远端只能直连mysql其他的连接可以在官网下载对应的依赖或者jar包
package flink_cdc;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class demo_cdc {
public static void main(String[] args) throws Exception {
// Properties debeziumProperties = new Properties();
// debeziumProperties.put("snapshot.locking.mode", "none");
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DebeziumSourceFunction source = MySqlSource.builder()
.hostname("192.168.144.130")
.port(3306)
.databaseList("cdc_test")
.tableList("cdc_test.*")
.username("root")
.password("123456")
.deserializer(new MyDeserializationSchemaFunction()) //cdc读取数据的模式
.startupOptions(StartupOptions.initial())
// .debeziumProperties(debeziumProperties)
.build();
DataStreamSource mysql_source = environment.addSource(source);
mysql_source.addSink(kafakUntil.getKafkaSink("test_cdc"));
environment.execute("flink-cdc");
}
}
至此我们就可以启动主程序完成一个实时的cdc同步任务
8:在msql端新增或者修改一条或多条数据 9:kafka端就会产生对应的json结果进行反馈 10:注意kafak只支持upsert操作,如果源端有删除操作,java程序会报错


