栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

oracle Debezium

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

oracle Debezium

Debezium是什么

Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。

Debezium连接oracle方式

Debezium提供了两种监控数据库的方式,对应了oracle的两种连接方式。
1.LogMiner:本质是jdbc thin driver,纯Java开发,与平台无关。
2.XStream API:本质是jdbc oci driver,通过调用oci客户端c动态库实现。

从官网上的描述可以看出来thin和oci的区别
thin 底层是通过tcp/ip协议实现的。oci是通过调用oci客户端c动态库实现的。
oci jdbc使用之前必须要安装oci 的客户端,所以我们通常会选择thin驱动来连接oracle数据库。
理论上oci jdbc驱动要优与thin的驱动。

默认情况下,Debezium 使用 Oracle LogMiner 从 Oracle 摄取更改事件。可以调整连接器配置以使连接器能够使用 Oracle XStreams 适配器。

"database.connection.adapter": "xstream"
Debezium 优缺点

优点:Debezium 是一个分布式系统,可以捕获多个上游数据库中的所有变化;它永远不会错过或丢失事件。当系统正常运行或被仔细管理时,Debezium 提供每个更改事件记录的一次交付。
如果发生故障,Debezium 不会丢失任何事件。在这些异常情况下,Debezium 和 Kafka 一样,至少提供一次变更事件的交付。
由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。
缺点:当它从故障中恢复时,它可能会重复一些更改事件。

java POM::
  
            io.debezium
            debezium-embedded
            1.6.2.Final
        
  
        
        
            io.debezium
            debezium-connector-oracle
            1.6.2.Final
        
        
            io.debezium
            debezium-api
            1.6.2.Final
        
Code:
 // 配置
            Properties props = new Properties();
            props.setProperty("name", "oracle-engine-0036"); //唯一实例
            props.setProperty("connector.class", OracleConnector.class.getName());
            props.setProperty("offset.storage", FileOffsetBackingStore.class.getName());
            // 指定 offset 存储目录
            props.setProperty("offset.storage.file.filename", "D:/data/oracledebezium/oracle4.txt");
            // 指定 Topic offset 写入磁盘的间隔时间
            props.setProperty("offset.flush.interval.ms", "60000");
            //设置数据库连接信息
            props.setProperty("database.hostname", "10.69.72.112");
            props.setProperty("database.port", "11522");
            props.setProperty("database.user", "source1");
            props.setProperty("database.password", "oracle");
            props.setProperty("database.server.id", "85701");
            //C##DBZUSER.STU可以用正则
            //props.setProperty("table.include.list", "STUDENT1");
            props.setProperty("snapshot.include.collection.list", "C##SOURCE1");
            props.setProperty("database.history",
                    FileDatabaseHistory.class.getCanonicalName());
            props.setProperty("database.history.file.filename",
                    "D:/data/oracledebezium/oracle5.txt");
            //每次运行需要对此参数进行修改,因为此参数唯一
            props.setProperty("database.server.name", "my-oracle-connector-0026");
            //指定 CDB 模式的实例名
            props.setProperty("database.dbname", "zujian4");
            //是否输出 schema 信息
            props.setProperty("key.converter.schemas.enable", "false");
            props.setProperty("value.converter.schemas.enable", "false");
            props.setProperty("database.serverTimezone", "UTC"); // 时区
            props.setProperty("database.connection.adapter", "logminer"); // 模式
			
			
	ExecutorService executor = Executors.newSingleThreadExecutor();
           executor.execute(engine);
	// 2. 构建 DebeziumEngine
	// 使用 Json 格式
	DebeziumEngine> engine =DebeziumEngine
                        .create(Json.class)
                        .using(props)
                        .notifying(record -> {
                            // record中会有操作的类型(增、删、改)和具体的数据
                            System.out.println(record);
                            System.out.println("record.key() = " + record.key());
                            System.out.println("record.value() = " + record.value());
                        })
                        .using((success, message, error) -> {
                            //查看错误信息
                            if (!success && error != null) {
                                // 报错回调
                                System.out.println("----------error------");
                                System.out.println(message);
                                System.out.println(error);
                                error.printStackTrace();
                            }
                        })
                        .build();

        return engine;
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/340572.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号