栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ消息可靠性(一)

RabbitMQ消息可靠性(一)

一、消息发送可靠性(推荐使用消息确认模式,事务方式效率低) 1.1 通过事务确保消息发送成功
@Configuration
public class RabbitmqConfig {

    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("direct_change");
    }
    @Bean
    Queue queue1(){
        return new Queue("queue",true,false,false);
    }

    @Bean
    Binding binding(){
        return BindingBuilder.bind(queue1()).to(directExchange()).with("a");
    }


    @Bean
    RabbitTransactionManager rabbitTransactionManager(ConnectionFactory factory){
        return new RabbitTransactionManager(factory);
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }
}

   @Autowired
    RabbitTemplate rabbitTemplate;

    @Transactional(rollbackFor = Exception.class)
    public void sendMsg(){
        rabbitTemplate.convertAndSend("direct_change","a","hello queue");
    }
1.2 消息确认机制
spring:
  rabbitmq:
    virtual-host: /
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-/confirm/i-type: correlated
    publisher-returns: true

@Configuration
public class RabbitConfig implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnsCallback {

    public static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.set/confirm/iCallback(this::/confirm/i);
        rabbitTemplate.setReturnsCallback(this::returnedMessage);
    }

    @Bean
    DirectExchange directExchange(){
        return new DirectExchange("exchange_name",true,false);
    }
    @Bean
    Queue queue1(){
        return new Queue("queue_name",true,false,false);
    }


    @Bean
    Binding binding(){
        return BindingBuilder.bind(queue1()).to(directExchange()).with("a");
    }

    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
      if(ack){
          logger.info("{}消息成功到达交换机",correlationData.getId());
      }else {
          logger.info("{}消息未到达交换机,{}",correlationData.getId(),cause);
      }
    }

    
    @Override
    public void returnedMessage(ReturnedMessage returned) {
         logger.info("消息未到达队列");
    }
}
二、消息失败重试 2.1 通过配置rabbit实现消息失败重试
spring:
  rabbitmq:
    template:
      retry:
        ## 启用重试
        enabled: true
        ## 重试时间间隔
        initial-interval: 1000ms
        ## 最大重试次数
        max-attempts: 5
        ## 重试时间累加  如第一次 1000ms  第二次就是1200 以此类推
        multiplier: 1.2
        ## 最大重试间隔
        max-interval: 5000ms
2.2 业务重试机制

针对消息没有成功到交换机,针对这种情况,回调做相关的业务逻辑即可.
具体思路:
可以使用数据库在每次发送消息的时候写入库里给此条消息增加一个状态,在交换机回调处,做相应的处理逻辑.
1.收到失败回调对此消息做数据库的修改,是重试几次后,还是发送失败那么就不在重新发送此条消息,改变此条消息的状态即可。
2.如果消息收到此条消息发送成功就改变次条消息的状态。
3.当然了,发送失败的消息,重试次数超过后,定时器去找失败的消息,做业务处理即可。
注意:这里会有消息重复发送的情况,在消费方做好消息的幂等性即可。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734892.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号