栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rabbitmq延时策略实现方式(延时队列、TTL、延迟队列插件)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rabbitmq延时策略实现方式(延时队列、TTL、延迟队列插件)

rabbitmq延时队列实现
  • 前言
    • 概念及区别
    • 死信队列
  • 流程
    • 延时队列和TTL模式
    • 延时插件模式
  • 正文
    • 公用配置
    • 延时队列方式
    • TTL方式
    • 延迟队列插件方式
      • 下载
      • 安装
      • 使用

前言

看了一些讲延迟队列的文章,发现很多文章说的都很乱,要么概念没搞清楚,要么讲混了,三种方式的内容都讲串了,这里正好实践一下,并且清晰的记录一下。
正文开始之前,先理解概念及各种实现方式的区别。
简单叙述一下三种方式的概念。

概念及区别
  • 延时队列
    rabbitmq自带功能,针对队列,直接创建延时队列,指定队列中的所有消息过期时间,比如下边这个:
arguments.put("x-message-ttl", 60000);

这就代表所有入队消息的过期时间都是60000ms。

  • TTL
    rabbitmq自带功能,针对每个消息,发送消息的时候给每个消息指定过期时间,
    会存在一个问题:rabbitmq默认对过期消息采用懒加载的方式扫描,如果第一条消息设置的是20s过期,第二条10s过期,那么第二条必须要等第一条过期了才能触发,也就是也需要等20s。
    所以个人觉得基于这种情况就出现了延迟队列插件。

  • 延迟队列插件rabbitmq_delayed_message_exchange
    这是rabbitmq插件市场的一个延时插件,通过自定义交换机,然后绑定队列,发送消息时指定消息过期时间实现延迟,跟延迟队列的区别是:
    1、过期中的消息这边是暂存在交换机,那边暂存到队列中;
    2、消息过期不会像TTL那样等待执行。

死信队列

死信队列是个概念性的东西,说白了就是普通的队列,只不过它用来接收所有的过期信息,无人消费的信息被称为死信,专门接收死信的队列就是死信队列,要是交换机,就叫死信交换机。
所以可以看出来,死信和延时结合使用才能实现真正的延时策略。

流程

切记不要对延时队列进行监听,否则消息直接被消费,就没有任何意义了,永远也到不了死信队列。

延时队列和TTL模式

延时插件模式


各自的区别在下文中会讲到。

正文

开始延时队列、TTL、延迟队列插件的实践、记录

