为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制,只要将自动ack设置为false,消费者有足够的时间处理消息数据,消费者可以在处理完后续的业务逻辑后再进行提交ack,确保消息确实是被消费了,防止服务宕机可能导致的消息丢失。而MQ会一直等待消费者手动提交ack!!
在rabbitmq管理页面上可以详细看到队列中的消息情况:
- ready: 队列中存在的消息,可以提供给消费者的消息数量
- Unacked: 表示是发送给消费者了,但是消费者还未将ack反馈给MQ的消息数量(一般只有设置了手动ack时,当消费者获取到消息时才会有值)
自动ACK: 消费者配置中如果是自动ack机制,MQ将消息发送给消费者后直接就将消息给删除了,这个的前提条件是消费者程序没有出现异常,如果消费者接收消息后处理时出现异常,那么MQ将会尝试重发消息给消费者直至达到了消费者服务中配置的最大重试次数后将会直接抛出异常不再重试。
手动ACK:消费者设置了手动ACK机制后,可以显式的提交/拒绝消息(这一步骤叫做发送ACK),如果消息被消费后正常被提交了ack,那么此消息可以说是流程走完了,然后MQ将此消息从队列中删除。而如果消息被消费后被拒绝了,消费者可选择让MQ重发此消息或者让MQ直接移除此消息。后面可以使用死信队列来进行接收这些被消费者拒绝的消息,再进行后续的业务处理。
认知RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认。
其中发送方确认又分为:生产者到交换机到确认、交换机到队列的确认。(借用下大佬的图)
实践 发送方确认-
ConfirmCallback()方法,是一个回调方法,生产者将消息发送给Broker(RabbitMQ服务),然后Broker给回调生产者的ConfirmCallback()方法告知生产者消息是否接收到。也就是确认消息是否正常到达 Exchange 中。
# 我们需要在生产者中添加配置,表示开启发布者确认(注意新旧版本) spring.rabbitmq.publisher-confirm-type=correlated # 新版本 spring.rabbitmq.publisher-confirms=true # 老版本
-
ReturnCallback()方法同样是一个回调方法,是交换机和队列之间的消息确认方式。启动消息失败返回,此方法是在交换器路由不到队列时触发回调,这个可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了
# 在生产者中配置,表示发布者返回 spring.rabbitmq.publisher-returns=true
使用
application.yml
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin # 消息确认(ACK) publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) publisher-returns: true #确认消息已发送到队列(Queue)
理解:springboot中需要给RabbitTemplate设置一些方法的回调即可。
通常情况下我们可以直接在配置类中设置好这些东西,但是可能由于某些业务需求,并不是所有的消息都使用常用的方式,也可以将我们的消息发送服务实现接口然后重写这些回调。
配置类方式(全局方式):@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); //确认消息送到交换机(Exchange)回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println("n确认消息送到交换机(Exchange)结果:"); System.out.println("相关数据:" + correlationData); System.out.println("是否成功:" + ack); System.out.println("错误原因:" + cause); }); //确认消息送到队列(Queue)回调 rabbitTemplate.setReturnsCallback(returnedMessage -> { System.out.println("n确认消息送到队列(Queue)结果:"); System.out.println("发生消息:" + returnedMessage.getMessage()); System.out.println("回应码:" + returnedMessage.getReplyCode()); System.out.println("回应信息:" + returnedMessage.getReplyText()); System.out.println("交换机:" + returnedMessage.getExchange()); System.out.println("路由键:" + returnedMessage.getRoutingKey()); }); return rabbitTemplate; }
以接口的形式访问发送一下。注意:确认消息送到队列(Queue)回调,只有在出现错误时才回调。
发送服务类实现(局部方式)将发送的服务类实现接口,实现回调
@Service public class SendMessageService implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { private static Logger logger = LoggerFactory.getLogger(SendMessageService.class); @Autowired public RabbitTemplate rabbitTemplate; public void sendMessage(String str){ rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(this); rabbitTemplate.setConfirmCallback(this); // CorrelationData构造函数中的id可以随便写,但是必须要非null而且是唯一的 rabbitTemplate.convertAndSend("exchange","routingKey", str,new CorrelationData(UUID.randomUUID().toString())); } @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("sender return success" + returnedMessage.toString()); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { logger.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); if (!b) { logger.error("消息发送异常!"); // 进行处理 } else { logger.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s); } } }
接口方式访问下。没问题
需要注意的是:配置类方式和局部方式只能选择其一,如果一个RabbitTemplate设置了两个或者多个ConfirmCallback/ReturnCallback,会报错的不支持。类似这样的报错:Only one ConfirmCallback/ReturnCallback is supported by each RabbitTemplate。在开发过程中需要注意!!
接收方确认消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理,比如重新发送或者丢弃。
RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
消息确认模式有:
- AcknowledgeMode.NONE:自动确认。
- AcknowledgeMode.AUTO:根据情况确认。
- AcknowledgeMode.MANUAL:手动确认。
消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。
- Basic.Ack 命令:用于确认当前消息。
- Basic.Nack 命令:用于否定当前消息(批量拒绝) 。
- Basic.Reject 命令:用于拒绝当前消息(单量拒绝)。
配置,注意是simple模式的ack还是direct模式,或者两个都设置上
server: port: 9000 spring: rabbitmq: username: admin password: admin virtual-host: / listener: simple: acknowledge-mode: manual direct: acknowledge-mode: manual # 集群配置,集群配置时使用 rabbitmq.addresses即可,不用配置rabbitmq.port rabbitmq.host了 addresses: 192.168.0.101:5672,192.168.0.101:5673,192.168.0.101:5673
消费者接收数据
其中在调用basiAck或basicNack时必须要携带一个tag,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。而在接收者方法上使用@Header(AmqpHeaders.DELIVERY_TAG)可以直接获取到这个tag。
@Component public class MQConsumer { @Autowired private DispatcherService dispatcherService; @RabbitListener(queues = "order.queue") public void messageConsumer(String orderMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { try { System.out.println("消息:" + orderMsg); JSONObject order = JSONObject.parseObject(orderMsg); String orderId = order.getString("orderId"); // 派单处理 dispatcherService.dispatch(orderId); System.out.println(1 / 0); // 出现异常 // 手动确认 channel.basicAck(tag, false); } catch (Exception e) { // 如果出现异常的情况下 根据实际情况重发 // 重发一次后,丢失 // 参数1:消息的tag // 参数2:多条处理 // 参数3:重发 // false 不会重发,会把消息打入到死信队列 // true 重发,建议不使用try/catch 否则会死循环 // 手动拒绝消息 channel.basicNack(tag, false, false); } } }扩展
消息的接收者也可使用普通类实现ChannelAwareMessageListener接口,重写方法完成,这种是直接全局性接收的。没有最好的,只有最合适的,根据项目情况选择全局接收还是单个类接收自己监听的。
@Component public class Consumer implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { if ("queue_name".equals(message.getMessageProperties().getConsumerQueue())) { System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue()); System.out.println("接收消息: " + new String(message.getBody(), "UTF-8")); System.out.println("执行queue_name中的消息的业务处理流程......"); } if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())) { System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue()); System.out.println("接收消息: " + new String(message.getBody(), "UTF-8")); System.out.println("执行fanout.A中的消息的业务处理流程......"); } // 手动提交ack,并且批量确认消息 channel.basicAck(deliveryTag, true); } catch (Exception e) { e.printStackTrace(); channel.basicReject(deliveryTag, true); } } }参考
参考自
RabbitMQ消息确认机制(ACK)
springboot rabbitmq ACK手动确认