栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink CDC 同步Oracle数据

Flink CDC 同步Oracle数据

一、启用归档日志 1. 用dba进入数据库

sqlplus / AS SYSDBA

2. 开启归档日志
修改归档日志大小,目录
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/oradata/dg01/recovery_area' scope=spfile;
alter system set db_recovery_file_dest_size=41820M scope=spfile;
# 重启数据库实例,打开归档日志
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
# 查看归档
archive log list;
3. 开启补全日志
# 开启单个表
ALTER TABLE schema.table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
# 开启全库
ALTER DATAbase ADD SUPPLEMENTAL LOG DATA;
# 全体字段补充日志
## 打开all补全日志(建议执行)
alter database add supplemental log data (all) columns; 
## 查看是否打开
select supplemental_log_data_all as all from v$database ;
## 删除all补全日志
alter database drop supplemental log data (all) columns;
二、创建Oracle用户并授权 1. 创建表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/oradata/dg01/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

2. 创建用户并授权
CREATE USER flink IDENTIFIED BY flink DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flink;
GRANT SET ConTAINER TO flink; //
GRANT SELECT ON V_$DATAbase to flink;
GRANT FLASHBACK ANY TABLE TO flink;
GRANT SELECT ANY TABLE TO flink;
GRANT SELECT_CATALOG_ROLE TO flink;
GRANT EXECUTE_CATALOG_ROLE TO flink;
GRANT SELECT ANY TRANSACTION TO flink;
GRANT LOGMINING TO flink;
GRANT CREATE TABLE TO flink;
GRANT LOCK ANY TABLE TO flink;
GRANT ALTER ANY TABLE TO flink;
GRANT CREATE SEQUENCE TO flink;
GRANT EXECUTE ON DBMS_LOGMNR TO flink;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flink;
GRANT SELECT ON V_$LOG TO flink;
GRANT SELECT ON V_$LOG_HISTORY TO flink;
GRANT SELECT ON V_$LOGMNR_LOGS TO flink;
GRANT SELECT ON V_$LOGMNR_ConTENTS TO flink;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flink;
GRANT SELECT ON V_$LOGFILE TO flink;
GRANT SELECT ON V_$ARCHIVED_LOG TO flink;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flink;
三、代码 1. pom.xml


    4.0.0

    org.example
    flink-cdc
    1.0-SNAPSHOT

    
        8
        8
        2.12
        1.12.1
        1.8
    

    
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
            provided
        
        
            com.ververica
            flink-connector-oracle-cdc
            2.1.1
        
        
            com.alibaba
            fastjson
            1.2.75
        
        
            junit
            junit
            4.11
            test
        
        
            org.projectlombok
            lombok
            1.16.22
            
        
        
            com.alibaba
            druid
            1.1.20
        
        
            mysql
            mysql-connector-java
            5.1.47
            
        

        
        
            org.apache.logging.log4j
            log4j-core
            2.8.2
        
        
            org.apache.hadoop
            hadoop-common
            2.9.2

        
        
            org.apache.hadoop
            hadoop-client
            2.9.2

        
        
            org.apache.hadoop
            hadoop-hdfs
            2.9.2

        
    

    
        

            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.1
                
                    ${target.java.version}
                    ${target.java.version}
                
            

            
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.0.0
                
                    
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    org.apache.flink:force-shading
                                    com.google.code.findbugs:jsr305
                                    org.slf4j:*
                                    org.apache.logging.log4j:*
                                
                            
                            
                                
                                    
                                    *:*
                                    
                                        meta-INF
public class Demo {
    public static void main(String[] args) throws Exception {
        Properties pros = new Properties();
        pros.setProperty("debezium.log.mining.strategy", "online_catalog");
        pros.setProperty("debezium.log.mining.continuous.mine", "true");
        DebeziumSourceFunction sourceFunction = OracleSource.builder()
                .hostname("ip")
                .port(1521)
                .database("数据库")
                .schemaList("schema")
                .tableList("schema.table1, schema.table2")
                .username("flink") 
                .password("flink")
                .debeziumProperties(pros)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 指定登录hadoop的用户
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        // 开启检查点
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 检查点存储位置
        env.setStateBackend(new FsStateBackend("hdfs://ip:9000/user/bd/flink/checkpoint/", true));
        // 取消作业,checkpoint清除策略
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 数据源
        DataStreamSource source = env.addSource(sourceFunction);
        //
        SingleOutputStreamOperator archiveLog = source.map((MapFunction) json -> {
            ArchiveLog archiveLog1 = JSON.parseObject(json, ArchiveLog.class);
            return JSON.toJSONString(archiveLog1);
        });

        // 消息存入数据库TiDB
        archiveLog.addSink(new SinkToTiDB());

        env.execute("flink cdc");
    }

    private static class SinkToTiDB extends RichSinkFunction {
        private transient DruidDataSource dataSource = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            // 数据库连接
            dataSource = new DruidDataSource();
            dataSource.setDriverClassName("com.mysql.jdbc.Driver");
            dataSource.setUsername("username");
            dataSource.setPassword("password");
            dataSource.setUrl("jdbc:mysql://ip:port/database?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false");
            dataSource.setMaxActive(5);
        }

