延迟队列的实现方式有很多种,通过程序的方式实现,例如 JDK 自带的延迟队列 DelayQueue,通过 MQ 框架来实现,例如 RocketMQ、RabbitMQ等,本文通过 Redis 的方式来实现延迟队列 。
Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Score 属性可以用来存储延迟执行的时间。
优点:
灵活方便,Redis 是互联网公司的标配,无序额外搭建相关环境;
可进行消息持久化,大大提高了延迟队列的可靠性;
分布式支持,不像 JDK 自身的 DelayQueue;
高可用性,利用 Redis 本身高可用方案,增加了系统健壮性。
缺点:
需要使用无限循环的方式来执行任务检查,会消耗少量的系统资源。
redis的失效监听事件会存在一定的时间差,并且当数据量越大时,误差会越大。
分布式业务的场景下,会出现重复消费的问题。(可以增加分布式锁的实现,但是redisson分布式锁提供了另一种延迟队列的实现方式)
@Data
public class DelayMessage implements Serializable {
private String groupName;
private String id;
private Object body;
private long expireTime;
}
延迟消息工具类
@Component
public class DelayMessageUtils {
@Resource
private RedisTemplate redisTemplate;
public void addMsgPool(DelayMessage delayMessage) {
if (null != delayMessage) {
ValueOperations valueOperations = redisTemplate.opsForValue();
String key = RedisKey.MESSAGE_POOL + delayMessage.getGroupName() + delayMessage.getId();
valueOperations.set(key, JSON.toJSONString(delayMessage), delayMessage.getExpireTime(), TimeUnit.SECONDS);
}
}
public void delMsgPool(String groupName, String id) {
String key = RedisKey.MESSAGE_POOL + groupName + id;
redisTemplate.delete(key);
}
public void addMsgQueue(String key, String val, long score) {
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
zSetOperations.add(key, val, score);
}
public void delMsgQueue(String key, String id) {
ZSetOperations zSetOperations = redisTemplate.opsForZSet();
zSetOperations.remove(key, id);
}
}
延迟消息生产者
@Slf4j
@Component
public class DelayMessageProvider {
@Resource
private DelayMessageUtils delayMessageUtils;
@Value("${delayMsgExpireTime:900}")
private Long delayMsgExpireTime;
public boolean sendMessage(DelayMessage delayMessage) {
// 消息体为空 不予执行
if (ObjectUtil.isNull(delayMessage.getBody())) {
return false;
}
if (delayMsgExpireTime <= 0) {
delayMsgExpireTime = 1L;
}
// 计算过期时间
long delayTime = System.currentTimeMillis() + Convert.convertTime(delayMsgExpireTime - 1, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
delayMessage.setExpireTime(delayMsgExpireTime);
try {
// 向队列中添加消息
String key = RedisKey.MESSAGE_QUEUE + delayMessage.getGroupName();
delayMessageUtils.addMsgQueue(key, delayMessage.getId(), delayTime);
// 存入消息池
delayMessageUtils.addMsgPool(delayMessage);
log.debug("DelayMessageProvider 发送信息:{}", delayMessage);
} catch (Exception e) {
e.printStackTrace();
log.error("DelayMessageProvider 消息发送失败!");
return false;
}
return true;
}
public boolean delMessage(DelayMessage delayMessage) {
// 消息标识为空,不予执行
if (ObjectUtil.isNull(delayMessage.getGroupName()) || ObjectUtil.isNull(delayMessage.getId())) {
return false;
}
// 消息队列名称
String msgQueue = RedisKey.MESSAGE_QUEUE + delayMessage.getGroupName();
String id = delayMessage.getId();
try {
// 删除队列中消息
delayMessageUtils.delMsgQueue(msgQueue, String.valueOf(id));
// 删除消息池
delayMessageUtils.delMsgPool(delayMessage.getGroupName(), String.valueOf(id));
} catch (Exception e) {
e.printStackTrace();
log.error("DelayMessageProvider 删除消息失败!");
return false;
}
return true;
}
}
延迟消息消费者
@Slf4j
@Component
public class DelayMessageConsumer {
@Resource
private DelayMessageUtils delayMessageUtils;
@Resource
private RedisTemplate redisTemplate;
@Resource
private DelayMessageProvider delayMessageProvider;
@Resource
private List delayMessageExecuteList;
@Scheduled(cron = "* * * * * ?")
public void baseMonitor() {
long current = System.currentTimeMillis();
for (DelayMessageExecute delayMessageExecute : delayMessageExecuteList) {
// 消息队列名称
String msgQueue = RedisKey.MESSAGE_QUEUE + delayMessageExecute.getGroupName();
// 消息池前缀,以此前缀加上传递的消息id作为key
String msgPool = RedisKey.MESSAGE_POOL + delayMessageExecute.getGroupName();
// 查询当前已过期的消息队列
Set
延迟消息主体类型
public interface DelayMessageExecute {
String getGroupName();
boolean execute(DelayMessage delayMessage);
}
延迟消息主体消费实现
@Slf4j
@Component
public class PlayerKeepAliveExecuteImpl implements DelayMessageExecute {
@Override
public String getGroupName() {
return MsgTypeEnum.PLAYER_KEEP_ALIVE.name();
}
@Override
public boolean execute(DelayMessage delayMessage) {
log.debug("PlayerKeepAliveExecuteImpl 执行了 心跳消息处理:{}", delayMessage.getBody());
// 没有消息体,不予执行
if (ObjectUtil.isNull(delayMessage.getBody())) {
return false;
}
return true;
}
}
Redis 缓存 Key
public class RedisKey {
public static final String MESSAGE_POOL = "Message:Pool:";
public static final String MESSAGE_QUEUE = "Message:Queue:";
}



