如上图所示:
一、publisher /confirm/iCallBack确认模式
springboot开启rabbitmq可靠抵达 ——/confirm/iCallBack
spring:
rabbitmq:
publisher-/confirm/i-type: correlated
当我们的publisher 到达 broker (服务器时候) ,返回/confirm/iCallback,当消息没有抵达broker的时候返回true,并会给出失败原因。
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate. setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
}
});
}
}
二、publisher returnCallBack 确认模式
springboot开启rabbitmq可靠抵达 —— returnCallBack
当我们开启publisher-returns 时候,将 spring.rabbitmq.template.mandatory 开启
作用:只要消息抵达队列 ,以异步方式优先回调 rerun/confirm/i
spring:
rabbitmq:
publisher-returns: true
#只要消息抵达队列 ,以异步方式优先回调 rerunConfirm
template:
mandatory: true
设置broker 抵达 queue 时候的returnCallBack回调
只要消息没有投递给指定的队列,就触发这个失败回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
}
});
贴上RetrunedMessage实体返回的内容(此处为rabbitMQ自带实体,非自建)
public class ReturnedMessage {
//投递失败的消息详细信息
private final Message message;
//回复的状态码
private final int replyCode;
//回复的文本内容
private final String replyText;
//当时这个消息发给哪个交换机
private final String exchange;
//当时这个消息用哪个路由键
private final String routingKey;
public ReturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
this.message = message;
this.replyCode = replyCode;
this.replyText = replyText;
this.exchange = exchange;
this.routingKey = routingKey;
}
public Message getMessage() {
return this.message;
}
public int getReplyCode() {
return this.replyCode;
}
public String getReplyText() {
return this.replyText;
}
public String getExchange() {
return this.exchange;
}
public String getRoutingKey() {
return this.routingKey;
}
public String toString() {
return "ReturnedMessage [message=" + this.message + ", replyCode=" + this.replyCode + ", replyText=" + this.replyText + ", exchange=" + this.exchange + ", routingKey=" + this.routingKey + "]";
}
}
路由键绑定为 :
@Test
void bindingQueueExchange(){
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE,exchange,"chendazui.#",null);
amqpAdmin.declareBinding(binding);
log.info("[{}]绑定成功","chendazui-binding");
}
当我们将路由键修改后:
@Test
void sendMessage(){
MessageUtil messageUtil = new MessageUtil();
messageUtil.setCode("1");
messageUtil.setMsg("我到了");
messageUtil.setOrder("陈大嘴的订单");
messageUtil.setUser("陈大嘴");
messageUtil.setDateTime(new Date());
rabbitTemplate.convertAndSend(exchange,"chendazui1.#",messageUtil);
log.info("[{}]消息已经发出",messageUtil);
}
这个时候我们测试结果如下:
当前指向的交换机==>chendazui-exchange投递失败消息详情(Body:'{"code":"1","msg":"我到了","order":"陈大嘴的订单","user":"陈大嘴","dateTime":1634780951481}' MessageProperties [headers={__TypeId__=com.example.demo.util.MessageUtil}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])回复的文本内容NO_ROUTE路由键chendazui1.#回复的状态码=>312
这时候我们returnCallBack机制捕捉到失败消息,消息未抵达队列queue。
相关代码地址:
RabbitMQ Demo: rabbitmq 代码示例



