栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

canal应用三:启动多个Instance实例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

canal应用三:启动多个Instance实例

文章目录
  • 前言
  • 2个Instance监听不同的库
  • 2个Instance监听相同的库

前言

Canal通过 canal.destinations 来配置对应的 Instance 实例,这里允许配置多个Instance,中间用逗号分开即可。

2个Instance监听不同的库
  • 修改 canal.properties 文件,找到 canal.destinations 把他修改成 example1,example2
canal.destinations = example1,example2
  • 在 conf 目录下分别建2个文件夹:example1 和 example2,同时把 example 文件夹自带的 instance.properties 拷贝过去
  • 修改 instance.properties 这几个地方,这里以 example1 为例
canal.instance.master.address=192.168.25.129:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=example1
  • example2 同理,注意2个库都要开启binlog,并且创建canal账号
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECt, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

创建Java客户端

  • 引用依赖
    
      com.alibaba.otter
      canal.client
      1.1.0
    
  • 创建Canal连接Util
public class CanalUtil {

    public static void connect(Class clazz) throws IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchMethodException {
        // canal-server地址
        String addr = "192.168.25.129";
        //这里 example1 或者 example2 都可以
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(addr,11111), "example1", "", "");
        int batchSize = 1000;
        Long batchId = null;
        //取到数据之后,通过反射执行回调方法
        Object obj = clazz.getConstructors()[0].newInstance();
        Method method = clazz.getMethod("operate", CanalEntry.Entry.class, CanalEntry.RowChange.class);

        connector.connect();
        connector.subscribe();

        try{
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIonBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                            continue;
                        }
                        CanalEntry.RowChange rowChage =  CanalEntry.RowChange.parseFrom(entry.getStorevalue());
                        //回调处理数据
                        method.invoke(obj, entry, rowChage);
                    }

                }
                connector.ack(batchId); // 提交确认
            }
        }
        catch (Exception e){
            e.printStackTrace();
            if (batchId != null) {
                connector.rollback(batchId);
            }
        }
    }
}
  • 创建 main 程序
public class SimpleCanalClientExample {
    public static void main(String args[]) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
        CanalUtil.connect(SimpleCanalClientExample.class);
    }

    public void operate(CanalEntry.Entry entry, CanalEntry.RowChange rowChage) {
        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------> before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------> after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }

    private static void printColumn(List columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}
  • 启动 main 程序,然后执行语法修改数据
update user set age = 20 where id = 1;

控制台如期输出监听到的结果:

2个Instance监听相同的库

如下图所示,把 example2 改成和 example1 相同的 MySQL

在每个 Instance 目录下都有一份 meta.dat 文件,维护了当前Instance消费的进度。虽然example1和example2都监听相同的MySQL,但是假设我们Client端只消费 example1,对 example2 不做任何处理,name他们的 meta.data 记录的进度肯定会不一致。

example1/meta.dat,position = 1401

example2/meta.dat,position = 1082

此时,只要我们把Client改成消费 example2,他的 position 就会更新成 1401,跟example1一致。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/318863.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号