栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot Redis 延时队列

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot Redis 延时队列

常见延迟队列实现方式

  延迟队列的实现方式有很多种,通过程序的方式实现,例如 JDK 自带的延迟队列 DelayQueue,通过 MQ 框架来实现,例如 RocketMQ、RabbitMQ等,本文通过 Redis 的方式来实现延迟队列 。
  Redis 是通过有序集合(ZSet)的方式来实现延迟消息队列的,ZSet 有一个 Score 属性可以用来存储延迟执行的时间。
  优点:
    灵活方便,Redis 是互联网公司的标配,无序额外搭建相关环境;
    可进行消息持久化,大大提高了延迟队列的可靠性;
    分布式支持,不像 JDK 自身的 DelayQueue;
    高可用性,利用 Redis 本身高可用方案,增加了系统健壮性。
  缺点:
    需要使用无限循环的方式来执行任务检查,会消耗少量的系统资源。
    redis的失效监听事件会存在一定的时间差,并且当数据量越大时,误差会越大。
    分布式业务的场景下,会出现重复消费的问题。(可以增加分布式锁的实现,但是redisson分布式锁提供了另一种延迟队列的实现方式)

延迟消息主体
@Data
public class DelayMessage implements Serializable {


    
    private String groupName;

    
    private String id;

    
    private Object body;

    
    private long expireTime;


}
延迟消息工具类
@Component
public class DelayMessageUtils {

    @Resource
    private RedisTemplate redisTemplate;

    
    public void addMsgPool(DelayMessage delayMessage) {
        if (null != delayMessage) {
            ValueOperations valueOperations = redisTemplate.opsForValue();
            String key = RedisKey.MESSAGE_POOL + delayMessage.getGroupName() + delayMessage.getId();
            valueOperations.set(key, JSON.toJSONString(delayMessage), delayMessage.getExpireTime(), TimeUnit.SECONDS);
        }
    }

    
    public void delMsgPool(String groupName, String id) {
        String key = RedisKey.MESSAGE_POOL + groupName + id;
        redisTemplate.delete(key);
    }

    
    public void addMsgQueue(String key, String val, long score) {
        ZSetOperations zSetOperations = redisTemplate.opsForZSet();
        zSetOperations.add(key, val, score);
    }

    
    public void delMsgQueue(String key, String id) {
        ZSetOperations zSetOperations = redisTemplate.opsForZSet();
        zSetOperations.remove(key, id);
    }


}
延迟消息生产者
@Slf4j
@Component
public class DelayMessageProvider {

    @Resource
    private DelayMessageUtils delayMessageUtils;
    
    @Value("${delayMsgExpireTime:900}")
    private Long delayMsgExpireTime;

    
    public boolean sendMessage(DelayMessage delayMessage) {
        // 消息体为空 不予执行
        if (ObjectUtil.isNull(delayMessage.getBody())) {
            return false;
        }
        if (delayMsgExpireTime <= 0) {
            delayMsgExpireTime = 1L;
        }
        // 计算过期时间
        long delayTime = System.currentTimeMillis() + Convert.convertTime(delayMsgExpireTime - 1, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
        delayMessage.setExpireTime(delayMsgExpireTime);
        try {
            // 向队列中添加消息
            String key = RedisKey.MESSAGE_QUEUE + delayMessage.getGroupName();
            delayMessageUtils.addMsgQueue(key, delayMessage.getId(), delayTime);
            // 存入消息池
            delayMessageUtils.addMsgPool(delayMessage);
            log.debug("DelayMessageProvider 发送信息:{}", delayMessage);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("DelayMessageProvider 消息发送失败!");
            return false;
        }
        return true;
    }

    
    public boolean delMessage(DelayMessage delayMessage) {
        // 消息标识为空,不予执行
        if (ObjectUtil.isNull(delayMessage.getGroupName()) || ObjectUtil.isNull(delayMessage.getId())) {
            return false;
        }
        // 消息队列名称
        String msgQueue = RedisKey.MESSAGE_QUEUE + delayMessage.getGroupName();
        String id = delayMessage.getId();
        try {
            // 删除队列中消息
            delayMessageUtils.delMsgQueue(msgQueue, String.valueOf(id));
            // 删除消息池
            delayMessageUtils.delMsgPool(delayMessage.getGroupName(), String.valueOf(id));
        } catch (Exception e) {
            e.printStackTrace();
            log.error("DelayMessageProvider 删除消息失败!");
            return false;
        }
        return true;
    }

}
延迟消息消费者
@Slf4j
@Component
public class DelayMessageConsumer {


