栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

JDK8 ThreadPoolExecutor源码理解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JDK8 ThreadPoolExecutor源码理解

目录
  • 一、概述
  • 二、线程池流程图
    • 2.1 状态转移图
    • 2.2 线程池提交任务流程图
    • 2.3 线程池关闭流程图
    • 三、关键方法与逻辑
    • 四、 问题
    • 4.1 基础问题
      • 4.1.1 线程池构造方法七个参数是什么与含义
      • 4.1.2 阻塞队列有哪些,都有什么特点,对应的Executors提供的线程池有哪些
      • 4.1.3 拒绝策略有哪些,特点
      • 4.1.4 线程池如何关闭,各种关闭方式的区别
      • 4.1.5 线程池有哪些提交方法,区别是什么
      • 4.1.6 线程池提交任务的流程
      • 4.1.7 配置线程池时考虑哪些配置因素
      • 4.1.8 什么是、为什么要有线程池,
    • 4.2 代码层面的问题
      • 4.2.1 线程池状态含义、如何转移,在代码中如何体现
      • 4.2.2 线程池创建后如何不提交任务就预先建立线程
      • 4.2.3 线程池启动后能否更改核心线程数、空闲线程存活时间、最大线程数
      • 4.2.4 是否允许核心线程超时回收
      • 4.2.5 线程池调用shutdown/shutdownNow后阻塞队列不再接收新线程,它是如何做到的
      • 4.2.6 线程池如何做到线程重复利用
      • 4.2.7 非核心线程为何可以超时回收,依赖了什么东西
      • 4.2.8 线程池如何存储状态和线程数量
      • 4.2.9 Worker如何实现的AQS,它有什么特点
      • 4.2.10 调用awaitTermination的线程会被阻塞,如何实现
      • 4.2.11 线程中断在线程池中的作用
      • 4.2.12 如何监控线程池的状态
      • 4.2.13 线程池如何判断线程是否空闲
    • 4.3 与ThreadPoolExecutor类有关的类,组合问题
      • 4.3.1 Semaphore和ThreadPoolExecutor都可以限制最大线程数量,他俩有哪些区别
      • 4.3.2 与FutureTask如何组合使用
    • 五、 源码分析

一、概述 二、线程池流程图 2.1 状态转移图

2.2 线程池提交任务流程图

2.3 线程池关闭流程图

三、关键方法与逻辑
  • execute 提交任务,不带返回值
  • submit 提交任务,带返回值
  • addWorker 添加工作线程
  • interruptIdleWorkers 中断所有空闲线程
  • processWorkerExit 准备当前worker退出的逻辑
  • tryTerminate 尝试停止线程池
  • Worker类 继承了AQS,实现了独占机制,执行任务时会锁
四、 问题 4.1 基础问题 4.1.1 线程池构造方法七个参数是什么与含义 4.1.2 阻塞队列有哪些,都有什么特点,对应的Executors提供的线程池有哪些 4.1.3 拒绝策略有哪些,特点 4.1.4 线程池如何关闭,各种关闭方式的区别 4.1.5 线程池有哪些提交方法,区别是什么 4.1.6 线程池提交任务的流程 4.1.7 配置线程池时考虑哪些配置因素 4.1.8 什么是、为什么要有线程池, 4.2 代码层面的问题 4.2.1 线程池状态含义、如何转移,在代码中如何体现 4.2.2 线程池创建后如何不提交任务就预先建立线程 4.2.3 线程池启动后能否更改核心线程数、空闲线程存活时间、最大线程数 4.2.4 是否允许核心线程超时回收 4.2.5 线程池调用shutdown/shutdownNow后阻塞队列不再接收新线程,它是如何做到的 4.2.6 线程池如何做到线程重复利用 4.2.7 非核心线程为何可以超时回收,依赖了什么东西 4.2.8 线程池如何存储状态和线程数量 4.2.9 Worker如何实现的AQS,它有什么特点 4.2.10 调用awaitTermination的线程会被阻塞,如何实现 4.2.11 线程中断在线程池中的作用 4.2.12 如何监控线程池的状态 4.2.13 线程池如何判断线程是否空闲 4.3 与ThreadPoolExecutor类有关的类,组合问题 4.3.1 Semaphore和ThreadPoolExecutor都可以限制最大线程数量,他俩有哪些区别 4.3.2 与FutureTask如何组合使用 五、 源码分析
package java.util.concurrent;

