栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

项目中怎么使用线程池、多线程和反射(查询流水为例)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

项目中怎么使用线程池、多线程和反射(查询流水为例)

AbstractAccountService
public class AbstractAccountService {

    @Autowired
    protected ParallelExecutor parallelExecutor;

    public void createBatch(QueryModel queryModel, int total, String methodName){
        //多线程并发执行
        int totalPage = total % 1500 == 0 ? total / 1500 : total / 1500 + 1;
        Batch batch = new CounterBatch(totalPage-1);//多线程减一次
        for (int pageNum = 2; pageNum <= totalPage; pageNum++) {
            int beginNum = (pageNum - 1) * 1500 + 1;
            int endNum = pageNum * 1500 > total ? total : pageNum * 1500;
            QueryModel newQueryModel = new QueryModel();
            //newQueryModel参数拼装,省略
            batch.getTaskList().add(createTask(batch, beginNum, endNum, newQueryModel, methodName));
        }
        long startTime = System.currentTimeMillis();
        //执行多线程
        try {
            parallelExecutor.synParallelExecute(batch);
        } catch (Exception e) {
            //打印输出异常
            e.printStackTrace();;
        }
        log.info("查询流水耗时:" + (System.currentTimeMillis() - startTime) + "ms");
        //查询所有子任务是否全部成功,失败则直接返回
        for (Task task : batch.getTaskList()) {
            if (!((QueryFlowTask) task).isSuccess()) {
                log.error("流水查询失败");
            }
        }
    }

    public QueryFlowTask createTask(Batch batch, int beginNum, int endNum, QueryModel queryModel, String methodName) {
        return new QueryFlowTask("AbstractAccountService",batch,beginNum,endNum,queryModel,this);
    }

    //通过反射,执行下面的代码逻辑
    public List example(QueryModel queryModel, int beginNum, int endNum) {
        List list = new ArrayList();
        .
        .
        .
        return list;
    }

}
Batch
public abstract interface Batch {
    public abstract int total();

    public abstract List getTaskList();

    public abstract void doneTask();

    public abstract boolean isAllFinished();

    public abstract void await() throws Exception;
}
CounterBatch
public class CounterBatch implements Batch{

    private final int total;
    //AtomicInteger可保证多线程中保持原子操作,适用于在多线程中的计算
    private AtomicInteger finished = new AtomicInteger(0);
    private List teskList;

    public CounterBatch(int total) {
        this.teskList = new ArrayList<>(total);
        this.total = total;
    }

    @Override
    public int total() {
        return this.total;
    }

    @Override
    public List getTaskList() {
        return this.teskList;
    }

    @Override
    public void doneTask() {
        //原子性的+1
        this.finished.incrementAndGet();
    }

    @Override
    public boolean isAllFinished() {
        return (this.total == this.finished.intValue());
    }

    @Override
    public void await() throws Exception {
        while (!(isAllFinished())) {
            //线程休眠100毫秒
            Thread.sleep(100L);
        }
    }
}
Task
public abstract interface Task {
    public abstract void runTask();
}
TemplateTask
public abstract class TemplateTask implements Task{

    private static final Logger logger = LoggerFactory.getLogger(TemplateTask.class);
    protected Batch batch;
    protected String taskId;

    public TemplateTask(String taskId,Batch batch) {
        this.taskId = taskId;
        this.batch = batch;
    }

    protected abstract void invoke() throws Exception;

    @Override
    public void runTask() {
        logger.info("Started Task: " + this.taskId);
        try {
            invoke();
        } catch (Exception e) {
            logger.error("Error at Task : " + this.taskId,e);
        } finally {
            logger.error("Finished Task : " + this.taskId);
            this.batch.doneTask();
        }

    }
}
QueryFlowTask
public class QueryFlowTask extends TemplateTask{

    private int beginNum;

    private int endNum;

    private boolean isSuccess;

    protected QueryModel queryModel;

    private AbstractAccountService abstractAccountService;

    private String methodName;

    private static final Logger log = LoggerFactory.getLogger(QueryFlowTask.class);


