延时队列(延迟队列):队列内部是 有序 的,最重要的特性就体现在它的 延时 属性上
延时队列中的元素是希望在指定时间到了以后或之前取出和处理
延时队列就是用来存放需要在指定时间被处理的元素的队列
2、使用场景订单在十分钟之内未支付则自动取消新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒用户注册成功后,如果三天内没有登陆则进行短信提醒用户发起退款,如果三天内没有得到处理则通知相关运营人员预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务。
如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于”如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
3、整合Springbootpom.xml
4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.5 com.tuwer springboot-rabbitmq 0.0.1-SNAPSHOT springboot-rabbitmq Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.amqp spring-rabbit-test test com.alibaba fastjson 1.2.79 io.springfox springfox-boot-starter 3.0.0 org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok
application.yml
spring:
rabbitmq:
host: 192.168.19.101
port: 5672
username: admin
password: admin
添加Swagger配置类
package com.tuwer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.documentationType;
import springfox.documentation.spring.web.plugins.Docket;
import java.util.ArrayList;
@Configuration
@EnableWebMvc
public class SwaggerConfig {
@Bean
public Docket docket(){
return new Docket(documentationType.OAS_30)
.groupName("webApi")
.apiInfo(apiInfo())
.select()
.build();
}
private ApiInfo apiInfo(){
// 作者信息
Contact contact = new Contact("土味儿", "http://localhost:8080/", "2141421414@qq.com");
return new ApiInfo(
"Hello Swagger API 文档",
"大道无垠 行者无疆",
"v1.0",
"http://localhost:8080/",
contact,
"Apache 2.0",
"http://www.apache.org/licenses/LICENSE-2.0",
new ArrayList());
}
}
4、队列实现 1)代码架构【SpringBoot】18、整合Swagger 3.0【狂神篇】_土味儿~的博客-CSDN博客
创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后再创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:
2)代码实现
配置类 RabbitMqConfig.java
声明交换机、队列及绑定关系
package com.tuwer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
public static final String EXCHANGE_X = "X";
public static final String DEAD_LETTER_EXCHANGE_Y = "Y";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String DEAD_LETTER_QUEUE_D = "QD";
@Bean("exchangeX")
public DirectExchange getExchangeX(){
return new DirectExchange(EXCHANGE_X);
}
@Bean("deadLetterExchangeY")
public DirectExchange getDeadLetterExchangeY(){
return new DirectExchange(DEAD_LETTER_EXCHANGE_Y);
}
@Bean("queueA")
public Queue getQueueA(){
Map arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_Y);
// 设置死信交换机绑定的RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间TTL:单位毫秒
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
@Bean("queueB")
public Queue getQueueB(){
Map arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_Y);
// 设置死信交换机绑定的RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间TTL:单位毫秒
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
@Bean("queueD")
public Queue getQueueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
}
@Bean
public Binding queueABindExchangeX(
@Qualifier("queueA") Queue queue,
@Qualifier("exchangeX") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("XA");
}
@Bean
public Binding queueBBindExchangeX(
@Qualifier("queueB") Queue queue,
@Qualifier("exchangeX") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("XB");
}
@Bean
public Binding queueDBindExchangeY(
@Qualifier("queueD") Queue queue,
@Qualifier("deadLetterExchangeY") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("YD");
}
}
生产者 SendMsgController.java
发送消息;由 RabbitTemplate 来发送消息
package com.tuwer.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.time.LocalTime;
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/sendMsg/{message}")
public String sendMsg(@PathVariable("message") String message) {
rabbitTemplate.convertAndSend("X","XA",message);
rabbitTemplate.convertAndSend("X","XB",message);
log.info("当前时间:{},发送给两个TTL队列:{}", LocalTime.now(),message);
return "消息【" + message + "】已发送!";
}
}
消费者 DeadLetterQueueConsumer.java
一直监听队列;@RabbitListener(queues = "QD")官方教程中加入了Channel,但方法中没有使用,实测去掉Channel,也不用抛异常,可以运行;public void receiveD(Message message, Channel channel) throws Exception {
package com.tuwer.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
//public void receiveD(Message message, Channel channel) throws Exception {
public void receiveQueueD(Message message){
String msg = new String(message.getBody());
log.info("时间:{},接收到的消息:{}", LocalTime.now(),msg);
}
}
3)运行测试
5、队列优化
1)问题
第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
队列的过期时间固定,不够灵活。可以建立一个通用队列,过期时间灵活设置。
2)代码架构图 3)实现
修改配置类 RabbitMqConfig.java
增加QC队列,并绑定交换机;QC中不设置过期时间
@Configuration
public class RabbitMqConfig {
// 省略...
public static final String QUEUE_C = "QC";
// 省略...
@Bean("queueC")
public Queue getQueueC(){
Map arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_Y);
// 设置死信交换机绑定的RoutingKey
arguments.put("x-dead-letter-routing-key","YD");
// 设置过期时间TTL:单位毫秒
//arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
// 省略...
@Bean
public Binding queueCBindExchangeX(
@Qualifier("queueC") Queue queue,
@Qualifier("exchangeX") DirectExchange exchange
){
return BindingBuilder.bind(queue).to(exchange).with("XC");
}
// 省略...
}
修改生产者 SendMsgController.java
增加可以发送过期时间消息的方法
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 省略...
@RequestMapping("/sendExpirationMsg/{message}/{ttlTime}")
public String sendMsg(
@PathVariable("message") String message,
@PathVariable("ttlTime") String ttlTime
) {
// 消息处理器:设置过期时间
MessagePostProcessor messagePostProcessor = msg ->{
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
};
rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);
log.info("当前时间:{},发送给QC队列:{},过期时间:{}ms", LocalTime.now(),message,ttlTime);
return "消息【" + message + "】已发送!";
}
}
4)运行测试
因为RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行!
6、插件实现延迟队列 1)docker中安装插件下载插件
插件下载地址:https://www.rabbitmq.com/community-plugins.html 找到rabbitmq_delayed_message_exchange下载 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
将插件复制到容器内,进行安装
# 拷贝至docker容器内 docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq容器ID:/plugins # 进入docker容器内 docker exec -it rabbitmq bash # 赋予权限 chmod 777 /plugins/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez # 启动延时插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange2)代码架构图 3)实现
配置类 DelayedQueueConfig.java
package com.tuwer.config;
import com.rabbitmq.client.BuiltinExchangeType;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean("delayedExchange")
public CustomExchange getDelayedExchange() {
// 参数
Map arguments = new HashMap<>(1);
// 交换机类型:直接
arguments.put("x-delayed-type", BuiltinExchangeType.DIRECT.getType());
//arguments.put("x-delayed-type","direct");
// 自定义交换机
return new CustomExchange(
DELAYED_EXCHANGE_NAME,
"x-delayed-message",
true,
false,
arguments);
}
@Bean("delayedQueue")
public Queue getDelayedQueue() {
//return new Queue(DELAYED_QUEUE_NAME);
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}
@Bean
public Binding delayedQueueBindDelayedExchange(
@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange exchange
) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with(DELAYED_ROUTING_KEY)
.noargs();
}
}
生产者
因为 setDelay() 接收 Integer 类型参数,所以 delayTime 类型为 Integer;
@RequestMapping("/sendDelayMsg/{message}/{delayTime}")
public String sendMsg(
@PathVariable("message") String message,
@PathVariable("delayTime") Integer delayTime
) {
System.out.println(message + delayTime);
// 消息处理器:设置延迟时间
MessagePostProcessor messagePostProcessor = msg ->{
msg.getMessageProperties().setDelay(delayTime);
return msg;
};
rabbitTemplate.convertAndSend(
DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
DelayedQueueConfig.DELAYED_ROUTING_KEY,
message,
messagePostProcessor
);
log.info("当前时间:{},发送给延迟队列:{},延迟时间:{}ms", LocalTime.now(),message,delayTime);
return "消息【" + message + "】已发送!";
}
消费者(监听器)DelayedLetterCustomer.java
package com.tuwer.consumer;
import com.tuwer.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Slf4j
@Component
public class DelayedLetterCustomer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("时间:{},接收到的消息:{}", LocalTime.now(),msg);
}
}
4)测试
延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次,以及未被正确处理的消息不会被丢弃。
通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
延时队列还有很多其它选择,比如:利用Java的DelayQueue,利用Redis的Zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,看需要适用的场景



