- 前言
- 2个Instance监听不同的库
- 2个Instance监听相同的库
Canal通过 canal.destinations 来配置对应的 Instance 实例,这里允许配置多个Instance,中间用逗号分开即可。
- 修改 canal.properties 文件,找到 canal.destinations 把他修改成 example1,example2
canal.destinations = example1,example2
- 在 conf 目录下分别建2个文件夹:example1 和 example2,同时把 example 文件夹自带的 instance.properties 拷贝过去
- 修改 instance.properties 这几个地方,这里以 example1 为例
canal.instance.master.address=192.168.25.129:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.mq.topic=example1
- example2 同理,注意2个库都要开启binlog,并且创建canal账号
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECt, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
创建Java客户端
- 引用依赖
com.alibaba.otter canal.client 1.1.0
- 创建Canal连接Util
public class CanalUtil {
public static void connect(Class> clazz) throws IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchMethodException {
// canal-server地址
String addr = "192.168.25.129";
//这里 example1 或者 example2 都可以
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(addr,11111), "example1", "", "");
int batchSize = 1000;
Long batchId = null;
//取到数据之后,通过反射执行回调方法
Object obj = clazz.getConstructors()[0].newInstance();
Method method = clazz.getMethod("operate", CanalEntry.Entry.class, CanalEntry.RowChange.class);
connector.connect();
connector.subscribe();
try{
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIonBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = CanalEntry.RowChange.parseFrom(entry.getStorevalue());
//回调处理数据
method.invoke(obj, entry, rowChage);
}
}
connector.ack(batchId); // 提交确认
}
}
catch (Exception e){
e.printStackTrace();
if (batchId != null) {
connector.rollback(batchId);
}
}
}
}
- 创建 main 程序
public class SimpleCanalClientExample {
public static void main(String args[]) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
CanalUtil.connect(SimpleCanalClientExample.class);
}
public void operate(CanalEntry.Entry entry, CanalEntry.RowChange rowChage) {
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
private static void printColumn(List columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
- 启动 main 程序,然后执行语法修改数据
update user set age = 20 where id = 1;
控制台如期输出监听到的结果:
2个Instance监听相同的库如下图所示,把 example2 改成和 example1 相同的 MySQL
在每个 Instance 目录下都有一份 meta.dat 文件,维护了当前Instance消费的进度。虽然example1和example2都监听相同的MySQL,但是假设我们Client端只消费 example1,对 example2 不做任何处理,name他们的 meta.data 记录的进度肯定会不一致。
example1/meta.dat,position = 1401
example2/meta.dat,position = 1082
此时,只要我们把Client改成消费 example2,他的 position 就会更新成 1401,跟example1一致。



