1.2 使用场景延迟队列内部是有序的,其中的元素是希望在指定时间到了之后取出或者处理,总之就是用来存放需要在指定时间被处理的元素的队列。
1.3 整合SpringBoot用来在一定时间间隔内对相应的事件做出提醒或者操作。
正常新建SpringBoot项目即可。SpringBoot版本为2.5.6,如果版本过高与Swagger2版本相差过大就会抛出异常。
添加依赖:
org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web 2.5.6 com.alibaba fastjson 1.2.78 org.projectlombok lombok 1.18.20 io.springfox springfox-swagger2 2.9.2 io.springfox springfox-swagger-ui 2.9.2 org.springframework.amqp spring-rabbit-test test org.springframework.boot spring-boot-starter-test test
新增Swagger配置类:
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig(){
return new Docket(documentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo(){
return new ApiInfoBuilder()
.title("RabbitMQ接口文档")
.description("RabbitMQ文档接口定义")
.version("1.0")
.contact(new Contact("RabbitMQ","https://www.baidu.com"
,"baidu@qq.com"))
.build();
}
}
配置文件:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest1.4 延迟队列实战 1.4.1 图示
- P:生产者;X:普通交换机;Y:延迟交换机;QA:延迟10s普通队列;QB:延迟40s普通队列;QD:死信队列;C:消费者。
主要是基于SpringBoot方式将队列交换机RotingKey等绑定起来。
@Configuration
public class TtlQueueConfig {
// 普通交换机
private static final String X_EXCHANGE = "X";
// 死信交换机
private static final String DEAD_LETTER_Y_EXCHANGE = "Y";
// 普通队列
private static final String QUEUE_A = "QA";
private static final String QUEUE_B = "QB";
// 死信队列
private static final String DEAD_LETTER_QUEUE_D = "QD";
// 设置普通交换机
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 设置死信交换机
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(DEAD_LETTER_Y_EXCHANGE);
}
// 设置普通队列
@Bean("queueA")
public Queue aQueue(){
Map argument = new HashMap<>();
// 设置死信交换机
argument.put("x-dead-letter-exchange",DEAD_LETTER_Y_EXCHANGE);
// 设置死信RoutingKey
argument.put("x-dead-letter-routing-key","YD");
// 设置过期时间
argument.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(argument).build();
}
@Bean("queueB")
public Queue bQueue(){
Map argument = new HashMap<>();
// 设置死信交换机
argument.put("x-dead-letter-exchange",DEAD_LETTER_Y_EXCHANGE);
// 设置死信RoutingKey
argument.put("x-dead-letter-routing-key","YD");
// 设置过期时间
argument.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(argument).build();
}
// 设置死信队列
@Bean("queueD")
public Queue dQueue(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueD).to(xExchange).with("YD");
}
}
1.4.3 控制器模拟生产者发送消息
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给TTL队列:{}",new Date().toString(),message);
// 执行发送消息到X交换机RoutingKey为XA,XB
rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列" + message);
rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列" + message);
}
}
1.4.4 消费者
@Slf4j
@Component
public class DeadLetterConsumer {
// 声明队列监听器,监听死信队列QD
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到来自死信队列消息:{}",new Date().toString(),msg);
}
}
1.4.5 测试和结果
测试方式:访问http://localhost:8088/ttl/sendMsg/测试,访问了这个界面之后控制器会发送一条消息到正常的交换机中,等待10/40秒后会进入死信队列由消费者接收。
1.5.1 图示上方设计中,每新增一个时间需求就需要新增一个队列,这样会占有较多的资源,并且不便于管理。
优化思路:在原先项目的基础上新增一个先不对时间进行限制队列QC,QC的时间限制在生产者发送消息的时候指定。
在原先的配置基础上进行修改,新增队列QC,并且绑定它和交换机X,Y的关系。
声明队列:
private static final String QUEUE_C = "QC";
配置队列:
public Queue cQueue(){
Map argument = new HashMap<>();
// 设置死信交换机
argument.put("x-dead-letter-exchange",DEAD_LETTER_Y_EXCHANGE);
// 设置死信RoutingKey
argument.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(argument).build();
}
绑定交换机:
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
1.5.3 新增控制器方法
@GetMapping("/sendTimeAndMsg/{time}/{message}")
public void sendTimeAndMsg(@PathVariable String time,@PathVariable String message){
log.info("当前时间:{},发送一条延迟{}毫秒的信息给TTL队列:{}",new Date().toString(),time,message);
rabbitTemplate.convertAndSend("X",
"XC",
"来自交换机X延迟为" + time +"毫秒的消息:" + message,
msg -> {
// 设置时间
msg.getMessageProperties().setExpiration(time);
return msg;
}) ;
}
1.5.4 测试
依次访问:
http://localhost:8088/ttl/sendTimeAndMsg/20000/20sMessage,
http://localhost:8088/ttl/sendTimeAndMsg/2000/2sMessage
按照上方的延迟时间,如果相隔的时间不超过20秒,应该是先收到延迟为2秒的消息。但是显然并不是按照两秒的时间来进行接收的。这就引出了一个新的问题:RabbitMQ只会检测第一个消息是否过期,只要第一条没有到期后面的也不会优先得到执行。
1.6 基于插件延迟队列 1.6.1 优势在使用插件之前:
使用了插件之后:
也就是说消息停留在生产者将消息发送给交换机之后,等停留的时间过了之后,交换机才会将消息发给队列,并且只需要一个队列和一个交换机即可完成。
下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载之后的ez文件复制到RabbitMQ安装目录下的plugins文件夹下,然后使用cmd进入到RabbitMQ安装目录下的sbin文件夹下运行:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如图:
然后进入到web端,随便新增一个交换机,可以看到下拉框中多了一个x-delayed-message选项:
这里就表示安装完成了。
直接将消息发送给交换机,交换机等待相应时间之后再发给队列。
@Configuration
public class DelayedQueueConf {
// 交换机
private static final String DELAYED_EXCHANGE = "delayed.exchange";
// 队列
private static final String DELAYED_QUEUE = "delayed.queue";
// rotingKey
private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
@Bean("delayQueue")
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE);
}
@Bean
public CustomExchange delayedExchange(){
Map args = new HashMap<>();
// 延迟类型
args.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",
false,false,args);
}
@Bean
public Binding delayQueueBindingDelayExchange(@Qualifier("delayQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange customExchange){
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
1.6.3.4 生产者控制器
@GetMapping("/sendTimeAndMsgByPlugins/{time}/{message}")
public void sendTimeAndMsgByPlugins(@PathVariable Integer time,@PathVariable String message){
log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}",new Date().toString(),time,message);
rabbitTemplate.convertAndSend("delayed.exchange",
"delayed.routingKey",message,
msg -> {
msg.getMessageProperties().setDelay(time);
return msg;
});
}
1.6.3.5 消费者监听
@Slf4j
@Component
public class DelayQueueConsumer {
// 声明队列监听器,监听死信队列QD
@RabbitListener(queues = "delayed.queue")
public void receiveD(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到来自死信队列消息:{}",new Date().toString(),msg);
}
}
1.6.4 测试
依次访问:
http://localhost:8088/ttl/sendTimeAndMsgByPlugins/20000/20sMessage,
http://localhost:8088/ttl/sendTimeAndMsgByPlugins/2000/2sMessage
出现如下结果,之前出现死信的情况消失了。



