1.flink版本:1.13.0
2.Oracle版本:12c
3.Oracle CDC:2.1.0
pom全文件com.ververica flink-connector-oracle-cdc2.1.0
主要代码实现cd4.0.0 flinkTest flinkTest1.0-SNAPSHOT 1.13.0 2.12 org.apache.flink flink-java${flink.version} com.ververica flink-connector-mysql-cdc2.1.0 mysql-connector-java mysql slf4j-api org.slf4j jackson-databind com.fasterxml.jackson.core jackson-jaxrs-json-provider com.fasterxml.jackson.jaxrs jaxb-api javax.xml.bind jetty-servlet org.eclipse.jetty org.apache.flink flink-streaming-java_${scala.version}${flink.version} slf4j-api org.slf4j org.apache.flink flink-clients_${scala.version}${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.version}${flink.version} org.apache.hudi hudi-flink-bundle_2.110.8.0 org.apache.flink flink-hadoop-compatibility_2.111.11.0 org.apache.hadoop hadoop-client3.1.1 org.apache.flink flink-connector-jdbc_2.121.13.0 org.apache.flink flink-table-planner-blink_${scala.version}${flink.version} org.slf4j slf4j-simple1.7.26 com.ververica flink-connector-oracle-cdc2.1.0 org.apache.flink flink-sql-avro1.12.3 org.apache.flink flink-connector-hive_2.111.13.0 flink-connector-base org.apache.flink mysql mysql-connector-java8.0.23 org.apache.flink flink-csv${flink.version} org.apache.flink flink-json1.13.0 org.apache.flink flink-jdbc_2.121.10.3 force-shading org.apache.flink mysql mysql-connector-java8.0.23 org.apache.maven.plugins maven-shade-plugin3.1.0 org.apache.maven.plugins maven-jar-plugin2.6 true lib/ com.testCdC org.apache.maven.plugins maven-dependency-plugin2.10 copy-dependencies package copy-dependencies ${project.build.directory}/lib org.apache.maven.plugins maven-compiler-plugin8 8
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE flink (n" +
" id STRING,n" +
" name STRING,n" +
" age STRING," +
" sex STRINGn" +
") WITH (n" +
" 'connector' = 'oracle-cdc'," +
" 'hostname' = '127.0.0.1',n" +
" 'port' = '6045',n" +
" 'username' = 'liuyun',n" +
" 'password' = '123456',n"+
" 'database-name' = 'xe',n"+
" 'schema-name' = 'LIUYUN',n"+
" 'debezium.log.mining.continuous.mine'='true',n"+
" 'debezium.log.mining.strategy'='online_catalog',n"+
" 'table-name' = 'Flink')");
TableResult tableResult = tableEnv.executeSql("select * from flink");
tableResult.print();
env.execute();
}
运行结果
相关问题排查
相关文档地址:https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/oracle-cdc.html
1读取数据有延时在 create 语句中加上如下两个配置项:
'debezium.log.mining.strategy'='online_catalog', 'debezium.log.mining.continuous.mine'='true'
debezium 对监控时序范围和休眠时间的调节原理:
流程图参考:
https://mp.weixin.qq.com/s/IQiK7enF5fX0ighRE_i2sg
[ERROR] Could not execute SQL statement. Reason: io.debezium.DebeziumException: Supplemental logging not configured for table LIUYUN.flink Use command: ALTER TABLE LIUYUN.flink ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
参看文档: https://docs.oracle.com/cd/E11882_01/server.112/e41084/sql_elements008.htm
可以在 create 语句中加上 :
'debezium.database.tablename.case.insensitive'='false',3 包冲突
这个主要表现在kafka的相关包冲突
maven中解决这些冲突就行



