栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq---TTL机制

rabbitmq---TTL机制

文章预览:

一、TTL介绍二、原始API三、springboot案例

3.1 pom.xml添加依赖3.2.application.yml添加rabbitmq连接信息3.3.主启动类3.4 RabbitConfig类3.5 测试类

3.5.1 整个队列的消息设置过期时间3.5.2局部设置消息过期
在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。

一、TTL介绍

TTL,Time to Live 的简称,即过期时间。
RabbitMQ 可以对消息和队列两个维度来设置TTL
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。
目前有两种方法可以设置消息的TTL。

    通过Queue属性设置,队列中所有消息都有相同的过期时间。对消息自身进行单独设置,每条消息的TTL 可以不同。

如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费

二、原始API
   try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 创建队列(实际上使用的是AMQP default这个direct类型的交换器)
            // 设置队列属性
            Map arguments = new HashMap();
            // 设置队列的TTL
            arguments.put("x-message-ttl", 30000);
            // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
            arguments.put("x-expires", 10000);
            channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
            for (int i = 0; i < 1000000; i++) {
                String message = "Hello World!" + i;
                channel.basicPublish("", QUEUE_NAME, new
                        AMQP.BasicProperties().builder().expiration("30000").build(), message.getBytes()
                );
                System.out.println(" [X] Sent '" + message + "'");
            }
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

注意理解 x-message-ttl、 x-expires这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)

三、springboot案例 3.1 pom.xml添加依赖
        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
3.2.application.yml添加rabbitmq连接信息
spring:
  application:
    name: springboot_rabbitmq_ttl
  rabbitmq:
    host: 139.9.132.132
    virtual-host: /
    username: admin
    password: 123
    port: 5672
3.3.主启动类
@SpringBootApplication
public class RabbitmqttlApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqttlApplication.class, args);
    }
}
3.4 RabbitConfig类
@Configuration
public class RabbitConfig {
    
    
    @Bean
    public Queue queueTTLWaiting() {
        Map props = new HashMap<>();
        //对于该队列中的消息,设置都等待10s
        props.put("x-message-ttl", 10000);
        Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
        return queue;
    }

    
    @Bean
    public Exchange exchangeTTLWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.ttl-waiting", false, false);
        return exchange;
    }

    
    @Bean
    public Binding bindingTTLWaiting() {
        return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttl-waiting").noargs();
    }

    
    @Bean
    public Queue queueWaiting() {
        Queue queue = new Queue("q.pay.waiting", false, false,
                false);
        return queue;
    }

    
    @Bean
    public Exchange exchangeWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
        return exchange;
    }
    @Bean
    public Binding bindingWaiting() {
        return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
    }
}

3.5 测试类 3.5.1 整个队列的消息设置过期时间
    @Autowired
    private AmqpTemplate rabbitTemplate;

    
    @Test
    void sendMessage() {
        rabbitTemplate.convertAndSend(
                "ex.pay.ttl-waiting",
                "pay.ttl-waiting",
                "发送了TTL-WAITING-MESSAGE");
        System.out.println("queue-ttl-ok = ");
    }

结果
发送消息时

10s后

3.5.2局部设置消息过期
    
    @Test
    void sendTTLMessage() throws UnsupportedEncodingException {
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("5000");
        Message message = new Message("发送了WAITING-MESSAGE" .getBytes("utf-8"), properties);
        rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
        System.out.println("msg-ttl-ok = ");
    }

结果
发送消息时

5s后

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774605.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号