1、基本功能
把多次操作合并成一次操作批量执行,类似于Hystrix的请求合并。
2、基本api及使用方式// 构建一个缓冲队列 private BufferTrigger3、基本原理bufferTrigger = BufferTrigger. batchBlocking() // 队列最大容量 .batchSize(5) // 每隔n秒消费一次 .linger(20, TimeUnit.SECONDS) // 消费逻辑 .setConsumerEx((ids) -> { for (String id : ids) { log.info("id:{}", id); } }) .build(); @PostConstruct protected void init() { // 程序结束时把所有积攒的数据一次性消费干净 TermHelper.addTerm(bufferTrigger::manuallyDoTrigger); } // service方法 public String commitByBatch(String id) { bufferTrigger.enqueue(id); return "success"; }
内部主要由一个阻塞队列和一个延迟线程池组成。调用enqueue方法时,将任务参数放入BlockingQueue中,同时判断阻塞队列容量是否到达最大值,若已达到且定时消费任务并未执行,则提前消费队列中的任务。ScheduledThreadPoolExecutor中的任务每隔n秒检查队列中是否有未完成任务,若有则执行。
// 每次调用enqueue时触发, 判断阻塞队列容量是否到达最大值, 是否需要提前触发任务
private void tryTrigBatchConsume() {
if (queue.size() >= batchSize) {
runWithTryLock(lock, () -> {
if (queue.size() >= batchSize) {
if (!running.get()) { // prevent repeat enqueue
this.scheduledExecutorService.execute(() -> doBatchConsumer(TriggerType.ENQUEUE));
running.set(true);
}
}
});
}
}
// 定时任务执行逻辑, 遍历阻塞队列, 查看是否还有未完成任务
private void doBatchConsumer(TriggerType type) {
runWithLock(lock, () -> {
try {
running.set(true);
int queueSizeBeforeConsumer = queue.size();
int consumedSize = 0;
while (!queue.isEmpty()) {
if (queue.size() < batchSize) {
if (type == TriggerType.ENQUEUE) {
return;
} else if (type == TriggerType.LINGER && consumedSize >= queueSizeBeforeConsumer) {
return;
}
}
List toConsumeData = new ArrayList<>(min(batchSize, queue.size()));
queue.drainTo(toConsumeData, batchSize);
if (!toConsumeData.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("do batch consumer:{}, size:{}", type, toConsumeData.size());
}
consumedSize += toConsumeData.size();
// 执行具体动作, 最终调用.setConsumerEx传入的匿名类中方法
doConsume(toConsumeData);
}
}
} finally {
running.set(false);
}
});
}



