在rabbitmq中,生产者发信息不会直接将信息投递到队列中,而是先将信息投递到交换机中,在交换机转发在具体的队列,队列再将信息推送或者拉取消费者进行消费
生产者将消息发送到Exchange,由Exchage再路由到一个或队列中
路由建(RoutingKey)绑定键(BindingKey)生产者将消息发送给交换机的时候,会指定RoutingKey指定路由规则
总结:通过绑定键将交换机与队列关联起来,这样RabbbitMQ就知道如何正确地将消息路由到队列
二、交换机类型 1、直连交换机:Direct exchange生产者将消息发送给哪个Exchange是需要RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由BindingKey决定的
直接交换机的路由算法非常简单:将信息推送到binding key与改信息的routing key相同队列
直连交换机x上绑定了两个队列,第一个列队绑定了绑定键orange,第二个队列有两个绑定键:black和green。
在这种情况下,一个消息在布时指定了路由键为orange将会只被路由到列队q1,路由键为black和green的消息都将被路由到队列q2,其他的消息都将被丢失
同一个绑定键可以绑定到不同的队列上去,可以增加一个交换机x与列队Q2的绑定键,在这种情况下,直连交换机将会和广播交换机有着相同行为,将消息推送到所有匹配到列队,一个路由键为black的消息将会同时被推送到列队Q1和Q2
2、主题交换机:Topic exchange ①直接交换机的特点:②主题交换机的特点: 3、 延申直连交换机的routing_key方案非常简单,如果我们希望一条消息发送多个队列,那么这个交换机需要绑定上非常多的routing_key
假设每个交换机上都绑定一堆的routing_key连接到各个队列上,那么消息管理就会异常困难
三、交换机的属性
四、直连交换机(代码) 直连交换机(生产者) ①创建配置类DirectConfig(创建交换机,队列,进行绑定)name:交换机名称type:交换机类型,direct,topic,tanout,headersdurability:是否需要持久化,如果持久性 则rabbitmq重启后 交换机还存在auto delete:当最后一个绑定到exchange上的队列删除后 自动删除该exchangeinternal:当前exchange是否用于rabbitmq内部使用 默认为falsearguments:扩展参数 用于扩展mop协议定制化使用
package com.xhy.provider.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class DirectConfig {
//新建队列
@Bean
public Queue directQueueA(){
return new Queue("directQueueA",true);
}
@Bean
public Queue directQueueB(){
return new Queue("directQueueB",true);
}
@Bean
public Queue directQueueC(){
return new Queue("directQueueC",true);
}
//创建交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
//进行交换机和队列的绑定
//设置绑定key
@Bean
public Binding BindingA(){
return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA");
}
@Bean
public Binding BindingB(){
return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB");
}
@Bean
public Binding BindingC(){
return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC");
}
}
②创建controller层模拟发送消息
package com.xhy.provider;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@SuppressWarnings("all")
public class ProviderController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/directSend")
public String directSend(String routingKey){
template.convertAndSend("directExchange",routingKey,"Hello World");
return "yes";
}
}
③把生产者跑起来
直连交换机(消费者)
①在消费者中创建接收者类,需要一直运行,表示一直处于接收状态
分别监听队列ABC
②先运行生产者发消息再运行消费者 接收消息 五、主题交换机 主题交换机(生产者) ①创建主题交换机
package com.xhy.provider.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class TopicConfig {
public final static String KEY_A="*.orange.*";
public final static String KEY_B="*.*.rabbit";
public final static String KEY_C="lazy.#";
//新建队列
@Bean
public Queue topicQueueA(){
return new Queue("topicQueueA",true);
}
@Bean
public Queue topicQueueB(){
return new Queue("topicQueueB",true);
}
@Bean
public Queue topicQueueC(){
return new Queue("topicQueueC",true);
}
//创建交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
//进行交换机和队列的绑定
//设置绑定key
@Bean
public Binding topicBindingA(){
return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
}
@Bean
public Binding topicBindingB(){
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
}
@Bean
public Binding topicBindingC(){
return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
}
}
②创建controller层模拟发送消息
package com.xhy.provider;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@SuppressWarnings("all")
public class ProviderController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/directSend")
public String directSend(String routingKey){
template.convertAndSend("directExchange",routingKey,"Hello World");
return "yes";
}
@RequestMapping("/topicSend")
public String topicSend(String routingKey){
template.convertAndSend("topicExchange",routingKey,"Hello World");
return "yes";
}
}
③运行生产者发消息
主题交换机(消费者)
①在消费者中创建接收者类,需要一直运行,表示一直处于接收状态
分别监听队列ABC
②先运行生产者发消息再运行消费者 接收消息 六、扇形交换机(广播) 扇形交换机(生产者) ①创建扇形交换机
定义key的规则并与交换机进行绑定
package com.xhy.provider.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class FanoutConfig {
//新建队列
@Bean
public Queue fanoutQueueA(){
return new Queue("fanoutQueueA",true);
}
@Bean
public Queue fanoutQueueB(){
return new Queue("fanoutQueueB",true);
}
@Bean
public Queue fanoutQueueC(){
return new Queue("fanoutQueueC",true);
}
//创建交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
//进行交换机和队列的绑定
//设置绑定key
@Bean
public Binding fanoutBindingA(){
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingB(){
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingC(){
return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
}
}
②创建controller层模拟发送消息
@RequestMapping("/fanoutSend")
public String fanoutSend(){
template.convertAndSend("fanoutExchange",null,"Hello World");
return "yes";
}
③运行生产者发消息
扇形交换机(消费者) ①在消费者中创建接收者类,需要一直运行,表示一直处于接收状态
分别监听队列ABC
②先运行生产者发消息再运行消费者 接收消息



