在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢?
代码框架:
需要一个回调接口,当消息发出去后交换机没有被接收,就会触发回调接口,然后将消息重发。
# 交换机确认消息 spring.rabbitmq.publisher-/confirm/i-type=correlated # 开启回退消息 spring.rabbitmq.publisher-returns=true
需要开启两个配置,第一个是确认机制,第二个是回退即通知生产者消息发送的情况
代码实现
1、配置类
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
// 注入
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if(ack){
log.info("交换机已经收到id为:{}的消息",id);
}else {
log.info("交换机还未收到id为:{}的消息,由于原因:{}",id,s);
}
}
@Override
public void returnedMessage(Message message, int i, String replyText, String exchange, String routingKey) {
log.info("消息{}被交换机{}回退,退回原因{},路由key{}",
new String(message.getBody()),exchange,replyText,routingKey);
}
}
2、生产者
@RestController
@Slf4j
@RequestMapping("/api/v1//confirm/i")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发消息
@GetMapping("sendMessage/{message}")
public void sendMesaage(@PathVariable String message){
// 设置回调
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(/confirm/iConfig./confirm/i_EXCHANGE_NAME,
/confirm/iConfig./confirm/i_ROUTING_KEY,message,correlationData);
log.info("发送消息:{}",message);
}
}