import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.*;

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // INTEGER位数-3,用于状态左移对应位数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 和ctl求与,可以得到线程池中线程个数,也是线程池中最大线程数
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // 高三位为111,代表线程池会接收新任务,并处理阻塞队列中的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 高三位为000,代表线程池不接受新任务,会处理阻塞队列中的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 高三位为001,代表线程池不接受新任务,不处理阻塞队列中的任务,同时中断正在运行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 高三位为010,代表所有的任务已经终止
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 高三位为011,代表terminated()方法已经执行完成
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 获取高三位的值,得到运行状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取低29位的值,得到线程池中线程的个数
    private static int workerCountOf(int c)  { return c & CAPACITY; }

    
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    
    private final BlockingQueue workQueue;

    
    private final ReentrantLock mainLock = new ReentrantLock();

    
    private final HashSet workers = new HashSet();

    
    private final Condition termination = mainLock.newCondition();


    
    private int largestPoolSize;

    
    private long completedTaskCount;

    
    private volatile ThreadFactory threadFactory;

    
    private volatile RejectedExecutionHandler handler;

    
    private volatile long keepAliveTime;

    
    private volatile boolean allowCoreThreadTimeOut;

    
    private volatile int corePoolSize;
    
    private volatile int maximumPoolSize;

    
    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    private static final RuntimePermission shutdownPerm =
            new RuntimePermission("modifyThread");
    
    private final AccessControlContext acc;

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    {
        
        final Thread thread;
        
        Runnable firstTask;
        
        volatile long completedTasks;

        
        Worker(Runnable firstTask) {
            // 设置state值为-1
            setState(-1);
            this.firstTask = firstTask;
            // 相当于new Thread传的Runnable参数为当前Worker对象,因此线程start的时候,调用的是Worker里的run方法
            this.thread = getThreadFactory().newThread(this);
        }

        
        public void run() {
            runWorker(this);
        }

        
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        
        void interruptIfStarted() {
            Thread t;
            
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }


    
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 以下三种情况不进行终止
            if (isRunning(c) || // 处在RUNNING状态
                    runStateAtLeast(c, TIDYING) || // 处在TIDYING或TERMINATED说明已经终止过了,不需要再终止
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 处于SHUTDOWN状态并且工作队列不为空(调用shutdown方法可能处于这种状态)
                return;
            // 状态为SHUTDOWN并且工作队列为空 或者 状态为STOP  才会进到这块代码
            // 如果此时线程池还有线程(正在运行的和正在等待任务的)
            // 调用shutdownNow后,再getTask里还没调整workerCount可能会进入此方法
            if (workerCountOf(c) != 0) {
                // 中断workers集合中的空闲任务,只中断一个。在getTask中线程被唤醒后会判断中断状态,如果中断会返回null,进入销毁worker的流程
                // 这里我理解只中断一个是为了提前销毁一个worker
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            // (状态为SHUTDOWN并且工作队列为空 或者 状态为STOP ) 并且 正在运行的worker也没有了 开始terminated
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // cas尝试将 状态变为TIDYING,workerCount变为0 。如果失败了,会循环重新进入此代码块
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 执行线程池terminated后的动作,需要子类实现
                        terminated();
                    } finally {
                        // 最终变为TERMINATED和workerCount为0
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 唤醒调用等待线程终止的线程,也就是因调用awaitTermination()而被阻塞的线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

    
    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }

    
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

    
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 如果线程没被中断,并且worker能获取到锁(说明当前worker是空闲的--runWorker方法)
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    
    private static final boolean ONLY_ONE = true;

    
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

    
    void onShutdown() {
    }

    
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

    // 清空阻塞队列,返回阻塞队列的任务列表
    private List drainQueue() {
        BlockingQueue q = workQueue;
        ArrayList taskList = new ArrayList();
        // 清空阻塞队列,返回阻塞队列的任务列表
        q.drainTo(taskList);
        // 要是清完还有,循环清(什么情况下发生?)
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }


    
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 获取线程池信息
            int c = ctl.get();
            // 获取线程池运行状态
            int rs = runStateOf(c);
            // 如果线程池状态为STOP/TIDYING/TERMINATED 或者 线程池状态为SHUTDOWN&&(新来的Runnable不为NULL 或者 阻塞队列为空)
//            boolean judge =  rs > SHUTDOWN || rs == SHUTDOWN && (firstTask != null || workQueue.isEmpty());
            // 这种情况直接返回添加失败
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 获取线程池中线程数量
                int wc = workerCountOf(c);
                // 如果线程池中线程数量已经到达最大值 或者 已经到达对应模式下线程的最大值,直接返回false
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // cas设置线程数+1成功,则退出两层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果cas设置失败了,重新获取线程池信息
                c = ctl.get();
                // 如果运行状态变了 todo
                if (runStateOf(c) != rs)
                    // 进入外层循环,重新获取线程池状态
                    continue retry;
                // 失败是因为 线程数变更了,不是运行状态变,所以继续内层循环
            }
        }
        // 进到这块,线程池信息中线程数已经+1

        // 标记 worker还没开始
        boolean workerStarted = false;
        // 标记 worker还没加进去
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 当获取锁的时候,重新检查。线程工厂创建失败或者在获取锁前就关闭线程池时退出
                    // 获取线程池运行状态
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||  // 如果是RUNNING
                            (rs == SHUTDOWN && firstTask == null)) {  // 如果是SHUTDOWN 并且 firstTask为null (我估计在某处处理的)
                        // 判断t是否存活--目前我理解是否已经启动。这个线程正常情况下不应该启动
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // workers集合添加当前线程
                        workers.add(w);
                        int s = workers.size();
                        // 如果现在已经比最大线程数大了,就更新
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 标记worker已经添加到集合里了
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果worker已经添加到集合里了,则启动worker里的线程,并标记worker已启动
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果worker启动失败,则将其移除worker集合
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回worker是否启动成功
        return workerStarted;
    }

    
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            // workerCount减一
            decrementWorkerCount();
            // 尝试停止线程池(SHUTDOWN和STOP状态才肯能停止)
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果不是超时回收而是异常导致的,则将worker数减一
        // 如果是超时导致的,则在getTask里面已经减一过了,在这里不需要再减一
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 更新线程池已经完成的线程数量
            completedTaskCount += w.completedTasks;
            // 移除Workers里的当前worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 尝试停止线程池
        tryTerminate();

        int c = ctl.get();
        // 如果当前线程池的状态小于STOPPING
        if (runStateLessThan(c, STOP)) {
            // 如果是超时退出的worker
            if (!completedAbruptly) {
                // 计算线程池允许worker数的最小值
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果最小值是0并且阻塞队列中有worker
                if (min == 0 && ! workQueue.isEmpty())
                    // 设置最小值为1
                    min = 1;
                // 如果当前工作worker数大于等于最小值,直接退出
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 添加一个无任务、非核心的worker          ----------跟添加核心的有啥区别吗
            addWorker(null, false);
        }
    }

    
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            // 运行时状态
            int rs = runStateOf(c);

            // 如果线程池状态为STOP及之后  或者 状态为SHUTDOWN且等待队列为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // ctl的Workers数为0
                decrementWorkerCount();
                return null;
            }
            // 获取Workers的数量
            int wc = workerCountOf(c);

            // 该worker是否受到超时时间影响。 如果允许核心线程超时 或者 当前线程数大于核心线程数,则当前Worker超时获取不到任务则会死亡
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 如果(当前worker的数量大于最大线程数(怎么可能) 或者 超时了)// 并且 (worker数量大于1 或者 阻塞队列为空)
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                // workCount减一,返回null
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            // wc<=maximumPoolSize && (!timed || ! timedOut ) || (wc<=1 && !workQueue.isEmpty())
            try {
                // 如果受超时时间影响,则poll指定之间,否则一直take阻塞
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                // 如果得到了任务,不为null,则直接返回
                if (r != null)
                    return r;
                // 设置已超时,在下一轮返回null
                timedOut = true;
            } catch (InterruptedException retry) {
                // 如果从阻塞队列中获取任务的过程中遇到了中断异常,则设置超时为false重新尝试(setKeepAliveTime后发现设置的值比之前小,就要触发中断)
                timedOut = false;
            }
        }
    }

    
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 先释放锁资源,没锁也可以调用,可中断             -----这样处理的原因
        w.unlock();
        // 判断当前worker是否是正常超时回收的
        boolean completedAbruptly = true;
        try {
            // 线程复用的原因:无限循环中从阻塞队列获取任务,没有则阻塞。 如果获取到的任务为null,则当前线程退出循环并死亡
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 如果池正在停止,确保线程被中断;如果不是,确保线程不被中断。 这需要在第二种情况下重新检查以在清除中断时处理 shutdownNow 竞争
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted()) {
                    // 当前状态在STOPPING之后,并且线程中断了,将中断标记清空后 才会进入此代码块
                    wt.interrupt();
                }

                try {
                    // 任务执行的前置操作
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 清空当前任务、当前worker完成的任务数加一,当前Worker解锁
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 标记是正常结束的Worker
            completedAbruptly = false;
        } finally {

            processWorkerExit(w, completedAbruptly);
        }
    }


    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }


    
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 工作线程数小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 创建核心线程,并启动线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        // 工作线程数等于核心线程数

        // 处在RUNNING状态 并且 添加到队列成功(不会阻塞)          ----工作线程数等于核心线程数,且阻塞队列没满
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果不再RUNNING状态 并且 移除队列成功
            if (! isRunning(recheck) && remove(command))
                // 执行拒绝策略
                reject(command);
            // 如果 (是RUNNING状态 或者 移除队列失败) 并且   工作线程数为0
            else if (workerCountOf(recheck) == 0)
                // 添加一个非核心但任务为null的Worker
                addWorker(null, false);
        }
        // 如果 (不在RUNNING状态 或者 添加队列失败)并且 添加非核心线程失败      ----工作线程数大于等于核心线程数,且阻塞队列满了
        else if (!addWorker(command, false))
            // 执行拒绝策略   ----工作线程数等于最大线程数,且阻塞队列满了
            reject(command);
    }

    
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 将状态变为SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中断空闲线程
            interruptIdleWorkers();
            // 关闭后动作
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试关闭线程池
        tryTerminate();
    }


    
    public List shutdownNow() {
        List tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查关闭权限
            checkShutdownAccess();
            // 将当前状态转变为STOP
            advanceRunState(STOP);
            // 中断所有已经启动的线程
            interruptWorkers();
            // 清空阻塞队列的任务,得到任务列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 尝试关闭
        tryTerminate();
        return tasks;
    }

    
    public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }

    
    public boolean isTerminating() {
        int c = ctl.get();
        return ! isRunning(c) && runStateLessThan(c, TERMINATED);
    }

    
    public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }

    
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

    
    protected void finalize() {
        SecurityManager sm = System.getSecurityManager();
        if (sm == null || acc == null) {
            shutdown();
        } else {
            PrivilegedAction pa = () -> { shutdown(); return null; };
            AccessController.doPrivileged(pa, acc);
        }
    }


    
    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }

    
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        // 如果当前线程数比核心线程数大,则中断所有空闲线程
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            // 如果新的核心线程数比原来设置的核心线程数大
            // 获取差值和工作队列大小的较小值
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }


    
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
                addWorker(null, true);
    }

    
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

    
    public boolean allowsCoreThreadTimeOut() {
        return allowCoreThreadTimeOut;
    }

    
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            // 如果允许空闲后自动回收,则中断所有空闲线程
            if (value)
                interruptIdleWorkers();
        }
    }

    
    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        // 如果新的值比之前的值小,则中断所有空闲线程
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            interruptIdleWorkers();
    }

    
    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    
    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        // 如果更改后的空闲线程存活时间比之前小,则中断所有空闲线程(worker在获取task时就设置好了从阻塞队列的等待时间,所以要取消获取重新设置)
        if (delta < 0)
            interruptIdleWorkers();
    }

    
    public long getKeepAliveTime(TimeUnit unit) {
        return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
    }

    
    public BlockingQueue getQueue() {
        return workQueue;
    }

    
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        // 只有SHUTDOWN且阻塞队列为空和STOP状态才会结束线程池
        tryTerminate();
        return removed;
    }

    
    public void purge() {
        final BlockingQueue q = workQueue;
        try {
            Iterator it = q.iterator();
            while (it.hasNext()) {
                Runnable r = it.next();
                if (r instanceof Future && ((Future)r).isCancelled())
                    it.remove();
            }
        } catch (ConcurrentModificationException fallThrough) {
            // Take slow path if we encounter interference during traversal.
            // Make copy for traversal and call remove for cancelled entries.
            // The slow path is more likely to be O(N*N).
            for (Object r : q.toArray())
                if (r instanceof Future && ((Future)r).isCancelled())
                    q.remove(r);
        }

        tryTerminate(); // In case SHUTDOWN and now empty
    }


    
    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                    : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

    
    public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

    
    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

    
    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            }
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }

    
    public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

    public String toString() {
        long ncompleted;
        int nworkers, nactive;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            ncompleted = completedTaskCount;
            nactive = 0;
            nworkers = workers.size();
            for (Worker w : workers) {
                ncompleted += w.completedTasks;
                if (w.isLocked())
                    ++nactive;
            }
        } finally {
            mainLock.unlock();
        }
        int c = ctl.get();
        String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
                (runStateAtLeast(c, TERMINATED) ? "Terminated" :
                        "Shutting down"));
        return super.toString() +
                "[" + rs +
                ", pool size = " + nworkers +
                ", active threads = " + nactive +
                ", queued tasks = " + workQueue.size() +
                ", completed tasks = " + ncompleted +
                "]";
    }

    
    protected void beforeExecute(Thread t, Runnable r) { }

    
    protected void afterExecute(Runnable r, Throwable t) { }

    
    protected void terminated() { }

    
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

    
    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
        }
    }

    
    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

    
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/888617.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号