栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

canal整合rabbitmq

canal整合rabbitmq

canal1.1.5好像就开始支持rabbitmq了,然后我下载的是1.1.6,为啥要整合rabbitmq,首先其他mq我也
不会啊,其次各有所需对吧。

首先要修改canal.properties文件

## tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 修改为rabbitmq 
canal.serverMode = rabbitMQ

## 配置上我们的rabbitmq信息
rabbitmq.host = 8.142.188.187
rabbitmq.virtual.host = /
rabbitmq.exchange = canal_exchange
rabbitmq.username = admin
rabbitmq.password = admin
rabbitmq.deliveryMode =

再修改instance.properties文件

# 链接数据库的信息
canal.instance.master.address=127.0.0.1:3306

# username/password 链接数据库的账号密码
canal.instance.dbUsername=root
canal.instance.dbPassword=root

# mq config  这里是rabbitmq的routerkey
canal.mq.topic=canal_key

修改这几项基本就OK了

rabbitmq配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CanalConfig {

    @Bean
    Queue queue(){
        return  new Queue("canal_queue");
    }

    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("canal_exchange");
    }

    @Bean
    Binding binding(){
        return BindingBuilder.bind(queue()).to(directExchange()).with("canal_key");
    }
}

这样我每次变动数据库,都会把变动的信息投递给rabbitmq了


这里收到的消息都是ASCLL码,所以要转一下

@RabbitListener(queues = "canal_queue")
    public void  getMsg(Message message, Channel channel, String msg){
        String[]chars=msg.split(",");
        StringBuffer stringBuffer = new StringBuffer();
        for(int i=0;i 



打完收工

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780610.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号