需求:后台管理添加消息支持定时发送;
看到这个需求,我第一个想到的是查询用户消息时,只查询发送时间小于当前时间的消息;
这确实是一种解决方案,不过我这边的需求复杂一些,在手机通知中心中也能定时收到消息;【就是从手机顶部滑下来的消息】
这就要用到延迟队列了,延迟队列的实现有好几种,这里主要讲 rabbitMQ 的实现方式;
二、rabbitMQ 实现消息定时发送
具体流程:添加消息时如果发送时间大于当前时间,调用添加延迟队列的方法;
我们会先计算消息发送时间和当前时间的时间差,把这个时间差和这条消息封装成放入死信队列中,如果时间一到死信队列会根据路由键把消息发送到消息推送队列,我们有个监听消息推送队列的方法,当监听到有消息时处理相应的业务逻辑;
注:死信队列可以理解为放一些异常或无法立即被消费的消息;
三、代码
引入依赖;
org.springframework.boot spring-boot-starter-amqp
配置文件;
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 虚拟主机,可以用项目名
virtual-host: demo
# 开启发送方消息抵达broker确认回调
publisher-/confirm/i-type: correlated
# 开启发送方消息抵达队列确认回调
publisher-returns: true
# 只要抵达队列,以异步发送优先回调return/confirm/i
template:
mandatory: true
# 手动ack消息,不使用默认消费端确认
listener:
simple:
acknowledge-mode: manual
配置类:配置交换机,队列和路由键的关联关系;
@Configuration
public class MyMQConfig {
public static final String NOTICE_PUSH_EXCHANGE = "notice_push_exchange";
public static final String NOTICE_DL_QUEUE = "notice_dl_queue";
public static final String NOTICE_DL_KEY = "notice_dl_key";
public static final String NOTICE_PUSH_QUEUE = "notice_push_queue";
public static final String NOTICE_PUSH_KEY = "notice_push_key";
@Bean
public Exchange noticePushExchange() {
// 参数说明:交换机名字,是否持久化,是否自动删除,交换机参数
return new DirectExchange(NOTICE_PUSH_EXCHANGE, true,false, null);
}
@Bean
public Queue deadLetterQueue() {
Map args = new HashMap<>(2);
// 声明死信交换机
args.put("x-dead-letter-exchange", NOTICE_PUSH_EXCHANGE);
// 转发路由键
args.put("x-dead-letter-routing-key", NOTICE_PUSH_KEY);
// 参数说明:队列名字,是否持久化,是否排它,是否自动删除,队列参数
// 排它指的是只能被一个消费者连接使用
return new Queue(NOTICE_DL_QUEUE, true, false, false, args);
}
@Bean
public Queue noticePushQueue() {
return new Queue(NOTICE_PUSH_QUEUE, true, false, false, null);
}
@Bean
public Binding deadLetterBinding() {
// 参数说明:绑定名称【队列名称/】,类型【队列/交换机】,绑定交换机名称,绑定路由键,绑定参数
return new Binding(NOTICE_DL_QUEUE, Binding.DestinationType.QUEUE, NOTICE_PUSH_EXCHANGE, NOTICE_DL_KEY, null);
}
@Bean
public Binding noticePushBinding() {
return new Binding(NOTICE_PUSH_QUEUE, Binding.DestinationType.QUEUE, NOTICE_PUSH_EXCHANGE, NOTICE_PUSH_KEY, null);
}
}
添加延迟消息的方法;
public void addDelayNotice(Notice notice){
// 发送时间和当前时间相差的毫秒数,DateUtil.betweenMs()是hutool工具包的方法
long betweenMs = DateUtil.betweenMs(notice.getSendTime(), new Date());
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置编码
messageProperties.setContentEncoding("utf-8");
// 设置过期时间,单位毫秒
messageProperties.setExpiration(Long.toString(betweenMs));
return message;
};
// 加入到死信队列
rabbitTemplate.convertAndSend(MyMQConfig.NOTICE_PUSH_EXCHANGE, MyMQConfig.NOTICE_DL_KEY, notice, messagePostProcessor);
}
@Data
public class Notice {
// 主键id
private Integer id;
// 发送时间
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date sendTime;
// 其他属性
}
监听消息推送队列;
@Service
@RabbitListener(queues = {MyMQConfig.NOTICE_PUSH_QUEUE})
public class NoticePushListener {
@SneakyThrows
@RabbitHandler
public void noticePush(Notice notice, Channel channel, Message message){
try {
// TODO 监听到消息推送队列传来的notice对象,调用推送的方法
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
// 拒签,重新放回队列(可能自身服务问题报错等原因)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
当我们修改还没发送的消息时也会将消息加到死信队列,这时有个消息重复的问题;
在项目中我是这样处理的,在发送消息的方法中比较数据库的发送时间和消息推送队列中Notice对象的发送时间是否一样,如果不一样就不发消息;也可以加个版本号字段判断,每次修改时版本号都加1;



