sqlplus / AS SYSDBA
2. 开启归档日志修改归档日志大小,目录 alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/oradata/dg01/recovery_area' scope=spfile; alter system set db_recovery_file_dest_size=41820M scope=spfile; # 重启数据库实例,打开归档日志 shutdown immediate; startup mount; alter database archivelog; alter database open; # 查看归档 archive log list;3. 开启补全日志
# 开启单个表 ALTER TABLE schema.table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; # 开启全库 ALTER DATAbase ADD SUPPLEMENTAL LOG DATA; # 全体字段补充日志 ## 打开all补全日志(建议执行) alter database add supplemental log data (all) columns; ## 查看是否打开 select supplemental_log_data_all as all from v$database ; ## 删除all补全日志 alter database drop supplemental log data (all) columns;二、创建Oracle用户并授权 1. 创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/oradata/dg01/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
2. 创建用户并授权CREATE USER flink IDENTIFIED BY flink DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flink; GRANT SET ConTAINER TO flink; // GRANT SELECT ON V_$DATAbase to flink; GRANT FLASHBACK ANY TABLE TO flink; GRANT SELECT ANY TABLE TO flink; GRANT SELECT_CATALOG_ROLE TO flink; GRANT EXECUTE_CATALOG_ROLE TO flink; GRANT SELECT ANY TRANSACTION TO flink; GRANT LOGMINING TO flink; GRANT CREATE TABLE TO flink; GRANT LOCK ANY TABLE TO flink; GRANT ALTER ANY TABLE TO flink; GRANT CREATE SEQUENCE TO flink; GRANT EXECUTE ON DBMS_LOGMNR TO flink; GRANT EXECUTE ON DBMS_LOGMNR_D TO flink; GRANT SELECT ON V_$LOG TO flink; GRANT SELECT ON V_$LOG_HISTORY TO flink; GRANT SELECT ON V_$LOGMNR_LOGS TO flink; GRANT SELECT ON V_$LOGMNR_ConTENTS TO flink; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flink; GRANT SELECT ON V_$LOGFILE TO flink; GRANT SELECT ON V_$ARCHIVED_LOG TO flink; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flink;三、代码 1. pom.xml
3. ArchiveLog.java4.0.0 org.example flink-cdc 1.0-SNAPSHOT 8 8 2.12 1.12.1 1.8 org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided com.ververica flink-connector-oracle-cdc 2.1.1 com.alibaba fastjson 1.2.75 junit junit 4.11 test org.projectlombok lombok 1.16.22 com.alibaba druid 1.1.20 mysql mysql-connector-java 5.1.47 org.apache.logging.log4j log4j-core 2.8.2 org.apache.hadoop hadoop-common 2.9.2 org.apache.hadoop hadoop-client 2.9.2 org.apache.hadoop hadoop-hdfs 2.9.2 org.apache.maven.plugins maven-compiler-plugin 3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin 3.0.0 package shade org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* *:* meta-INF public class Demo { public static void main(String[] args) throws Exception { Properties pros = new Properties(); pros.setProperty("debezium.log.mining.strategy", "online_catalog"); pros.setProperty("debezium.log.mining.continuous.mine", "true"); DebeziumSourceFunction sourceFunction = OracleSource. builder() .hostname("ip") .port(1521) .database("数据库") .schemaList("schema") .tableList("schema.table1, schema.table2") .username("flink") .password("flink") .debeziumProperties(pros) .deserializer(new JsonDebeziumDeserializationSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 指定登录hadoop的用户 System.setProperty("HADOOP_USER_NAME", "hadoop"); // 开启检查点 env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 检查点存储位置 env.setStateBackend(new FsStateBackend("hdfs://ip:9000/user/bd/flink/checkpoint/", true)); // 取消作业,checkpoint清除策略 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 数据源 DataStreamSource source = env.addSource(sourceFunction); // SingleOutputStreamOperator archiveLog = source.map((MapFunction ) json -> { ArchiveLog archiveLog1 = JSON.parseObject(json, ArchiveLog.class); return JSON.toJSONString(archiveLog1); }); // 消息存入数据库TiDB archiveLog.addSink(new SinkToTiDB()); env.execute("flink cdc"); } private static class SinkToTiDB extends RichSinkFunction { private transient DruidDataSource dataSource = null; @Override public void open(Configuration parameters) throws Exception { // 数据库连接 dataSource = new DruidDataSource(); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUsername("username"); dataSource.setPassword("password"); dataSource.setUrl("jdbc:mysql://ip:port/database?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false"); dataSource.setMaxActive(5); } @Override public void invoke(String json, Context context) throws Exception { ArchiveLog archiveLog = JSON.parseObject(json, ArchiveLog.class); String op = archiveLog.getOp(); ArchiveLogSource source = archiveLog.getSource(); String after = archiveLog.getAfter(); JSonObject jsonObject = JSON.parseObject(after); String sql = ""; switch (op) { // insert 新增 case "c": System.out.println("新增逻辑"); StringBuilder keyBuilder = new StringBuilder(); StringBuilder valueBuilder = new StringBuilder(); for (String item : jsonObject.keySet()) { keyBuilder.append(item).append(","); valueBuilder.append("'").append(jsonObject.get(item)).append("'").append(","); } String key = keyBuilder.substring(0, keyBuilder.length() - 1); String value = valueBuilder.substring(0, valueBuilder.length() - 1); sql = "insert into " + source.getSchema() + "." + source.getTable() + "(" + key + ") values(" + value + ")"; break; // update 更新 case "u": System.out.println("更新逻辑"); StringBuilder updateBuilder = new StringBuilder(); StringBuilder idBuilder = new StringBuilder(); for (String item : jsonObject.keySet()) { if (item.equalsIgnoreCase("id")) { idBuilder.append("'").append(jsonObject.get(item)).append("'"); } else { updateBuilder.append(item).append("=").append("'").append(jsonObject.get(item)).append("'").append(","); } } String keyValue = updateBuilder.substring(0, updateBuilder.length() - 1); String id = idBuilder.toString(); System.out.println(keyValue); sql = "update " + source.getSchema() + "." + source.getTable() + " set " + keyValue + " where id =" + id; break; // delete 删除 case "d": String before = archiveLog.getBefore(); JSonObject deleteObj = JSON.parseObject(before); id = deleteObj.get("ID").toString(); System.out.println("删除逻辑"); sql = "delete from " + source.getSchema() + "." + source.getTable() + " where id = '" + id + "'"; break; case "r": System.out.println("读取逻辑"); break; } Connection conn = null; PreparedStatement ps = null; try { conn = dataSource.getConnection(); ps = conn.prepareStatement(sql); ps.execute(); } catch (Exception e) { e.printStackTrace(); } finally { if (ps != null) { try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { conn.close(); } } } } }
@Data
@ToString
public class ArchiveLog {
private String before;
private String after;
private ArchiveLogSource source;
private String op;
private String ts_ms;
private String transaction;
}
4. ArchiveLogSource.java
@Data
@ToString
public class ArchiveLogSource {
private String version;
private String connector;
private String name;
private String ts_ms;
private String snapshot;
private String db;
private String sequence;
private String schema;
private String table;
private String txId;
private String scn;
private String commit_scn;
private String lcr_position;
}
四、补充
1. 归档日志
# 新增
{
"before": null,
"after": {
"ID": "1",
"NAME": "1"
},
"source": {
"version": "1.5.4.Final",
"connector": "oracle",
"name": "oracle_logminer",
"ts_ms": 1646652622448,
"snapshot": "last",
"db": "DG01",
"sequence": null,
"schema": "test",
"table": "CDCTEST",
"txId": null,
"scn": "46495548600",
"commit_scn": null,
"lcr_position": null
},
"op": "c",
"ts_ms": 1646652622456,
"transaction": null
}
# 更新
{
"before": {
"ID": "1",
"NAME": "1"
},
"after": {
"ID": "1",
"NAME": "2"
},
"source": {
"version": "1.5.4.Final",
"connector": "oracle",
"name": "oracle_logminer",
"ts_ms": 1646680890000,
"snapshot": "false",
"db": "DG01",
"sequence": null,
"schema": "test",
"table": "CDCTEST",
"txId": "0a0009007f231200",
"scn": "46495572789",
"commit_scn": "46495590649",
"lcr_position": null
},
"op": "u",
"ts_ms": 1646652829683,
"transaction": null
}
# 删除
{
"before": {
"ID": "1",
"NAME": "2"
},
"after": null,
"source": {
"version": "1.5.4.Final",
"connector": "oracle",
"name": "oracle_logminer",
"ts_ms": 1646819782000,
"snapshot": "false",
"db": "DG01",
"sequence": null,
"schema": "Flink",
"table": "CDC2",
"txId": "0a00140054270000",
"scn": "2491112",
"commit_scn": "2491120",
"lcr_position": null
},
"op": "d",
"ts_ms": 1646791645954,
"transaction": null
}
# 读取
{
"before": null,
"after": {
"ID": "1",
"NAME": "1"
},
"source": {
"version": "1.5.4.Final",
"connector": "oracle",
"name": "oracle_logminer",
"ts_ms": 1646652622448,
"snapshot": "last",
"db": "DG01",
"sequence": null,
"schema": "test",
"table": "CDCTEST",
"txId": null,
"scn": "46495548600",
"commit_scn": null,
"lcr_position": null
},
"op": "r",
"ts_ms": 1646652622456,
"transaction": null
}



