栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitMQ交换机的讲解

rabbitMQ交换机的讲解

交换机

Exchange

在rabbitmq中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取的方式给消费者进行消费

生产者将消息发送到Exchange,由Exchange再路由到一个或者多个队列中 

路由键(Routingkey)

生产者将消息发送给交换机的时候,会指定Routingkey指定路由规则

绑定键(Bindingkey)

通过绑定键将交换机与队列关联起来,这样rabbitMQ就知道如何正确地将消息路由到队列。

关系小结

生产者将消息发送给哪个Exchange是需要Routingkey决定的,生产者需要将Exchange决定的,生产者需要将Exchange与哪个队列绑定时需要由Bindingkey决定的

 

 

 

 

一、直连(Direct)

provider

创建俩个类

ProviderController 

package com.smy.provider;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProviderController {

    @Autowired
    private RabbitTemplate template;

    @RequestMapping("/directSend")
    public String directSend(String routingkey){
     template.convertAndSend("directExchange",routingkey,"Hello World");
        return "yes";
    }

}

创建一个mq包 

DirectConfig  

package com.smy.provider.mq;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    
      @Bean
    public Queue directQueueA(){
        return new Queue("directQueueA",true);
    }
    @Bean
    public Queue directQueueB(){
        return new Queue("directQueueB",true);
    }
    @Bean
    public Queue directQueueC(){
        return new Queue("directQueueC",true);
    }

    

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    

    @Bean
    public Binding bindingA(){
        return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA");
    }
    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB");
    }
    @Bean
    public Binding bindingC(){
        return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC");
    }


}

运行provider 

 comsuner

新建一个mq包

在里面新建三个类接收队列

DirectReceiverA 

package com.smy.comsuner.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "directQueueA")
@Slf4j
public class DirectReceiverA {

    @RabbitHandler
    public void process(String message){
        log.info("A接到"+message);
    }

}

DirectReceiverB 

package com.smy.comsuner.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "directQueueB")
@Slf4j
public class DirectReceiverB {

    @RabbitHandler 
    public void process(String message){
        log.info("B接到"+message);
    }

}

 DirectReceiverC 

package com.smy.comsuner.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "directQueueC")
@Slf4j
public class DirectReceiverC {

    @RabbitHandler 
    public void process(String message){
        log.info("C接到"+message);
    }

}

重新运行生产者provider和消费者comsuner

 二、主题交换机(tipoc)

provider

新建TopicConfig 

package com.smy.provider.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConfig {

    public final static String KEY_A="*.orange.*";
    public final static String KEY_B="*.*.rabbit";
    public final static String KEY_C="lazy.#";


    
      @Bean
    public Queue topicQueueA(){
        return new Queue("topicQueueA",true);
    }
    @Bean
    public Queue topicQueueB(){
        return new Queue("topicQueueB",true);
    }
    @Bean
    public Queue topicQueueC(){
        return new Queue("topicQueueC",true);
    }

    

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    

    @Bean
    public Binding TopicbindingA(){
        return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
    }
    @Bean
    public Binding TopicbindingB(){
        return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
    }
    @Bean
    public Binding TopicbindingC(){
        return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
    }


}

在ProviderController类增加这个方法

@RequestMapping("/topicSend")
    public String topicSend(String routingkey){
        template.convertAndSend("topicExchange",routingkey,"Hello World");
        return "yes";
    }

在comsuner新建三个类

topicReceiverA 

package com.smy.comsuner.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topicQueueA")
@Slf4j
public class topicReceiverA {

    @RabbitHandler
    public void process(String message){
        log.warn("A接到"+message);
    }

}

topicReceiverB  

package com.smy.comsuner.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topicQueueB")
@Slf4j
public class topicReceiverB {

    @RabbitHandler
    public void process(String message){
        log.warn("B接到"+message);
    }

}

topicReceiverC  

package com.smy.comsuner.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topicQueueC")
@Slf4j
public class topicReceiverC {

    @RabbitHandler
    public void process(String message){
        log.warn("C接到"+message);
    }

}

 

 三、扇形交换机(fanout)

provider

FanoutConfig 都不需要键

package com.smy.provider.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

  


    
      @Bean
    public Queue fanoutQueueA(){
        return new Queue("fanoutQueueA",true);
    }
    @Bean
    public Queue fanoutQueueB(){
        return new Queue("fanoutQueueB",true);
    }
    @Bean
    public Queue fanoutQueueC(){
        return new Queue("fanoutQueueC",true);
    }

    

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    

    @Bean
    public Binding fanoutbindingA(){
        return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutbindingB(){
        return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutbindingC(){
        return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
    }


}

键留着但是写空值,因为不写键 交换机就变成了键

@RequestMapping("/fanoutSend")
    public String fanoutSend(String routingkey){
        template.convertAndSend("fanoutExchange",null,"Hello World");
        return "yes";
    }

comsuner

FanoutReceiverA 

package com.smy.comsuner.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanoutQueueA")
@Slf4j
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String message){
        log.error("A接到"+message);
    }

}

 FanoutReceiverB 

package com.smy.comsuner.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanoutQueueB")
@Slf4j
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String message){
        log.error("B接到"+message);
    }

}

FanoutReceiverC 

package com.smy.comsuner.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanoutQueueC")
@Slf4j
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String message){
        log.error("C接到"+message);
    }

}

 

 

 

 

因为BB绑定了A又绑定了B 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745097.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号