栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 延迟队列

RabbitMQ 延迟队列

1.延时队列+死信队列实现 (1)配置
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    private static final String EXCHANGE = "test_exchange";

    private String DELAY_QUEUE = "delay_queue";

    private String DELAY_QUEUE_ROUTING_KEY = "delay.queue.routing.key";

    private String DEAD_LETTER_QUEUE = "dead_letter_queue";

    private String DEAD_LETTER_QUEUE_ROUTING_KEY = "dead.letter.queue.routing.key";

    @Bean
    Exchange exchange() {
        return new TopicExchange(EXCHANGE);
    }

    
    @Bean
    Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE_ROUTING_KEY)
                .withArgument("x-dead-letter-exchange", EXCHANGE)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY)
                .build();
    }

    
    @Bean
    Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    @Bean
    Binding delayQueueBinding() {
        return BindingBuilder.bind(delayQueue())
                .to(exchange())
                .with(DELAY_QUEUE_ROUTING_KEY)
                .noargs();
    }

    @Bean
    Binding deadLetterQueueBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(exchange())
                .with(DEAD_LETTER_QUEUE_ROUTING_KEY)
                .noargs();
    }

}

(2)发送者
public class RabbitMQSender {
    @Autowired
	private RabbitTemplate rabbitTemplate;
    
    public void send(String exchange, String routingKey, String message, String delayMilli) {
        MessagePostProcessor postProcessor = (msg) -> {
            msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            msg.getMessageProperties().setExpiration(delayMilli);
            return msg;
        };
        rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
    }
    
}
2.插件实现

设置了过期时间的消息,并不一定在过期后里面失效,而是等处理到该消息后,判断其失效时才会进入死信队列,所以需要安装插件来支持消息及时过期。

https://www.rabbitmq.com/community-plugins.html

安装 rabbitmq_delayed_message_exchange 插件后即可实现。

(1)下载插件

进入 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/plugins 目录

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/plugins
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

然后再用

rabbitmq-plugins list

就能看到刚才下载的插件了

(2)启用插件
[root@mq-test plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@mq-test:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_tracing
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq-test...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange
(3)重启 RabbitMQ
rabbitmqctl stop
rabbitmq-server restart
(4)配置
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqConfiguration {

    
    @Bean
    CustomExchange delayExchange() {
        Map args = new HashMap<>(1);
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delay_exchange", "x-delayed-message", true, false, args);
    }

    @Bean
    Queue delayQueue() {
        return new Queue("delay_queue");
    }

    @Bean
    Binding delayBinding() {
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange())
                .with("delay.queue.routing.key")
                .noargs();
    }

}
(5)发送者
public class RabbitMQSender {
    @Autowired
	private RabbitTemplate rabbitTemplate;
    
    public void send(String exchange, String routingKey, String message, int delayMilli) {
        MessagePostProcessor postProcessor = (msg) -> {
            msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            msg.getMessageProperties().setDelay(delayMilli);
            return msg;
        };
        rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
    }
    
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/423042.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号