RabbitMq提供了消息确认机制,主要分为生产者端发送消息确认和消费者端的消费消息确认。
1、生产者端发送消息确认又分为Confirm 消息确认机制和Return 消息机制
2、消费者端消息接收确认采用的是ack模式
-
AcknowledgeMode.NONE :自动确认
-
AcknowledgeMode.AUTO:根据情况确认
如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)
当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置
-
AcknowledgeMode.MANUAL:手动确认
2.2 application.yml4.0.0 org.example springboot-rabbitmq-fanout-producer 1.0-SNAPSHOT 8 8 org.springframework.boot spring-boot-dependencies 2.3.2.RELEASE pom import org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test org.projectlombok lombok 1.18.20
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
#由于开启了集群,不再建议使用此方式配置服务地址,若集群含有多个IP地址,不方便指定
# host: 192.168.229.128
# port: 5672
#集群地址配置:指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
addresses: 192.168.229.128:5672
# 开启发送确认(有些博文写的是publisher-/confirm/is: true,但其实此方式已经被遗弃,替换为publisher-/confirm/i-type)
publisher-/confirm/i-type: correlated
# # 开启发送失败退回(例如:消息有没有找到合适的队列)
publisher-returns: true
springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-/confirm/i-type = correlated 实现相同效果
在springboot2.2.0.RELEASE版本之前是amqp正式支持的属性,用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-/confirm/i-type属性配置代替,用来配置更多的确认类型:
-
NONE值是禁用发布确认模式,是默认值
-
CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
-
SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用,waitFor/confirm/is或waitFor/confirm/isOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitFor/confirm/isOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
package com.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
//创建队列
@Bean
public Queue directEmailQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("email.direct.queue", true);
}
@Bean
public Queue directSmsQueue() {
return new Queue("sms.direct.queue", true);
}
@Bean
public Queue directWeixinQueue() {
return new Queue("weixin.direct.queue", true);
}
//创建交换机
@Bean
public DirectExchange directOrderExchange() {
return new DirectExchange("direct_order_exchange", true, false);
}
//绑定关系
@Bean
public Binding directEmailBinding() {
return BindingBuilder.bind(directEmailQueue()).to(directOrderExchange()).with("email");
}
@Bean
public Binding directSmsBinding() {
return BindingBuilder.bind(directSmsQueue()).to(directOrderExchange()).with("sms");
}
@Bean
public Binding directWeixinBinding() {
return BindingBuilder.bind(directWeixinQueue()).to(directOrderExchange()).with("weixin");
}
}
2.4 消息发送确认
发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。
消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。(无论成功失败都有返回)
消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。(失败时才会有返回)
我们可以利用这两个Callback来确保消的100%送达。
2.4.1 /confirm/iCallback确认模式消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。
package com.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class /confirm/iCallbackService implements RabbitTemplate./confirm/iCallback {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息标识:" + correlationData.toString());
log.info("发送成功确认:"+ack);
log.info("错误原因:"+cause);
}
}
实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。
- correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
- ack:消息投递到broker 的状态,true表示成功。
- cause:表示投递失败的原因。
但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback 。
2.4.2 ReturnCallback 退回模式如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
package com.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息:"+message.toString());
log.info("返回码:"+replyCode);
log.info("返回描述:"+replyText);
log.info("交换机:"+exchange);
log.info("路由key:"+routingKey);
}
}
实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。
2.4.3 测试 @Test
public void contextLoads7() throws Exception {
myService.sendMessage("direct_order_exchange","email","我是发送者");
Thread.sleep(5000);
}
为何要沉睡5秒呢?因为使用这种单元测试方式,程序一运行完就会立即关闭应用,而回调函数的执行会有延迟,故为了保证能收到消息确认而设置了沉睡一下再关闭程序。
成功测试:
失败测试:
略
3.2 application.yml# 服务端口
server:
port: 8081
# 配置rabbitmq服务
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
addresses: 192.168.229.128:5672
# host: 192.168.229.128
# port: 5672
3.3 Exchange 和 Queue
若生产者端未配置,则需要配置;这里生产者已经配置了,故这里无需配置。
3.4 消息接收确认 3.4.1 默认ack代码:
package com.service.direct;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailService {
@RabbitHandler
public void messagerevice(String msg, Channel channel, Message message){
// 此处省略发邮件的逻辑
System.out.println("email-------------->" + message);
// int i = 10/0;
}
}
rabbitmq默认的是自动ack,无需添加其他配置。
特性:
消费者自行监控队列,若有队列存在未消费的消息,则进行消费。
若正常消费成功了,则会自动返回确认ack给队列,队列收到后即可将消息移除。
若消费过程中出现异常,则会触发重试消费,若一直报错则会出现死循环。
问题:使用自动ack时,如何解决出现死循环的情况?
方案一:控制重试次数
修改配置文件
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
addresses: 192.168.229.128:5672
# host: 192.168.229.128
# port: 5672
listener:
simple:
# acknowledge-mode: manual # 设置消费端手动 ack
retry:
enabled: true # 是否支持重试
max-attempts: 10 #最大重试次数
initial-interval: 2000ms #重试时间间隔
测试后,可发现重试了10次就停止了,但队列的消息也被移除掉了,会造成消息丢失。
方案二:控制重试次数+死信队列
加上死信队列,可以实现重试次数结束后,队列会将消息转移到死信队列,从而不会造成消息丢失。
开启方式简单,只需要放开此配置即可
acknowledge-mode: manual # 设置消费端手动 ack
消费消息有三种回执方法,我们来分析一下每种方法的含义。
1、basicAck
basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
void basicAck(long deliveryTag, boolean multiple)
deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
2、basicNack
basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag:表示消息投递序号。
multiple:是否批量确认。
requeue:值为 true 消息将重新入队列。
3、basicReject
basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
deliveryTag:表示消息投递序号。
requeue:值为 true 消息将重新入队列。
3.4.3 测试package com.service.direct;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailService {
@RabbitHandler
public void messagerevice(String msg, Channel channel, Message message) throws IOException {
try {
// 此处省略发邮件的逻辑
System.out.println("email-------------->" + message);
int i = 10/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
//注意:参数三若设置为true,会出现死循环
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
}
注意:
1、basicNack方法的参数三设置为true会造成死循环,true是允许重试。
2、使用了手动ack后,重试次数将不起作用
3、basicNack方法执行后,消息会被移除,若存在死信队列则转移到死信队列,反之则造成消息丢失。
https://blog.csdn.net/xinzhifu1/article/details/107016179
https://www.cnblogs.com/biehongli/p/11789098.html
https://www.cnblogs.com/haixiang/p/10900005.html#%E7%94%9F%E4%BA%A7%E7%AB%AF-/confirm/i-%E6%B6%88%E6%81%AF%E7%A1%AE%E8%AE%A4%E6%9C%BA%E5%88%B6
https://blog.csdn.net/dh554112075/article/details/90137869



