前言:基于死信的延迟队列存在漏洞问题,所以要使用基于插件的延迟队列。前者延迟的时机发生在队列,后者则在交换机。
插件在官网上下载:https://www.rabbitmq.com/commuinty-plugins.html
插件安装成功后,可以在管理员界面看到这个下拉选项:
流程图:
配置类:
@Configuration
public class DelayedQueueConfig {
// 队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
// 交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
// RoutingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 声明交换机(基于插件)
@Bean
public CustomExchange delayedExchange() {
Map arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true,false,arguments);
}
// 绑定
@Bean
public Binding delayedQueueBindingDelayedChange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange
) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者:
// 开始发消息,基于插件的 消息 以及 延迟的时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
log.info("当前时间:{}, 发送一条时长{}毫秒TTL信息给延迟队列delayed.queue:{}", new Date().toString(), delayTime, message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
// 消息的延迟时长,单位ms
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
消费者:
@Slf4j
@Component
public class DelayQueueConsumer {
// 监听消息
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("当前时间:{}, 收到延迟队列:{}", new Date().toString(), msg);
}
}



