- 1 摘要
- 2 常用交换机简介
- 2.1 direct exchange
- 2.2 Topic exchange
- 2.3 Fanout exchange
- 3 核心代码
- 3.1 RabbitMQ 配置定义
- 3.2 RabbitMQ 生产者
- 3.3 RabbitMQ 消费者
- 3.4 测试类(Controller)
- 4 RabbitMQ 参数配置说明
- 5 测试
- 6 推荐参考资料
- 7 Github 源码
RabbitMQ 消息队列四种交换机,分别是: 直连交换机(Direct exchange)、主题交换机(Topic exchange)、广播交换机(Fanout exchange)、头部交换机(Headers exchange)。常用的交换机为 Direct exchange、Topic exchange、Fanout exchange。本文将介绍 Springboot 集成RabbitMQ 常用交换机的示例。
Spring boot 集成 RabbitMQ: SpringBoot 快速整合 RabbitMQ 消息队列框架
2 常用交换机简介 2.1 direct exchange交换机模型
直连交换机通过路由键与队列进行绑定,一个交换机可以绑定多个路由键,每一个交换机定义的路由键与必须与生产者发送时指定的路由键保持对应。
生产者发送消息之后,通过路由键,交换机根据绑定将消息转发给对应的消费者。
2.2 Topic exchange主题交换机模型:
主题交换机也是通过路由键与队列进行绑定,一个交换机可以绑定多个路由键,主题交换机定义的路由键支持模糊匹配,凡是生产者发送的路由键符合主题规则的,即会转发给对应的消费者。
RabbitMQ 主题定义模糊匹配规则:
* 能够匹配零个或一个单词
# 能够匹配任意个单词
注意事项: 建议路由键按照 wordA.wordB.* 格式进行定义,不要使用 wordA_wordB 格式进行定义,否则无法进行有效匹配(RabbitMQ 的匹配规则不是按照常规的正则表达式进行匹配的)
2.3 Fanout exchange广播交换机模型:
广播交换机不需要定义路由键,交换机与队列进行绑定,当生产者使用广播交换机发送消息时,消息会被所有绑定该交换机的消费者队列消费。
3 核心代码 3.1 RabbitMQ 配置定义./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/config/RabbitMQConfig.java
package com.ljq.demo.springboot.baseweb.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME_DEMO = "rabbitmq_spring_boot_demo";
public static final String QUEUE_NAME_API = "rabbitmq_api";
public static final String DIRECT_EXCHANGE_NAME_DEMO = "rabbitmq_direct_exchange_demo";
public static final String TOPIC_EXCHANGE_NAME_DEMO = "rabbitmq_topic_exchange_demo";
public static final String TOPIC_EXCHANGE_NAME_API = "rabbitmq_topic_exchange_api";
public static final String FANOUT_EXCHANGE_NAME_DEMO = "rabbitmq_fanout_exchange_demo";
public static final String DIRECT_EXCHANGE_ROUT_KEY_DEMO = "rabbitmq.spring.boot.demo";
public static final String TOPIC_EXCHANGE_ROUT_KEY_DEMO = "rabbitmq.spring.boot.#";
public static final String TOPIC_EXCHANGE_ROUT_KEY_API = "rabbitmq.api.#";
public static final String QUEUE_SENDER_ROUTING_KEY_DEMO = "rabbitmq.spring.boot.demo";
public static final String QUEUE_SENDER_ROUTING_KEY_DEMO_2 = "rabbitmq.spring.boot.demo.2";
public static final String QUEUE_SENDER_ROUTING_KEY_API_USER = "rabbitmq.api.user";
@Bean("queueDemo")
public Queue queueDemo(){
return new Queue(QUEUE_NAME_DEMO);
}
@Bean("queueApi")
public Queue queueApi(){
return new Queue(QUEUE_NAME_API);
}
@Bean("directExchangeDemo")
public DirectExchange directExchangeDemo() {
return new DirectExchange(DIRECT_EXCHANGE_NAME_DEMO, false, true);
}
@Bean("topicExchangeDemo")
public TopicExchange topicExchangeDemo(){
return new TopicExchange(TOPIC_EXCHANGE_NAME_DEMO, false, true);
}
@Bean("topicExchangeApi")
public TopicExchange topicExchangeApi(){
return new TopicExchange(TOPIC_EXCHANGE_NAME_API, false, true);
}
@Bean("fanoutExchangeDemo")
public FanoutExchange fanoutExchangeDemo() {
return new FanoutExchange(FANOUT_EXCHANGE_NAME_DEMO, false, true);
}
@Bean
public Binding bindingDirectExchangeDemo(@Qualifier("queueDemo") Queue queue,
@Qualifier("directExchangeDemo") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(DIRECT_EXCHANGE_ROUT_KEY_DEMO);
}
@Bean
public Binding bindingTopicExchangeDemo(@Qualifier("queueDemo") Queue queue,
@Qualifier("topicExchangeDemo") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(TOPIC_EXCHANGE_ROUT_KEY_DEMO);
}
@Bean
public Binding bindingTopicExchangeApi(@Qualifier("queueApi") Queue queueApi,
@Qualifier("topicExchangeApi") TopicExchange topicExchangeApi) {
return BindingBuilder.bind(queueApi).to(topicExchangeApi).with(TOPIC_EXCHANGE_ROUT_KEY_API);
}
@Bean
public Binding bingingFanoutExchangeDemo(@Qualifier("queueDemo") Queue queue,
@Qualifier("fanoutExchangeDemo") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bingingFanoutExchangeApi(@Qualifier("queueApi") Queue queue,
@Qualifier("fanoutExchangeDemo") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
3.2 RabbitMQ 生产者
./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/rabbitmq/RabbitMQSender.java
package com.ljq.demo.springboot.baseweb.rabbitmq;
import com.ljq.demo.springboot.baseweb.config.RabbitMQConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQSender {
private static final Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String message){
logger.info("sent by RabbitMQ ... ...{}",message);
rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME_DEMO, "Hello world ---RabbitMQ demo" + message);
}
public void sendDirectDemo(String message) {
logger.info("exchangeName = {}, queue sender outing key = {}, message = {}",
RabbitMQConfig.DIRECT_EXCHANGE_NAME_DEMO, RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DEMO, message);
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME_DEMO,
RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DEMO, message);
}
public void sendTopicDemo(String message) {
logger.info("exchangeName = {}, queue sender outing key = {}, message = {}",
RabbitMQConfig.TOPIC_EXCHANGE_NAME_DEMO, RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DEMO, message);
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME_DEMO,
RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DEMO, message);
}
public void sendTopicDemo2(String message) {
logger.info("exchangeName = {}, queue sender outing key = {}, message = {}",
RabbitMQConfig.TOPIC_EXCHANGE_NAME_DEMO, RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DEMO_2, message);
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME_DEMO,
RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_DEMO_2, message);
}
public void sendTopicApiUser(String message) {
logger.info("exchangeName = {}, queue sender outing key = {}, message = {}",
RabbitMQConfig.TOPIC_EXCHANGE_NAME_API, RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_API_USER, message);
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME_API,
RabbitMQConfig.QUEUE_SENDER_ROUTING_KEY_API_USER, message);
}
public void sendFanoutDemo(String message) {
logger.info("exchangeName = {}, message = {}", RabbitMQConfig.FANOUT_EXCHANGE_NAME_DEMO, message);
rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_DEMO, "", message);
}
}
3.3 RabbitMQ 消费者
./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/rabbitmq/RabbitMQReceiver.java
package com.ljq.demo.springboot.baseweb.rabbitmq;
import com.ljq.demo.springboot.baseweb.config.RabbitMQConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQReceiver {
private static final Logger logger = LoggerFactory.getLogger(RabbitMQReceiver.class);
@RabbitHandler
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME_DEMO)
public void receiveDemo(String message){
logger.info("Received queueName = {}, message = {}",RabbitMQConfig.QUEUE_NAME_DEMO, message);
}
@RabbitHandler
@RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME_API})
public void receiveApi(String message) {
logger.info("Received queueName = {}, message = {}",RabbitMQConfig.QUEUE_NAME_API, message);
}
}
3.4 测试类(Controller)
./demo-web/src/main/java/com/ljq/demo/springboot/web/controller/RabbitMQController.java
package com.ljq.demo.springboot.web.controller;
import com.ljq.demo.springboot.baseweb.api.ApiResult;
import com.ljq.demo.springboot.baseweb.rabbitmq.RabbitMQSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "api/demo/rabbitmq")
public class RabbitMQController {
@Autowired
private RabbitMQSender rabbitMQSender;
@GetMapping(value = "/send")
public ApiResult send(){
int count = 5;
for (int i = 0; i < count; i++) {
rabbitMQSender.send("" + (i+1));
}
return ApiResult.success();
}
@GetMapping(value = "/send/exchange")
public ApiResult send2(){
rabbitMQSender.sendDirectDemo("send direct demo");
rabbitMQSender.sendTopicDemo("send topic demo");
rabbitMQSender.sendTopicDemo2("send topic demo2");
rabbitMQSender.sendTopicApiUser("send topic api user");
rabbitMQSender.sendFanoutDemo("send fanout message");
return ApiResult.success();
}
}
4 RabbitMQ 参数配置说明./demo-base-web/src/main/java/com/ljq/demo/springboot/baseweb/config/RabbitMQConfig.java
public static final String QUEUE_NAME_DEMO = "rabbitmq_spring_boot_demo";
public static final String QUEUE_NAME_API = "rabbitmq_api";
public static final String DIRECT_EXCHANGE_NAME_DEMO = "rabbitmq_direct_exchange_demo";
public static final String TOPIC_EXCHANGE_NAME_DEMO = "rabbitmq_topic_exchange_demo";
public static final String TOPIC_EXCHANGE_NAME_API = "rabbitmq_topic_exchange_api";
public static final String FANOUT_EXCHANGE_NAME_DEMO = "rabbitmq_fanout_exchange_demo";
public static final String DIRECT_EXCHANGE_ROUT_KEY_DEMO = "rabbitmq.spring.boot.demo";
public static final String TOPIC_EXCHANGE_ROUT_KEY_DEMO = "rabbitmq.spring.boot.#";
public static final String TOPIC_EXCHANGE_ROUT_KEY_API = "rabbitmq.api.#";
public static final String QUEUE_SENDER_ROUTING_KEY_DEMO = "rabbitmq.spring.boot.demo";
public static final String QUEUE_SENDER_ROUTING_KEY_DEMO_2 = "rabbitmq.spring.boot.demo.2";
public static final String QUEUE_SENDER_ROUTING_KEY_API_USER = "rabbitmq.api.user";
QUEUE_NAME_: 消息队列的名称,可以定义多个消息队列,每个消息队列的名称要保持不一样。只有先定义队列,才能够在消费者中进行消费。
EXCHANGE_NAME_ : 交换机名称,用于定义交换机的名称,生产者可通过指定交换机的名称进行消息推送。
EXCHANGE_ROUT_KEY_: 交换机路由键,生产者推送消息需要指定路由键(生产者路由键),当消息到达交换机,需要UI交换机路由键进行匹配,符合要求的消息将会被交换机转发给消费者,不符合的将不会被消费。
QUEUE_SENDER_ROUTING_KEY : 生产者路由键,生产者发送消息时指定的路由键,该路由键和交换机路由键不是一个概念。这两者相当于鱼和网,生产者路由键是鱼,交换机路由键相当于网,当鱼游过网时,被抓住的鱼就是生产者符合交换机路由键规则的。
注意事项:当交换机模式为 direct 时,生产者路由键和交换机路由键必须保持一致,消息才能够被对应的消费者消费。当交换机模式为 topic 时,生产者路由键只需符合交换机路由键规则即可。
5 测试启动应用程序,请求测试接口:
http://127.0.0.1:8088/api/demo/rabbitmq/send/exchange
后台日志:
2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | c.ljq.demo.springboot.web.acpect.SimpleInterceptor 29| preHandle 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | com.ljq.demo.springboot.web.acpect.LogAspect 66| [AOP-LOG-START] requestMark: b8e8911b-d0e4-410d-9a18-f09be390d01b requestIP: 127.0.0.1 contentType:null requestUrl: http://127.0.0.1:8088/api/demo/rabbitmq/send/exchange requestMethod: GET requestParams: targetClassAndMethod: com.ljq.demo.springboot.web.controller.RabbitMQController#send2 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQSender 42| exchangeName = rabbitmq_direct_exchange_demo, queue sender outing key = rabbitmq.spring.boot.demo, message = send direct demo 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQSender 56| exchangeName = rabbitmq_topic_exchange_demo, queue sender outing key = rabbitmq.spring.boot.demo, message = send topic demo 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQSender 70| exchangeName = rabbitmq_topic_exchange_demo, queue sender outing key = rabbitmq.spring.boot.demo.2, message = send topic demo2 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQSender 84| exchangeName = rabbitmq_topic_exchange_api, queue sender outing key = rabbitmq.api.user, message = send topic api user 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQSender 97| exchangeName = rabbitmq_fanout_exchange_demo, message = send fanout message 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | com.ljq.demo.springboot.web.acpect.LogAspect 72| [AOP-LOG-END] requestMark: b8e8911b-d0e4-410d-9a18-f09be390d01b requestUrl: http://127.0.0.1:8088/api/demo/rabbitmq/send/exchange response: ApiResult(code=200, msg=成功, data=null, extraData=null, timestamp=1633946321726) 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | com.ljq.demo.springboot.baseweb.log.LogService 44| [LOG-RESPONSE] requestIp: 127.0.0.1 requestUrl: http://127.0.0.1:8088/api/demo/rabbitmq/send/exchange response: ApiResult(code=200, msg=成功, data=null, extraData=null, timestamp=1633946321726) 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | c.ljq.demo.springboot.web.acpect.SimpleInterceptor 38| postHandle 2021-10-11 17:58:41 | INFO | http-nio-8088-exec-3 | c.ljq.demo.springboot.web.acpect.SimpleInterceptor 44| afterCompletion 2021-10-11 17:58:41 | INFO | SimpleAsyncTaskExecutor-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQReceiver 28| Received queueName = rabbitmq_spring_boot_demo, message = send direct demo 2021-10-11 17:58:41 | INFO | SimpleAsyncTaskExecutor-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQReceiver 39| Received queueName = rabbitmq_api, message = send topic api user 2021-10-11 17:58:41 | INFO | SimpleAsyncTaskExecutor-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQReceiver 28| Received queueName = rabbitmq_spring_boot_demo, message = send topic demo 2021-10-11 17:58:41 | INFO | SimpleAsyncTaskExecutor-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQReceiver 28| Received queueName = rabbitmq_spring_boot_demo, message = send topic demo2 2021-10-11 17:58:41 | INFO | SimpleAsyncTaskExecutor-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQReceiver 39| Received queueName = rabbitmq_api, message = send fanout message 2021-10-11 17:58:41 | INFO | SimpleAsyncTaskExecutor-1 | c.l.d.springboot.baseweb.rabbitmq.RabbitMQReceiver 28| Received queueName = rabbitmq_spring_boot_demo, message = send fanout message
6 推荐参考资料Spring Boot中使用RabbitMQ
RabbitMQ Topics 主题交换机(官方文档)
AMQP 0-9-1 Model Explained(官方文档)
spring boot整合RabbitMQ(Fanout模式)
7 Github 源码Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo
个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.



