栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Springboot基于redisson实现延时队列

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Springboot基于redisson实现延时队列

总监:咳咳咳…

我:

总监:那个,最近很多微服务里面需要重试机制啊,你看看怎么搞一下?

我:我gzip压缩还没搞完呢!(Gateway网关和Feign调用开启gzip压缩)

总监:你加把劲,我是相信你的,搞完了告诉我一声。

我心里:我的怎么办?


思路:

    既然是需要重试机制,那么一定需要一个队列去的形式去存储该部分需要重试的数据。重试就需要有时间的概念,比如重试几次,多长时间重试一次。由此就想到了可不可以使用redis的延时队列呢,我们只需要设置时间,记录一下失败的次数,就可以通过回调来自动执行这个任务。

说干就干.

一:编写一个生产者类为QueueProducer
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class QueueProducer {

    @Resource
    private RedissonClient redissonClient;

    
    public  void addQueue(String queueName, long delay, TimeUnit timeUnit, T t) {
        log.info("添加到队列:{}, 延迟时间数:{}, 延迟时间单位:{}, 实际参数:{}", queueName, delay, timeUnit, t);
        RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
    }

    
    public  Boolean delValue(String queueName,Object value){
        RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        return delayedQueue.remove(value);
    }

}

二:编写一个消费者的接口
public interface QueueConsumer {

    
    void execute(T t);
}
三:编写一个核心的监听类
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;


@Slf4j
@SuppressWarnings("all")
@Component
public class CommonQueueComponent implements ApplicationContextAware {

    
    @Value("${spring.application.name}")
    private String queueName;

    @Resource
    private RedissonClient redissonClient;

    
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map map = applicationContext.getBeansOfType(QueueConsumer.class);
        map.values().forEach(this::startThread);
    }

    
    private  void startThread(QueueConsumer queueConsumer) {
        RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(queueName);
        //由于此线程需要常驻,所以可以直接新建线程,不需要交给线程池管理
        Thread thread = new Thread(() -> {
            log.info("启动队列名为:{}的监听线程", queueName);
            while (true) {
                try {
                    T t = blockingFairQueue.take();
                    //此处不提供多线程处理,自己决定是否开启多线程(业务中需要开启多线程话,建议使用线程池!!!)
                    queueConsumer.execute(t);
                } catch (Exception e) {
                    log.error("队列监听线程错误,", e);
                }
            }
        });
        thread.setName(queueName);
        thread.start();
    }
}

因为是微服务项目,一个服务只用一个队列,如果不是微服务项目,可自行修改队列名称位置。

四:测试添加延时数据
public class Test{
  
    @Resource
    CommonQueueProducer commonQueueProducer;
  
  public void test(){
  	//"test-service"对应queueName
    commonDelayedQueueProducer.addQueue("test-service", 1, MINUTES, "test");
  }
} 
五:实现消费者接口QueueConsumer
@Component
public class RedisCustomer implements QueueConsumer {

    @Override
    public void execute(Object object) {
      	//该类监听的为CommonQueueComponent类中的queueName名称的redis中的key
        //实现延时返回后处理数据的业务

    }
}

嘿嘿嘿,貌似搞定了。。。

我:看着外面的黑黑的天空,眼泪不自觉的流了下来。

溜了溜了。。。。等等,点个再走也不迟!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781085.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号