栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ常用Exchange详解

RabbitMQ常用Exchange详解

目录

1.Exchange 介绍

1.2 路由键(RoutingKey)

1.3 绑定键(BindingKey)

 2. 直连交换机:Direct exchange

3. 主题交换机:Topic

4. 扇形交换机:Fanout exchange


 

1.Exchange 介绍

在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费

1.2 路由键(RoutingKey)

生产者将消息发送给交换机的时候,会指定RoutingKey指定路由规则。

1.3 绑定键(BindingKey)

通过绑定键将交换机与队列关联起来,这样RabbitMQ就知道如何正确消息路由到队列。

小结:

生产者将消息发送给哪个E ×change是需要由RoutingKey决定的,生产者需要将E× change与哪个队列绑定时需要由BindingKey决定的

 2. 直连交换机:Direct exchange

直连交换机的路由算法非常简单:将消息准送到binding key与该消息的routing key相同的队列

直连交换机x上绑定了两个队列。第一个队列绑定了绑定键orange,第二个队列有两个绑定踺:black和greeno在这种场景下,一个消息在指定了路由键为orange将会只被路由到队列QI,路由键为black和greeno的消息都将被路由到队列 Q2。其他的消息都将被丢失。

示例:

  • provider
    package com.wyy.provider;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DirectQueueConfig {
    
        
        @Bean
        public Queue directQueue(){
            return new Queue("DirectQueue",true);
        }
    
        
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange("directExchange");
        }
    
        
        @Bean
        public Binding directBindingA(){
            return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
        }
    }
    
    package com.wyy.provider.controller;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class DirectController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/senderDirect/{msg}")
        public Object senderDirect( @PathVariable String msg){
            rabbitTemplate.convertAndSend("directExchange",msg,"??????");
            return "yes";
        }
    }
    

     consumer

    package com.wyy.consumer.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @Slf4j
    @RabbitListener(queues = "DirectQueue")
    public class DirectController {
    
        @RabbitHandler
        public void process(String msg){
            log.warn("DirectQueue接受:"+msg);
        }
    }
    

    3. 主题交换机:Topic

    发送到主题交换机的消息不能有任意的routing key,必须是由点号分开的一串单词,这些单词可以是任意的,但通常是与消息相关的一些特

    比如以下是几个有效的routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",routing key的单词可以有很多,最大限制是 255 byteso

    Topic交换机的逻辑与direct交换机有点相似,使用特定路由踺发送的消息将被发送到所有使用匹配绑定键绑定的队列,然而,绑定键有两个特殊的情况:

  •  *  表示匹配任意一个单词
  • #  表示匹配任意一个或多个单词

    示例:

    provider

    package com.wyy.provider;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class TopicQueueConfig {
        public static String KEY_A ="*.wyy.*";
        public static String KEY_B ="*.*.wyy";
        public static String KEY_C ="wyy.#";
    
        
        @Bean
        public Queue topicQueue(){
            return new Queue("TopicQueue",true);
        }
    
        
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange("topicExchange");
        }
    
        
        @Bean
        public Binding topicBindingA(){
            return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(KEY_A);
        }
    
        
        @Bean
        public Binding topicBindingB(){
            return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(KEY_B);
        }
    
        
        @Bean
        public Binding topicBindingC(){
            return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(KEY_C);
        }
    }
    
    package com.wyy.provider.controller;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class TopicController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/senderTopic/{msg}")
        public Object senderDirect( @PathVariable String msg){
            rabbitTemplate.convertAndSend("topicExchange",msg,"~~~~~~~");
            return "yes";
        }
    }
    

    consumer

    package com.wyy.consumer.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @Slf4j
    @RabbitListener(queues = "TopicQueue")
    public class TopicController {
    
        @RabbitHandler
        public void process(String msg){
            log.warn("TopicQueue接受:"+msg);
        }
    }
    

    4. 扇形交换机:Fanout exchange

    扇形交换机是最基本的交换机类型,它所能做的事清非常简单广播消息。

    扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要''思考",所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

    示例:

  • provider
    package com.wyy.provider;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutQueueConfig {
    
        
        @Bean
        public Queue fanoutQueue(){
            return new Queue("fanoutQueue",true);
        }
    
        
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("fanoutExchange");
        }
    
        
        @Bean
        public Binding fanoutBindingA(){
            return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
        }
    }
    
    package com.wyy.provider.controller;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class FanoutController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/senderFanout")
        public Object senderFanout(){
            rabbitTemplate.convertAndSend("fanoutExchange",null,"#######");
            return "yes";
        }
    }
    

     consumer

    package com.wyy.consumer.controller;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @Slf4j
    @RabbitListener(queues = "fanoutQueue")
    public class FanoutController {
    
        @RabbitHandler
        public void process(String msg){
            log.warn("fanoutQueue接受:"+msg);
        }
    }
    

  • 转载请注明:文章转载自 www.mshxw.com
    本文地址:https://www.mshxw.com/it/745100.html
    我们一直用心在做
    关于我们 文章归档 网站地图 联系我们

    版权所有 (c)2021-2022 MSHXW.COM

    ICP备案号:晋ICP备2021003244-6号