栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq(二)一文彻底弄懂RabbitMq的四种交换机原理及springboot实战应用

RabbitMq(二)一文彻底弄懂RabbitMq的四种交换机原理及springboot实战应用

四大交换机工作原理及实战应用
  • 交换机概念
  • direct 直连交换机
    • 工作模式图解
    • springboot代码
  • Fanout扇出交换机
    • 工作模式图解
    • springboot代码
  • Topic主题交换机
    • 工作模式图解
    • springboot代码
  • header交换机

交换机概念

交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。

最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。

交换机的作用: 生产者向broker(rabbitmq服务器)发送消息,交换机通过生产者绑定的路由键,将消息推送到不同的消息队列中。而消费者,只绑定队列,从队列中获取消息。

direct 直连交换机 工作模式图解

生产者发送的消息;交换机会根据不同的路由键,发送到对应的队列;

springboot代码

MQ配置类,声明交换机、队列,路由键绑定

	@Bean
	public DirectExchange createExchange() {
	    // 交换机名字;是否持久化;是否自动删除
		return new DirectExchange("testE", true, false);
	}
	
	@Bean
	public Queue createQueue() {
	    // 交换机名字;是否持久化;是否自动删除
		return new Queue ("testQ", true, false, false);
	}
	
	@Bean
	public Binding createBinding() {
	    // 交换机名字;是否持久化;是否自动删除
		return BindingBuilder
		.bind(this.createQueue())
		.to(this.createExchange())
		.with("testR");
	}
}

生产者

@Service
public class ProduceMsg {
	@Autowire
	private RabbitTemplate rabbitTemplate;
	public void sendMsg(Object msg){
		// 消息唯一标识
	 	CorrelationData correlationData= new CorrelationData();
	 	correlationData.setId(msg.getId());
		rabbitTemplate.converAndSend("testE", "testR", msg, correlationData);
	}
}

消费者

@Conponent
public class ConsumeMsg {
	
	@RabbitListener(queues = "testQ")
	public void sendMsg(String msg){
		log.info("接收到消息:{}", msg);
		// ......业务逻辑消费消息;
	}
}
Fanout扇出交换机 工作模式图解

生产者发送到交换机的消息;会发送到绑定到该交换机的所有队列

springboot代码

MQ配置类,声明交换机、队列,绑定

@Configuration
public class RabbitMqConfig {
    @Bean
    public Queue fanoutQueueA()
    {
        return new Queue("queueA", true, false, false);
    }
 
    @Bean
    public Queue fanoutQueueB()
    {
        return new Queue("queueB", true, false, false);
    }
 
    @Bean
    public Queue fanoutQueueC()
    {
        return new Queue("queueC", true, false, false);
    }
 
    @Bean
    FanoutExchange fanoutExchange()
    {
        return new FanoutExchange("exchangeFanout");
    }
 
    @Bean
    Binding bindingExchangeA()
    {
        return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
    }
 
    @Bean
    Binding bindingExchangeB()
    {
        return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
    }
 
    @Bean
    Binding bindingExchangeC()
    {
        return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
    }
}

生产者

@Service
public class ProduceMsg {
	@Autowire
	private RabbitTemplate rabbitTemplate;
	public void sendMsg(Object msg){
		// 消息唯一标识
	 	CorrelationData correlationData= new CorrelationData();
	 	correlationData.setId(msg.getId());
		rabbitTemplate.converAndSend("exchangeFanout", "", msg, correlationData);
	}
}

消费者

@Conponent
public class ConsumeMsg {
	
	@RabbitListener(queues = "testQ")
	public void sendMsg(String msg){
		log.info("接收到消息:{}", msg);
		// ......业务逻辑消费消息;
	}
}
Topic主题交换机

topic模式跟direct的区别是,topic模式可以用通配符的方式,对路由键进行绑定;达到更灵活路由消息的效果。
交换机的routingKey不能随意写;必须是以点号分隔;如aa.bb; cc.dd.ee等形式
*号代表一个单词;#号代表0个或多个单词

工作模式图解

图中队列1绑定的路由键是 *.*.routeA
图中队列2绑定的路由键是 routeA.#
生产者向该交换机的routeA.xxx.routeA路由键发送消息;那么队列1和2都会收到消息

springboot代码

MQ配置类,声明交换机、队列,绑定

@Configuration
public class TopicRabbitMqConfig
{
    
    @Bean
    public Queue topicQueueA() {
        return new Queue("topic_queue_A", true, false, false);
    }
 
    
    @Bean
    public Queue topicQueueB() {
        return new Queue("topic_queue_B", true, false, false);
    }
 
    
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topic_exchange", true, false);
    }
 
    
    @Bean
    Binding bindingExchangeQueueA() {
        //将队列和交换机绑定, 并设置用于匹配键:routingKey
        return BindingBuilder.bind(topicQueueA()).to(exchange()).with("*.*.routeKey");
    }
 
    
    @Bean
    Binding bindingExchangeQueueB(Queue topicQueueB, TopicExchange exchange) {
        //将队列和交换机绑定, 并设置用于匹配键:routingKey
        return BindingBuilder.bind(topicQueueB()).to(exchange()).with("routeKey.#");
    }
}

生产者

@Service
public class ProduceMsg {
	@Autowire
	private RabbitTemplate rabbitTemplate;
	public void sendMsg(Object msg){
		// 消息唯一标识
	 	CorrelationData correlationData= new CorrelationData();
	 	correlationData.setId(msg.getId());
		rabbitTemplate.converAndSend("topic_exchange", "routeKey.test.routeKey", 
	
	msg, correlationData);
	}
}

这个生产者发送的消息;队列topic_queue_Atopic_queue_B都会接收到该生产者发送的消息

header交换机

该交换机不同于其他机制,且实际开发不常用,此处不作讲解

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433900.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号