栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot项目使用RabbitMQ TTL+死信队列完成消息延迟

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot项目使用RabbitMQ TTL+死信队列完成消息延迟

文章目录

RabbitMQ配置

常量申明TTL交换机申明TTL队列TTL队列绑定到TTL交换机声明死信交换机声明死信队列死信队列绑定死信交换机并关联路由键配置解读运作逻辑说明 固定延迟演示动态延迟演示
在项目中,我们经常会遇到需要进行延迟的场景比如 延迟计算,延迟重试,延迟关闭订单等等,延迟的技术方案多种多样,我这里列举RabbitMq进行延迟的方案之一:TTL+死信

RabbitMQ配置 常量
    public static final String CHECK_POST_TTL_EXCHANGE = "check-post-ttl-exchange";
    public static final String CHECK_POST = "check-post";
    public static final String CHECK_POST_TTL_QUEUE = "check-post-ttl-queue";
    public static final String CHECK_POST_DEAD_EXCHANGE = "check-post-dead-exchange";
    public static final String CHECK_POST_DEAD_QUEUE = "check-post-dead-queue";
申明TTL交换机

实际就是一个普通的队列,我这里定义为fanout类型

    @Bean
    public FanoutExchange checkPostTtlExchange() {
        return new FanoutExchange(CHECK_POST_TTL_EXCHANGE);
    }

申明TTL队列

可定义统一的消息存活时间,死信交换机的名字,死信交换机的路由键

    @Bean
    public Queue checkPostTtlQueue() {
        Map args = new HashMap<>(8);
        // 消息存活时间 ,key 固定 value 必须为Int值,此种方式则消息存活时间固定
        args.put("x-message-ttl", 10000);
        //设置死信交换机,value为死信交换机的名字
        args.put("x-dead-letter-exchange", CHECK_POST_DEAD_EXCHANGE);
        //设置死信 routing_key,value为死信路由键的名字
        args.put("x-dead-letter-routing-key", CHECK_POST);
        return new Queue(CHECK_POST_TTL_QUEUE, true, false, false, args);
    }

TTL队列绑定到TTL交换机
    @Bean
    Binding ttlBind() {
        return BindingBuilder.bind(checkPostTtlQueue()).to((checkPostTtlExchange()));
    }

声明死信交换机

死信交换机实际也就是一个普通的交换机,我们这里需要将其申明为直连类型交换机,需要结合路由键一起使用

    
    @Bean
    DirectExchange checkPostDeadExchange() {
        return new DirectExchange(CHECK_POST_DEAD_EXCHANGE, true, false);
    }

声明死信队列

死信队列实际也是一个普通的队列

    
    @Bean
    Queue checkPostDeadQueue() {
        return new Queue(CHECK_POST_DEAD_QUEUE, true, false, false);
    }

死信队列绑定死信交换机并关联路由键
    
    @Bean
    Binding dlxBinding() {
        return BindingBuilder.bind(checkPostDeadQueue()).to(checkPostDeadExchange()).with(CHECK_POST);
    }

配置解读

​ 我们定义的TTL交换机,实际就是一个普普通通的交换机,与其绑定的TTL队列额外加入了配置参数x-dead-letter-exchange 申明了死信交换机的名字,配置参数x-dead-letter-routing-key 申明了死信交换机的路由键

​ 定义的死信交换机本质上是一个普通的直连交换机,交换机名字需与TTL队列中配置属性x-dead-letter-exchange指定的值一致,死信队列则是一个普通的队列,其使用的路由键需与TTL队列中配置属性dead-letter-routing-key指定的值一致

运作逻辑说明

我们将需要延迟的消息发至TTL交换机中,TTL交换机将消息发送至绑定的TTL队列,且我们不设置消费者去监听这个TTL队列的消息,当消息在TTL队列中存活指定的时间后(上方TTL队列中x-message-ttl属性)消息将再次发送到x-dead-letter-exchange申明的死信交换机中,如果有队列(我们称之为死信队列)绑定了死信交换机且路由键为TTL队列中配置的x-dead-letter-routing-key,则会接收到消息,我们只监听对应的死信队列消息,就完成了消息的延迟

固定延迟演示

固定延迟无法根据消息选择延迟时间,适合延迟时间统一的场景,重点在于TTL队列申明中,配置属性x-message-ttl

上方配置后,消息将固定延迟10s

@Bean
public Queue checkPostTtlQueue() {
    Map args = new HashMap<>(8);
    // 消息存活时间 ,key 固定 value 必须为Int值,此种方式则消息存活时间固定
    args.put("x-message-ttl", 10000);
    //设置死信交换机,value为死信交换机的名字
    args.put("x-dead-letter-exchange", CHECK_POST_DEAD_EXCHANGE);
    //设置死信 routing_key,value为死信路由键的名字
    args.put("x-dead-letter-routing-key", CHECK_POST);
    return new Queue(CHECK_POST_TTL_QUEUE, true, false, false, args);
}
    private void addDisableScheduleTimelyConfig(Integer configId, int expirationTime) {
        final long start = System.currentTimeMillis();
        int finalExpirationTime = expirationTime + 30000;
        rabbitTemplate.convertAndSend(RabbitMqConfig.CHECK_POST_TTL_EXCHANGE, "", configId.toString(), message -> {
            // 由于TTL队列中申明了x-message-ttl 为10000ms,下方延迟时间配置不会生效
            message.getMessageProperties().setExpiration((Integer.toString(finalExpirationTime)));
            message.getMessageProperties().setContentEncoding("UTF-8");
            System.out.println("延迟消息发送时间:" + LocalDateTime.now());
            return message;
        });
    }

死信队列消费者

    @RabbitListener(queues = RabbitMqConfig.CHECK_POST_DEAD_QUEUE)
    public void flushCheckConfigState(Message message) {
        try {
            System.out.println("死信队列消费者接受:" + LocalDateTime.now());
        } catch (Exception exception) {
            exception.printStackTrace();
            log.error("处理死信消息失败:{}",new String(message.getBody()));
        }
    }

动态延迟演示

动态延迟即延迟消息不固定,可为每一条消息设置延迟时间

需要去除TTL队列申明中的中x-message-ttl配置属性

    @Bean
    public Queue checkPostTtlQueue() {
        Map args = new HashMap<>(4);
        //设置死信交换机
        args.put("x-dead-letter-exchange", CHECK_POST_DEAD_EXCHANGE);
        //设置死信 routing_key
        args.put("x-dead-letter-routing-key", CHECK_POST);
        return new Queue(CHECK_POST_TTL_QUEUE, true, false, false, args);
    }

发送消息指明延迟时间,我这一条消息是延迟了90000ms,即一分半

    private void addDisableScheduleTimelyConfig(Integer configId, int expirationTime) {
        int finalExpirationTime = expirationTime + 30000;
        rabbitTemplate.convertAndSend(RabbitMqConfig.CHECK_POST_TTL_EXCHANGE, "", configId.toString(), message -> {
            // 由于TTL队列中申明了x-message-ttl 为10000ms,下方延迟时间配置不会生效
            message.getMessageProperties().setExpiration((Integer.toString(finalExpirationTime)));
            message.getMessageProperties().setContentEncoding("UTF-8");
            System.out.println("延迟消息发送时间:" + LocalDateTime.now());
            return message;
        });
    }

发现从消息发送,到消息接收,中间时间间隔90000ms左右,说明延迟消息生效

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/703749.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号