栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq延时消费(死信队列)

RabbitMq延时消费(死信队列)

业务背景,创建订单后进行延迟回调确认订单

思想:定义AExchange、BExchange;AQueue、BQueue;AKey、BKey。没有消费者消费AQueue,
有消费者消费BQueue,当AQueue队列中的消息到了过期时间后,就自动转到私信队列BQueue中,且对用的key为Bkey,由BQueue的消费者去消费消息。具体设置见下代码:

AQueue :设置队列的三个参数

 		// x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_BC);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY_BC_CREATE_ORDER);
        // x-message-ttl  声明队列的TTL
        args.put("x-message-ttl", 10000);// 毫秒
@Component
public class RabbitConfig {

    // 延迟交换机
    // 创建订单使用延迟队列
    public static final String DELAY_EXCHANGE_BC = "delay.exchange.bc";
    public static final String DELAY_QUEUE_BC_CREATE_ORDER = "delay.queue.bc.createOrder";
    public static final String DELAY_KEY_BC_CREATE_ORDER = "delay.key.bc.createOrder";

    public static final String DEAD_LETTER_EXCHANGE_BC = "deadLetter.exchange.bc";
    public static final String DEAD_LETTER_QUEUE_BC_CREATE_ORDER = "deadLetter.queue.bc";
    public static final String DEAD_LETTER_KEY_BC_CREATE_ORDER = "deadLetter.key.bc";

    // 取消订单立即执行
    public static final String DIRECT_EXCHANGE_BC = "direct.exchange.bc";
    public static final String QUEUE_BC_CANCEL_ORDER = "queue.bc.cancelOrder";
    public static final String KEY_BC_CANCEL_ORDER = "key.bc.cancelOrder";


    // 延迟交换机
    // 创建订单使用延迟队列
    public static final String DELAY_EXCHANGE_JSJ = "delay.exchange.jsj";
    public static final String DELAY_QUEUE_JSJ_CREATE_ORDER = "delay.queue.jsj.createOrder";
    public static final String DELAY_KEY_JSJ_CREATE_ORDER = "delay.key.jsj.createOrder";

    public static final String DEAD_LETTER_EXCHANGE_JSJ = "deadLetter.exchange.jsj";
    public static final String DEAD_LETTER_QUEUE_JSJ_CREATE_ORDER = "deadLetter.queue.jsj";
    public static final String DEAD_LETTER_KEY_JSJ_CREATE_ORDER = "deadLetter.key.jsj";

    // 取消订单立即执行
    public static final String DIRECT_EXCHANGE_JSJ = "direct.exchange.jsj";
    public static final String QUEUE_JSJ_CANCEL_ORDER = "queue.jsj.cancelOrder";
    public static final String KEY_JSJ_CANCEL_ORDER = "key.jsj.cancelOrder";
    // 声明延时Exchange
    @Bean("delayExchangeBC")
    public DirectExchange delayExchangeBC() {
        return new DirectExchange(DELAY_EXCHANGE_BC);
    }