        @Override
        public void invoke(String json, Context context) throws Exception {
            ArchiveLog archiveLog = JSON.parseObject(json, ArchiveLog.class);

            String op = archiveLog.getOp();
            ArchiveLogSource source = archiveLog.getSource();
            String after = archiveLog.getAfter();
            JSonObject jsonObject = JSON.parseObject(after);
            String sql = "";
            switch (op) {
                // insert 新增
                case "c":
                    System.out.println("新增逻辑");
                    StringBuilder keyBuilder = new StringBuilder();
                    StringBuilder valueBuilder = new StringBuilder();
                    for (String item : jsonObject.keySet()) {
                        keyBuilder.append(item).append(",");
                        valueBuilder.append("'").append(jsonObject.get(item)).append("'").append(",");
                    }
                    String key = keyBuilder.substring(0, keyBuilder.length() - 1);
                    String value = valueBuilder.substring(0, valueBuilder.length() - 1);
                    sql = "insert into " + source.getSchema() + "." + source.getTable() + "(" + key + ") values(" + value + ")";
                    break;
                // update 更新
                case "u":
                    System.out.println("更新逻辑");
                    StringBuilder updateBuilder = new StringBuilder();
                    StringBuilder idBuilder = new StringBuilder();
                    for (String item : jsonObject.keySet()) {
                        if (item.equalsIgnoreCase("id")) {
                            idBuilder.append("'").append(jsonObject.get(item)).append("'");
                        } else {
                            updateBuilder.append(item).append("=").append("'").append(jsonObject.get(item)).append("'").append(",");
                        }
                    }
                    String keyValue = updateBuilder.substring(0, updateBuilder.length() - 1);
                    String id = idBuilder.toString();
                    System.out.println(keyValue);
                    sql = "update " + source.getSchema() + "." + source.getTable() + " set " + keyValue + " where id =" + id;
                    break;
                // delete 删除
                case "d":
                    String before = archiveLog.getBefore();
                    JSonObject deleteObj = JSON.parseObject(before);
                    id = deleteObj.get("ID").toString();
                    System.out.println("删除逻辑");
                    sql = "delete from " + source.getSchema() + "." + source.getTable() + " where id = '" + id + "'";
                    break;
                case "r":
                    System.out.println("读取逻辑");
                    break;
            }
            Connection conn = null;
            PreparedStatement ps = null;
            try {
                conn = dataSource.getConnection();
                ps = conn.prepareStatement(sql);
                ps.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (ps != null) {
                    try {
                        ps.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
                if (conn != null) {
                    conn.close();
                }
            }

        }
    }
}
3. ArchiveLog.java
@Data
@ToString
public class ArchiveLog {
    private String before;
    private String after;
    private ArchiveLogSource source;
    private String op;
    private String ts_ms;
    private String transaction;
}
4. ArchiveLogSource.java
@Data
@ToString
public class ArchiveLogSource {
    private String version;
    private String connector;
    private String name;
    private String ts_ms;
    private String snapshot;
    private String db;
    private String sequence;
    private String schema;
    private String table;
    private String txId;
    private String scn;
    private String commit_scn;
    private String lcr_position;
}
四、补充 1. 归档日志
# 新增
{
  "before": null,
  "after": {
    "ID": "1",
    "NAME": "1"
  },
  "source": {
    "version": "1.5.4.Final",
    "connector": "oracle",
    "name": "oracle_logminer",
    "ts_ms": 1646652622448,
    "snapshot": "last",
    "db": "DG01",
    "sequence": null,
    "schema": "test",
    "table": "CDCTEST",
    "txId": null,
    "scn": "46495548600",
    "commit_scn": null,
    "lcr_position": null
  },
  "op": "c",
  "ts_ms": 1646652622456,
  "transaction": null
}

# 更新
{
  "before": {
    "ID": "1",
    "NAME": "1"
  },
  "after": {
    "ID": "1",
    "NAME": "2"
  },
  "source": {
    "version": "1.5.4.Final",
    "connector": "oracle",
    "name": "oracle_logminer",
    "ts_ms": 1646680890000,
    "snapshot": "false",
    "db": "DG01",
    "sequence": null,
    "schema": "test",
    "table": "CDCTEST",
    "txId": "0a0009007f231200",
    "scn": "46495572789",
    "commit_scn": "46495590649",
    "lcr_position": null
  },
  "op": "u",
  "ts_ms": 1646652829683,
  "transaction": null
}


# 删除
{
  "before": {
    "ID": "1",
    "NAME": "2"
  },
  "after": null,
  "source": {
    "version": "1.5.4.Final",
    "connector": "oracle",
    "name": "oracle_logminer",
    "ts_ms": 1646819782000,
    "snapshot": "false",
    "db": "DG01",
    "sequence": null,
    "schema": "Flink",
    "table": "CDC2",
    "txId": "0a00140054270000",
    "scn": "2491112",
    "commit_scn": "2491120",
    "lcr_position": null
  },
  "op": "d",
  "ts_ms": 1646791645954,
  "transaction": null
}

# 读取
{
  "before": null,
  "after": {
    "ID": "1",
    "NAME": "1"
  },
  "source": {
    "version": "1.5.4.Final",
    "connector": "oracle",
    "name": "oracle_logminer",
    "ts_ms": 1646652622448,
    "snapshot": "last",
    "db": "DG01",
    "sequence": null,
    "schema": "test",
    "table": "CDCTEST",
    "txId": null,
    "scn": "46495548600",
    "commit_scn": null,
    "lcr_position": null
  },
  "op": "r",
  "ts_ms": 1646652622456,
  "transaction": null
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/757830.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号