消息丢失的情况
(1)生产者方面:生产者发送消息至MQ的数据丢失
(2)RabbitMQ方面:MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失
(3)消费者方面:消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完
解决方法
1.配置文件中添加
#消息已发送到交换机(Exchange)时返回 spring.rabbitmq.publisher-/confirm/i-type=correlated # 消息在未被队列收到的情况下返回 spring.rabbitmq.template.mandatory=true spring.rabbitmq.publisher-returns=true # 开启消息手动确认机制 spring.rabbitmq.listener.simple.acknowledge-mode=manual
2.config类配置
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class RabbitmqConfig {
@Autowired
AmqpAdmin amqpAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//发送到exchange时调用回调函数
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("/confirm/iCallback:"+"相关数据:"+correlationData+" /confirm/iCallback:"+"确认情况:"+ack+" /confirm/iCallback:"+"原因:"+cause);
}
});
//设置消息抵达队列的失败回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("二位热热我热太热");
}
});
return rabbitTemplate;
}
@Bean
public void createNormalExchange(){
DirectExchange mydirect = new DirectExchange("mydirect3", true, false);
amqpAdmin.declareExchange(mydirect);
}
}
3.监听类改写
package com.example.demo.service.impl;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class RabbitmqTest2Impl {
@RabbitListener(queues = {"mybe"})
public void getMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
byte[] body = message.getBody();
MessageProperties messageProperties = message.getMessageProperties();
Thread.sleep(2000);
String s = new String(body);
System.out.println("消费消息完成"+s);
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
//消息消费方错误后的处理
//deliveryTag消息id
//multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.
// requeue是否重新入队
channel.basicNack(deliveryTag,false,false);
e.printStackTrace();
}
}
}



