栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ——发布确认高级

RabbitMQ——发布确认高级

## 发布确认高级

生产环境下发送异常问题,导致RabbitMQ重启,在重启器件生产者投递消息失败,导致消息丢失,需要手动处理。

1. SpringBoot版本发布确认 1.1 方案图


当发送失败时,将消息存到缓存中,交换机接收到消息之后再从缓存中清除已经收到的消息。

1.2 实战

在上次博客的SpringBoot版本的基础上测试本次项目。

1.2.1 配置类
@Configuration
public class BingConf {

    public static final String /confirm/i_EXCHANGE = "/confirm/i_exchange";

    public static final String /confirm/i_QUEUE = "/confirm/i_queue";

    public static final String /confirm/i_ROTING_KEY = "/confirm/i_routing_key";

    @Bean("c_ex")
    public DirectExchange /confirm/iChange(){
        return new DirectExchange(/confirm/i_EXCHANGE);
    }

    @Bean("c_qu")
    public Queue /confirm/iQueue(){
        return QueueBuilder.durable(/confirm/i_QUEUE).build();
    }

    @Bean()
    public Binding bindingQueueAndExchange(@Qualifier("c_ex") DirectExchange exchange,
                                           @Qualifier("c_qu") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(/confirm/i_ROTING_KEY);
    }
}

配置文件新增:

# 发布到交换机触发回调
spring.rabbitmq.publisher-/confirm/i-type=correlated
1.2.2 生产者控制器
@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class SendMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("//confirm/i/{message}")
    public void sendMsg(@PathVariable String message){
        rabbitTemplate.convertAndSend(BingConf./confirm/i_EXCHANGE,
                BingConf./confirm/i_ROTING_KEY,message);
        log.info("消息内容:{}",message);
    }
}

1.2.3 消费者监听
@Slf4j
@Component
public class /confirm/iConsumer {

    
    @RabbitListener(queues = BingConf./confirm/i_QUEUE)
    public void receive/confirm/iMessage(Message message){
        log.info("接收到消息:{}",new String(message.getBody(), StandardCharsets.UTF_8));
    }

}
1.2.4 异常接口回调

上方情况仅仅完成了对正常情况下消息的接收和发送,但出现异常时需要进行新的处理。

@Slf4j
@Component
public class CallBackConf implements RabbitTemplate./confirm/iCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @PostConstruct
    public void init(){
        rabbitTemplate.set/confirm/iCallback(this);
    }

    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
        String msgId = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("接收消息成功,ID:{}", msgId);
        }else {
            log.error("接收消息失败,原因:{},ID:{}",cause,msgId);
        }
    }

}

1.2.5 测试

测试交换机:
访问:http://localhost:8080//confirm/i//confirm/i/test/confirm/i进行发送消息,正常情况下会接收到消息。

测试交换机接收不到:
在发送部分修改交换机名字:

    public void sendMsg(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(BingConf./confirm/i_EXCHANGE + "1",
                BingConf./confirm/i_ROTING_KEY,message,correlationData);
        log.info("消息内容:{}",message);
    }

再次进行发送:

会消失发送失败404找不到交换机,并且异常回调也完美回调成功。

测试队列接收不到:
在发送部分修改RotingKey:

    public void sendMsg(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(BingConf./confirm/i_EXCHANGE,
                BingConf./confirm/i_ROTING_KEY + "1",message,correlationData);
        log.info("消息内容:{}",message);
    }

结果:

显然只有交换机收到了消息,而队列没有接收到交换机发来的消息。

1.3 回退消息

上方实战中有一个很明显的问题,如果发送到交换机成功,但是出现其他问题,交换机发送到消费者时出现问题,不会进行异常处理,所以消费者不知道当前消息丢失了。

使用Mandatory参数将不能达到目的地的消息回退给生产者。

1.3.1 完善方法

在配置文件中新增:

spring.rabbitmq.publisher-returns=true

表示开启回退

修改CallBackConf额外实现RabbitTemplate.ReturnsCallback接口。
重写returnedMessage方法:

    
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息:{},被交换机{}回退:原因:{},RoutingKey:{}",
                returned.getMessage(),
                returned.getExchange(),
                returned.getReplyText(),
                returned.getRoutingKey());
    }

修改init方法:

    
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("n消息:{}n被交换机{}回退:n原因:{},nRoutingKey:{}",
                returned.getMessage(),
                returned.getExchange(),
                returned.getReplyText(),
                returned.getRoutingKey());
    }

再次访问错误RoutingKey的路径:

这样就实现了如何接收到发送失败的消息。

1.4 备份交换机

上述使用Mandatory参数仅仅能实现记录到接收失败的消息,无法达到再次发送消息的目的,这里可以使用备份交换机再次进行发送。在和Mandatory同时使用情况下,备份交换机优先级更高。

如果普通交换机的消息处理失败,可以转发给备份交换机。

1.4.1 修改配置类

新增了备份交换机、备份队列、异常队列的声明、绑定;修改了普通交换机转发到备份交换机。

@Configuration
public class BingConf {

    public static final String /confirm/i_EXCHANGE = "/confirm/i_exchange";

    public static final String /confirm/i_QUEUE = "/confirm/i_queue";

    public static final String /confirm/i_ROTING_KEY = "/confirm/i_routing_key";

    public static final String BACKUP_EXCHANGE = "backup_exchange";

    public static final String BACKUP_QUEUE = "backup_queue";

    public static final String WARNING_QUEUE = "warning_queue";


    @Bean("c_ex")
    public DirectExchange /confirm/iChange(){
        return ExchangeBuilder.directExchange(/confirm/i_EXCHANGE).
                withArgument("alternate-exchange",BACKUP_EXCHANGE)
                .build();
    }

    @Bean("b_ex")
    public FanoutExchange backupChange(){
        return new FanoutExchange(BACKUP_EXCHANGE);
    }

    @Bean("c_qu")
    public Queue /confirm/iQueue(){
        return QueueBuilder.durable(/confirm/i_QUEUE).build();
    }

    @Bean("b_qu")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }

    @Bean("w_qu")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE).build();
    }

    @Bean()
    public Binding bindingQueueAndExchange(@Qualifier("c_ex") DirectExchange exchange,
                                           @Qualifier("c_qu") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(/confirm/i_ROTING_KEY);
    }

    @Bean()
    public Binding bindingBackupQueueAndBackupExchange(@Qualifier("b_ex") FanoutExchange exchange,
                                                 @Qualifier("b_qu") Queue backQueue){
        return BindingBuilder.bind(backQueue).to(exchange);
    }

    @Bean()
    public Binding bindingWarningQueueAndBackupExchange(@Qualifier("b_ex") FanoutExchange exchange,
                                                 @Qualifier("w_qu") Queue warningQueue){
        return BindingBuilder.bind(warningQueue).to(exchange);
    }
}

1.4.2 新增报警消费者和备份消费者
@Slf4j
@Component
public class BackupConsumer {

    @RabbitListener(queues = BingConf.BACKUP_QUEUE)
    public void receive/confirm/iMessage(Message message){
        log.info("备份消费者接收到消息:{}",new String(message.getBody(), StandardCharsets.UTF_8));
    }

}

@Slf4j
@Component
public class WarningConsumer {

    @RabbitListener(queues = BingConf.WARNING_QUEUE)
    public void receive/confirm/iMessage(Message message){
        log.info("报警消费者接收到消息:{}",new String(message.getBody(), StandardCharsets.UTF_8));
    }

}

访问错误RotingKey:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761874.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号