- 消息发送确认
- 配置文件
- 消息确认配置类
- 消息找不到交换机
- 消息到了交换机,找不到队列
- 消息啥都找不到
- 消息推送成功
- 消息接收确认
- 消息确认模式
- 手动确认消息
- `channel.basicAck()`
- `channel.basicNack()`
- `channel.basicReject()`
- 消息的确认实现
- 配置类
- 测试
本篇文章是继 springboot整合rabbitmq(一) 的续篇
消息发送确认生产者发送消息,先将消息发送到 Exchange,然后由 Exchange 再路由到 Queue,这中间就需要确认两个事情
- 确认消息是否成功发送到 Exchange
- 确认消息是否从 Exchange 成功路由到 Queue
spring 提供了两个回调函数来处理这两种消息发送确认
配置文件server.port=8080 #配置rabbitmq服务器 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #确认消息已发送到交换机 spring.rabbitmq.publisher-/confirm/is=true #确认消息已发送到队列 spring.rabbitmq.publisher-returns=true消息确认配置类
@Slf4j
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.set/confirm/iCallback((correlationData, ack, cause) -> {
log.info("/confirm/iCallback:" + "相关数据:" + correlationData);
log.info("/confirm/iCallback:" + "确认情况:" + ack);
log.info("/confirm/iCallback:" + "原因:" + cause);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("ReturnCallback:" + "消息:" + message);
log.info("ReturnCallback:" + "回应码:" + replyCode);
log.info("ReturnCallback:" + "回应信息:" + replyText);
log.info("ReturnCallback:" + "交换机:" + exchange);
log.info("ReturnCallback:" + "路由键:" + routingKey);
});
return rabbitTemplate;
}
}
可以看到上面写了两个回调函数 /confirm/iCallback 和 RetrunCallback,那么以上这两种回调函数都是在什么情况会触发呢?先从总体的情况分析,推送消息存在四种情况
- 消息推送到 server,但是在 server 里找不到交换机
- 消息推送到 server,找到交换机了,但是没找到队列
- 消息推送到 sever,交换机和队列啥都没找到
- 消息推送成功
写个测试接口,把消息推送到名为 non-existent-exchange 的交换机上(这个交换机是没有创建没有配置的)
@GetMapping("/TestMessageAck")
@ResponseBody
public String TestMessageAck() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: non-existent-exchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
return "消息已发送至rabbitmq server";
}
调用接口 http://localhost:8080/TestMessageAck,查看 rabbitmq-provuder 项目的日志输出情况
结论:这种情况触发的是 /confirm/iCallback 回调函数
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,在 DirectRabitConfig 里面新增一个交换机 lonelyDirectExchange,但没给它做任何绑定配置操作
@Bean
public DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange", true, false);
}
然后写个测试接口,把消息推送到名为 lonelyDirectExchange 的交换机上(这个交换机是没有任何队列绑定的)
@GetMapping("/TestMessageAck2")
@ResponseBody
public String TestMessageAck2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: lonelyDirectExchange test message ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);
return "消息已发送至rabbitmq server";
}
调用接口 http://localhost:8080/TestMessageAck2,查看 rabbitmq-provuder 项目的日志输出情况
可以看到这种情况,两个函数都被调用了,消息是推送成功到服务器了的,所以 /confirm/iCallback 对消息确认情况是 true;而在 RetrunCallback 回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE
结论:这种情况触发的是 /confirm/iCallback 和 RetrunCallback 两个回调函数
消息啥都找不到这种情况其实一看就觉得跟 1 很像,,所以不做结果说明了。结论:这种情况触发的是 /confirm/iCallback 回调函数
消息推送成功那么测试下,按照正常调用之前消息推送的接口就行,就调用下 http://localhost:8080/sendFanoutMessage 接口,可以看到日志输出
结论:这种情况触发的是 /confirm/iCallback 回调函数
- RabbitMQ 默认自动确认消息被正确消费,即消息投递到消费者后就自动确认消息被处理完毕,并且会将该消息删除,即使消费者意外宕机,或者抛出异常,如果消费者接收到消息,还没处理完成就 down 掉或者抛出异常。那么,这条消息就丢失了
- 问题就出在 RabbitMQ 只管将消息投递出去,而不管消息是否被正确处理就自动删除消息。所以,只要将自动 ack 修改为手动 ack,消费成功才通知 RabbitMQ 可以删除该消息即可。如果消费者宕机消费失败,由于 RabbitMQ 并未收到 ack 通知,且感知到该消费者状态异常(如抛出异常),就会将该消息重新推送给其他消费者,让其他消费者继续执行,这样就保证消费者挂掉但消息不会丢失
- AcknowledgeMode.NONE:默认使用自动确认,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。一般这种情况我们都是使用 try catch 捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理
- AcknowledgeMode.AUTO:根据情况确认
- AcknowledgeMode.MANUAL:手动确认,我们配置接收消息确认机制时,多数选择的模式。消费者收到消息后,手动调用 basicAck() 或 basicNack() 或 basicReject() 方法, RabbitMQ 收到这些消息后,才认为本次投递成功
用于消息的肯定确认,表示消息已经被正确处理
channel.basicNack()用于消息的否定确认,表示没有被正确处理。
方法 channel.basicNack(deliveryTag, false, true):设置不消费某条消息
- 第一个参数是当前消息到的数据的唯一 id
- 第二个参数是指是否针对 多条消息;如果是 true,将一次性拒绝所有小于 deliveryTag 的消息
- 第三个参数如果传入 true,则重新入队列,否则进入死信队列(多条消息)
使用不确认后,重新入对列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压
channel.basicReject()用于消息的否定确认,basicReject() 不支持批量拒绝,而 basicNack() 可以
方法 channel.basicReject(deliveryTag, true):拒绝消费当前消息
- 第一个参数是当前消息到的数据的唯一 id
- 第二参数如果传入 true,则重新入队列,否则进入死信队列
该方法执行后,该消费者还是有机会消费到该条消息。使用拒绝后,重新入对列这个确认模式要谨慎,因为一般都是出现异常的时候,catch 异常再拒绝入列,选择是否重入列。但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压
消息的确认实现 配置类在 rabbitmq-consumer 项目中,增加配置类
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MyAckReceiver myAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames("DirectQueue");
//如果同时设置多个如下: 前提是队列都是必须已经创建存在的
// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
container.setMessageListener(myAckReceiver);
return container;
}
}
自定义消息接收转换类
@Slf4j
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(@NotNull Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String msg = message.toString();
String[] msgArray = msg.split("'");
Map msgMap = mapStringToMap(msgArray[1].trim());
String messageId = msgMap.get("messageId");
String messageData = msgMap.get("messageData");
String createTime = msgMap.get("createTime");
log.info("MyAckReceiver messageId:" + messageId + " messagedata:" + messageData + " createTime:" + createTime);
log.info("消费的主题消息来自:" + message.getMessageProperties().getConsumerQueue());
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
@NotNull
private Map mapStringToMap(String str) {
str = str.substring(1, str.length() - 1);
String[] strs = str.split(",", 3);
Map map = new HashMap<>();
for (String string : strs) {
String key = string.split("=")[0].trim();
String value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}
测试
调用接口 http://localhost:8080/sendDirectMessage,给交换机 directExchange 的队列 DirectQueue 推送一条消息,看下 rabbitmq-consumer 日志输出情况
参考:https://blog.csdn.net/qq_35387940/article/details/100514134
源码:https://gitee.com/chaojiangcj/springboot-rabbitmq



