- 延时队列
- 概念
- 使用场景
- 延时队列插件
- 插件安装
- 插件原理
- 插件实现延时队列
- 消息发送方
- 主要依赖
- 配置文件
- 配置类(`1`)
- 配置类(`2`)
- 发送方 `service`
- 发送方 `controller`
- 消息接收方
- 配置文件
- 消息的消费
- 测试
延时队列是存储延时消息的队列,延时消息就是生产者发送了一条消息,但是不希望该消息不要被立即消费,而是设置一个延时时间,等过了这个时间再消费消息
使用场景- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
- 账单在一周内未支付,则自动结算
- 用户注册成功后,如果三天内没有登陆则进行短信提醒
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员
这时候,延时队列就可以闪亮登场了,以上场景,正是延时队列的用武之地
延时队列插件 插件安装在 RabbitMQ 3.6.x 之前我们一般采用 死信队列+ TTL 过期时间来实现延迟队列,我们这里不做过多介绍,从 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件。插件下载地址:https://www.rabbitmq.com/community-plugins.html
本人使用的 RabbitMQ 3.8.3 版本,所以下载 rabbitmq_delayed_message_exchange-3.8.0.ez 这个插件放到 RabbitMQ 安装目录的 plugins 文件中。在 RabbitMQ 安装 sbin 文件中用 cmd 执行命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
开启插件后,启动 RabbitMQ,访问登录后访问 http://localhost:15672,在交换机 exchanges 的 tab下,底部新增将看到如下图设置,则表示插件已启动,以后直接就可以使用了
插件原理延迟插件底层简单原理图
- 原始的 死信队列+ TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用)
- 这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了
配置文件org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web
server.port=8080 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #确认消息已发送到交换机 spring.rabbitmq.publisher-/confirm/is=true #确认消息已发送到队列 spring.rabbitmq.publisher-returns=true配置类(1)
主要是队列,交换机的配置绑定
@Configuration
public class RabbitConfig {
// 延时交换机
public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
// 延时队列名称
public static final String DELAY_QUEUE_NAME = "delay_queue";
// 普通交换机
public static final String ORDER_PAY_Exchange_Name = "order_pay_exchange";
// 普通队列名称
public static final String ORDER_PAY_QUEUE_NAME = "order_pay_queue";
// 普通交换机路由键
public static final String ORDER_PAY_ROUTING_KEY = "order_pay_routing_key";
// ------------------------延时队列------------------------
// 延时队列
@Bean
public Queue delayPayQueue() {
return new Queue(RabbitConfig.DELAY_QUEUE_NAME, true);
}
// 延时交换机
public FanoutExchange delayExchange() {
Map args = new HashMap<>();
args.put("x-delayed-type", "direct");
FanoutExchange fanoutExchange = new FanoutExchange(RabbitConfig.DELAY_EXCHANGE_NAME, true, false, args);
fanoutExchange.setDelayed(true);
return fanoutExchange;
}
// 绑定延时队列与延时交换机
@Bean
public Binding delayPayBind() {
return BindingBuilder.bind(delayPayQueue()).to(delayExchange());
}
// ------------------------普通队列------------------------
// 普通队列
@Bean
public Queue orderPayQueue() {
return new Queue(RabbitConfig.ORDER_PAY_QUEUE_NAME, true);
}
// 普通交换机
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(RabbitConfig.ORDER_PAY_Exchange_Name, true, false);
}
// 绑定普通消息队列
@Bean
public Binding orderPayBind() {
return BindingBuilder.bind(orderPayQueue()).to(orderExchange()).with(RabbitConfig.ORDER_PAY_ROUTING_KEY);
}
// 定义消息转换器
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
// 定义消息模板用于发布消息,并且设置其消息转换器
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
// --------------------------使用RabbitAdmin启动服务便创建交换机和队列--------------------------
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
// 创建延时交换机和对列
rabbitAdmin.declareExchange(delayExchange());
rabbitAdmin.declareQueue(delayPayQueue());
// 创建普通交换机和对列
rabbitAdmin.declareExchange(orderExchange());
rabbitAdmin.declareQueue(orderPayQueue());
return new RabbitAdmin(connectionFactory);
}
}
配置类(2)
消息发送到 exchange,queue 的回调函数
@Slf4j
@Configuration
public class Rabbit/confirm/iConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.set/confirm/iCallback((correlationData, ack, cause) -> {
log.info("/confirm/iCallback:" + "相关数据:" + correlationData);
log.info("/confirm/iCallback:" + "确认情况:" + ack);
log.info("/confirm/iCallback:" + "原因:" + cause);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("ReturnCallback:" + "消息:" + message);
log.info("ReturnCallback:" + "回应码:" + replyCode);
log.info("ReturnCallback:" + "回应信息:" + replyText);
log.info("ReturnCallback:" + "交换机:" + exchange);
log.info("ReturnCallback:" + "路由键:" + routingKey);
});
return rabbitTemplate;
}
}
发送方 service
@Service
public class MsgProductionService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送延时信息
public void sendTimeoutMsg(String content, String routingKey, int delay) {
// 通过广播模式发布延时消息,会广播至每个绑定此交换机的队列,这里的路由键没有实质性作用
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, routingKey, content, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 毫秒为单位,指定此消息的延时时长
message.getMessageProperties().setDelay(delay * 1000);
return message;
});
}
// 发送普通消息
public void sendMsg(String routingKey, String content) {
// DirectExchange类型的交换机,必须指定对应的路由键
rabbitTemplate.convertAndSend(RabbitConfig.ORDER_PAY_Exchange_Name, routingKey, content);
}
}
发送方 controller
@Controller
public class MsgSendController {
@Autowired
private MsgProductionService msgProductionService;
@GetMapping(path = "/sendMsg")
@ResponseBody
public String sendMsg() {
// 发送多个延时消息
msgProductionService.sendTimeoutMsg("hello1", "routingKey1", 40);
msgProductionService.sendTimeoutMsg("hello2", "routingKey2", 20);
msgProductionService.sendTimeoutMsg("hello3", "routingKey3", 60);
// 发送普通消息
msgProductionService.sendMsg(RabbitConfig.ORDER_PAY_ROUTING_KEY, "weixin");
msgProductionService.sendMsg(RabbitConfig.ORDER_PAY_ROUTING_KEY, "alipay");
return "success";
}
}
消息接收方
配置文件
server.port=8081 #配置rabbitmq服务器 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.listener.type=simple #消费方消息确认:手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.default-requeue-rejected=false消息的消费
@Slf4j
@Component
public class MsgComsumerService {
// 监听消费延时消息
@RabbitListener(queues = {"delay_queue"})
@RabbitHandler
public void process(String content, Message message, Channel channel) throws IOException {
try {
log.info("延迟队列的内容[{}]", content);
// 消息的可定确认,第二个参数如果为true将一次性确认所有小于deliveryTag的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("超时信息处理完毕");
} catch (Exception e) {
log.error("处理失败:{}", e.getMessage());
// 直接拒绝消费该消息,后面的参数一定要是false,否则会重新进入业务队列,不会进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
// 消费普通消息
@RabbitListener(queues = {"order_pay_queue"})
@RabbitHandler
public void process1(String content, Message message, Channel channel) throws IOException {
try {
log.info("普通队列的内容[{}]", content);
// 消息的可定确认,第二个参数如果为true将一次性确认所有小于deliveryTag的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("普通信息处理完毕");
} catch (Exception e) {
log.error("处理失败:{}", e.getMessage());
// 直接拒绝消费该消息,后面的参数一定要是false,否则会重新进入业务队列,不会进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
测试
启动 RabbitMQ,及两个 springboot 项目,RabbitMQ 管理页面如下
队列
交换机
调用接口 http://localhost:8080/sendMsg,查看控制台结果
消息消费方日志
- hello2 的延时时间是 20s
- hello1 的延时时间是 40s
- hello3 的延时时间是 60s
消息发送方日志
源码:https://gitee.com/chaojiangcj/springboot-rabbitmq-delay-queue.git



