- 一、探索之路
- 升级canal.client支持rabbitmq
- 探索结果
- 二、最终方案
- canal server升级1.1.5
- 使用方法
- 1. 新建Handler,用@CanalTable注解标注
- 2. rabbitmq消费者端监听队列,调用对应的Handler进行消息消费
- 三、参考资料
canal-client-springboot-starter引用jar包canal-client升级1.1.5。
com.alibaba.otter canal.client ${canal-client.version}
1.1.5中把protocol包单独设置了一个模块。
所以,需要加一个包。
探索结果com.alibaba.otter canal.protocol ${canal-client.version}
本来想在canal-client-springboot-starter中集成rabbitmq,但是发现官方提供的canal.client jar包中没有rabbitmq相关的连接包,自己再去扩展的话还得去编译canal的源码——暂时只好放弃。
Exception in thread "canal-client-thread" java.lang.NoClassDefFoundError: com/rabbitmq/client/ConnectionFactory at com.alibaba.otter.canal.client.rabbitmq.RabbitMQCanalConnector.connect(RabbitMQCanalConnector.java:67) at top.javatool.canal.client.client.RabbitMqCanalClient.process(RabbitMqCanalClient.java:34) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.ConnectionFactory at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 3 more
RabbitMQCanalConnector.java连接rabbitmq的部分源码。
public void connect() throws CanalClientException {
ConnectionFactory factory = new ConnectionFactory();
if (accessKey.length() > 0 && secretKey.length() > 0) {
factory.setCredentialsProvider(new AliyunCredentialsProvider(accessKey, secretKey, resourceOwnerId));
} else {
factory.setUsername(username);
factory.setPassword(password);
}
factory.setHost(nameServer);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(vhost);
try {
connect = factory.newConnection();
channel = connect.createChannel();
} catch (IOException | TimeoutException e) {
throw new CanalClientException("Start RabbitMQ producer error", e);
}
}
不过也是,截止2021年10月26日,canal官方推荐的正式版本仍然是1.1.4,对客户端支持rabbitmq还没有做足够的支持。
二、最终方案参考canal-client-springboot-starter自己构建了一个easy-canal-client的项目。
canal server升级1.1.5源码已开源:https://gitee.com/cowboy2014/easy-canal-client.git
- 升级canal-1.1.5,binlog数据解析完成后直接进入rabbitmq中了——1.1.5可以把数据直接投递给rabbitmq;
- 业务模块直接集成rabbitmq,作为消费者进行数据的解析、同步。
示意图如下:
@Component @CanalTable(value = "lpm_park") @Slf4j public class ParkHandler implements EntryHandler2. rabbitmq消费者端监听队列,调用对应的Handler进行消息消费{ @Resource private ElSearchParkServiceImpl elSearchParkService; @Resource private EsIndexes esIndexes; @Override public void insert(Park park) { log.info("insert message {}", park); try { elSearchParkService.synchronous(esIndexes.getPark(), park.getId()); } catch (IOException e) { log.error("es insert wrong!"); } } @Override public void update(Park before, Park park) { try { if (ObjectUtil.isNotEmpty(before.getDeleted()) && !before.getDeleted().equals(park.getDeleted()) && park.getDeleted() == 1){ this.delete(park); } if (ObjectUtil.isNotEmpty(before.getDeleted()) && !before.getDeleted().equals(park.getDeleted()) && park.getDeleted() == 0){ this.insert(park); } elSearchParkService.synchronous(esIndexes.getPark(), park.getId()); } catch (IOException e) { log.error("es insert wrong!"); } log.info("update after {}", park); } @Override public void delete(Park park) { log.info("delete {}", park); elSearchParkService.deleteById(esIndexes.getPark(), park.getId().intValue()); } }
@RabbitListener(queues = {"shangwtQueue"},containerFactory = "multiListenerContainer")
public void consumeMsg(FlatMessage info){
try {
handlerUtil.handleMessage(info);
}catch (Exception e){
log.error("canal消息-监听者-发生异常:",e.fillInStackTrace());
}
}
三、参考资料
https://github.com/NormanGyllenhaal/canal-client
感谢您的赏读~
如果您对我的文章感兴趣的话,欢迎留下您的问题让我们一起探讨!一起进步!!
还可以关注我的微信公众号,回复“Canal”获取我的Canal学习脑图哦~



