栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot项目中连接两个RabbitMq

SpringBoot项目中连接两个RabbitMq

今天在写项目的时候遇到新需求,一个mq的功能要使用我们公司的服务器的mq,一个mq的功能要使用部署的那边的服务器的mq,话不多说直接上代码。

配置文件application.yml:

  spring: 
      rabbitmq:
        yjdpeservice:
          host: xxx.xxx.xxx.xxx
          port: 5672
          username: admin
          password: admin
        yjservice:
          host: xxx.xxx.xxx.xxx
          port: 5672
          username: admin
          password: admin

配置类:

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Configuration
public class RabbitPlusConfig {

    @Bean(name="mergeConnectionFactory")
    @Primary
    public ConnectionFactory MergeConnectionFactory(
            @Value("${spring.rabbitmq.yjdpeservice.host}") String host,
            @Value("${spring.rabbitmq.yjdpeservice.port}") int port,
            @Value("${spring.rabbitmq.yjdpeservice.username}") String username,
            @Value("${spring.rabbitmq.yjdpeservice.password}") String password
    ){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean(name="LocalConnectionFactory")
    public ConnectionFactory LocalConnectionFactory(
            @Value("${spring.rabbitmq.yjservice.host}") String host,
            @Value("${spring.rabbitmq.yjservice.port}") int port,
            @Value("${spring.rabbitmq.yjservice.username}") String username,
            @Value("${spring.rabbitmq.yjservice.password}") String password
    ){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean(name="mergeRabbitTemplate")
    @Primary
    public RabbitTemplate mergeRabbitTemplate(
            @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
    ){
        RabbitTemplate yjdpRabbitTemplate = new RabbitTemplate(connectionFactory);
        return yjdpRabbitTemplate;
    }

    @Bean(name="LocalRabbitTemplate")
    public RabbitTemplate LocalRabbitTemplate(
            @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
    ){
        RabbitTemplate yjRabbitTemplate = new RabbitTemplate(connectionFactory);
        return yjRabbitTemplate;
    }

    @Bean(name="mergeFactory")
    public SimpleRabbitListenerContainerFactory mergeFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("mergeConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name="LocalFactory")
    public SimpleRabbitListenerContainerFactory LocalFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("LocalConnectionFactory") ConnectionFactory connectionFactory
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }


}

发送消息类:

import io.renren.common.utils.R;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;

@Api(tags = "测试双mq发送")
@RestController
@RequestMapping("rbt/mq")
public class RbtMqController {


    @Autowired
    @Qualifier(value = "mergeRabbitTemplate")
    private RabbitTemplate mergerabbitTemplate;

    @Autowired
    @Qualifier(value = "LocalRabbitTemplate")
    private RabbitTemplate LocalrabbitTemplate;


    @ApiOperation("测试发送mq")
    @PostMapping("/PostMq/{mqone}/{mqtwo}")
    public Object PostMq(@RequestParam("token") String token, @PathVariable String mqone, @PathVariable String mqtwo){

        mergerabbitTemplate.convertAndSend("CeshiQueue", (Object) mqone, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                long l = 40000;
                //设置定时发布的时间发送到延时队列 到时间后转交给死信队列
                message.getMessageProperties().setExpiration(String.valueOf(l));
                return message;

            }
        });

        String msgTwo = "success";
        LocalrabbitTemplate.convertAndSend("CeshiQueue", (Object) mqtwo, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                long l = 40000;
                //设置定时发布的时间发送到延时队列 到时间后转交给死信队列
                message.getMessageProperties().setExpiration(String.valueOf(l));
                return message;

            }
        });
        return R.ok();
    }

}

消费者端:

消费LocalFactory对应的mq中的my-dlx-queue-Ceshi

@Component
@RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "LocalFactory")
@Log4j2
public class locallistener {

    @RabbitHandler
    public void RegularlyAddAsCheckIn(String msg) throws Exception {
        log.info(new Date() + "::LocalFactory收到信息::" + msg);
    }
}

消费mergeFactoryFactory对应的mq中的my-dlx-queue-Ceshi

@Component
@RabbitListener(queues = "my-dlx-queue-Ceshi",containerFactory = "mergeFactory")
@Log4j2
public class mergerlistener {

    @RabbitHandler
    public void RegularlyAddAsCheckIn(String msg) throws Exception {
        log.info(new Date() + "::mergeFactory收到信息::" + msg);
    }

}

完事,实测有效。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/730432.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号