    public QueryFlowTask(String taskId, Batch batch, int beginNum, int endNum, QueryModel queryModel,String methodName,AbstractAccountService abstractAccountService) {
        super(taskId + "-" + (endNum % 1500 == 0 ? endNum / 1500 : endNum / 1500 + 1), batch);
        this.beginNum = beginNum;
        this.endNum = endNum;
        this.queryModel = queryModel;
        this.abstractAccountService = abstractAccountService;
        this.methodName = methodName;
        isSuccess = false;
    }

    @Override
    protected void invoke() throws Exception {
        long startTime = System.currentTimeMillis();
        try {
            Method method = abstractAccountService.getClass().getMethod(methodName,queryModel.getClass(),int.class,int.class);
            method.invoke(abstractAccountService,new Object[] {queryModel,beginNum,endNum});
        } catch (Exception e) {
            throw e;
        }
        log.info("获取数据耗时:" + (System.currentTimeMillis() - startTime) + "ms");
        isSuccess = true;
    }

    public boolean isSuccess() {
        return isSuccess;
    }
}
ParallelExecutor
public abstract interface ParallelExecutor {
    public abstract void synParallelExecute(Batch batch) throws Exception;
}

ParallerExecutorImpl
public class ParallerExecutorImpl implements ParallelExecutor{

    private static final Logger logger = LoggerFactory.getLogger(ParallerExecutorImpl.class);
    private ExecutorService executorService;

    @Override
    public void synParallelExecute(Batch batch) throws Exception {
        for (Task task : batch.getTaskList()) {
            this.executorService.execute(new Runnable(task) {
                @Override
                public void run() {
                    task.runTask();
                }
            });
        }
        batch.await();

        if (logger.isInfoEnabled()){
            logger.info("batch " + batch + " execute successfully!");
        }
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }
}
逻辑整理

1、首先执行AbstractAccountService.createBatch()方法,组装参数
2、然后执行多线程,执行ParallerExecutorImpl.synParallelExecute()方法,遍历参数,使用实现Runnable接口重写run方法的方式实现多线程
3、再然后执行TemplateTask.runTask()方法,在try里面执行QueryFlowTask.invoke()方法
4、通过反射,根据传入的方法名称执行对应的方法,方法里面有具体的逻辑
5、总结一下,就是组装参数,使用多线程通过反射调用方法执行

线程池 ParallelExecutorConfiguration
@Configuration
public class ParallelExecutorConfiguration {

    private static final Logger logger = LoggerFactory.getLogger(ParallelExecutorConfiguration.class);

    private final JHipsterProperties jHipsterProperties;

    public ParallelExecutorConfiguration(JHipsterProperties jHipsterProperties) {
        this.jHipsterProperties = jHipsterProperties;
    }

    @Bean
    public ParallelExecutor getParallelExecutor(){
        log.debug("Creating getParallelExecutor");
        //有界队列
        ArrayBlockingQueue queue = new ArrayBlockingQueue(jHipsterProperties.getAsync().getQueueCapacity());
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,jHipsterProperties.getAsync().getMaxPoolSize(),60, TimeUnit.SECONDS,queue);
        ParallerExecutorImpl parallerExecutor = new ParallerExecutorImpl();
        parallerExecutor.setExecutorService(executor);
        return parallerExecutor;
    }
}

总所周知,线程池有两种方法创建,一种是Executor工厂方法创建,另一种,也是最常用的一种,就是我们使用的方法,new ThreadPoolExecutor()方法自定义创建线程池。

new ThreadPoolExecutor(
	int corePoolSize, 
	int maximumPoolSize, 
	long keepAliveTime, 
	TimeUnit unit, 
	BloackingQueue workQueue)

看得出来这种创建有五个参数,分别对应:
① corePoolSize:核心线程数
② maximumPoolSize:最大线程数
③ keepAliveTime:活跃时间
④ unit:时间单位(分、秒)
⑤ workQueue:阻塞队列

由此可以看出,我们创建了一个核心线程数为10,最大线程数为,活跃时间60分钟,阻塞队列为有边界的基于数组的并发阻塞队列的线程池

补充 1、常用并发队列的介绍 2、如果你提交任务时,线程池队列已满,这时会发生什么
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/704910.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号