前言:本篇博客主要介绍SpringBoot中如何根据直连型交换机去使用RabbitMQTemplate,Direct模式的Exchange根据消息携带的路由值将消息投递给对应队列。
一、RabbitMQ通信模型和Exchange类型 1.1、RabbitMQ通信模型
在代码中使用MQ发送消息的过程是异步执行的,消息到达RabbitMQ后,会在通信模型中找到适合的队列进行入队。
下面来看看消息到达RabbitMQ会发生什么,Exchange会将消息通过RoutingKey将消息路由到相应的队列,每当有消息进入到队列中时,消费端就会监听到该消息进行消费。
1.2、Exchange类型RabbitMQ中对于Exchange有以下几种类型:
1.3、Direct直连型交换机这里着重说下Direct直连型交换机,根据消息携带的路由值将消息投递给对应绑定路由值的队列。
Direct交换机大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键routing key。然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
二、SpringBoot的RabbitMQTemplate实战
比如说我们想实现个多渠道消息通知服务系统,用来发送邮件、短信和微信通知,使用RabbitMQ做异步解耦和流量削峰。那么就可以使用Direct交换机把三种不同类型的通知消息放在对应路由值的三个队列EmailQueue、SmsQueue、WeChatQueue上。
2.1、引入Maven依赖2.2、修改yml配置文件org.springframework.boot spring-boot-starter-amqp2.3.7.RELEASE
SpringBoot的RabbitMQ具有丰富的配置,比如消费失败重试、消息确认模式和超时等,这里可以不做复杂的配置,用于简单的应用。
spring:
rabbitmq:
host: localhost
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用默认host为/
virtualHost: /
listener:
simple:
concurrency: 1 # Minimum number of consumers.
max-concurrency: 20 # Maximum number of consumers.
prefetch: 50
default-requeue-rejected: true #消息被拒后(即未消费),重新(true)放入队列
retry:
enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
max-attempts: 3 #消费端发现了异常,尝试了规定次数后,这条“问题消息”就会被解决(如果设置了死信队列,就被送到了死信队列;否则直接扔掉)。如果不开启消费者重试尝试模式,那么会无限的循环下去控制台一直报错
initial-interval: 5000ms
2.3、编写RabbitConfig配置类
采用Java Configuration的方式配置支持消息体为对象的RabbitTemplate、Exchange和Queue等信息
package com.cernet.notice.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig
{
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter jackson2JsonMessageConverter(){
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
return jackson2JsonMessageConverter;
}
//队列 起名:EmailDirectQueue
@Bean
public Queue EmailDirectQueue()
{
// 声明队列参数列表:new Queue("EmailDirectQueue",true,true,false);
// name 队列名称
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("EmailDirectQueue",true);
}
//Direct交换机 起名:EmailDirectExchange
@Bean
DirectExchange EmailDirectExchange() {
//声明交换机参数列表
//name:交换机名称。
// durable:是否持久化,否则RabbitMQ服务端重启,队列就不再存在。
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
return new DirectExchange("EmailDirectExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于路由匹配键:EmailDirectRouting
@Bean
Binding bindingDirectEmail() {
return BindingBuilder.bind(EmailDirectQueue()).to(EmailDirectExchange()).with("EmailDirectRouting");
}
//队列 起名:WeChatDirectQueue
@Bean
public Queue WeChatDirectQueue()
{
return new Queue("WeChatDirectQueue",true);
}
//Direct交换机 起名:WeChatDirectExchange
@Bean
DirectExchange WeChatDirectExchange() {
return new DirectExchange("WeChatDirectExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于路由匹配键:EmailDirectRouting
@Bean
Binding bindingDirectWeChat() {
return BindingBuilder.bind(WeChatDirectQueue()).to(WeChatDirectExchange()).with("WeChatDirectRouting");
}
//队列 起名:SmsDirectQueue
@Bean
public Queue SmsDirectQueue()
{
return new Queue("SmsDirectQueue",true);
}
//Direct交换机 起名:SmsDirectExchange
@Bean
DirectExchange SmsDirectExchange() {
return new DirectExchange("SmsDirectExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于路由匹配键:EmailDirectRouting
@Bean
Binding bindingDirectSms() {
return BindingBuilder.bind(SmsDirectQueue()).to(SmsDirectExchange()).with("SmsDirectRouting");
}
}
-
声明队列参数列表:
-
name:队列名称。
-
durable:是否持久化,否则RabbitMQ服务端重启,队列就不再存在。
-
exclusive:是否排他,即该队列是否只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参数优先级高于durable。
-
autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
-
-
声明交换机参数列表:
-
name:交换机名称。
-
durable:是否持久化,否则RabbitMQ服务端重启,队列就不再存在。
-
autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
-
生产者使用RabbitMQ的直连型交换机的形式,根据消息携带的路由值将消息投递给对应队列
RabbitTemplate是Spring AMQP和RabbitMQ整合的产物,是进行发送消息的关键类。使用RabbitTemplate,可以发送String、Map和对象类型的消息。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Component
@Slf4j
public class RabbitMQProducer {
//使用RabbitTemplate,提供了接收/发送等等方法,可以发送String、Map和对象
@Autowired
RabbitTemplate rabbitTemplate;
public boolean produceEmailMessage(String receiverEmail,String title,String content)
{
// Map map =new HashMap<>();
// map.put("senderID",messageId);
// map.put("receiverID",messageData);
// map.put("tilte",createTime);
// map.put("content",createTime);
// rabbitTemplate.convertAndSend("directExchange", "directRouting", map);
EmailNotice emailNotice = new EmailNotice();
emailNotice.setReceiverEmail(receiverEmail);
emailNotice.setTilte(title);
emailNotice.setContent(content);
try {
//将消息携带绑定键值:EmailirectRouting 发送到交换机EmailDirectExchange:convertAndSend(交换机名",“路由键”,“消息内容”)
rabbitTemplate.convertAndSend("EmailDirectExchange", "EmailDirectRouting", emailNotice);
}
catch (Exception e)
{
e.printStackTrace();
log.error("邮件生产者发送异常:{}",e.getMessage());
return false;
}
return true;
}
public boolean produceSmsMessage(String receiverPhone,String variableArray)
{
SmsNotice smsNotice = new SmsNotice();
smsNotice.setReceiverPhone(receiverPhone);
smsNotice.setVariableArray(variableArray);
try
{
//将消息携带绑定键值:SmsDirectRouting 发送到交换机SmsDirectExchange:convertAndSend(交换机名",“路由键”,“消息内容”)
rabbitTemplate.convertAndSend("SmsDirectExchange", "SmsDirectRouting", smsNotice);
}
catch (Exception e)
{
e.printStackTrace();
log.error("短信生产者发送异常:{}",e.getMessage());
return false;
}
return true;
}
public boolean produceWeChatMessage(String openid,String content)
{
WeChatNotice wechatNotice = new WeChatNotice();
wechatNotice.setOpenId(openid);
wechatNotice.setContent(content);
try
{
//将消息携带绑定键值:WeChatDirectRouting 发送到交换机WeChatDirectExchange:convertAndSend(交换机名",“路由键”,“消息内容”)
rabbitTemplate.convertAndSend("WeChatDirectExchange", "WeChatDirectRouting", wechatNotice);
}
catch (Exception e)
{
e.printStackTrace();
log.error("短信生产者发送异常:{}",e.getMessage());
return false;
}
return true;
}
}
convertAndSend函数发送到指定的交换机上,同时指定路由键。RabbitMQ会根据路由键把消息推送至和该交换机绑定的队列中。
2.5、消费者RabbitMQConsumerimport com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Slf4j
@Component
public class RabbitMQConsumer {
//监听的队列名称 EmailDirectQueue
@RabbitListener(queues = "EmailDirectQueue")
@RabbitHandler
public void process(EmailNotice emailNotice)
{
ObjectMapper objectMapper = new ObjectMapper();
String message = null;
try
{
message = objectMapper.writevalueAsString(emailNotice);
}
catch (JsonProcessingException e)
{
e.printStackTrace();
log.error("Json解析异常,参数格式不正确!");
}
catch (Exception exception)
{
exception.printStackTrace();
}
log.info("Email消费者收到消息:{}",message);
}
//监听的队列名称 SmsDirectQueue
@RabbitListener(queues = "SmsDirectQueue")
@RabbitHandler
public void process(SmsNotice smsNotice) throws JsonProcessingException
{
ObjectMapper objectMapper = new ObjectMapper();
String message = null;
try
{
message = objectMapper.writevalueAsString(smsNotice);
}
catch (JsonProcessingException e)
{
e.printStackTrace();
log.error("Json解析异常,参数格式不正确!");
}
catch (Exception exception)
{
exception.printStackTrace();
}
log.info("Sms消费者收到消息:{}",message);
}
//监听的队列名称 WeChatDirectQueue
@RabbitListener(queues = "WeChatDirectQueue")
@RabbitHandler
public void process(WeChatNotice weChatNotice) throws JsonProcessingException
{
ObjectMapper objectMapper = new ObjectMapper();
String message = null;
try
{
message = objectMapper.writevalueAsString(weChatNotice);
}
catch (JsonProcessingException e)
{
e.printStackTrace();
log.error("Json解析异常,参数格式不正确!");
}
catch (Exception exception)
{
exception.printStackTrace();
}
log.info("WeChat消费者收到消息:{}",message);
}
// @RabbitHandler
// public void process(Map testMessage)
// {
// log.info("DirectReceiver消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
// JSonObject jsonObject = new JSonObject(testMessage);
// String senderID = jsonObject.getStr("senderID");
// }
}
通过注解@RabbitListener指定要消费的队列,当监听到队列 XXX 中有消息时则会进行接收并处理
2.6、通过RabbitMQ的后台管理界面,查看交换机和队列可以看到交换机已经被创建出来了:
可以看到队列也已经被创建出来了:
可以看到和队列的绑定关系:
参考链接:
[RabbitMQ]SpringBoot的RabbitMQTemplate实战
Springboot整合RabbitMQ(一)——Direct直连交换机



