public class AbstractAccountService {
@Autowired
protected ParallelExecutor parallelExecutor;
public void createBatch(QueryModel queryModel, int total, String methodName){
//多线程并发执行
int totalPage = total % 1500 == 0 ? total / 1500 : total / 1500 + 1;
Batch batch = new CounterBatch(totalPage-1);//多线程减一次
for (int pageNum = 2; pageNum <= totalPage; pageNum++) {
int beginNum = (pageNum - 1) * 1500 + 1;
int endNum = pageNum * 1500 > total ? total : pageNum * 1500;
QueryModel newQueryModel = new QueryModel();
//newQueryModel参数拼装,省略
batch.getTaskList().add(createTask(batch, beginNum, endNum, newQueryModel, methodName));
}
long startTime = System.currentTimeMillis();
//执行多线程
try {
parallelExecutor.synParallelExecute(batch);
} catch (Exception e) {
//打印输出异常
e.printStackTrace();;
}
log.info("查询流水耗时:" + (System.currentTimeMillis() - startTime) + "ms");
//查询所有子任务是否全部成功,失败则直接返回
for (Task task : batch.getTaskList()) {
if (!((QueryFlowTask) task).isSuccess()) {
log.error("流水查询失败");
}
}
}
public QueryFlowTask createTask(Batch batch, int beginNum, int endNum, QueryModel queryModel, String methodName) {
return new QueryFlowTask("AbstractAccountService",batch,beginNum,endNum,queryModel,this);
}
//通过反射,执行下面的代码逻辑
public List example(QueryModel queryModel, int beginNum, int endNum) {
List list = new ArrayList();
.
.
.
return list;
}
}
Batch
public abstract interface Batch {
public abstract int total();
public abstract List getTaskList();
public abstract void doneTask();
public abstract boolean isAllFinished();
public abstract void await() throws Exception;
}
CounterBatch
public class CounterBatch implements Batch{
private final int total;
//AtomicInteger可保证多线程中保持原子操作,适用于在多线程中的计算
private AtomicInteger finished = new AtomicInteger(0);
private List teskList;
public CounterBatch(int total) {
this.teskList = new ArrayList<>(total);
this.total = total;
}
@Override
public int total() {
return this.total;
}
@Override
public List getTaskList() {
return this.teskList;
}
@Override
public void doneTask() {
//原子性的+1
this.finished.incrementAndGet();
}
@Override
public boolean isAllFinished() {
return (this.total == this.finished.intValue());
}
@Override
public void await() throws Exception {
while (!(isAllFinished())) {
//线程休眠100毫秒
Thread.sleep(100L);
}
}
}
Task
public abstract interface Task {
public abstract void runTask();
}
TemplateTask
public abstract class TemplateTask implements Task{
private static final Logger logger = LoggerFactory.getLogger(TemplateTask.class);
protected Batch batch;
protected String taskId;
public TemplateTask(String taskId,Batch batch) {
this.taskId = taskId;
this.batch = batch;
}
protected abstract void invoke() throws Exception;
@Override
public void runTask() {
logger.info("Started Task: " + this.taskId);
try {
invoke();
} catch (Exception e) {
logger.error("Error at Task : " + this.taskId,e);
} finally {
logger.error("Finished Task : " + this.taskId);
this.batch.doneTask();
}
}
}
QueryFlowTask
public class QueryFlowTaskParallelExecutorextends TemplateTask{ private int beginNum; private int endNum; private boolean isSuccess; protected QueryModel queryModel; private AbstractAccountService abstractAccountService; private String methodName; private static final Logger log = LoggerFactory.getLogger(QueryFlowTask.class); public QueryFlowTask(String taskId, Batch batch, int beginNum, int endNum, QueryModel queryModel,String methodName,AbstractAccountService abstractAccountService) { super(taskId + "-" + (endNum % 1500 == 0 ? endNum / 1500 : endNum / 1500 + 1), batch); this.beginNum = beginNum; this.endNum = endNum; this.queryModel = queryModel; this.abstractAccountService = abstractAccountService; this.methodName = methodName; isSuccess = false; } @Override protected void invoke() throws Exception { long startTime = System.currentTimeMillis(); try { Method method = abstractAccountService.getClass().getMethod(methodName,queryModel.getClass(),int.class,int.class); method.invoke(abstractAccountService,new Object[] {queryModel,beginNum,endNum}); } catch (Exception e) { throw e; } log.info("获取数据耗时:" + (System.currentTimeMillis() - startTime) + "ms"); isSuccess = true; } public boolean isSuccess() { return isSuccess; } }
public abstract interface ParallelExecutor {
public abstract void synParallelExecute(Batch batch) throws Exception;
}
ParallerExecutorImpl
public class ParallerExecutorImpl implements ParallelExecutor{
private static final Logger logger = LoggerFactory.getLogger(ParallerExecutorImpl.class);
private ExecutorService executorService;
@Override
public void synParallelExecute(Batch batch) throws Exception {
for (Task task : batch.getTaskList()) {
this.executorService.execute(new Runnable(task) {
@Override
public void run() {
task.runTask();
}
});
}
batch.await();
if (logger.isInfoEnabled()){
logger.info("batch " + batch + " execute successfully!");
}
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
}
逻辑整理
1、首先执行AbstractAccountService.createBatch()方法,组装参数
2、然后执行多线程,执行ParallerExecutorImpl.synParallelExecute()方法,遍历参数,使用实现Runnable接口重写run方法的方式实现多线程
3、再然后执行TemplateTask.runTask()方法,在try里面执行QueryFlowTask.invoke()方法
4、通过反射,根据传入的方法名称执行对应的方法,方法里面有具体的逻辑
5、总结一下,就是组装参数,使用多线程通过反射调用方法执行
@Configuration
public class ParallelExecutorConfiguration {
private static final Logger logger = LoggerFactory.getLogger(ParallelExecutorConfiguration.class);
private final JHipsterProperties jHipsterProperties;
public ParallelExecutorConfiguration(JHipsterProperties jHipsterProperties) {
this.jHipsterProperties = jHipsterProperties;
}
@Bean
public ParallelExecutor getParallelExecutor(){
log.debug("Creating getParallelExecutor");
//有界队列
ArrayBlockingQueue queue = new ArrayBlockingQueue(jHipsterProperties.getAsync().getQueueCapacity());
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,jHipsterProperties.getAsync().getMaxPoolSize(),60, TimeUnit.SECONDS,queue);
ParallerExecutorImpl parallerExecutor = new ParallerExecutorImpl();
parallerExecutor.setExecutorService(executor);
return parallerExecutor;
}
}
总所周知,线程池有两种方法创建,一种是Executor工厂方法创建,另一种,也是最常用的一种,就是我们使用的方法,new ThreadPoolExecutor()方法自定义创建线程池。
new ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BloackingQueue workQueue)
看得出来这种创建有五个参数,分别对应:
① corePoolSize:核心线程数
② maximumPoolSize:最大线程数
③ keepAliveTime:活跃时间
④ unit:时间单位(分、秒)
⑤ workQueue:阻塞队列
由此可以看出,我们创建了一个核心线程数为10,最大线程数为,活跃时间60分钟,阻塞队列为有边界的基于数组的并发阻塞队列的线程池
补充 1、常用并发队列的介绍 2、如果你提交任务时,线程池队列已满,这时会发生什么


