栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ消息可靠性投递解决方案

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ消息可靠性投递解决方案

谈到消息的可靠性投递,无法避免的,在实际的工作中会经常碰到,比如一些核心业务需要保障消息不丢失,接下来我们看一个可靠性投递的流程图,说明可靠性投递的概念:

  • Step 1: 首先把消息信息(业务数据)存储到数据库中,紧接着,我们再把这个消息记录也存储到一张消息记录表里(或者另外一个同源数据库的消息记录表)

  • Step 2:发送消息到MQ Broker节点(采用/confirm/i方式发送,会有异步的返回结果)

  • Step 3、4:生产者端接受MQ Broker节点返回的/confirm/i确认消息结果,然后进行更新消息记录表里的消息状态。比如默认Status = 0 当收到消息确认成功后,更新为1即可!

  • Step 5:但是在消息确认这个过程中可能由于网络闪断、MQ Broker端异常等原因导致 回送消息失败或者异常。这个时候就需要发送方(生产者)对消息进行可靠性投递了,保障消息不丢失,100%的投递成功!(有一种极限情况是闪断,Broker返回的成功确认消息,但是生产端由于网络闪断没收到,这个时候重新投递可能会造成消息重复,需要消费端去做幂等处理)所以我们需要有一个定时任务,(比如每5分钟拉取一下处于中间状态的消息,当然这个消息可以设置一个超时时间,比如超过1分钟 Status = 0 ,也就说明了1分钟这个时间窗口内,我们的消息没有被确认,那么会被定时任务拉取出来)

  • Step 6:接下来我们把中间状态的消息进行重新投递 retry send,继续发送消息到MQ ,当然也可能有多种原因导致发送失败

  • Step 7:我们可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)。

接下来,我们使用SpringBoot2.x 实现这一可靠性投递策略:

废话不多说,直接上代码:

  • 数据库库表结构:订单表和消息记录表

