栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot集成Redisson做延迟队列、重试队列(解决项目重新启动并不会消费之前队列里的消息的问题)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot集成Redisson做延迟队列、重试队列(解决项目重新启动并不会消费之前队列里的消息的问题)

maven配置:
        
            org.redisson
            redisson-spring-boot-starter
            3.16.6
        
springboot配置文件:
spring:
    # redis 配置
    redis:
        # 地址
        host: localhost
        # 端口,默认为6379
        port: 6379
        # 数据库索引
        database: 0
        # 密码
        password:
延迟队列service:

import com.ruoyi.unionpay.service.DelayQueueListenerService;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;


@Service
public class DelayQueueListenerServiceImpl implements DelayQueueListenerService, CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(DelayQueueListenerServiceImpl.class);

    //线程池
    @Resource
    private ThreadPoolTaskExecutor taskExecutor;
    //延迟参数
    @Resource
    private DelayQueueProperties delayQueueProperties;
    @Resource
    private RedissonClient redissonClient;

    @Override
    public void run(String... args) throws Exception {
        //启动监控管控状态线程
        taskExecutor.execute(this::RetryThread);
    }

    
    private void RetryThread() {
        RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue("test");
        while (true) {
            //解决项目重新启动并不会消费之前队列里的消息
            RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
            try {
                String message= blockingFairQueue.take();
                log.debug("触发延迟队列:{}", message);
                String result = "ERROR";
                //返回状态为错误,进入重试延迟队列
                if (result.equals("ERROR")) {
                    log.debug("失败{},继续进入延迟队列", result.toString());
                    delayedQueue.offer(message, delayQueueProperties.getDelay(), delayQueueProperties.getTimeUnit());
                }else {
                    log.debug("其他状态{},不进入延迟队列", result.toString());
                }
                delayedQueue.destroy();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    
    @Override
    public boolean addDelayQueue(String message) {
        log.debug("添加延迟队列:{}",message);
        RBlockingQueue blockingQueue = redissonClient.getBlockingQueue("test");
        RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
        delayedQueue.offer(message, delayQueueProperties.getDelay(), delayQueueProperties.getTimeUnit());
        delayedQueue.destroy();
        return true;
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/679100.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号