栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Redis实现延时队列

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Redis实现延时队列

实现方案:
相当于是使用了三个队列,消息队列、延迟队列、业务队列。如果只使用一个延时队列可以吗?也可以,如果只有一种消息需要延时,则一个队列就可以了。如果有多种消息,还使用一个队列的话会导致消费者逻辑非常复杂,耦合严重,使用三个队列,可以对消息按type分类,新增消息类型时,新增一个消费者就行,消费者职责单一解耦合。
存储消息的id和消息实体到消息队列
利用zset存储消息的id,延时时间戳到延时队列,延时时间戳作为score。
开启一个定时任务,从该队列中扫描大于当前时间戳的id集合,将其添加到业务队列。
消费者扫描业务队列进行消费
定时任务加锁是为了防止多节点部署。未加锁,如果同一时间有多个定时任务执行,可能导致业务队列数据重复。

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

@Component
public class RedisDelayQueueComponent {

    @Resource
    private RedisTemplate redisTemplate;

    @Resource
    private ExecutorService executorService;

    @Resource
    private RedisLock redisLock;

    @Resource
    private DelayTimeParam delayTimeParam;

    @Resource
    private Logger logger;

    public void addMessageQueue(Message messege) {
        redisTemplate.opsForHash().put(messege.getType(), messege.getId(), messege);
    }

    public void delMessageQueue(String type, String key) {
        redisTemplate.opsForHash().delete(type, key);
    }

    public Message getMessageQueue(String key) {
        return (Message) redisTemplate.opsForHash().get("type", key);
    }

    public void addDelayQueue(String key, String value, double score) {
        redisTemplate.opsForZSet().add(key, value, score);
    }

    public Set getAndRemoveDelayQueue(String key, Long timestamp) {
        Set set = redisTemplate.opsForZSet().rangeByScore(key, 0, timestamp);
        if (!CollectionUtils.isEmpty(set)) {
            redisTemplate.opsForZSet().removeRangeByScore(key, 0, timestamp);
        }
        return set;
    }

    public void pushBusinessQueue(String key, String value) {
        redisTemplate.opsForList().leftPush(key, value);
    }

    public String popBusinessQueue(String key) {
        return redisTemplate.opsForList().rightPop(key, 1, TimeUnit.SECONDS);
    }

    class Message {
        private String id;
        private int count;
        private String type;

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }

        public String getType() {
            return type;
        }

        public void setType(String type) {
            this.type = type;
        }
    }

    class Consumer implements ApplicationRunner {
        @Override
        public void run(ApplicationArguments arguments) {
            executorService.execute(this::consume);
        }

        void consume() {
            while (true) {
                if (!delayTimeParam.getSwitchButtrn()) {
                    try {
                        TimeUnit.SECONDS.sleep(60);
                    } catch (InterruptedException e) {
                        logger.error("sleep error:{}", e);
                    }
                    continue;
                }
                try {
                    String key = popBusinessQueue("key");
                    if (StringUtils.isBlank(key)) {
                        continue;
                    }
                    Message messageQueue = getMessageQueue(key);
                    if (messageQueue == null) {
                        continue;
                    }
                    delMessageQueue("type", "key");
                    int count = messageQueue.getCount();
                    if (count > delayTimeParam.getCount()) {
                        continue;
                    }
                    messageQueue.setCount(++count);
                    long timestamp = System.currentTimeMillis() + 60 * 1000;
                    addMessageQueue(messageQueue);
                    addDelayQueue(messageQueue.getType(), messageQueue.getId(), timestamp);
                } catch (Exception ex) {
                    try {
                        TimeUnit.SECONDS.sleep(60);
                    } catch (InterruptedException e) {
                        logger.error("sleep error:{}", e);
                    }
                }

            }
        }
    }

    @Scheduled(cron = "0/5 * * * * ?")
    public void scanDelayQueue() {
        Set set = null;
        try {
            Boolean lock = redisLock.lock("key", "uuid", 3);
            if (lock) {
                set = getAndRemoveDelayQueue("key", System.currentTimeMillis());
            }
        } finally {
            redisLock.unlock("key", "uuid");
        }
        if (CollectionUtils.isEmpty(set)) {
            return;
        }
        for (String s : set) {
            Message messegeQueue = getMessageQueue(s);
            if (messegeQueue == null) {
                continue;
            }
            pushBusinessQueue(messegeQueue.getType(), s);
        }
    }

    @Configuration
    @ConfigurationProperties(prefix = "delay")
    class DelayTimeParam {

        private Boolean switchButtrn;

        private int count;

        public Boolean getSwitchButtrn() {
            return switchButtrn;
        }

        public int getCount() {
            return count;
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/328180.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号