1. 延迟队列2. 插件实现延迟队列
2.1 插件安装2.2 代码实现 3. DLX 实现延迟队列
3.1 DLX + TTL3.2 代码示例3.3 小结
1. 延迟队列延迟队列,顾名思义,就是让队列中的消息不要立刻被消费,而是要延迟一定的时间。
延迟?是不是想到了定时任务,在日常开发中,定时任务肯定并不陌生,SpringBoot也提供很好的支持,比如定时统计任务、定时日志备份……
这样看来,延迟队列作用似乎和定时任务相同,二者可以相互替代,但仔细想想二者区别,还是适用于不同场景
对于任务开始时间确定的需求,用定时任务没有问题,但如果任务开始时间不确定呢?比如:
在购买商品时,下完订单后30分钟内要付款,要不然订单会取消
在抢购商品时,可以设置商品开始抢购提醒
送外卖时,如果没有按照指定时间送达,临近超时,会提醒外卖小哥
上面这些情况,用定时任务似乎很难办到,因为任务开始的时间并不确定,而用延迟队列的话,很容易实现
在RabbitMQ上实现定时任务有两种方式:
使用 RabbitMQ插件rabbitmq_delayed_message_exchange 插件来实现定时任务利用 RabbitMQ自带的消息过期和死信队列机制,实现定时任务
2. 插件实现延迟队列定时任务不适合开始时间不确定的情况
rabbitmq_delayed_message_exchange插件是RabbitMQ提供的开源项目,可以用来实现延迟消费消息,从名字是就能看出其意思,下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
注意要选择与RabbitMQ相同或者相近的版本
2.1 插件安装由于RabbitMQ是按装在Docker上,所以要将插件拷贝到Docker容器中
docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez myrabbitmq:/plugins
第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的位置
接下来再执行如下命令进入到 RabbitMQ容器中:
docker exec -it myrabbitmq /bin/bash
进入到容器之后,查看所有的插件,发现并没有启用:
rabbitmq-plugins list
启用插件命令,再查看:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
这样插件就按照好了
2.2 代码实现配置文件:
server.port=8889 spring.rabbitmq.host=192.168.43.86 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
配置类:
@Configuration
public class PluginRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";
// 队列名称
public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue";
// 交换机类型,固定值x-delayed-message
public static final String SCORPIOS_EXCHANGE_TYPE = "x-delayed-message";
@Bean
CustomExchange customExchange(){
Map setting = new HashMap<>();
setting.put("x-delayed-type", "direct");
return new CustomExchange(PluginRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,PluginRabbitMQConfig.SCORPIOS_EXCHANGE_TYPE,true,false, setting);
}
@Bean
Queue queue() {
return new Queue(PluginRabbitMQConfig.SCORPIOS_MSG_QUEUE,true,false,false);
}
@Bean
Binding bindingMsg(){
return BindingBuilder.bind(queue()).to(customExchange()).with(PluginRabbitMQConfig.SCORPIOS_MSG_QUEUE).noargs();
}
}
上面创建了一个CustomExchange交换机 ,这是一个 Spring提供的交换机,创建需要参数如下:
交换机名称交换机类型,固定值交换机是否持久化如果没有队列绑定到交换机,交换机是否删除其他参数
最后一个setting参数中,指定交换机消息分发的类型,就是交换机的那几种类型:direct、fanout、topic以及 header,选择哪种类型,交换机分发消息方式就按哪种方式。
启动项目后,打开RabbitMQ Web页面,可以看到:
消息消费者:
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = PluginRabbitMQConfig.SCORPIOS_MSG_QUEUE)
public void consume(String msg) {
log.info("队列收到的消息为:{}", msg);
}
}
此处创建CustomExchange时,需要指定一个交换机类型,此值为固定值:x-delayed-message
否则控制台会报connection reset错误!
消息发送:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String send() {
log.info("发送消息....");
// 创建消息对象,在消息头中延迟10秒
Message msg = MessageBuilder.withBody(("delay message plugin..." + new Date()).getBytes(StandardCharsets.UTF_8)).setHeader("x-delay", 10000).build();
rabbitTemplate.convertAndSend(PluginRabbitMQConfig.SCORPIOS_EXCHANGE_NAME, PluginRabbitMQConfig.SCORPIOS_MSG_QUEUE, msg);
return "success";
}
}
打开浏览器,输入地址:http://localhost:8889/send/message,控制台输入如下:
3. DLX 实现延迟队列上面使用了RabbitMQ提供的插件rabbitmq_delayed_message_exchange实现了延迟队列,细细想来,延迟队列不就是让消息延迟指定的时间再去被消费么?而之前已经了解过死信交换机和死信队列,再来回忆下。
3.1 DLX + TTL死信交换机用来接收死信消息(Dead Message)的,一般消息变成死信消息有如下几种情况:
消息被拒绝(Basic.Reject/Basic.Nack) ,并且设置requeue参数为false消息过期队列达到最大长度
死信队列:绑定死信交换机的消息队列是死信队列。当消息在队列中变成了死信消息后,此时就会被发送到死信交换机。
换句话说,没人消费的消息,最终会进入死信队列。DLX+ TTL是不是可以实现延迟队列
假设一条消息需要延迟30分钟执行,那么就设置这条消息的有效期为30分钟,同时为这条消息配置死信交换机和死信routing_key,并且不为这个消息队列设置消费者,那么30分钟后,这条消息由于没有被消费者消费而进入死信队列,此时死信队列的消费者就会消费这条过期的消息
3.2 代码示例配置文件:
server.port=8889 spring.rabbitmq.host=192.168.43.86 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
配置文件:
@Configuration
public class DLXRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_DLX_EXCHANGE = "scorpios_dlx_exchange";
// 发送队列名称
public static final String SCORPIOS_DLX_QUEUE = "scorpios_dlx_queue";
public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue";
// 死信交换机
@Bean
DirectExchange dlxDirectExchange(){
return new DirectExchange(DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE,true,false);
}
// 死信队列
@Bean
Queue dlxQueue() {
return new Queue(DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE,true,false,false);
}
// 将死信队列和死信交换机绑定
@Bean
Binding dlxBinding(){
return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE);
}
// 创建一个普通队列,并把它与死信交换机
@Bean
Queue msgQueue() {
Map setting = new HashMap<>();
// 设置死信交换机
setting.put("x-dead-letter-exchange", DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE);
// 设置死信 routing_key 与队列名称相同
setting.put("x-dead-letter-routing-key", DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE);
return new Queue(DLXRabbitMQConfig.SCORPIOS_MSG_QUEUE, true, false, false, setting);
}
}
上面的代码复用了介绍死信交换机和死信队列时的代码,不多说
死信队列配置一个消费者:
@Slf4j
@Component
public class Consumer {
// 死信队列的消费者
@RabbitListener(queues = DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE)
public void dlxConsume(String msg) {
log.info("死信队列收到的消息为:{}", msg);
}
}
消息发送者:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String send() {
log.info("客户端发送消息");
// 创建消息对象
Message message = MessageBuilder.withBody("delay message dlx and ttl ...".getBytes(StandardCharsets.UTF_8))
.setExpiration("10000")
.build();
rabbitTemplate.convertAndSend(DLXRabbitMQConfig.SCORPIOS_MSG_QUEUE,message);
return "success";
}
}
打开浏览器,输入地址:http://localhost:8889/send/message,控制台输入如下:
3.3 小结使用DLX Exchange + TTL实现延迟队列,核心思想就是:给消息设置指定的过期时间,而消息队列并没有消费者,当过期时间到了以后,就会进入到死信交换机,最终被死信队列的消费者消费。



