栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq实现延迟队列

rabbitmq实现延迟队列

延迟队列

延迟队列,顾名思义就是消息进入消息队列后不会被立即消费,而是等一段时间后消费,这个时间可以自己设定,比如用户登录5分钟后向其推送消息。

rabbitmq并未提供延时队列的直接实现,但是rabbitmq提供了死信队列和TTL(消息过期时间)的实现方式,我们可以借助死信队列+TTL到达实现延迟队列的目的。下面先实现死信队列和TTL。

实现TTL

这里使用springboot实现TTL,通过ttl方法设置队列过期时间,配置类:

@Configuration
public class RabbitmqConfig {
    public static final String  EXCHANGE = "springboot-exchange";

    public static final String QUEUE = "springboot-queue";

   
    
    @Bean
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
    }

    @Bean
    public Queue itemQueue() {
    	//过期时间设置为10000毫秒
       return QueueBuilder.durable(QUEUE).ttl(10000).build();
    }

    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue, @Qualifier("topicExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }
}

也可以单独设置消息的过期时间,最终以时间短的为准。

 @Test
    void test01() {
        MessagePostProcessor postProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置5秒后过期,队列是10秒后过期,消息过期时间比队列过期时间短,所以消息最终是5秒后过期
                message.getMessageProperties().setExpiration("5000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE, "item.insert", "商品新增, routing key为item.insert");
    }
实现死信队列

当消息成为死信后,可以被重新发送到另一个交换机,与该交换机绑定的消息队列可以暂时存放该死信消息。一个消息成为死信可能是由于 队列消息长度到达限制, 消费者拒接消费消息,并且没把消息重新放入原目标队列,或者原队列消息到达超时时间还未被消费。

队列绑定死信交换机,只需要给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key,这里使用springboot实现死信队列。

配置类:

@Configuration
public class RabbitmqConfig {
    public static String DEAD_EXCHANGE = "dead-exchange";

    public static String DEAD_QUEUE = "dead-queue";

    public static String NORMAL_EXCHANGE = "normal-exchange";

    public static String NORMAL_QUEUE = "normal_queue";
    
    @Bean
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
    }

    
    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    
    @Bean
    public Binding deadExchangeQueue(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("dead").noargs();
    }

    
    @Bean
    public Exchange normalExchange() {
        return ExchangeBuilder.directExchange(NORMAL_EXCHANGE).durable(true).build();
    }

    
    @Bean
    public Queue normalQueue() {
        // 设置过期时间和队列长度,并且设置 x-dead-letter-exchange 和 x-dead-letter-routing-key参数
        Map map = new HashMap();
        map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        map.put("x-dead-letter-routing-key", "dead");
        return QueueBuilder.durable(NORMAL_QUEUE).ttl(10000).maxLength(10).withArguments(map).build();
    }

    
    @Bean
    public Binding normalExchangeQueue(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
    }
}

测试:

    @Test
    public void testDlx() {
        //1. 测试消息拒收,死信消息,需要在消费端拒收
        //rabbitTemplate.convertAndSend(RabbitmqConfig.NORMAL_EXCHANGE,"normal","我是一条消息,我会死吗?");
        //2. 测试长度限制后,消息死信
//        for (int i = 0; i < 20; i++) {
//            rabbitTemplate.convertAndSend(RabbitmqConfig.NORMAL_EXCHANGE, "normal", "我是一条消息,我会死吗?");
//        }
        //3. 测试过期时间,死信消息
        rabbitTemplate.convertAndSend(RabbitmqConfig.NORMAL_EXCHANGE,"normal","我是一条消息,我会死吗?");
    }
实现延迟队列

利用TTL+死信队列就可以轻松实现延迟队列了,正常队列设置过期时间,过期后放到死信队列,监听该死信队列的消费者便可消费。

上面配置好的死信队列就可以用来做延迟队列,使用测试中的第三个,即:

 		//3. 测试过期时间,死信消息
        rabbitTemplate.convertAndSend(RabbitmqConfig.NORMAL_EXCHANGE,"normal","我是一条消息,我会死吗?");

该消息过期后,便会转到死信队列,消费端监听该死信队列:

@Component
public class MyListener implements ChannelAwareMessageListener {
    @Override
    @RabbitListener(queues = "dead-queue")
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("message: " + message);
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            e.printStackTrace();
            channel.basicNack(deliveryTag, true, true);
        }
        Thread.sleep(1000);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/439157.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号