- 前言
- 概念及区别
- 死信队列
- 流程
- 延时队列和TTL模式
- 延时插件模式
- 正文
- 公用配置
- 延时队列方式
- TTL方式
- 延迟队列插件方式
- 下载
- 安装
- 使用
看了一些讲延迟队列的文章,发现很多文章说的都很乱,要么概念没搞清楚,要么讲混了,三种方式的内容都讲串了,这里正好实践一下,并且清晰的记录一下。
正文开始之前,先理解概念及各种实现方式的区别。
简单叙述一下三种方式的概念。
- 延时队列
rabbitmq自带功能,针对队列,直接创建延时队列,指定队列中的所有消息过期时间,比如下边这个:
arguments.put("x-message-ttl", 60000);
这就代表所有入队消息的过期时间都是60000ms。
-
TTL
rabbitmq自带功能,针对每个消息,发送消息的时候给每个消息指定过期时间,
会存在一个问题:rabbitmq默认对过期消息采用懒加载的方式扫描,如果第一条消息设置的是20s过期,第二条10s过期,那么第二条必须要等第一条过期了才能触发,也就是也需要等20s。
所以个人觉得基于这种情况就出现了延迟队列插件。 -
延迟队列插件rabbitmq_delayed_message_exchange
这是rabbitmq插件市场的一个延时插件,通过自定义交换机,然后绑定队列,发送消息时指定消息过期时间实现延迟,跟延迟队列的区别是:
1、过期中的消息这边是暂存在交换机,那边暂存到队列中;
2、消息过期不会像TTL那样等待执行。
死信队列是个概念性的东西,说白了就是普通的队列,只不过它用来接收所有的过期信息,无人消费的信息被称为死信,专门接收死信的队列就是死信队列,要是交换机,就叫死信交换机。
所以可以看出来,死信和延时结合使用才能实现真正的延时策略。
切记不要对延时队列进行监听,否则消息直接被消费,就没有任何意义了,永远也到不了死信队列。
延时队列和TTL模式 延时插件模式
各自的区别在下文中会讲到。
开始延时队列、TTL、延迟队列插件的实践、记录
公用配置-
普通交换机和队列:
order_event_exchange、order_release_queue -
死信交换机和队列:
error_event_exchange、error_queue
@Bean("order_event_exchange") //直接注入名字,后边绑定队列和交换机直接使用注解更方便
public Exchange oderEventExchange(){
//durable:是否持久化 autoDelete:是否自动删除
DirectExchange directExchange = new DirectExchange("order_event_exchange",true, false);
return directExchange;
}
@Bean("order_release_queue")
public Queue orderReleaseQueue() {
Queue queue = new Queue("order_release_queue", true, false, false);
return queue;
}
@Bean
public Binding orderReleaseOrderBinging(@Qualifier("order_release_queue") Queue queue,
@Qualifier("order_event_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.release.order").noargs();
}
@Bean("error_event_exchange")
public DirectExchange errorMessageExchange(){
DirectExchange directExchange = new DirectExchange("error_event_exchange",true, false);
return directExchange;
}
@Bean("error_queue")
public Queue errorQueue(){
Queue queue = new Queue("error_queue", true, false, false);
return queue;
}
@Bean
public Binding errorBinding(@Qualifier("error_queue") Queue queue,
@Qualifier("error_event_exchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("error").noargs();
}
延时队列方式
- 创建
在公用基础上创建延迟队列,指定死信交换机(error_event_exchange)和路由键(error),并且绑定公用交换机order_event_exchange。
@Bean("order_delay_queue")
public Queue oderDelayQueue(){
Map arguments = new HashMap<>();
//配置队列的死信应该发送给哪个交换机,过期消息发给谁
arguments.put("x-dead-letter-exchange", "order_event_exchange");
//发送给交换机使用的路由key
arguments.put("x-dead-letter-routing-key", "order.release.order");
//队列消息变成死信的时间 10s
arguments.put("x-message-ttl", 10000);
//exclusive:是否排他 arguments:扩展参数
Queue queue = new Queue("order_delay_queue", true, false, false, arguments);
return queue;
}
@Bean
public Binding orderCreateOrderBinging(@Qualifier("order_delay_queue") Queue queue,
@Qualifier("order_event_exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.delay.order").noargs();
}
- 测试
死信队列监听器
@Service
@RabbitListener(queues = {"error_queue"})
public class OrderErrorListener {
@RabbitHandler
public void listener(Order order) throws IOException {
System.out.println("收到error-order信息");
System.out.println(order);
}
}
测试接口
@Autowired
RabbitTemplate rabbitTemplate;
@PostMapping("/sendOrder")
public String sentOrder(@RequestBody Order order){
rabbitTemplate.convertAndSend("order_event_exchange","order.delay.order", order,new CorrelationData(order.getId()));
return "ok";
}
- 测试结果
10s之后死信队列收到消息,并被消费。
- 优缺点
优点:直接使用rabbit内置机制,而且还保证队列里的消息到期就自动投递到死信,不会受mq懒加载的影响,区别于TTL机制。
缺点:如果存在很多不同过期时间的情况,比如,a消息需要10s过期,b消息需要50s过期等等,如果用这种方式实现,就需要创建很多延时队列,不划算,性能也受影响。
- 单纯使用消息过期机制
删除mq中的延时队列,注释消息过期时间设置
修改接口发送消息代码:
@PostMapping("/sendOrder")
public String sentOrder(@RequestBody Order order) {
rabbitTemplate.convertAndSend("order_event_exchange", "order.delay.order", order, message -> {
//消息有效期5秒
long time = 1000 * 5;
message.getMessageProperties().setExpiration(String.valueOf(time));
return message;
},new CorrelationData(order.getId()));
System.out.println("当前时间1:"+System.currentTimeMillis());
return "ok";
}
测试结果:
可以看到时间差正好5s。
再测试一下,发两条消息,过期时间分别是10s和5s。
稍微改下代码:
测试结果:
可以看到,即使我们设置了5s的消息:id=111,还是需要等10s才能被消费。
- 延时队列+消息过期机制
打开延时队列对于消息过期时间的设置,并且删除延时队列,重启项目:
先发送id=222过期时间为10s的消息,在发送id=111过期时间为5s的消息。
测试结果:
可以看到,都设置过期时间的情况下,会以消息的过期时间为准,队列过期时间失效,同时消息过期检测还是得等前一条消息过期才能检测到。 - 优缺点
优点:在消息过期需求很多,且过期时间不同的情况下,无需创建大量队列,比较灵活。
缺点:同一个队列中的消息过期时间由队列的第一条信息的过期时间为准,依次向后推进,也就是上边测试的结果。
官网下载
根据自己的rabbit版本选择对应的插件版本下载,我这里用的3.9.x。
找到安装rabbitmq的路径,由于之前用rpm包,所以需要使用命令:
rpm -ql rabbitmq-server-3.9.16-1.el7.noarch
找到这个路径即可
将插件上传到plugins中
启用插件即可
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
在管理端页面看见这个代表成功
- 创建延迟交换机和普通队列
@Bean("lazy_exchange")
public Exchange lazyExchange(){
Map pros = new HashMap<>();
//设置交换机支持延迟消息推送,可取值:direc、topic、fanout
pros.put("x-delayed-type", "direct");
//type类型必须设置为 x-delayed-message
CustomExchange exchange = new CustomExchange("lazy_exchange", "x-delayed-message",true,false,pros);
return exchange;
}
@Bean("lazy_queue")
public Queue pluginDelayQueue(){
return new Queue("lazy_queue", true, false, false);
}
@Bean
public Binding pluginDelayBinding(@Qualifier("lazy_queue") Queue queue,@Qualifier("lazy_exchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();
}
- 接口测试代码
@Autowired
RabbitTemplate rabbitTemplate;
@PostMapping("/sendOrder")
public String sentOrder(@RequestBody Order order) {
rabbitTemplate.convertAndSend("lazy_exchange", "delay", order, message -> {
//消息有效期5秒
message.getMessageProperties().setHeader("x-delay", 15000);
return message;
},new CorrelationData(order.getId()));
System.out.println("发送消息:"+order.getId());
System.out.println("当前发送时间:"+System.currentTimeMillis());
return "ok";
}
- 监听器
@Service
@RabbitListener(queues = {"lazy_queue"})
public class OrderDelayListener {
@RabbitHandler
public void listener(Order order) {
System.out.println("收到lazy_queue信息:"+order.getId());
System.out.println("当前接收时间:"+System.currentTimeMillis());
System.out.println(order);
}
}
- 测试结果
算一下时间差,正好是15s. - 测试多消息设置不同延时参数的效果
准备参数:
接口改造:
测试结果:
可以看到,每个消息都是到期就被消费,并没有像TTL模式那样,还需要等待前一个消息到期,其实原因很简单,因为之前的两个模式,都是将消息存储到队列queue中,而延迟插件是将消息存储在交换机(exchange)中。



