## 发布确认高级
1. SpringBoot版本发布确认 1.1 方案图生产环境下发送异常问题,导致RabbitMQ重启,在重启器件生产者投递消息失败,导致消息丢失,需要手动处理。
当发送失败时,将消息存到缓存中,交换机接收到消息之后再从缓存中清除已经收到的消息。
在上次博客的SpringBoot版本的基础上测试本次项目。
1.2.1 配置类
@Configuration
public class BingConf {
public static final String /confirm/i_EXCHANGE = "/confirm/i_exchange";
public static final String /confirm/i_QUEUE = "/confirm/i_queue";
public static final String /confirm/i_ROTING_KEY = "/confirm/i_routing_key";
@Bean("c_ex")
public DirectExchange /confirm/iChange(){
return new DirectExchange(/confirm/i_EXCHANGE);
}
@Bean("c_qu")
public Queue /confirm/iQueue(){
return QueueBuilder.durable(/confirm/i_QUEUE).build();
}
@Bean()
public Binding bindingQueueAndExchange(@Qualifier("c_ex") DirectExchange exchange,
@Qualifier("c_qu") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with(/confirm/i_ROTING_KEY);
}
}
配置文件新增:
# 发布到交换机触发回调 spring.rabbitmq.publisher-/confirm/i-type=correlated1.2.2 生产者控制器
@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("//confirm/i/{message}")
public void sendMsg(@PathVariable String message){
rabbitTemplate.convertAndSend(BingConf./confirm/i_EXCHANGE,
BingConf./confirm/i_ROTING_KEY,message);
log.info("消息内容:{}",message);
}
}
1.2.3 消费者监听
@Slf4j
@Component
public class /confirm/iConsumer {
@RabbitListener(queues = BingConf./confirm/i_QUEUE)
public void receive/confirm/iMessage(Message message){
log.info("接收到消息:{}",new String(message.getBody(), StandardCharsets.UTF_8));
}
}
1.2.4 异常接口回调
上方情况仅仅完成了对正常情况下消息的接收和发送,但出现异常时需要进行新的处理。
@Slf4j
@Component
public class CallBackConf implements RabbitTemplate./confirm/iCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.set/confirm/iCallback(this);
}
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("接收消息成功,ID:{}", msgId);
}else {
log.error("接收消息失败,原因:{},ID:{}",cause,msgId);
}
}
}
1.2.5 测试
测试交换机:
访问:http://localhost:8080//confirm/i//confirm/i/test/confirm/i进行发送消息,正常情况下会接收到消息。
测试交换机接收不到:
在发送部分修改交换机名字:
public void sendMsg(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(BingConf./confirm/i_EXCHANGE + "1",
BingConf./confirm/i_ROTING_KEY,message,correlationData);
log.info("消息内容:{}",message);
}
再次进行发送:
会消失发送失败404找不到交换机,并且异常回调也完美回调成功。
测试队列接收不到:
在发送部分修改RotingKey:
public void sendMsg(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(BingConf./confirm/i_EXCHANGE,
BingConf./confirm/i_ROTING_KEY + "1",message,correlationData);
log.info("消息内容:{}",message);
}
结果:
显然只有交换机收到了消息,而队列没有接收到交换机发来的消息。
上方实战中有一个很明显的问题,如果发送到交换机成功,但是出现其他问题,交换机发送到消费者时出现问题,不会进行异常处理,所以消费者不知道当前消息丢失了。
使用Mandatory参数将不能达到目的地的消息回退给生产者。
1.3.1 完善方法在配置文件中新增:
spring.rabbitmq.publisher-returns=true
表示开启回退
修改CallBackConf额外实现RabbitTemplate.ReturnsCallback接口。
重写returnedMessage方法:
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息:{},被交换机{}回退:原因:{},RoutingKey:{}",
returned.getMessage(),
returned.getExchange(),
returned.getReplyText(),
returned.getRoutingKey());
}
修改init方法:
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("n消息:{}n被交换机{}回退:n原因:{},nRoutingKey:{}",
returned.getMessage(),
returned.getExchange(),
returned.getReplyText(),
returned.getRoutingKey());
}
再次访问错误RoutingKey的路径:
这样就实现了如何接收到发送失败的消息。
上述使用Mandatory参数仅仅能实现记录到接收失败的消息,无法达到再次发送消息的目的,这里可以使用备份交换机再次进行发送。在和Mandatory同时使用情况下,备份交换机优先级更高。
如果普通交换机的消息处理失败,可以转发给备份交换机。
新增了备份交换机、备份队列、异常队列的声明、绑定;修改了普通交换机转发到备份交换机。
@Configuration
public class BingConf {
public static final String /confirm/i_EXCHANGE = "/confirm/i_exchange";
public static final String /confirm/i_QUEUE = "/confirm/i_queue";
public static final String /confirm/i_ROTING_KEY = "/confirm/i_routing_key";
public static final String BACKUP_EXCHANGE = "backup_exchange";
public static final String BACKUP_QUEUE = "backup_queue";
public static final String WARNING_QUEUE = "warning_queue";
@Bean("c_ex")
public DirectExchange /confirm/iChange(){
return ExchangeBuilder.directExchange(/confirm/i_EXCHANGE).
withArgument("alternate-exchange",BACKUP_EXCHANGE)
.build();
}
@Bean("b_ex")
public FanoutExchange backupChange(){
return new FanoutExchange(BACKUP_EXCHANGE);
}
@Bean("c_qu")
public Queue /confirm/iQueue(){
return QueueBuilder.durable(/confirm/i_QUEUE).build();
}
@Bean("b_qu")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE).build();
}
@Bean("w_qu")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE).build();
}
@Bean()
public Binding bindingQueueAndExchange(@Qualifier("c_ex") DirectExchange exchange,
@Qualifier("c_qu") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with(/confirm/i_ROTING_KEY);
}
@Bean()
public Binding bindingBackupQueueAndBackupExchange(@Qualifier("b_ex") FanoutExchange exchange,
@Qualifier("b_qu") Queue backQueue){
return BindingBuilder.bind(backQueue).to(exchange);
}
@Bean()
public Binding bindingWarningQueueAndBackupExchange(@Qualifier("b_ex") FanoutExchange exchange,
@Qualifier("w_qu") Queue warningQueue){
return BindingBuilder.bind(warningQueue).to(exchange);
}
}
1.4.2 新增报警消费者和备份消费者
@Slf4j
@Component
public class BackupConsumer {
@RabbitListener(queues = BingConf.BACKUP_QUEUE)
public void receive/confirm/iMessage(Message message){
log.info("备份消费者接收到消息:{}",new String(message.getBody(), StandardCharsets.UTF_8));
}
}
@Slf4j
@Component
public class WarningConsumer {
@RabbitListener(queues = BingConf.WARNING_QUEUE)
public void receive/confirm/iMessage(Message message){
log.info("报警消费者接收到消息:{}",new String(message.getBody(), StandardCharsets.UTF_8));
}
}
访问错误RotingKey:



