栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 02.交换机的讲解

RabbitMQ 02.交换机的讲解

前言 交换机 (Exchange)

生产者将消息发送到Exchange,有Exchange再路由到一个或多个队列中

路由键 (RoutingKey)

生产者将信息发送给交换机时会指定RoutingKey指定路由规则

绑定键 (BindingKey)

通过绑定键将交换机和队列关联起来,这样RabbitMQ就知道如何正确的将消息路由到队列

关系小结

生产者将消息发送给哪个Exchange是需要由RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由BindingKey决定

交换机类型 直连交换机:Direct exchange

主题交换机:Topic exchange  

扇形交换机:Fanout exchange   首部交换机:Headers exchange

 默认交换机: 

Dead Letter Exchange (死信交换机)

 

交换机的属性

 

 一.直连交换机 1.新建DirectConfig类
 创建队列
 创建交换机
 行交换机和队列的绑定:设置BindingKey
package com.lj.provider.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@SuppressWarnings("all")
public class DirectConfig {
    
    @Bean
    public Queue direcrQueueA(){
        return new Queue("direcrQueueA",true);
    }

    @Bean
    public Queue direcrQueueB(){
        return new Queue("direcrQueueB",true);
    }

    @Bean
    public Queue direcrQueueC(){
        return new Queue("direcrQueueC",true);
    }

    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    
    public Binding BindingA(){
        return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("aa");
    }
    public Binding BindingB(){
        return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("bb");
    }
    public Binding BindingC(){
        return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("cc");
    }
}
2.建一个ProviderController发送信息
package com.lj.provider;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SuppressWarnings("all")
public class ProviderController {

    @Autowired
    private RabbitTemplate template;

    @RequestMapping("/directSend")
    public String DirectSend(String routingKey){
        template.convertAndSend("directExchange",routingKey,"Hello World");
        return "yes";
    }
}

 这里面是没有Queue的

运行成功

 3.建立3个DirectReceiver类接收消息

package com.lj.consumer.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@RabbitListener(queues = "direcrQueueA")
@Slf4j
public class DirectReceiverA {

    @RabbitHandler
    public void process(String msg){
        log.warn("A接到"+msg);
    }
}

 

二.主题交换机  1.在生产者新建 TopicConfig
package com.lj.provider.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@SuppressWarnings("all")
public class TopicConfig {
    
    public final static String KEY_A="*.orange.*";
    public final static String KEY_B="*.*.rabbit";
    public final static String KEY_C="lazy.#";
    
    @Bean
    public Queue topicQueueA(){
        return new Queue("topicQueueA",true);
    }

    @Bean
    public Queue topicQueueB(){
        return new Queue("topicQueueB",true);
    }

    @Bean
    public Queue topicQueueC(){
        return new Queue("topicQueueC",true);
    }

    
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    
    @Bean
    public Binding topicBindingA(){
        return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
    }
    @Bean
    public Binding topicBindingB(){
        return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
    }
    @Bean
    public Binding topicBindingC(){
        return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
    }
}
2.在 Controller 里面加一个方法
    @RequestMapping("/topicSend")
    public String topicSend(String routingKey){
        template.convertAndSend("topicExchange",routingKey,"Hello World");
        return "yes";
    }
3.在消费者创建3个接收者TopicReceiver

package com.lj.consumer.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueA")
@Slf4j
public class TopicReceiverA {

    @RabbitHandler
    public void process(String msg){
        log.warn("A接到"+msg);
    }
}

 

 三.扇形交换机 1.建立FanoutConfig
package com.lj.provider.mq;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@SuppressWarnings("all")
public class FanoutConfig {
    
    @Bean
    public Queue fanoutQueueA(){
        return new Queue("fanoutQueueA",true);
    }

    @Bean
    public Queue fanoutQueueB(){
        return new Queue("fanoutQueueB",true);
    }

    @Bean
    public Queue fanoutQueueC(){
        return new Queue("fanoutQueueC",true);
    }

    
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    
    @Bean
    public Binding fanoutBindingA(){
        return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBindingB(){
        return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBindingC(){
        return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
    }
}
2.在controller加方法
  @RequestMapping("/fanoutSend")
    public String fanoutSend(){
        template.convertAndSend("fanoutExchange",null,"Hello World");
        return "yes";
    }
3.在消费者创建3个接收者FanoutReceiver
package com.lj.consumer.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@RabbitListener(queues = "fanoutQueueA")
@Slf4j
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg){
        log.warn("A接到"+msg);
    }
}

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/744932.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号