RabbitMQ是基于AMQP协议的一种消息队列,常用于异步处理、解耦合的操作场景。
什么叫交换机?
发送者sender向外发送信息的过程中,并不直接投递到mq队列中来,而是先放在交换机上,由交换机进行投递,再把数据发送到队列上
Springboot集成RabbitMQ(Direct交换机模式实现)
- 添加依赖spring-boot-starter-amqp
org.springframework.boot spring-boot-starter-amqp
配置依赖
#rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #u6D88u8D39u8005u6570u91CF消费者数量 spring.rabbitmq.listener.simple.concurrency= 10 spring.rabbitmq.listener.simple.max-concurrency= 10 #u6D88u8D39u8005u6BCFu6B21u4ECEu961Fu5217u83B7u53D6u7684u6D88u606Fu6570u91CF spring.rabbitmq.listener.simple.prefetch= 1 #u6D88u8D39u8005u81EAu52A8u542Fu52A8 spring.rabbitmq.listener.simple.auto-startup=true #u6D88u8D39u5931u8D25uFF0Cu81EAu52A8u91CDu65B0u5165u961F spring.rabbitmq.listener.simple.default-requeue-rejected= true #u542Fu7528u53D1u9001u91CDu8BD5重置 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000 spring.rabbitmq.template.retry.multiplier=1.0
- 创建RabbitMQ的相关配置package
MQConfig.class(RabbitMQ配置类) MQSender.class(RabbitMQ发送者类) MQReceiver.class(RabbitMQ消息接收者类)
- MQConfig.class类
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//mq的配置类
@Configuration
public class MQConfig {
public static final String QUEUE="queue";
//注入一个bean实体类
@Bean
public Queue queue(){
return new Queue(QUEUE,true);//true代表是否持续
}
}
4.创建消息发送者MQSender.class
//service注解必须要带上
@Service
public class MQSender {
//log为打印辅助工具,建议采用log打印,而不采用system打印
private static Logger log = LoggerFactory.getLogger(MQSender.class);
//注入依赖引擎
@Autowired
AmqpTemplate amqpTemplate;
//发送数据
public void send(String message){
//RedisService.beanToString转化为String类型的设计
log.info("发送的数据"+message);
//MQConfig.QUEUE为发送队列
amqpTemplate.convertAndSend(MQConfig.QUEUE,message);
}
}
5.创建消息发送者MQReceiver.class
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
//MQ的接收者
import com.imooc.miaosha.controller.GoodsController;
//设置服务注解
@Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
//对指定消息队列进行监听
@RabbitListener(queues=MQConfig.QUEUE)
public void receiver(String message){
log.info("接受的信息"+message);
}
}
6.创建一个Controller,进行模拟请求信息的发送接收处理
编写请求测试
//注入发送者
@Autowired
MQSender mqSender;
@RequestMapping("/mq")
@ResponseBody
public String home() {
mqSender.send("hello RabbitMQ");//发送消息
return "success";
}
浏览器输入
http://localhost:8080/mq
结果:
rabbitmq.MQReceiver : 发送给的数据hello RabbitMQ rabbitmq.MQReceiver : 接受的信息hello RabbitMQ
Springboot集成Topic交换机模式实现
- 引入依赖(上述依赖相同)实现思路:要发送的消息,首先由一个队列mq1-------> 交换机--------->然后将其消息进行转发到mq2创建两个模拟队列,queue1和queue2
public static final String TOPTIC_QUEUE1="topicqueue1";
public static final String TOPIC_QUEUE2="topicqueue2";
//Topic模式的消息队列1
@Bean
public Queue topicQueue1(){
return new Queue(TOPTIC_QUEUE1,true);
}
//Topic模式下的消息队列2
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE2,true);
}
- 创建Topic交换机
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
public static final String TOPIC_EXCHAGE="topicExchange";
//交换机Topic模式
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHAGE);
}
- 将其消息队列绑定在交换机上
public static final String ROUTING_KEY1="topic.Key1";
public static final String ROUTING_KEY2="topic.#";
//进行绑定
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY1);
}
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY2);
}
- 设置消息发送者MQSender.class
这里设置一个Topic交换机式的方法,目的在于检测消息的匹配度。
//topic交换方法
//发送两条信息
//第一条是携带着匹配符号topic.key1的信息
//第二条是携带者匹配符号topic.#的信息
public void sendTopic(String message){
log.info("topic模式交换机下的发送的信息:"+message);
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHAGE,"topic.Key1",message+"1");
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHAGE,"topic.Key2",message+"2");
}
推理下:
携带着topic.key1的信息只能让mq消息队列queue1来接收,而queue2却不能
原因:
public static final String ROUTING_KEY1="topic.Key1";
public static final String ROUTING_KEY2="topic.#";
//进行绑定
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY1);
}
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY2);
}
这两个队列一个绑定携带着只有topic.Key1,只能识别topic.Key1
而第二个是topic.#可以识别topic.XXXX任何信息。
因此得到的结果应该是
mq1和mq2都接受到了topic.Key1匹配幅的信息
而第二条信息(携带着topic.Key2的信息)只能由mqqueue2来接收
- 设置MQReciver.class
作为接收者
只需要设置两个监听RabbitMQListener进行监听两个消息队列。
@RabbitListener(queues=MQConfig.TOPTIC_QUEUE1)
public void receiverTopinc1(String message){
log.info("topicQueue1接收的信息:"+message);
}
@RabbitListener(queues=MQConfig.TOPIC_QUEUE2)
public void receiverTopic2(String message){
log.info("topicQueue2接收的信息:"+message);
}
- 下面是编写Controller请求进行测试是否是按照预期我们想的那样:
@Autowired
MQSender mqSender;
@RequestMapping("/mqTopic")
@ResponseBody
public String mqTopic() {
mqSender.sendTopic("hello RabbitMQ---Topic模式");
return "success";//象征性的输出一个结果
}
结果
浏览器输入:
http://localhost:8080/demo/mqTopic
查看控制台:
rabbitmq.MQReceiver : topic模式交换机下的发送的信息:hello RabbitMQ---Topic模式 rabbitmq.MQReceiver : topicQueue2接受的信息:hello RabbitMQ---Topic模式1 rabbitmq.MQReceiver : topicQueue2接受的信息:hello RabbitMQ---Topic模式2 rabbitmq.MQReceiver : topicQueue1接受的信息:hello RabbitMQ---Topic模式1
分析:
正如我们上面预测的一样:
hello RabbitMQ---Topic模式1 这一条信息 rabbitmq.MQReceiver : topicQueue2接受的信息:hello RabbitMQ---Topic模式1 rabbitmq.MQReceiver : topicQueue1接受的信息:hello RabbitMQ---Topic模式1 这两个队列都收到了,因此topic.#和topic.Key1都识别topic.Key1 但是hello RabbitMQ---Topic模式2这一条信息 只有queue2收到了, 说明,topic.#可以识别topic.Key2 而topic.Key1不可以识别topic.Key2
到此Direct交换机模式和Topic交换机模式的简单测试实现!
剩下的两个模式:
Fanout Exchange模式
Headers Exchage模式
下一篇再介绍!



