单机版延迟队列,适用业务中,X秒/分钟/小时后执行的业务场景,此方案的弊端完全在内存,机器异常重启后数据丢失,优势轻量
1,目前支持到秒级别,若有毫秒级,需要自己修改timeLoop()方法中的睡眠时间;
2,线程池参数threadPoolInit(),根据自己的业务场景进行调整;
import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @Slf4j public class DelayCycleBuffer{ private final List > cycleDataList; private final AtomicInteger clock = new AtomicInteger(0); private ThreadPoolExecutor threadPoolExecutor; private void threadPoolInit() { threadPoolExecutor = new ThreadPoolExecutor(32, 64, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100000), run -> new Thread(run, "CONSUME-THREAD-" + run.hashCode())); } private final int capacity; private final AtomicInteger size = new AtomicInteger(0); public DelayCycleBuffer(int capacity, Consumer > callBack) { if (capacity < 0) { throw new IllegalArgumentException("capacity must >=0"); } cycleDataList = new CopyOnWriteArrayList<>(); //初始化 for (int i = 0; i < capacity; i++) { cycleDataList.add(new CycleData<>()); } this.capacity = capacity; //线程池初始化 this.threadPoolInit(); //开启时间轮询1s一次 this.timeLoop(callBack); } public void put(T t, Integer second) { if (t == null || second == null || second < 0) { throw new IllegalArgumentException("param is valid~!"); } int currentClock = clock.get(); int nextClock = currentClock + 1 + second; DataInfo data = new DataInfo<>(); data.setCircle(nextClock / capacity); data.setData(t); int index = nextClock % capacity; CycleData tCycleData = this.cycleDataList.get(index); tCycleData.dataInfoList.add(data); size.incrementAndGet(); } public Collection takeData(int clockTime) { log.info("------exec ---{}s", clockTime); Collection result = new ArrayList<>(); CycleData tCycleData = cycleDataList.get(clockTime); //过滤出,不是当前圈的数据 ,并且将符合条件的数据,放入result中 List > dataInfoList = tCycleData.getDataInfoList().stream().filter(m -> { if (m.getCircle() > 0) { m.setCircle(m.getCircle() - 1); return true; } result.add(m.getData()); size.decrementAndGet(); return false; }).collect(Collectors.toList()); tCycleData.setDataInfoList(dataInfoList); return result; } private void timeLoop(Consumer > callBack) { Thread thread = new Thread(() -> { log.info("-------------time loop is started~!----"); while (true) { try { int clockTime = clock.getAndIncrement(); threadPoolExecutor.submit(() -> { Collection ts = takeData(clockTime); callBack.accept(ts); }); int currClock = clock.get(); if (currClock == capacity) { clock.compareAndSet(currClock, 0); } TimeUnit.MILLISECONDS.sleep(1000L); } catch (Exception e) { log.error("exception~!", e); } } }, "TimeLoop-Thread"); thread.setDaemon(true); thread.start(); } public int getSize() { return size.get(); } public static class CycleData { private List > dataInfoList = new LinkedList<>(); public List > getDataInfoList() { return dataInfoList; } public void setDataInfoList(List > dataInfoList) { this.dataInfoList = dataInfoList; } } public static class DataInfo { private Integer circle = 0; private T data; public Integer getCircle() { return circle; } public void setCircle(Integer circle) { this.circle = circle; } public T getData() { return data; } public void setData(T data) { this.data = data; } } }
单元测试:
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Slf4j
public class DelayCycleBufferTest {
public static void main(String[] args) throws InterruptedException {
//初始化,并申明消费方法
DelayCycleBuffer delayCycleBuffer = new DelayCycleBuffer<>(3600, dataList -> {
if (CollectionUtils.isEmpty(dataList)) {
return;
}
//直接做打印
log.info("get data from rang buff size :{}", dataList.size());
});
//模拟存放数据
Random random = new Random();
new Thread(()->{
for (int i = 0; i < 20000; i++) {
delayCycleBuffer.put("数据"+i, random.nextInt(30));
}
}).start();
//检查数量变化
new Thread(()->{
while (true){
try{
log.info("剩余{}个", delayCycleBuffer.getSize());
TimeUnit.MILLISECONDS.sleep(1000L);
}catch (Exception ignore){}
}
}).start();
Thread.sleep(Integer.MAX_VALUE);
}
}



