实现方案:
相当于是使用了三个队列,消息队列、延迟队列、业务队列。如果只使用一个延时队列可以吗?也可以,如果只有一种消息需要延时,则一个队列就可以了。如果有多种消息,还使用一个队列的话会导致消费者逻辑非常复杂,耦合严重,使用三个队列,可以对消息按type分类,新增消息类型时,新增一个消费者就行,消费者职责单一解耦合。
存储消息的id和消息实体到消息队列
利用zset存储消息的id,延时时间戳到延时队列,延时时间戳作为score。
开启一个定时任务,从该队列中扫描大于当前时间戳的id集合,将其添加到业务队列。
消费者扫描业务队列进行消费
定时任务加锁是为了防止多节点部署。未加锁,如果同一时间有多个定时任务执行,可能导致业务队列数据重复。
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@Component
public class RedisDelayQueueComponent {
@Resource
private RedisTemplate redisTemplate;
@Resource
private ExecutorService executorService;
@Resource
private RedisLock redisLock;
@Resource
private DelayTimeParam delayTimeParam;
@Resource
private Logger logger;
public void addMessageQueue(Message messege) {
redisTemplate.opsForHash().put(messege.getType(), messege.getId(), messege);
}
public void delMessageQueue(String type, String key) {
redisTemplate.opsForHash().delete(type, key);
}
public Message getMessageQueue(String key) {
return (Message) redisTemplate.opsForHash().get("type", key);
}
public void addDelayQueue(String key, String value, double score) {
redisTemplate.opsForZSet().add(key, value, score);
}
public Set getAndRemoveDelayQueue(String key, Long timestamp) {
Set set = redisTemplate.opsForZSet().rangeByScore(key, 0, timestamp);
if (!CollectionUtils.isEmpty(set)) {
redisTemplate.opsForZSet().removeRangeByScore(key, 0, timestamp);
}
return set;
}
public void pushBusinessQueue(String key, String value) {
redisTemplate.opsForList().leftPush(key, value);
}
public String popBusinessQueue(String key) {
return redisTemplate.opsForList().rightPop(key, 1, TimeUnit.SECONDS);
}
class Message {
private String id;
private int count;
private String type;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
class Consumer implements ApplicationRunner {
@Override
public void run(ApplicationArguments arguments) {
executorService.execute(this::consume);
}
void consume() {
while (true) {
if (!delayTimeParam.getSwitchButtrn()) {
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
logger.error("sleep error:{}", e);
}
continue;
}
try {
String key = popBusinessQueue("key");
if (StringUtils.isBlank(key)) {
continue;
}
Message messageQueue = getMessageQueue(key);
if (messageQueue == null) {
continue;
}
delMessageQueue("type", "key");
int count = messageQueue.getCount();
if (count > delayTimeParam.getCount()) {
continue;
}
messageQueue.setCount(++count);
long timestamp = System.currentTimeMillis() + 60 * 1000;
addMessageQueue(messageQueue);
addDelayQueue(messageQueue.getType(), messageQueue.getId(), timestamp);
} catch (Exception ex) {
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
logger.error("sleep error:{}", e);
}
}
}
}
}
@Scheduled(cron = "0/5 * * * * ?")
public void scanDelayQueue() {
Set set = null;
try {
Boolean lock = redisLock.lock("key", "uuid", 3);
if (lock) {
set = getAndRemoveDelayQueue("key", System.currentTimeMillis());
}
} finally {
redisLock.unlock("key", "uuid");
}
if (CollectionUtils.isEmpty(set)) {
return;
}
for (String s : set) {
Message messegeQueue = getMessageQueue(s);
if (messegeQueue == null) {
continue;
}
pushBusinessQueue(messegeQueue.getType(), s);
}
}
@Configuration
@ConfigurationProperties(prefix = "delay")
class DelayTimeParam {
private Boolean switchButtrn;
private int count;
public Boolean getSwitchButtrn() {
return switchButtrn;
}
public int getCount() {
return count;
}
}
}



