一、RabbitMQ运作原理 二、/confirm/i确认模式介绍及实现消息可靠性投递
在使用RabbitMQ时,消息生产者发送消息时会出现消息丢失或者投递失败的现象;
RabbitMQ在消息投递可靠性方面提供了两种模式:
1.Confirm 确认模式 producter->exchange
2.Return 退回模式 exchange->queue
producer -> exchange
消息确认,生产者消息投递后,如果exchange收到消息,则会给生产者一个答应。
生产者接收应答,以确定消息成功发送到exchange。
spring:
rabbitmq:
publisher-/confirm/i-type: correlated
2.实现RabbitTemplate./confirm/iCallback接口,实现/confirm/i方法/confirm/i-type有none、correlated、simple这三种类型
none:表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到Broker都不会触发 /confirm/iCallback回调。correlated:表示消息成功到达Broker后触发/confirm/iCalllBack回调simple:simple模式下如果消息成功到达Broker后一样会触发
@Configuration
public class RabbitConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
// 消息是否成功发送到Exchange
rabbitTemplate.set/confirm/iCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功发送到Exchange");
} else {
//通过correlationData携带的id可定位哪条消息发送失败,做补发操作
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
}
}
3.生产者发送消息
@Service
public class Producter {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
public void sendMessage(Dto Dto) throws JsonProcessingException {
mailDto.setMsgId(RandomUtil.randomUUID());
String msgJson = objectMapper.writevalueAsString(Dto);
Message message = MessageBuilder
.withBody(msgJson.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
.build();
CorrelationData correlationData = new CorrelationData(mailDto.getMsgId());
rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME,message,correlationData);
}
}
二、Return退回模式介绍和实现
exchange->queue
如果消息未能路由到目标队列则将触发回调 ReturnCallback
rabbitmq: publisher-/confirm/is: true publisher-returns: true2.实现returnedMessage方法
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
三、消费端ACK手动确认机制
1.修改配置文件,开启ACK手动确认消息确认三种模式:manual、none、auto
手动确认manual:消费消息后需要根据消费情况返回一个回执,成功手动调用channel.basicAck()手动签收,失败则调用channel.basicNack()方法拒收,让MQ重新发送该消息自动确认 none:默认消费者正确处理所有请求。(不设置时默认方式)根据情况确认 auto:不太常用
spring:
rabbitmq:
publisher-/confirm/i-type: correlated
listener:
simple:
acknowledge-mode: manual
没有确认,消息为Unack状态
2.消费者接收消息并手动确认消息
@Override
@RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE})
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try{
System.out.println("业务处理流程,成功则ACK");
channel.basicAck(tag,true);
}catch (Exception e){
System.out.println("业务处理流程,失败则NACK");
channel.basicNack(tag,false,true);
}
}