-- 表 order 订单结构
CREATE TABLE IF NOT EXISTS `t_order` (
  `id` varchar(128) NOT NULL, -- 订单ID
  `name` varchar(128), -- 订单名称 其他业务熟悉忽略
  `message_id` varchar(128) NOT NULL, -- 消息唯一ID
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 表 broker_message_log 消息记录结构
CREATE TABLE IF NOT EXISTS `broker_message_log` (
  `message_id` varchar(128) NOT NULL, -- 消息唯一ID
  `message` varchar(4000) DEFAULT NULL, -- 消息内容
  `try_count` int(4) DEFAULT '0', -- 重试次数
  `status` varchar(10) DEFAULT '', -- 消息投递状态  0 投递中 1 投递成功   2 投递失败
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',  -- 下一次重试时间 或 超时时间
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 创建时间
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 更新时间
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 整合SpringBoot 实现生产端代码如下:pom.xml配置



    4.0.0

    com.bfxy
    rabbitmq-springboot-producer
    0.0.1-SNAPSHOT
    jar

    rabbitmq-springboot-producer
    rabbitmq-springboot-producer

    
        org.springframework.boot
        spring-boot-starter-parent
        2.0.2.RELEASE
         
    

    
        UTF-8
        UTF-8
        1.8
    

    
        
            org.springframework.boot
            spring-boot-starter-web
         

        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            org.springframework.boot
            spring-boot-starter-amqp
                

         
        
          org.mybatis.spring.boot
          mybatis-spring-boot-starter
          1.1.1
        
        
          tk.mybatis
          mapper-spring-boot-starter
          1.1.0
            
        
            com.alibaba
            druid
            1.0.24
        
        
            mysql
            mysql-connector-java
        
        
          
            com.github.miemiedev  
            mybatis-paginator  
            1.2.17  
            
                
                     org.mybatis
                    mybatis
                
                        
                            
        
            org.apache.commons
            commons-lang3
        
        
            commons-io
            commons-io
            2.4
        
        
            com.alibaba
            fastjson
            1.1.26
            
        
            javax.servlet
            javax.servlet-api
            provided    
          
                
                    log4j
                    log4j
                    1.2.17
                              
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


  • application.properties配置:

  • spring.rabbitmq.addresses=192.168.11.76:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    
    server.servlet.context-path=/
    server.port=8001
    
    spring.http.encoding.charset=UTF-8
    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
    spring.jackson.time-zone=GMT+8
    spring.jackson.default-property-inclusion=NON_NULL
    
    spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
    spring.datasource.url=jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.username=root
    spring.datasource.password=root
    
    mybatis.type-aliases-package=com.bfxy.springboot
    mybatis.mapper-locations=classpath:com/bfxy/springboot/mapping
    }
    • 消息记录表核心业务:

    package com.bfxy.springboot.mapper;
    
    import java.util.Date;
    
    import org.apache.ibatis.annotations.Param;
    
    import com.bfxy.springboot.entity.BrokerMessageLog;
    import com.sun.tools.javac.util.List;
    
    public interface BrokerMessageLogMapper {
        
        List query4StatusAndTimeoutMessage();
        
        
        void update4ReSend(@Param("messageId")String messageId, @Param("updateTime")Date updateTime);
        
        void changeBrokerMessageLogStatus(@Param("messageId")String messageId, @Param("status")String status, @Param("updateTime")Date updateTime);
    
    
        
    }
    • 对应的SQL代码:

        
               
        
      
      
        update broker_message_log bml
        set bml.try_count = bml.try_count + 1,
          bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
        where bml.message_id = #{messageId,jdbcType=VARCHAR}
      
      
      
        update broker_message_log bml
        set bml.status = #{status,jdbcType=VARCHAR},
              bml.update_time = #{updateTime, jdbcType=TIMESTAMP}
        where bml.message_id = #{messageId,jdbcType=VARCHAR}  
      
    • 核心发送代码:orderService

    • package com.bfxy.springboot.service;
      
      import java.util.Date;
      
      import org.apache.commons.lang3.time.DateUtils;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Service;
      
      import com.bfxy.springboot.constant.Constants;
      import com.bfxy.springboot.entity.BrokerMessageLog;
      import com.bfxy.springboot.entity.Order;
      import com.bfxy.springboot.mapper.BrokerMessageLogMapper;
      import com.bfxy.springboot.mapper.OrderMapper;
      import com.bfxy.springboot.producer.RabbitOrderSender;
      import com.bfxy.springboot.utils.FastJsonConvertUtil;
      
      @Service
      public class OrderService {
      
          @Autowired
          private OrderMapper orderMapper;
          
          @Autowired
          private BrokerMessageLogMapper brokerMessageLogMapper;
          
          @Autowired
          private RabbitOrderSender rabbitOrderSender;
          
          public void createOrder(Order order) throws Exception {
              // 使用当前时间当做订单创建时间(为了模拟一下简化)
              Date orderTime = new Date();
              // 插入业务数据
              orderMapper.insert(order);
              // 插入消息记录表数据
              BrokerMessageLog brokerMessageLog = new BrokerMessageLog();
              // 消息唯一ID
              brokerMessageLog.setMessageId(order.getMessageId());
              // 保存消息整体 转为JSON 格式存储入库
              brokerMessageLog.setMessage(FastJsonConvertUtil.convertObjectToJSON(order));
               // 设置消息状态为0 表示发送中
              brokerMessageLog.setStatus("0");
               // 设置消息未确认超时时间窗口为 一分钟 
              brokerMessageLog.setNextRetry(DateUtils.addMinutes(orderTime, Constants.ORDER_TIMEOUT));
              brokerMessageLog.setCreateTime(new Date());
              brokerMessageLog.setUpdateTime(new Date());
              brokerMessageLogMapper.insert(brokerMessageLog);
              // 发送消息
              rabbitOrderSender.sendOrder(order);
          }
          
      }
    • MQ消息发送核心代码:

    package com.bfxy.springboot.producer;
    
    import java.util.Date;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate./confirm/iCallback;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.bfxy.springboot.constant.Constants;
    import com.bfxy.springboot.entity.Order;
    import com.bfxy.springboot.mapper.BrokerMessageLogMapper;
    import com.bfxy.springboot.mapper.OrderMapper;
    
    @Component
    public class RabbitOrderSender {
    
        //自动注入RabbitTemplate模板类
        @Autowired
        private RabbitTemplate rabbitTemplate;  
        
        @Autowired
        private BrokerMessageLogMapper brokerMessageLogMapper;
        
        //回调函数: /confirm/i确认
        final /confirm/iCallback /confirm/iCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.err.println("correlationdata: " + correlationData);
                String messageId = correlationData.getId();
                if(ack){
                    //如果/confirm/i返回成功 则进行更新
                    brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
                } else {
                    //失败则进行具体的后续操作:重试 或者补偿等手段
                    System.err.println("异常处理...");
                }
            }
        };
        
        //发送消息方法调用: 构建自定义对象消息
        public void sendOrder(Order order) throws Exception {
            rabbitTemplate.setConfirmCallback(/confirm/iCallback);
            //消息唯一ID
            CorrelationData correlationData = new CorrelationData(order.getMessage_id());
            rabbitTemplate.convertAndSend("order-exchange", "order.ABC", order, correlationData);
        }
        
    }
    • 消息重试、最大努力尝试策略(定时任务):

    • package com.bfxy.springboot.task;
      
      import java.util.Date;
      import java.util.List;
      
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.scheduling.annotation.Scheduled;
      import org.springframework.stereotype.Component;
      
      import com.bfxy.springboot.constant.Constants;
      import com.bfxy.springboot.entity.BrokerMessageLog;
      import com.bfxy.springboot.entity.Order;
      import com.bfxy.springboot.mapper.BrokerMessageLogMapper;
      import com.bfxy.springboot.producer.RabbitOrderSender;
      import com.bfxy.springboot.utils.FastJsonConvertUtil;
      
      @Component
      public class RetryMessageTasker {
      
          
          @Autowired
          private RabbitOrderSender rabbitOrderSender;
          
          @Autowired
          private BrokerMessageLogMapper brokerMessageLogMapper;
          
          @Scheduled(initialDelay = 5000, fixedDelay = 10000)
          public void reSend(){
              //pull status = 0 and timeout message 
              List list = brokerMessageLogMapper.query4StatusAndTimeoutMessage();
              list.forEach(messageLog -> {
                  if(messageLog.getTryCount() >= 3){
                      //update fail message 
                      brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLog.getMessageId(), Constants.ORDER_SEND_FAILURE, new Date());
                  } else {
                      // resend 
                      brokerMessageLogMapper.update4ReSend(messageLog.getMessageId(),  new Date());
                      Order reSendOrder = FastJsonConvertUtil.convertJSONToObject(messageLog.getMessage(), Order.class);
                      try {
                          rabbitOrderSender.sendOrder(reSendOrder);
                      } catch (Exception e) {
                          e.printStackTrace();
                          System.err.println("-----------异常处理-----------");
                      }
                  }            
              });
          }
      }
    • 测试发送订单:

    • 代码如下:

        @Autowired
        private RabbitOrderSender rabbitOrderSender;
        
        @Test
        public void testSender2() throws Exception {
             Order order = new Order();
             order.setId("2018080400000001");
             order.setName("测试订单");
             order.setMessage_id(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
             rabbitOrderSender.sendOrder(order);
        }
    • 监控台查看消息:

    • 发送成功! 现在测试 发送订单并且入库(业务库和消息记录库)

        @Autowired
        private OrderService orderService;
        
        @Test
        public void testCreateOrder() throws Exception {
             Order order = new Order();
             order.setId("2018080400000002");
             order.setName("测试创建订单");
             order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
            orderService.createOrder(order);
        }
    • 发送成功 并且入库OK:业务表 和 消息记录表均有数据 且status状态=1 为成功!

    • 业务表:

    • 消息记录表:

    • 测试失败情况:修改路由KEY为 无法路由即可!

    • 这样消息就算失败的情况了。然后ACK的时候就会走异常处理,消息记录表如下:


    • 最后我们测试重试策略:直接启动生产者应用,开启定时任务,重试几次后,库表信息变化如下:


    • 最终重试3次 失败结果更新 status = 2


    RabbitMQ消息中间件技术精讲:RabbitMQ是目前主流的消息中间件,非常适用于高并发环境。各大互联网公司都在使用的MQ技术,晋级技术骨干、团队核心的必备技术!


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/239156.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号