昨天写的java多线程执行任务(工具)但是不能符合顺序执行计划的场景,下面升级一下原工具
java多线程执行任务(工具)
之前只支持两种模式,新工具支持四种模式
执行模式:
1:所有任务信息都执行2:先执行部分任务,执行完后再执行其他任务3:所有任务信息都执行,但是顺序执行每个任务中的计划4:顺序先执行执行任务中的计划,执行完后再顺序执行其他任务
模式3,4在模式1,2上顺序执行每个任务中的计划
实现原理如图:
接着上代码
Scheder
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.linkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.linkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@Slf4j
public class Scheder {
private TaskInfo[] taskInfos;
private linkedBlockingQueue[] planQueueArray;
int queueNum;
int queueSize;
private ExecutorService loopExecutor;
private int nThrends;
private int model;
private int modelSize;
private ArrayList indexList = new ArrayList<>();
private AtomicInteger count;
private volatile boolean status = false;
public Scheder(TaskInfo[] taskInfos, int nThrends, int queueNum, int queueSize, int model, Integer modelSize) {
this.taskInfos = taskInfos;
this.nThrends = nThrends;
this.queueNum = queueNum;
this.queueSize = queueSize;
this.loopExecutor = Executors.newFixedThreadPool(this.nThrends);
this.model = model;
if (this.model < 3) {
this.planQueueArray = new linkedBlockingQueue[1];
this.planQueueArray[0] = new linkedBlockingQueue<>(this.queueSize);
} else {
// 初始化队列数组
this.planQueueArray = new linkedBlockingQueue[this.queueNum];
IntStream.range(0, this.queueNum).forEach(i -> this.planQueueArray[i] = new linkedBlockingQueue<>(this.queueSize));
}
// modelSize只有在等于2,4有效
if (this.model == 2 || this.model == 4) {
this.modelSize = modelSize > taskInfos.length ? taskInfos.length : modelSize;
}
count = countPlan();
}
private AtomicInteger countPlan() {
int sum = 0;
for (int i = 0; i < this.taskInfos.length; i++) {
sum += this.taskInfos[i].getPlanQueue().size();
}
return new AtomicInteger(sum);
}
public Scheder(TaskInfo[] taskInfos, int nThrends, int model, Integer modelSize) {
this(taskInfos, nThrends, nThrends, 100, model, modelSize);
}
public Scheder(TaskInfo[] taskInfos, int nThrends) {
this(taskInfos, nThrends, nThrends, 100, 1, null);
}
public Scheder(TaskInfo[] taskInfos) {
this(taskInfos, 10, 10, 100, 1, null);
}
public void setModel(int model) {
this.model = model;
}
public int getModel() {
return model;
}
public void setModelSize(int modelSize) {
this.modelSize = modelSize;
}
public int getModelSize() {
return modelSize;
}
public ArrayList getIndexList() {
return indexList;
}
public AtomicInteger getCount() {
return count;
}
public boolean isStatus() {
return status;
}
public void run() {
if (this.status) {
log.warn("任务处于启动状态");
return;
}
this.status = true;
// 开启向队列中添加执行计划线程
init();
// 循环执行执行计划
while (this.status) {
// 所有执行计划执行完后,退出
if (this.taskInfos.length <= 0) {
if (this.model < 3) {
if (this.planQueueArray[0].size() == 0) {
this.status = false;
break;
}
} else {
ArrayList notEmptyIndex = getNotEmptyIndex(this.planQueueArray);
if (CollUtil.isEmpty(notEmptyIndex)) {
this.status = false;
break;
}
}
}
// 执行计划
execute();
}
int size;
// 所有线程执行完毕出循环
for (; ; ) {
size = this.count.get();
if (size == 0) {
break;
}
}
//停止线程池
this.loopExecutor.shutdownNow();
for (; ; ) {
//只有当线程池中所有线程完成任务时才会返回true,并且需要先调用线程池的shutdown方法或者shutdownNow方法。
if (this.loopExecutor.isTerminated()) {
System.out.println("执行结束!");
break;
}
}
}
private void execute() {
if (this.model < 3) {
try {
// 获取一个执行计划
Plan plan = this.planQueueArray[0].take();
// 执行计划
this.loopExecutor.execute(() -> plan.run0(this.count));
} catch (InterruptedException e) {
log.error("任务执行中发生异常", e);
}
} else {
this.loopExecutor.execute(() -> {
try {
// 获取一个执行计划
Plan plan = null;
// 获取线程id
String name = Thread.currentThread().getName();
int lastIndexOf = name.lastIndexOf("-");
int id = Integer.parseInt(name.substring(lastIndexOf + 1));
ArrayList notEmptyIndex2 = getNotEmptyIndex(this.planQueueArray);
Integer index = notEmptyIndex2.stream().filter(item -> item % this.nThrends == (id - 1)).findAny().orElse(null);
if (index == null) {
return;
}
linkedBlockingQueue plans = this.planQueueArray[index];
if (plans.size() > 0) {
plan = plans.take();
plan.run0(this.count);
}
} catch (InterruptedException e) {
log.error("任务执行中发生异常", e);
}
});
}
}
private ArrayList getNotEmptyIndex(linkedBlockingQueue[] planQueueArray) {
ArrayList indexArray = new ArrayList<>();
for (int i = 0; i < planQueueArray.length; i++) {
if (!CollUtil.isEmpty(planQueueArray[i])) {
indexArray.add(i);
}
}
return indexArray;
}
private void init() {
new Thread(() -> {
while (this.status) {
// 任务信息数组数量
int length = this.taskInfos.length;
// 执行完结束线程
if (length <= 0) {
break;
}
// 获取添加执行计划的的任务索引值
int index = getIndexOfModel(this.model, length);
TaskInfo taskInfo = null;
try {
taskInfo = this.taskInfos[index];
} catch (Exception e) {
e.printStackTrace();
}
linkedList plans = taskInfo.getPlanQueue();
if (plans.size() > 0) {
try {
if (this.model >= 3) {
int index2 = taskInfo.getId() % this.planQueueArray.length;
this.planQueueArray[index2].put(plans.removeFirst());
} else {
this.planQueueArray[0].put(plans.removeFirst());
}
} catch (InterruptedException e) {
log.error("向执行计划队列放入计划异常", e);
}
} else {
this.taskInfos = reBuildTaskInfos(this.taskInfos, index);
}
}
}).start();
}
private int getIndexOfModel(int model, int length) {
if (model == 1 || model == 3) {
return RandomUtil.randomInt(0, length) % length;
} else {
this.indexList.removeIf(item -> item >= length);
if (this.indexList.size() < this.modelSize) {
int index = RandomUtil.randomInt(0, length) % length;
this.indexList.add(index);
return index;
} else {
return this.indexList.get(RandomUtil.randomInt(0, length) % this.indexList.size());
}
}
}
private TaskInfo[] reBuildTaskInfos(TaskInfo[] taskInfos, int index) {
TaskInfo[] newTaskINfo = new TaskInfo[taskInfos.length - 1];
for (int j = 0, i = 0; i < taskInfos.length; i++) {
if (i != index) {
newTaskINfo[j] = taskInfos[i];
j++;
}
}
return newTaskINfo;
}
}
TaskInfo
import lombok.Data;
import java.util.linkedList;
@Data
public class TaskInfo {
private int id;
private String name;
private linkedList planQueue;
public TaskInfo(int id, String name, linkedList planQueue) {
this.id = id;
this.name = name;
this.planQueue = planQueue;
}
}
这里加了id属性
Plan
import java.util.concurrent.atomic.AtomicInteger;
public interface Plan {
default void before(){
}
void run();
default void after(){
}
default void run0(AtomicInteger atomicInteger) {
try{
before();
run();
}finally {
after();
atomicInteger.decrementAndGet();
}
}
}
修改完成
实现自己的计划
MyPlan
import lombok.Data;
@Data
public class MyPlan implements Plan {
private String name;
@Override
public void run() {
// if( name.startsWith("用户99")){
System.out.println(Thread.currentThread().getName() + ":" + name);
// }
}
}
Test
import java.util.linkedList;
import java.util.stream.IntStream;
public class Test {
public static void main(String[] args) {
int userSize = 100;
int jobSize = 1000;
TaskInfo[] taskInfos = new TaskInfo[userSize];
IntStream.range(0, userSize).parallel().forEach(i -> {
linkedList plans = new linkedList<>();
for (int j = 0; j < jobSize; j++) {
MyPlan myPlan = new MyPlan();
myPlan.setName("用户" + i + ",执行计划" + j);
plans.add(myPlan);
}
taskInfos[i] = new TaskInfo(i, "用户" + i, plans);
});
Scheder scheder = new Scheder(taskInfos, 3, 10, 100, 3, 2);
scheder.run();
}
}
测试结果:



