栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ实现延时队列

RabbitMQ实现延时队列

什么是延时队列

指消息进入队列后不会立即被消费,可以被延迟一定的时间,再进行消费.RabbitMQ没有提供延迟队列功能,但是可以使用TTL+DLX来实现延迟队列效果

使用场景

电商平台下单后,30分钟未支付,取消订单回滚库存;新用户注册成功一周后,发送问候短信等等.

延时队列实现

模拟电商平台下单后,30分钟后未支付,取消订单回滚库存

创建配置类
@Configuration
public class DelayConfig {

    
    @Bean
    public Queue createNormalQueue() {
        return QueueBuilder.durable("order_queue").build();
    }

    
    @Bean
    public Queue createDeadQueue() {
        return QueueBuilder.durable("order_dead_queue")
                .withArgument("x-dead-letter-exchange", "order_dead_exchange") //设置死信交换机
                .withArgument("x-dead-letter-routing-key", "order_dead")//设置死信路由key
                .withArgument("x-message-ttl", 30000)// 队列中消息30秒过期
                .build();
    }

    
    @Bean
    public DirectExchange createNormalExchange() {
        return new DirectExchange("order_exchange");
    }

    
    @Bean
    public DirectExchange createDeadExchange() {
        return new DirectExchange("order_dead_exchange");
    }

    
    @Bean
    public Binding createDeadBinding(@Qualifier(value = "createNormalQueue") Queue queue,
                                     @Qualifier(value = "createDeadExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order_dead");
    }

    
    @Bean
    public Binding binding(@Qualifier(value = "createDeadQueue")Queue queue,
                           @Qualifier(value = "createNormalExchange")DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order");
    }
}
创建监听类
@Component
public class DelayListener {


    @RabbitListener(queues = "order_queue")
    public void listener(Message message, Channel channel, String msg) throws IOException {
        // 模拟业务代码执行
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(simpleDateFormat.format(new Date()) + "收到消息:" + msg);
        System.out.println("检查订单是否付款操作开始::没有支付就取消订单,回滚库存");
        // 签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
创建controller用于测试
@RestController
public class DelayController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/send")
    public void send(){
        // 模拟业务代码执行
        String orderId = UUID.randomUUID().toString().replace("-","");
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(simpleDateFormat.format(new Date())+"创建订单:"+orderId);
        // 通过正常的交换机和routingKey把orderId发送到死信队列
        rabbitTemplate.convertAndSend("order_exchange","order",orderId);
    }
}
注意
  • 为了方便测试,我在配置类中的死信队列消息过期时间设置的是30秒,再真实的场景中根据自己的需求来就好了.
  • 发送消息要发送给order_dead_queue(死信队列),监听要监听order_queue(正常队列)
测试

http://localhost:18081/send:再发出信息后,延迟了30秒后,消费到了信息

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/349777.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号