    @Resource
    private DelayMessageUtils delayMessageUtils;

    @Resource
    private RedisTemplate redisTemplate;

    @Resource
    private DelayMessageProvider delayMessageProvider;

    
    @Resource
    private List delayMessageExecuteList;


    
    @Scheduled(cron = "* * * * * ?")
    public void baseMonitor() {
        long current = System.currentTimeMillis();

        for (DelayMessageExecute delayMessageExecute : delayMessageExecuteList) {
            // 消息队列名称
            String msgQueue = RedisKey.MESSAGE_QUEUE + delayMessageExecute.getGroupName();
            // 消息池前缀,以此前缀加上传递的消息id作为key
            String msgPool = RedisKey.MESSAGE_POOL + delayMessageExecute.getGroupName();


            // 查询当前已过期的消息队列
            Set set = redisTemplate.opsForZSet().rangeByScore(msgQueue, 0, current);
            if (CollectionUtils.isNotEmpty(set)) {
                for (Object id : set) {
                    long score = Objects.requireNonNull(redisTemplate.opsForZSet().score(msgQueue, String.valueOf(id))).longValue();
                    // 确认到期,然后执行消费
                    if (current >= score) {
                        String message;
                        DelayMessage delayMessage = null;
                        try {
                            message = (String) redisTemplate.opsForValue().get(msgPool + id);
                            if (ObjectUtil.isNull(message)) {
                                return;
                            }
                            delayMessage = JSONUtil.toBean(message, DelayMessage.class);
                            log.debug("DelayMessageConsumer 成功处理消息!");
                            delayMessageExecute.execute(delayMessage);
                        } catch (Exception e) {
                            e.printStackTrace();
                            // 如果发生异常,将其放回队列
                            assert delayMessage != null;
                            boolean bol = delayMessageProvider.sendMessage(delayMessage);
                            if (bol) {
                                log.error("DelayMessageConsumer 发生错误 ,该条消息回滚, 消息Id:{}", Objects.requireNonNull(delayMessage).getId());
                            } else {
                                log.error("DelayMessageConsumer 发生错误 ,该条消息回滚失败!,消息Id:{}", Objects.requireNonNull(delayMessage).getId());
                            }
                        } finally {
                            // 无论是否执行成功,都删除
                            delayMessageUtils.delMsgQueue(msgQueue, String.valueOf(id));
                            if (delayMessage != null) {
                                delayMessageUtils.delMsgPool(delayMessage.getGroupName(), String.valueOf(id));
                            }
                        }
                    }
                }
            }
        }
    }

}
 
延迟消息主体类型 
public interface DelayMessageExecute {

    
    String getGroupName();

    
    boolean execute(DelayMessage delayMessage);

}
延迟消息主体消费实现
@Slf4j
@Component
public class PlayerKeepAliveExecuteImpl implements DelayMessageExecute {

    
    @Override
    public String getGroupName() {
        return MsgTypeEnum.PLAYER_KEEP_ALIVE.name();
    }

    
    @Override
    public boolean execute(DelayMessage delayMessage) {
        log.debug("PlayerKeepAliveExecuteImpl 执行了 心跳消息处理:{}", delayMessage.getBody());
        // 没有消息体,不予执行
        if (ObjectUtil.isNull(delayMessage.getBody())) {
            return false;
        }
        return true;
    }
}
Redis 缓存 Key
public class RedisKey {

    
    public static final String MESSAGE_POOL = "Message:Pool:";
    
    public static final String MESSAGE_QUEUE = "Message:Queue:";

}
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号