- java多线程执行任务(工具再升级版)
- DelayPlan
- DelayPlanQueue
- Scheder
- TaskInfo
- Plan
- MyPlan
- Test
之前写的工具
java多线程执行任务(工具升级版)
少考虑了两种情况:
1:计划任务列表中如果有睡眠计划怎么办,线程就会等待了这样效率不高
2:计划之间没有上下文对象
针对这两点,在原来基础上我们再升级一下吧!
解决方法是:
1、针对第一种情况,新增一个延迟队列,将要休眠的计划对应的任务id放入延迟队列中,每次取计划时跳过延迟队列中存在的任务中的计划
2、在计划中添加上下文对象,方便后面计划获取前面计划放入的数据
实现原理如图:
这里面线程池取数据时较为复杂,没有画在图中,下面详细介绍存放数据的细节:
在介绍之前我们先看一下延迟队列中包含的属性
延迟队列的结构如下:
1、放数据
放数据比较简单,就在每次获取任务时判断原来的任务是否在延迟队列中,在的话就跳过获取下个数据,不在就将该任务的数据放入对应的执行计划队列中
2、取数据
取数据的逻辑就比较复杂了,这其中不同的模式取数据的方式还不太一样,但是实现逻辑是一样的,取数据时线程池获取对应执行列表中的计划,如果计划不存在,可以从延迟队列中对应任务中还未执行的计划中获取计划执行。如果计划存在,还需要判断该计划对应的任务是否在延迟队列中,如果在需要将该计划放入在延迟队列中对应任务的未执行计划列表中,等待后面执行;如果该计划对应的任务不在延迟队列中,那么可以直接执行计划
后面接着上代码
DelayPlanimport lombok.Data;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
public class DelayPlan implements Delayed {
private int taskInfoId;
private long time;
public DelayPlan(int taskInfoId, long time, TimeUnit unit) {
this.taskInfoId = taskInfoId;
this.time = System.currentTimeMillis() + (time > 0? unit.toMillis(time): 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
DelayPlan DelayPlan = (DelayPlan) o;
long diff = this.time - DelayPlan.time;
if (diff <= 0) {// 改成>=会造成问题
return -1;
}else {
return 1;
}
}
}
DelayPlanQueue
import cn.hutool.core.collection.ConcurrentHashSet;
import lombok.Data;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@Data
public class DelayPlanQueue {
public static DelayQueue delayPlanQueue = new DelayQueue<>();
public static Set taskInfoIdSet = new ConcurrentHashSet<>();
public static Map> delayPlanMap = new ConcurrentHashMap<>();
public static Thread thread;
public static void add(int taskInfoId, long time, TimeUnit unit) {
if (!contains(taskInfoId)) {
DelayPlan delayPlan = new DelayPlan(taskInfoId, time, unit);
delayPlanQueue.put(delayPlan);
taskInfoIdSet.add(taskInfoId);
}
}
public static void clearDelayPlanStart() {
thread = new Thread(() -> {
for (; ; ) {
try {
DelayPlan take = delayPlanQueue.take();
int taskInfoId = take.getTaskInfoId();
taskInfoIdSet.removeIf(item -> item == taskInfoId);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
});
thread.start();
}
public static void clearDelayPlanEnd() {
if (thread != null) {
thread.interrupt();
}
}
public static boolean contains(int taskInfoId) {
return taskInfoIdSet.contains(taskInfoId);
}
public static void main(String[] args) throws InterruptedException {
add(0, 5, TimeUnit.SECONDS);
add(1, 6, TimeUnit.SECONDS);
add(2, 7, TimeUnit.SECONDS);
add(3, 8, TimeUnit.SECONDS);
add(4, 10, TimeUnit.SECONDS);
add(5, 1, TimeUnit.MINUTES);
add(4, 30, TimeUnit.SECONDS);
clearDelayPlanStart();
for (int i = 1; i < 70; i++) {
Thread.sleep(1000);
for (int j = 0; j < 6; j++) {
boolean contains = contains(j);
System.out.println("第" + i + "秒," + j + "是否存在" + contains);
}
System.out.println("-----------------------------------------------");
}
}
}
Scheder
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@Slf4j
public class Scheder {
private TaskInfo[] taskInfos;
private LinkedBlockingQueue[] planQueueArray;
int queueNum;
int queueSize;
private java.util.concurrent.ExecutorService loopExecutor;
private int nThrends;
private int model;
private int modelSize;
private ArrayList indexList = new ArrayList<>();
private AtomicInteger count;
private volatile boolean status = false;
public Scheder(TaskInfo[] taskInfos, int nThrends, int queueNum, int queueSize, int model, Integer modelSize) {
this.taskInfos = taskInfos;
this.nThrends = nThrends;
this.queueNum = queueNum;
this.queueSize = queueSize;
this.loopExecutor = Executors.newFixedThreadPool(this.nThrends);
this.model = model;
if (this.model < 3) {
this.planQueueArray = new LinkedBlockingQueue[1];
this.planQueueArray[0] = new LinkedBlockingQueue<>(this.queueSize);
} else {
// 初始化队列数组
this.planQueueArray = new LinkedBlockingQueue[this.queueNum];
IntStream.range(0, this.queueNum).forEach(i -> this.planQueueArray[i] = new LinkedBlockingQueue<>(this.queueSize));
}
// modelSize只有在等于2,4有效
if (this.model == 2 || this.model == 4) {
this.modelSize = modelSize > taskInfos.length ? taskInfos.length : modelSize;
}
count = countPlan();
}
private AtomicInteger countPlan() {
int sum = 0;
for (int i = 0; i < this.taskInfos.length; i++) {
sum += this.taskInfos[i].getPlanQueue().size();
}
return new AtomicInteger(sum);
}
public Scheder(TaskInfo[] taskInfos, int nThrends, int model, Integer modelSize) {
this(taskInfos, nThrends, nThrends, 100, model, modelSize);
}
public Scheder(TaskInfo[] taskInfos, int nThrends) {
this(taskInfos, nThrends, nThrends, 100, 1, null);
}
public Scheder(TaskInfo[] taskInfos) {
this(taskInfos, 10, 10, 100, 1, null);
}
public void setModel(int model) {
this.model = model;
}
public int getModel() {
return model;
}
public void setModelSize(int modelSize) {
this.modelSize = modelSize;
}
public int getModelSize() {
return modelSize;
}
public ArrayList getIndexList() {
return indexList;
}
public AtomicInteger getCount() {
return count;
}
public boolean isStatus() {
return status;
}
public void run() {
if (this.status) {
log.warn("任务处于启动状态");
return;
}
this.status = true;
// 开启向队列中添加执行计划线程
init();
// 循环执行执行计划
while (this.status) {
// 所有执行计划执行完后,退出
if (this.taskInfos.length <= 0) {
if (this.model < 3) {
if (this.planQueueArray[0].size() == 0 && CollUtil.isEmpty(DelayPlanQueue.delayPlanMap)) {
this.status = false;
break;
}
} else {
ArrayList notEmptyIndex = getNotEmptyIndex(this.planQueueArray);
if (CollUtil.isEmpty(notEmptyIndex) && CollUtil.isEmpty(DelayPlanQueue.delayPlanMap)) {
this.status = false;
break;
}
}
}
// 执行计划
execute();
}
int size;
// 所有线程执行完毕出循环
for (; ; ) {
size = this.count.get();
if (size == 0) {
break;
}
}
//停止线程池
this.loopExecutor.shutdownNow();
for (; ; ) {
//只有当线程池中所有线程完成任务时才会返回true,并且需要先调用线程池的shutdown方法或者shutdownNow方法。
if (this.loopExecutor.isTerminated()) {
System.out.println("执行结束!");
break;
}
}
DelayPlanQueue.clearDelayPlanEnd();
}
private void execute() {
if (this.model < 3) {
try {
// 获取一个执行计划
Plan peek = this.planQueueArray[0].peek();
if(peek == null){
if (CollUtil.isEmpty(DelayPlanQueue.delayPlanMap)) {
return;
}
// 计划队列为空的时候,如果休眠计划队列有数据可以执行休眠计划队列中的数据,这里的any一定会取到值
Optional>> any = DelayPlanQueue.delayPlanMap.entrySet().stream().findAny();
Integer taskInfoId = any.get().getKey();
LinkedList planLinkedList = any.get().getValue();
Plan planOfDelayPlan = planLinkedList.removeFirst();
if(CollUtil.isEmpty(planLinkedList)){
DelayPlanQueue.delayPlanMap.entrySet().removeIf(item->item.getKey().equals(taskInfoId));
}
this.loopExecutor.execute(() -> planOfDelayPlan.run0(this.count, Plan.context.get(planOfDelayPlan.taskInfoId)));
return;
}
// 判断这个任务是否在休眠
if (DelayPlanQueue.contains(peek.taskInfoId)) {
// 处于休眠,取出该任务计划
Plan take = this.planQueueArray[0].take();
// 将该任务放入休眠计划队列中
LinkedList planLinkedList = DelayPlanQueue.delayPlanMap.computeIfAbsent(take.taskInfoId, k -> new LinkedList<>());
planLinkedList.push(take);
return;
}else{// 这个任务不在休眠
// 获取这个任务的休眠计划队列
LinkedList planLinkedList = DelayPlanQueue.delayPlanMap.get(peek.taskInfoId);
// 判断是否休眠计划队列中还存在该任务的计划
if(!CollUtil.isEmpty(planLinkedList)){
// 存在,优先执行休眠计划队列的计划
Plan planOfDelayPlan = planLinkedList.removeFirst();
if(CollUtil.isEmpty(planLinkedList)){
DelayPlanQueue.delayPlanMap.entrySet().removeIf(item->item.getKey() == peek.taskInfoId);
}
this.loopExecutor.execute(() -> planOfDelayPlan.run0(this.count, Plan.context.get(planOfDelayPlan.taskInfoId)));
}else{
// 不存在
Plan take = this.planQueueArray[0].take();
// 执行计划
this.loopExecutor.execute(() -> take.run0(this.count, Plan.context.get(take.taskInfoId)));
}
}
} catch (InterruptedException e) {
log.error("任务执行中发生异常", e);
}
} else {
this.loopExecutor.execute(() -> {
try {
// 获取一个执行计划
Plan plan = null;
// 获取线程id
String name = Thread.currentThread().getName();
int lastIndexOf = name.lastIndexOf("-");
int id = Integer.parseInt(name.substring(lastIndexOf + 1));
ArrayList notEmptyIndex2 = getNotEmptyIndex(this.planQueueArray);
Integer index = notEmptyIndex2.stream().filter(item -> item % this.nThrends == (id - 1)).findAny().orElse(null);
if (index == null) {
return;
}
LinkedBlockingQueue plans = this.planQueueArray[index];
if (plans.size() > 0) {
Plan peek = plans.peek();
if(peek == null){
if (CollUtil.isEmpty(DelayPlanQueue.delayPlanMap)) {
return;
}
// 计划队列为空的时候,如果休眠计划队列有数据可以执行休眠计划队列中的数据
Optional>> any = DelayPlanQueue.delayPlanMap.entrySet().stream().findAny();
Integer taskInfoId = any.get().getKey();
LinkedList planLinkedList = any.get().getValue();
Plan planOfDelayPlan = planLinkedList.removeFirst();
if(CollUtil.isEmpty(planLinkedList)){
DelayPlanQueue.delayPlanMap.entrySet().removeIf(item->item.getKey().equals(taskInfoId));
}
this.loopExecutor.execute(() -> planOfDelayPlan.run0(this.count, Plan.context.get(planOfDelayPlan.taskInfoId)));
return;
}
// 如果这个任务在睡眠
if (DelayPlanQueue.contains(peek.taskInfoId)) {
Plan take = plans.take();
LinkedList planLinkedList = DelayPlanQueue.delayPlanMap.computeIfAbsent(take.taskInfoId, k -> new LinkedList<>());
planLinkedList.push(take);
return;
}else{
LinkedList planLinkedList = DelayPlanQueue.delayPlanMap.get(peek.taskInfoId);
if(!CollUtil.isEmpty(planLinkedList)){
Plan planOfDelayPlan = planLinkedList.removeFirst();
if(CollUtil.isEmpty(planLinkedList)){
DelayPlanQueue.delayPlanMap.entrySet().removeIf(item->item.getKey() == peek.taskInfoId);
}
planOfDelayPlan.run0(this.count, Plan.context.get(planOfDelayPlan.taskInfoId));
}else{
plan = plans.take();
plan.run0(this.count, Plan.context.get(plan.taskInfoId));
}
}
}
} catch (InterruptedException e) {
log.error("任务执行中发生异常", e);
}
});
}
}
private ArrayList getNotEmptyIndex(LinkedBlockingQueue[] planQueueArray) {
ArrayList indexArray = new ArrayList<>();
for (int i = 0; i < planQueueArray.length; i++) {
if (!CollUtil.isEmpty(planQueueArray[i])) {
indexArray.add(i);
}
}
return indexArray;
}
private void init() {
DelayPlanQueue.clearDelayPlanStart();
new Thread(() -> {
while (this.status) {
// 任务信息数组数量
int length = this.taskInfos.length;
// 执行完结束线程
if (length <= 0) {
break;
}
// 获取添加执行计划的的任务索引值
int index = getIndexOfModel(this.model, length);
TaskInfo taskInfo = null;
try {
taskInfo = this.taskInfos[index];
} catch (Exception e) {
e.printStackTrace();
}
// 延迟队列中存在该任务id直接获取下一个任务
if (!DelayPlanQueue.contains(taskInfo.getId())) {
LinkedList plans = taskInfo.getPlanQueue();
if (plans.size() > 0) {
try {
if (this.model >= 3) {
int index2 = taskInfo.getId() % this.planQueueArray.length;
this.planQueueArray[index2].put(plans.removeFirst());
} else {
this.planQueueArray[0].put(plans.removeFirst());
}
} catch (InterruptedException e) {
log.error("向执行计划队列放入计划异常", e);
}
} else {
this.taskInfos = reBuildTaskInfos(this.taskInfos, index);
}
}
}
}).start();
}
private int getIndexOfModel(int model, int length) {
if (model == 1 || model == 3) {
return RandomUtil.randomInt(0, length) % length;
} else {
this.indexList.removeIf(item -> item >= length);
if (this.indexList.size() < this.modelSize) {
int index = RandomUtil.randomInt(0, length) % length;
this.indexList.add(index);
return index;
} else {
return this.indexList.get(RandomUtil.randomInt(0, length) % this.indexList.size());
}
}
}
private TaskInfo[] reBuildTaskInfos(TaskInfo[] taskInfos, int index) {
TaskInfo[] newTaskINfo = new TaskInfo[taskInfos.length - 1];
for (int j = 0, i = 0; i < taskInfos.length; i++) {
if (i != index) {
newTaskINfo[j] = taskInfos[i];
j++;
}
}
return newTaskINfo;
}
}
TaskInfo
import lombok.Data;
import java.util.LinkedList;
@Data
public class TaskInfo {
private int id;
private String name;
private LinkedList planQueue;
public TaskInfo(int id, String name, LinkedList planQueue) {
this.id = id;
this.name = name;
this.planQueue = planQueue;
}
}
Plan
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class Plan {
static Map> context = new ConcurrentHashMap<>();
int taskInfoId;
void before() {
}
abstract void run(Map context);
void after() {
}
void run0(AtomicInteger atomicInteger, Map context) {
try {
before();
run(context);
} finally {
after();
atomicInteger.decrementAndGet();
}
}
}
这里加入context上下对象
实现自己的计划
MyPlanimport lombok.Data;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Data
public class MyPlan extends Plan {
private String name;
public void setTaskInfoId(int taskInfoId) {
this.taskInfoId = taskInfoId;
}
@Override
public void run(Map context) {
if (this.taskInfoId == 33 && name.indexOf("333")>0) {
DelayPlanQueue.add(this.taskInfoId, 30, TimeUnit.SECONDS);
}
System.out.println(Thread.currentThread().getName() + ":" + "用户:" + this.taskInfoId + name);
}
}
这里第33个任务的333个计划睡眠30s
Testimport java.util.LinkedList;
import java.util.stream.IntStream;
public class Test {
public static void main(String[] args) {
int userSize = 100;
int jobSize = 1000;
TaskInfo[] taskInfos = new TaskInfo[userSize];
IntStream.range(0, userSize).parallel().forEach(i -> {
LinkedList plans = new LinkedList<>();
for (int j = 0; j < jobSize; j++) {
MyPlan myPlan = new MyPlan();
myPlan.setTaskInfoId(i);
myPlan.setName("执行计划" + j);
plans.add(myPlan);
}
taskInfos[i] = new TaskInfo(i, "用户" + i, plans);
});
Scheder scheder = new Scheder(taskInfos, 10,10,100,3,2);
scheder.run();
}
}
测试选择的是模式3,即所有任务都在执行,并且每个任务中的计划顺序实现,由于我们在计划中第33个任务的第333个计划睡眠过所以第33个任务会最后顺序执行完



