常见延迟任务常见解决方案
主动形式被动形式 基于Redis实现ZSet的方式、键空间通知的方式
ZSet的方式键空间通知的方式 RocketMQ延迟消息
延迟消息级别配置客户端发送延时消息 总结
常见延迟任务
订单下单30 分钟后,一直未付款的订单,系统需要自动取消订单并返还库存。
商家发货后,7天还未确认收货的订单,系统自动确认收货并进行分账处理。
用户发起的拼团购买,24小时后未拼团成功的订单需要关闭。
待处理售后信息超时1天,使用站内信通知店铺管理员,超时2天则使用短信通知店铺管理员。
红包 24 小时未被查收,过期后需要执退还业务;
常见解决方案 主动形式
基于JDK原生DelayQueue、ScheduledExecutorService、TimerTask等
缺点:
大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失延迟任务。
定时轮询数据库(数据量大或者轮询间隔短,扫描数据库是个**“重”**操作)
不支持集群模式
基于Spring Task
缺点:
定时轮询数据库(数据量大或者轮询间隔短,扫描数据库是个**“重”**操作)
不支持集群模式
基于分布式定时任务quartz、elastic-job-lite、xxl-job
缺点:
定时轮询数据库(数据量大或者轮询间隔短,扫描数据库是个**“重”**操作)
基于Redis实现ZSet的方式、键空间通知的方式
相比于上面优化:
把任务放入到轻量级别的内存中,把扫描数据库的**“重”**操作,改用性能更高的Redis做为代替;
缺点:
持久化也无法保证不丢数据。
ZSet的方式:存在集群问题。
键空间通知:Redis的Pub/Sub不可靠,没有ACK机制等。
基于MQ延迟消息
RabbitMQ:需要安装一个rabbitmq_delayed_message_exchange插件。RocketMQ延迟消息。 被动形式
给延迟任务实体设置个过期时间,被动等待用户触发。
例如:用户发起的拼团购买,24小时后未拼团成功的订单需要关闭。
主动形式:系统定时check拼单是否到了24小时,到了之后系统更新拼单状态成功与否。但是可能很多用户都不看这个拼单,我们还主动去更新,比较浪费性能 。(系统触发更新状态)
被动形式:发起拼单时就计算好超时时间,然后只有当用户主动查看这个拼单时,我们才去根据时间判断更新拼单状态。(用户触发更新状态)
基于Redis实现ZSet的方式、键空间通知的方式被动形式看需求场景哈。比如30分钟订单未支付,需要实时的返还库存,这种场景用被动形式就不行。用户一直不触发,别人没法买了GG。
ZSet的方式简单介绍下原理,不是很严谨的业务场景可以尝试用下。
将定时任务存放到ZSet集合中,并且将过期时间存储到ZSet的Score字段中,然后通过一个循环来判断当前时间内是否有需要执行的定时任务,如果有则进行执行。
zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值
添加元素:ZADD key score member [[score member] [score member] …]按顺序查询元素:ZRANGE key start stop [WITHSCORES]查询元素:score:ZSCORE key member移除元素:ZREM key member [member …]
一、添加任务
redis.zadd(Constants.DELAY_TASK_QUEUE,timeStamp,bizId);
二、定时轮询Zset消费
// 每秒轮询
scheduledExecutorService.scheduleWithFixedDelay(new DelayTaskHandler(),1,1, TimeUnit.SECONDS);
DelayTaskHandler.java // 单独线程
Set ids = redis.zrangeByScore(Constants.DELAY_TASK_QUEUE, 0, System.currentTimeMillis());
if(ids==null||ids.isEmpty()){
return;
}
for(String id : ids){
// 防止多实例并发导致的线程安全问题 返回值等于1说明移除成功才执行业务
Long count = redis.zrem(Constants.DELAY_TASK_QUEUE, id);
if(count!=null && count == 1){
// handle business
}
}
键空间通知的方式
实现思路是给所有的定时任务设置一个过期时间,等到了过期之后,我们通过订阅过期消息就能感知到定时任务需要被执行了,此时我们执行定时任务即可。
RocketMQ延迟消息上面比比了一堆,实际主要是用到了这个哈。
Broker端内置延迟消息处理能力,核心实现思路:将延迟消息通过一个临时存储进行暂存,到期后才投递到目标Topic中。
开源RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。
延迟级别的值可以进行修改,以满足自己的业务需求,可以修改/添加新的level。
例如:你想支持2天的延迟,修改最后一个level的值为2d,这个时候依然是18个level;也可以增加一个2d,这个时候总共就有19个level。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 2d
延迟消息级别配置建议在最后面添加,最好不要修改其默认的前18个level。RocketMQ消息重试,重试默认会进行重试16次,每次间隔也用要了这18个level。
在服务器端(rocketmq-broker端)的属性配置/conf/broker.conf文件中加入:
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH ## 加入这行,默认值如下 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
默认18级,如level1表示延时1s,level2表示延时5s,level14表示延时10m,可以修改这个指定级别的延时时间;时间单位支持:s、m、h、d,分别表示秒、分、时、天;RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等修改完成后重启RocketMQ。nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf & 客户端发送延时消息
需要在客户端待发送的消息( com.alibaba.rocketmq.common.message.Message )中设置延时级别delayLevel即可。
private RocketMQTemplate rocketMQTemplatet;
public void sendDelayMessage(String topic,String message,int delayLevel){
SendResult sendResult = rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLevel);
...
总结
采用MQ延迟消息 + 定时检查数据库双层保险。
单独设计一个专门用于存储延迟消息表。
有延迟任务需要处理时,把任务insert到这个表,同时发送延迟消息到MQ。
延迟任务处理完成后,在延迟消息表delete掉这个任务。(存储延迟消息表数据量就不会大了)
定时检查延迟消息表中那些长期未处理完成的延迟任务,用于补偿业务。
任务的间隔不要太短,防止频繁扫描数据库。
参考:
https://blog.csdn.net/tianshouzhi/article/details/103676104https://mp.weixin.qq.com/s/8MM2-_3KifMFS6SxV3zslw



