MQ消息丢失场景主要有三个:
1.消息生产者,发送消息后,rabbitmq服务器没有收到;导致消息丢失
2.rabbitmq收到消息后,没有持久化保存,导致消息丢失
3.消费者收到消息后,没来得及处理,消费者宕机,导致消息丢失
解决方案:消息异步确认机制(/confirm/i机制)
在集成springboot项目中;通过配置文件开启/confirm/i机制
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=test spring.rabbitmq.password=123123 spring.rabbitmq.virtual-host=/test spring.rabbitmq.connection-timeout=15000 #开启 confirm 确认机制 spring.rabbitmq.publisher-confirms=true #开启 return 确认机制 spring.rabbitmq.publisher-returns=true #设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除 spring.rabbitmq.template.mandatory=true
开启/confirm/i机制后,在生产者每次发送消息,都会调用回调代码;开发人员,需要写回调函数的逻辑,处理发送失败的消息
@Component
@Slf4j
public class RabbitMQ/confirm/iAndReturn implements RabbitTemplate./confirm/iCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.set/confirm/iCallback(this);
}
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
// 失败,一般解决方案,是将发送失败消息,存入定时任务队列;尝试重新发送消息;再多次失败,
// 就不再发送,转为人工处理
if (!ack) {
log.error("rabbitmq confirm fail,cause:{}", cause);
// ...... 失败处理逻辑
}
}
}
2、交换机没有发送到队列
解决方案:Return模式,确保消息从交换机发送到队列。
1.开启return模式
#开启 return 机制 spring.rabbitmq.publisher-returns=true
2.开发回调函数代码
@Component
public class Sender implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
}
//通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息主体message: " + message);
System.out.println("消息replyCode: " + replyCode);
System.out.println("描述: " + replyText);
System.out.println("消息使用的交换器exchange: " + exchange);
System.out.println("消息使用的路由键routing: " + routingKey);
}
}
3、交换机、队列、消息没有设置持久化
交换机、队列、消息没有持久化,当rabbitmq的服务重启之后,这些信息就会丢失。
交换机持久化
在声明交换机的时候,设置持久化属性
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchangeName", true, false);
}
队列持久化
在声明队列的时候,设置持久化属性
public Queue queue() {
return new Queue("queueName", true, false, false, args);
}
消息持久化
消息的持久化是默认持久的。无需配置
4、消费者接收到消息没有执行业务逻辑,导致消息丢失解决方案:手动确认消息机制
配置文件配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring:
rabbitmq:
host: localhost
port: 5672
username: root
password: 123123
virtual-host: /test
publisher-/confirm/is: true # 开启发送确认
publisher-returns: true # 开启发送失败回退
#开启ack
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual #采取手动应答
#concurrency: 1 # 指定最小的消费者数量
#max-concurrency: 1 #指定最大的消费者数量
retry:
enabled: true # 是否支持重试
消费者代码
@Component
public class Consumer {
@RabbitHandler
public void consumeMsg(String msg, Channel channel, Message message) throws IOException {
//拿到消息延迟消费
try {
// .... 消费消息业务逻辑
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
当业务出现意料之外的一场;消息就会重新回到队列中;会分发到其他正常consumer中进行消费



