本文用于说明Flink集成oracle cdc的方式
pom.xml
org.apache.flink flink-java${flink.version} org.apache.flink flink-clients_2.11${flink.version} com.ververica flink-connector-oracle-cdc2.1.1 org.apache.flink flink-connector-jdbc_2.11${flink.version} org.apache.flink flink-table-planner-blink_${scala.version}${flink.version} org.apache.flink flink-streaming-java_${scala.version}${flink.version} org.apache.flink flink-clients_${scala.version}${flink.version} org.apache.flink flink-cep_${scala.version}${flink.version} org.apache.flink flink-json${flink.version} com.alibaba fastjson1.2.51
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.setProperty("database.tablename.case.insensitive", "false");
props.setProperty("database.connection.adapter", "logminer");
// 要同步快,这个配置必须加,不然非常慢
props.setProperty("log.mining.strategy", "online_catalog");
props.setProperty("log.mining.continuous.mine", "true");
SourceFunction sourceFunction = OracleSource.builder().hostname("ip").port(1521).database("ORCL")
.schemaList("schema")
.tableList("schema.table")
.debeziumProperties(props).username("username").password("password").deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction).process(new Process()).setParallelism(1);
env.execute("OracleSource Demo");
}
public class Process extends ProcessFunction{ private static AtomicInteger count = new AtomicInteger(); @Override public void processElement(String s, ProcessFunction .Context context, Collector
Oracle开启归档模式
sqlplus / as sysdba archive log list; // 查看是否开启归档 shutdown immediate; startup mount; alter database archivelog; // 开启归档 alter database open; // 开启数据库
Oracle归档日志过小会报错,导致oralce连不上报错的问题
可以将oracle强制停止
然后mount启动
将db_recovery_file_dest_size调大(结合磁盘大小)
alter system set db_recovery_file_dest_size=100g scope=both;
需要注意的是,flink-connector-oracle-cdc和flink的版本需要注意对应关系,版本对应不上就会出现报错
-- 有缘登山,寒山不寒



