栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink CDC + Oracle Demo

Flink CDC + Oracle Demo

本文用于说明Flink集成oracle cdc的方式

pom.xml


			org.apache.flink
			flink-java
			${flink.version}

			
		
		
			org.apache.flink
			flink-clients_2.11
			${flink.version}
		
		
			com.ververica
			flink-connector-oracle-cdc

			
			2.1.1
		
		
			org.apache.flink
			flink-connector-jdbc_2.11
			${flink.version}
		
		
			org.apache.flink
			flink-table-planner-blink_${scala.version}
			${flink.version}
		
		
			org.apache.flink
			flink-streaming-java_${scala.version}
			${flink.version}
		

		
			org.apache.flink
			flink-clients_${scala.version}
			${flink.version}
		
		
			org.apache.flink
			flink-cep_${scala.version}
			${flink.version}
		
		
			org.apache.flink
			flink-json
			${flink.version}
		
		
			com.alibaba
			fastjson
			1.2.51
		
public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.setProperty("database.tablename.case.insensitive", "false");
        props.setProperty("database.connection.adapter", "logminer");
        // 要同步快,这个配置必须加,不然非常慢
        props.setProperty("log.mining.strategy", "online_catalog");
        props.setProperty("log.mining.continuous.mine", "true");
        SourceFunction sourceFunction = OracleSource.builder().hostname("ip").port(1521).database("ORCL")
            .schemaList("schema")
            .tableList("schema.table")
            .debeziumProperties(props).username("username").password("password").deserializer(new JsonDebeziumDeserializationSchema()) 
            .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(sourceFunction).process(new Process()).setParallelism(1);

        env.execute("OracleSource Demo");
    }
public class Process extends ProcessFunction {
    private static AtomicInteger count = new AtomicInteger();
    @Override
    public void processElement(String s, ProcessFunction.Context context, Collector collector) throws Exception {
        count.addAndGet(1);
        System.out.println(count.get());
        System.out.println(s);
        collector.collect(s);
    }
} 

Oracle开启归档模式

sqlplus / as sysdba

archive log list; // 查看是否开启归档


shutdown immediate;
startup mount;
alter database archivelog; // 开启归档

alter database open; // 开启数据库

Oracle归档日志过小会报错,导致oralce连不上报错的问题

可以将oracle强制停止

然后mount启动

将db_recovery_file_dest_size调大(结合磁盘大小)

alter system set db_recovery_file_dest_size=100g scope=both;

需要注意的是,flink-connector-oracle-cdc和flink的版本需要注意对应关系,版本对应不上就会出现报错

-- 有缘登山,寒山不寒

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号