栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java线程池执行流程(源码解读)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java线程池执行流程(源码解读)

Java线程池执行流程(源码解读) 一.背景介绍

java中,如果每个请求都创建一个新线程,开销是相当大的,服务器在创建和销毁线程的过程中花费的时间和消耗的系统资源都相当大。况且除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个jvm里创建太多的线程,可能会导致系统资源不足。为了防止资源不足,服务器应用程序需要采取一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象来进行服务,这就是“池化资源”产生的原因。

二.线程池的类型

在jdk1.8之前有newSingleThreadPool、newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool四种,jdk1.8又增加了newWorkStealingPool,所以一共是5种类型。

//创建单核心的线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
//创建固定核心数的线程池,这里核心数 = 3
ExecutorService executorService = Executors.newFixedThreadPool(3);
//创建一个按照计划规定执行的线程池,这里核心数 = 2
ExecutorService executorService = Executors.newScheduledThreadPool(2);
//创建一个自动增长的线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//创建一个具有抢占式操作的线程池
ExecutorService executorService = Executors.newWorkStealingPool();

注意】在创建线程池时,推荐使用ThreadPoolExecutor的方式创建线程池,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

三.线程池参数

ThreadPoolExecutor构造方法:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

1.corePoolSize:线程池的核心线程数,即便是线程池里没有任何任务,也会有corePoolSize个线程在候着等任务。

2.maximumPoolSize:最大线程数,不管你提交多少任务,线程池里最多工作线程数就是maximumPoolSize。

3.keepAliveTime:线程的存活时间。当线程池里的线程数大于corePoolSize时,如果等了keepAliveTime时长还没有任务可执行,则线程退出。

4.unit:这个用来指定keepAliveTime的单位,比如秒:TimeUnit.SECONDS。

5.workQueue:新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:ArrayBlockingQueue、linkedBlockingQuene、SynchronousQuene、PriorityBlockingQueue。

6.threadFactory:线程工厂,用来创建线程,主要是为了给线程起名字,默认工厂的线程名字:pool-1-thread-3。

7.handler:拒绝策略,当线程池里线程被耗尽,且队列也满了的时候会调用。jdk中提供了4中拒绝策略:CallerRunsPolicy(该任务被线程池拒绝,由调用execute方法的线程执行该任务)、
AbortPolicy(直接丢弃任务,并抛出RejectedExecutionException异常)、DiscardPolicy(直接丢弃任务,什么都不做)、DiscardOldestPolicy(抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列)

四.执行流程 1.类关系图:


Executor:根接口,负责线程的使用和调度;
ExecutorService:Executor的扩展接口,增加了返回Future 对象;
AbstractExecutorService:ExecutorService的默认抽象实现类,对ExecutorService进行了简单实现,开发可以参考并重写这些方法;
ThreadPoolExecutor:线程池的实现类。

2.执行流程图:


任务提交到线程池,首先判断当前线程数量是否小于核心线程数,如果小于则创建线程来执行提交的任务,否则将任务放入工作队列,如果工作满了,则判断当前线程数量是否小于最大线程数,如果小于则创建线程执行任务,否则就会调用拒绝策略,以表示线程池拒绝接收任务。

3.execute源码解析
    public void execute(Runnable command) {
        //如果任务为null,则抛出nullPointerException异常
        if (command == null)
            throw new NullPointerException();
        //ctl中保存的线程池当前的一些状态信息
        int c = ctl.get();
        
        if (workerCountOf(c) < corePoolSize) {
            //如果addWorker方法创建新的核心线程成功,return,方法结束
            if (addWorker(command, true))
                return;
            //如果addWorker方法创建新的核心线程失败,重新获取ctl的int值(线程池状态可能已被修改)
            c = ctl.get();
        }
       
        if (isRunning(c) && workQueue.offer(command)) {
            //再次获取ctl的int值(任务入队的过程中线程池状态可能已被修改)
            int recheck = ctl.get();
            // 再次通过isRunning方法获取线程池状态,如果线程池状态不是RUNNING状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果当前线程池为空就新则创建新的非核心线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
       
        else if (!addWorker(command, false))
            reject(command);
    }
4.addWorker源码解析

addWorker(Runnable firstTask, boolean core) 方法就是向线程池添加一个带有任务的工作线程:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //每次for循环都需要获取最新的ctl值
            int c = ctl.get();
            //获取当前线程池状态
            int rs = runStateOf(c);
            //首先是检查线程池状态,检查状态是否是running,shutdown是队列是否有数据
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //获取工作线程
                int wc = workerCountOf(c);
                
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //使用CAS的方法给ctl的worker的数量加1,成功则跳出最外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //CAS不成功则重新获取ctl的值
                c = ctl.get(); 
                //如果CAS不成功的原因是状态变了则重新进行外层循环
                if (runStateOf(c) != rs)
                    continue retry;

            }
        }

        //workerStarted表示woker是否被执行
        boolean workerStarted = false;
        //workerAdded表示worker是否成功添加到workers
        boolean workerAdded = false;
        Worker w = null;
        try {
           //创建一个新的工作线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //先持有锁,再创建线程
                final ReentrantLock mainLock = this.mainLock;
                //可重入锁加锁
                mainLock.lock();
                try {
                    //获取线程池的工作状态
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                         //检查线程是否已经已启动
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        //把worker添加到workers里面
                        workers.add(w);
                        //工作线程数
                        int s = workers.size();
                        //如果工作线程数s大于largestPoolSize,设置largestPoolSize为s
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //设置workerAdded为true   
                        workerAdded = true;
                    }
                } finally {
                    //可重入锁解锁
                    mainLock.unlock();
                }
                //如果workerAdded为true,启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //如果工作线程启动失败,则删除此工作线程
            if (! workerStarted)
                addWorkerFailed(w);
        }
        //返回工作线程启动结果
        return workerStarted;
    }

以上两个方法是线程池执行过程中涉及到的两个重要方法的源码解析,线程池提交任务的方式是有两种,一种是通过execute,另一种就是通过submit;通过submit源码中可以看出也是通过execute。

    public  Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

submit方法的不同之处是对runnable的封装。二者的不同可总结为:对返回值的处理不同、对异常的处理不同,具体可自行去了解,不在此详述。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/357708.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号