公用配置
  • 普通交换机和队列:
    order_event_exchange、order_release_queue

  • 死信交换机和队列:
    error_event_exchange、error_queue

	@Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
    public Exchange oderEventExchange(){
        //durable:是否持久化 autoDelete:是否自动删除
        DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
        return directExchange;
    }
 	@Bean("order_release_queue")
    public Queue orderReleaseQueue() {
        Queue queue = new Queue("order_release_queue", true, false, false);
        return queue;
    }
    @Bean
    public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
                                            @Qualifier("order_event_exchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
    }
    
 	@Bean("error_event_exchange")
    public DirectExchange errorMessageExchange(){
        DirectExchange directExchange = new DirectExchange("error_event_exchange",true, false);
        return directExchange;
    }
    @Bean("error_queue")
    public Queue errorQueue(){
        Queue queue = new Queue("error_queue", true, false, false);
        return queue;
    }
    @Bean
    public Binding errorBinding(@Qualifier("error_queue") Queue queue,
                                @Qualifier("error_event_exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("error").noargs();
    }
延时队列方式
  • 创建
    在公用基础上创建延迟队列,指定死信交换机(error_event_exchange)和路由键(error),并且绑定公用交换机order_event_exchange。
	@Bean("order_delay_queue")
    public Queue oderDelayQueue(){
        Map arguments = new HashMap<>();
        
        //配置队列的死信应该发送给哪个交换机,过期消息发给谁
        arguments.put("x-dead-letter-exchange", "order_event_exchange");
        //发送给交换机使用的路由key
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        //队列消息变成死信的时间 10s
        arguments.put("x-message-ttl", 10000);
        //exclusive:是否排他  arguments:扩展参数
        Queue queue = new Queue("order_delay_queue", true, false, false, arguments);
        return queue;
    }
    
   @Bean
    public Binding orderCreateOrderBinging(@Qualifier("order_delay_queue") Queue queue,
                                           @Qualifier("order_event_exchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.delay.order").noargs();
    }
  • 测试


    死信队列监听器
@Service
@RabbitListener(queues = {"error_queue"})
public class OrderErrorListener {

    @RabbitHandler
    public void listener(Order order) throws IOException {
        System.out.println("收到error-order信息");
        System.out.println(order);
    }
 }

测试接口

	@Autowired
    RabbitTemplate rabbitTemplate;

    @PostMapping("/sendOrder")
    public String sentOrder(@RequestBody Order order){
        rabbitTemplate.convertAndSend("order_event_exchange","order.delay.order", order,new CorrelationData(order.getId()));
        return "ok";
    }
  • 测试结果

    10s之后死信队列收到消息,并被消费。
  • 优缺点
    优点:直接使用rabbit内置机制,而且还保证队列里的消息到期就自动投递到死信,不会受mq懒加载的影响,区别于TTL机制。
    缺点:如果存在很多不同过期时间的情况,比如,a消息需要10s过期,b消息需要50s过期等等,如果用这种方式实现,就需要创建很多延时队列,不划算,性能也受影响。
TTL方式
  • 单纯使用消息过期机制
    删除mq中的延时队列,注释消息过期时间设置

    修改接口发送消息代码:
  @PostMapping("/sendOrder")
  public String sentOrder(@RequestBody Order order) {
      rabbitTemplate.convertAndSend("order_event_exchange", "order.delay.order", order, message -> {
          //消息有效期5秒
          long time = 1000 * 5;
          message.getMessageProperties().setExpiration(String.valueOf(time));
          return message;
      },new CorrelationData(order.getId()));
      System.out.println("当前时间1:"+System.currentTimeMillis());
      return "ok";
  }

测试结果:

可以看到时间差正好5s。
再测试一下,发两条消息,过期时间分别是10s和5s。
稍微改下代码:


测试结果:

可以看到,即使我们设置了5s的消息:id=111,还是需要等10s才能被消费。

  • 延时队列+消息过期机制
    打开延时队列对于消息过期时间的设置,并且删除延时队列,重启项目:

    先发送id=222过期时间为10s的消息,在发送id=111过期时间为5s的消息。


    测试结果:

    可以看到,都设置过期时间的情况下,会以消息的过期时间为准,队列过期时间失效,同时消息过期检测还是得等前一条消息过期才能检测到。
  • 优缺点
    优点:在消息过期需求很多,且过期时间不同的情况下,无需创建大量队列,比较灵活。
    缺点:同一个队列中的消息过期时间由队列的第一条信息的过期时间为准,依次向后推进,也就是上边测试的结果。
延迟队列插件方式 下载

官网下载

根据自己的rabbit版本选择对应的插件版本下载,我这里用的3.9.x。

安装

找到安装rabbitmq的路径,由于之前用rpm包,所以需要使用命令:

rpm -ql rabbitmq-server-3.9.16-1.el7.noarch

找到这个路径即可

将插件上传到plugins中

启用插件即可

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在管理端页面看见这个代表成功

使用
  • 创建延迟交换机和普通队列
@Bean("lazy_exchange")
    public Exchange lazyExchange(){
        Map pros = new HashMap<>();
        //设置交换机支持延迟消息推送,可取值:direc、topic、fanout
        pros.put("x-delayed-type", "direct");
        //type类型必须设置为 x-delayed-message
        CustomExchange exchange = new CustomExchange("lazy_exchange", "x-delayed-message",true,false,pros);
        return exchange;
    }

    @Bean("lazy_queue")
    public Queue pluginDelayQueue(){
        return new Queue("lazy_queue", true, false, false);
    }

    @Bean
    public Binding pluginDelayBinding(@Qualifier("lazy_queue") Queue queue,@Qualifier("lazy_exchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();
    }
  • 接口测试代码
@Autowired
    RabbitTemplate rabbitTemplate;

    @PostMapping("/sendOrder")
    public String sentOrder(@RequestBody Order order) {
        rabbitTemplate.convertAndSend("lazy_exchange", "delay", order, message -> {
            //消息有效期5秒
            message.getMessageProperties().setHeader("x-delay", 15000);
            return message;
        },new CorrelationData(order.getId()));
        System.out.println("发送消息:"+order.getId());
        System.out.println("当前发送时间:"+System.currentTimeMillis());
        return "ok";
    }
  • 监听器
@Service
@RabbitListener(queues = {"lazy_queue"})
public class OrderDelayListener {

    @RabbitHandler
    public void listener(Order order) {
        System.out.println("收到lazy_queue信息:"+order.getId());
        System.out.println("当前接收时间:"+System.currentTimeMillis());
        System.out.println(order);
    }
}
  • 测试结果

    算一下时间差,正好是15s.
  • 测试多消息设置不同延时参数的效果
    准备参数:


    接口改造:

    测试结果:

    可以看到,每个消息都是到期就被消费,并没有像TTL模式那样,还需要等待前一个消息到期,其实原因很简单,因为之前的两个模式,都是将消息存储到队列queue中,而延迟插件是将消息存储在交换机(exchange)中。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/872340.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号