    // 声明死信Exchange
    @Bean("deadLetterExchangeBC")
    public DirectExchange deadLetterExchangeBC() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE_BC);
    }

    // 生命directExchange
    @Bean("directExchangeBC")
    public DirectExchange directExchangeBC() {
        return new DirectExchange(DIRECT_EXCHANGE_BC);
    }

    // 声明死信队列
    @Bean("deadLetterQueueBCCreateOrder")
    public Queue deadLetterQueueBCCreateOrder() {
        return new Queue(DEAD_LETTER_QUEUE_BC_CREATE_ORDER);
    }

    // 声明延时队列A 延时5s
    // 并绑定到对应的死信交换机
    @Bean("delayQueueBCCreateOrder")
    public Queue delayQueueBCCreateOrder() {
        Map args = new HashMap<>();
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_BC);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY_BC_CREATE_ORDER);
        // x-message-ttl  声明队列的TTL
        args.put("x-message-ttl", 10000);// 毫秒
        return new Queue(DELAY_QUEUE_BC_CREATE_ORDER, true, false, false, args);
    }

    @Bean("queueBCCancelOrder")
    public Queue queueBCCancelOrder() {
        return new Queue(QUEUE_BC_CANCEL_ORDER);
    }

    // 把死信队列绑定在死信交换机上
    @Bean
    public Binding bindingDeadLetterQueueBCCreateOrder(@Qualifier(value = "deadLetterExchangeBC") DirectExchange deadLetterExchangeBC, @Qualifier(value = "deadLetterQueueBCCreateOrder") Queue deadLetterQueueBCCreateOrder) {
        return BindingBuilder.bind(deadLetterQueueBCCreateOrder).to(deadLetterExchangeBC).with(DEAD_LETTER_KEY_BC_CREATE_ORDER);
    }

    // 把延迟队列绑定在死信交换机上
    @Bean
    public Binding bindingDelayQueueBCCreateOrder(@Qualifier(value = "delayExchangeBC") DirectExchange delayExchangeBC, @Qualifier(value = "delayQueueBCCreateOrder") Queue delayQueueBCCreateOrder) {
        return BindingBuilder.bind(delayQueueBCCreateOrder).to(delayExchangeBC).with(DELAY_KEY_BC_CREATE_ORDER);
    }

    @Bean
    public Binding bindingQueueBCCancelOrder(@Qualifier(value = "directExchangeBC") DirectExchange directExchangeBC, @Qualifier(value = "queueBCCancelOrder") Queue queueBCCancelOrder) {
        return BindingBuilder.bind(queueBCCancelOrder).to(directExchangeBC).with(KEY_BC_CANCEL_ORDER);
    }

    // 声明延时Exchange
    @Bean("delayExchangeJSJ")
    public DirectExchange delayExchangeJSJ() {
        return new DirectExchange(DELAY_EXCHANGE_JSJ);
    }

    // 声明死信Exchange
    @Bean("deadLetterExchangeJSJ")
    public DirectExchange deadLetterExchangeJSJ() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE_JSJ);
    }

    // 生命directExchange
    @Bean("directExchangeJSJ")
    public DirectExchange directExchangeJSJ() {
        return new DirectExchange(DIRECT_EXCHANGE_JSJ);
    }

    // 声明死信队列
    @Bean("deadLetterQueueJSJCreateOrder")
    public Queue deadLetterQueueJSJCreateOrder() {
        return new Queue(DEAD_LETTER_QUEUE_JSJ_CREATE_ORDER);
    }

    // 声明延时队列A 延时5s
    // 并绑定到对应的死信交换机
    @Bean("delayQueueJSJCreateOrder")
    public Queue delayQueueJSJCreateOrder() {
        Map args = new HashMap<>();
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_JSJ);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY_JSJ_CREATE_ORDER);
        // x-message-ttl  声明队列的TTL
        args.put("x-message-ttl", 10000);// 毫秒
        return new Queue(DELAY_QUEUE_JSJ_CREATE_ORDER, true, false, false, args);
    }

    @Bean("queueJSJCancelOrder")
    public Queue queueJSJCancelOrder() {
        return new Queue(QUEUE_JSJ_CANCEL_ORDER);
    }

    // 把死信队列绑定在死信交换机上
    @Bean
    public Binding bindingDeadLetterQueueJSJCreateOrder(@Qualifier(value = "deadLetterExchangeJSJ") DirectExchange deadLetterExchangeJSJ, @Qualifier(value = "deadLetterQueueJSJCreateOrder") Queue deadLetterQueueJSJCreateOrder) {
        return BindingBuilder.bind(deadLetterQueueJSJCreateOrder).to(deadLetterExchangeJSJ).with(DEAD_LETTER_KEY_JSJ_CREATE_ORDER);
    }

    // 把延迟队列绑定在死信交换机上
    @Bean
    public Binding bindingDelayQueueJSJCreateOrder(@Qualifier(value = "delayExchangeJSJ") DirectExchange delayExchangeJSJ, @Qualifier(value = "delayQueueJSJCreateOrder") Queue delayQueueJSJCreateOrder) {
        return BindingBuilder.bind(delayQueueJSJCreateOrder).to(delayExchangeJSJ).with(DELAY_KEY_JSJ_CREATE_ORDER);
    }

    @Bean
    public Binding bindingQueueJSJCancelOrder(@Qualifier(value = "directExchangeJSJ") DirectExchange directExchangeJSJ, @Qualifier(value = "queueJSJCancelOrder") Queue queueJSJCancelOrder) {
        return BindingBuilder.bind(queueJSJCancelOrder).to(directExchangeJSJ).with(KEY_JSJ_CANCEL_ORDER);
    }
    @RabbitListener(queues = RabbitConfig.DEAD_LETTER_QUEUE_BC_CREATE_ORDER)
    public void createBCOrder(String params, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            System.out.println("DEAD_LETTER_QUEUE_BC_CREATE_ORDER" + params);
            rabbitHandler.basicAck(channel, tag, "");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/650662.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号