栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot整合rabbitmq-交换机模式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot整合rabbitmq-交换机模式

前言

了解完 单队列模式 之后,接下来开始交换机模式,交换机模式一共有四种:Fanout、headers、direct、topic,这里这介绍一种常用的定向direct和topic主题模式。
在实践代码逻辑之前,尽可能先了解rabbitmq的详情比较好,rabbitmq工作流程及模式详解。

准备配置
  • 依赖包

   org.springframework.boot
    spring-boot-starter-amqp
 
  • yml
server:
  port: 8081
#一个模块中不可有两个application文件,否则另一个会失效,多模块引用的时候尤其注意
spring:
  rabbitmq:
    host: 192.168.25.131
    port: 5672
    virtual-host: /
    #手动ack消息,手动确认消息才算消费
    listener:
      simple:
        acknowledge-mode: manual
    username: admin
    password: 123456
direct定向模式

  • 创建交换机
    @Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
    public Exchange oderEventExchange(){
        //durable:是否持久化 autoDelete:是否自动删除
        DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
        return directExchange;
    }
  • 声明队列
    @Bean("order_release_queue")
    public Queue orderReleaseQueue() {
        Queue queue = new Queue("order_release_queue", true, false, false);
        return queue;
    }
  • 绑定交换机和队列
@Bean
    public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
                                            @Qualifier("order_event_exchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
    }

指定路由key【routingKey】,发消息的时候带上路由key才能被交换机转发到指定队列。

  • 创建测试接口
@RestController
@RequestMapping("order")
public class controller {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostMapping("/sendOrder")
    public String sentOrder(@RequestBody Order order){
		//发送消息指定交换机和路由键
        rabbitTemplate.convertAndSend("order_event_exchange","order.release.order", order);
        return "ok";
    }
}
  • 测试结果

Topic主题模式
  • 创建topic模式交换机,队列还是用前边的队列即可
 
    @Bean("order_topic_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
    public Exchange oderTopicExchange(){
        //durable:是否持久化 autoDelete:是否自动删除
        TopicExchange topicExchange = new TopicExchange("order_topic_exchange",true, false);
        return topicExchange;
    }
  • 绑定交换机队列
 @Bean
    public Binding orderTopicBinging(@Qualifier("order_release_queue") Queue queue,
                                            @Qualifier("order_topic_exchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.topic.*").noargs();
    }

这里说明一下* 和 #的区别,匹配单个用 “*”,多个用 #;
举例说明:
a.#可以匹配a、a.b、a.b.c;
a.~ 只能匹配a.b(*打不出来,用~代替了);

  • 接口测试
@PostMapping("/sendOrder")
    public String sentOrder(@RequestBody Order order){
        rabbitTemplate.convertAndSend("order_topic_exchange","order.topic", order);
        return "ok";
    }

注意:在实验当中,如果发现用*和#测试发现怎么配置都能收到消息,可以去管理界面解除队列和交换机绑定的路由键关系,如图所示

完整测试代码
@Configuration
public class MyMQConfig {

    
    @Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
    public Exchange oderEventExchange(){
        //durable:是否持久化 autoDelete:是否自动删除
        DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
        return directExchange;
    }

    
    @Bean("order_delay_queue")
    public Queue oderDelayQueue(){
        Map arguments = new HashMap<>();
        
        //配置队列的死信应该发送给哪个交换机,过期消息发给谁
        arguments.put("x-dead-letter-exchange", "order_event_exchange");
        //发送给交换机使用的路由key
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        //队列消息变成死信的时间 10s
        arguments.put("x-message-ttl", 10000);
        //exclusive:是否排他  arguments:扩展参数
        Queue queue = new Queue("order_delay_queue", true, false, false, arguments);
        return queue;
    }

    
    @Bean("order_release_queue")
    public Queue orderReleaseQueue() {
        Queue queue = new Queue("order_release_queue", true, false, false);
        return queue;
    }

    
    @Bean
    public Binding orderCreateOrderBinging(@Qualifier("order_delay_queue") Queue queue,
                                           @Qualifier("order_event_exchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.delay.order").noargs();
    }

    @Bean
    public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
                                            @Qualifier("order_event_exchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
    }

    
    @Bean("order_topic_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
    public Exchange oderTopicExchange(){
        //durable:是否持久化 autoDelete:是否自动删除
        TopicExchange topicExchange = new TopicExchange("order_topic_exchange",true, false);
        return topicExchange;
    }

    @Bean
    public Binding orderTopicBinging(@Qualifier("order_release_queue") Queue queue,
                                            @Qualifier("order_topic_exchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.topic.#").noargs();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/863364.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号