栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java 线程池体系 - ThreadPoolExecutor

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java 线程池体系 - ThreadPoolExecutor

ThreadPoolExecutor 线程池分析 一 - 核心原理图解

形象图

此处引用马士兵教育图解

设计原理图

二 - 原理推测

    首先应该拥有一个创建线程的工厂 ThreadFactory

    其次应该拥有一个存放线程的线程"容器"

    然后应该拥有一个存放任务的容器,有任务就放入等待执行的任务容器中(任务队列taskQueue)

    最后应该定义长期存在线程池中处理任务的"长期工(甲方-长期占有资源)“与任务量大,处理不完添加的"临时工(外包-退场退还资源)”

    考虑在多余任务执行完毕之后"临时工"退回资源,而又增加大量任务再次创建"临时工"的问题,增加keepalive (保持)时间,时间单位为TimeUnit

    如果任务太多了,任务队列放不下了怎么办----> ThreadPoclicy

三 - 部分源码分析 1. ThreadPoolExecutor()→→→→构造函数
public class ThreadPoolExecutor extends AbstractExecutorService {
   
    public ThreadPoolExecutor(int corePoolSize, //核心线程数(长期工)
                              int maximumPoolSize, //最大线程数(长期工+临时工)
                              long keepAliveTime, //临时附加线程任务执行完待销毁时间/保留时间
                              TimeUnit unit, //保留时间的时间单位
                              BlockingQueue workQueue //阻塞队列存放待执行任务 
                             ) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(),//默认线程工厂
             defaultHandler//默认拒绝策略(当任务最大线程数满载,无线程空闲,且待执行任务队列满载时,拒绝策略)
            );
    }

   
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime,TimeUnit unit,
                              BlockingQueue workQueue,ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    }

   
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                              BlockingQueue workQueue,RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
    }

    
    public ThreadPoolExecutor(int corePoolSize,//核心线程数(长期工)
                              int maximumPoolSize,//最大线程数(长期工+临时工)
                              long keepAliveTime,//临时附加线程任务执行完待销毁时间/保留时间
                              TimeUnit unit,//保留时间的时间单位
                              BlockingQueue workQueue,//阻塞队列存放待执行任务
                              ThreadFactory threadFactory,//线程工厂
                              RejectedExecutionHandler handler //拒绝策略) {
        //常规入参参数校验                      
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        //判断是否存在安全策略                      
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        //常规参数赋值
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}
    
1.2 DefaultThreadFactory() →→→→默认线程工厂
public class ThreadPoolExecutor extends AbstractExecutorService { 
	static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            //获取系统当前安全管理器
            SecurityManager s = System.getSecurityManager();
            //组别 = 安全管理器 != null ? 则取安全管理器的组 : 当前线程的线程组
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            //定义名字前缀
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        
        public Thread newThread(Runnable r) {
            //创建线程
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            //测试线程 t 是否为守护线程
            if (t.isDaemon()){
                //如果是,则设置其为否(将守护线程变为普通线程)
                t.setDaemon(false);
            }
            //如果线程的优先级不是默认优先级"5",则设置为默认优先级"5"
            if (t.getPriority() != Thread.NORM_PRIORITY){
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}
3. RejectedExecutionHandler→→→→拒绝策略
public interface RejectedExecutionHandler {
	
    
    
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
策略一 : 抛出异常 RejectedExecutionException (线程池默认处理方式)
public class ThreadPoolExecutor extends AbstractExecutorService {
    public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }

    	
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
        }
    }
}
策略二 : 交由主线程(调用方)执行,如果线程池还在运行状态下,则将本次"任务"请求交由主线程执行
public class ThreadPoolExecutor extends AbstractExecutorService {
	public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //如果当前线程池没有关闭
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    
    
    public boolean isShutdown() {
        //判断是否是执行状态并取反,即判断是否是非执行状态
        return ! isRunning(ctl.get());
    }
    
    
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
}
策略三 : 丢弃策略 , 丢弃待执行任务队列第一个(最老的),重新执行 execute() 方法,将新请求来的"任务"放入待执行任务队列中,
public class ThreadPoolExecutor extends AbstractExecutorService {
	public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        
        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //如果当前线程池没有关闭
            if (!e.isShutdown()) {
                //任务队列中丢弃第一个
                e.getQueue().poll();
                //放入待执行任务队列中
                e.execute(r);
            }
        }
    }
}
策略四 : 忽略策略,直接丢弃对应任务,对此次请求忽略不计
public class ThreadPoolExecutor extends AbstractExecutorService {
 	public static class DiscardPolicy implements RejectedExecutionHandler {
   
        public DiscardPolicy() { }

        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
}

4. 线程池状态分析

RUNNING ---------------- 运行态 : 接收新任务,和进程队列任务
SHUTDOWN ------------- 关闭态 : 不接受新任务,但是接收进程队列中的任务
STOP ----------------------- 停止态 : 不接受新任务,也不接受进程队列任务,并且打断正在执行的任务
TIDYING ------------------- 整理态 : 所有任务执行结束,待处理任务为 0 ,线程转换为TIDYING态,将会执行terminated 钩子函数
TERMINATED ------------ 结束态 : terminated() 执行完成



public class ThreadPoolExecutor extends AbstractExecutorService {
    
    
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    private static final int COUNT_BITS = Integer.SIZE - 3;
    
    // 1 左移29位  0010 0000 0000 0000 0000 0000 0000 0000 (减一) 0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池使用 32位高3位作为 状态标志 (最高位为符号位)
    // -1左移29位  1110 0000 0000 0000 0000 0000 0000 0000  运行态   ----> 只有一个负数,表示运行态
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 0 左移29位  0000 0000 0000 0000 0000 0000 0000 0000  关闭态
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 1 左移29位  0010 0000 0000 0000 0000 0000 0000 0000  暂停态
    private static final int STOP       =  1 << COUNT_BITS;
    // 2 左移29位  0100 0000 0000 0000 0000 0000 0000 0000  休整态   ----> 过度状态
    private static final int TIDYING    =  2 << COUNT_BITS; 
    // 3 左移29位  0110 0000 0000 0000 0000 0000 0000 0000  结束态
    private static final int TERMINATED =  3 << COUNT_BITS;

    //求运行状态 二进制运算 ~ 取反 
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //求"工人"人数 二进制 与运算 C& 0001 1111 1111 1111 1111 1111 1111 1111
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //二进制 或运算 
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
}
5. Worker() →→→→线程池 - 线程类 (“工人”)
 public class ThreadPoolExecutor extends AbstractExecutorService {
	private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

        private static final long serialVersionUID = 6138294804551838833L;

        final Thread thread; //声明默认的线程---工具
      
        Runnable firstTask; //工人将要完成,进行的任务
      
      	volatile long completedTasks; //

        Worker(Runnable firstTask) { //利用构造方法完成工人初始化
            setState(-1); // 代表 新工人刚刚创建,还没有干活
            this.firstTask = firstTask; //将任务给予工人
            this.thread = getThreadFactory().newThread(this); //给工人创建工具
        }

        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 状态0代表无锁,状态1代表有锁
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        //打断工作的线程
        void interruptIfStarted() {
            Thread t;
            //如果线程状态为正在运行,并且线程存在,且不是打断状态,那么打断正在工作的线程
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
 }
6. addWorker()→→→→添加新的工作线程
public class ThreadPoolExecutor extends AbstractExecutorService {
    
    
	private boolean addWorker(Runnable firstTask, boolean core) {
        //============================第一步 (1):检查状态,如果没问题,工人计数器+1 ===========================================
        //标记(跳出内循环时使用)
        retry:
        for (;;) {
            //检查线程池状态(快照的形式)
            int c = ctl.get();
            int rs = runStateOf(c);

            // 仅在必要时检查队列是否为空 -------如果不满足添加新线程要求,则直接退出
            if (rs >= SHUTDOWN && //线程池不是运行状态
                ! (rs == SHUTDOWN //线程池不再接受外部任务(下1),可以处理内部任务(下2)
                   && firstTask == null  //(1)是否是新提交的任务, (入参)null为不是新提交的任务 ,则该函数是为了补充线程
                   && ! workQueue.isEmpty() //(2)任务队列是否存在任务 , 如果是补充线程,那么任务队列应该有任务,否则方法结束
                  )
               ){
                return false;
            }

            //判断状态,如果满足并且对线程数变量C做加1
            for (;;) {
                //获取当前工作线程数
                int wc = workerCountOf(c);
                
                //如果工作线程数大于等于可容纳的最大线程数
                if (wc >= CAPACITY ||
                    //如果当前工作线程数 >= (是否是核心线程?线程池核心线程数:线程池最大线程数)
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    //如果当前线程数已经达到无法补充线程的情况,则方法结束
                    return false;
                
                //如果满足上面条件
                if (compareAndIncrementWorkerCount(c)){//CAS (乐观锁似的比较并且增加)的增加工作线程数
                    //如果线程添加成功,则跳出内循环到程序retry处继续开始执行
                    break retry;
                }
                c = ctl.get();  // 由于进行了上一个if操作compareAndIncrementWorker,值发生变化,所以重新获取
                if (runStateOf(c) != rs)
                    continue retry;
                //再次获取线程池状态,防止其他操作,CAS判断,否则进行无用功,跳到标记点继续循环"等待"添加
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        
         //============================第二步 (2):添加工人,并使工人执行任务===========================================
        try {
            w = new Worker(firstTask); //-------------------------(2.1)创建工人
            final Thread t = w.thread;
            if (t != null) {
                //添加"主"锁,防止多线程问题
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get()); //查看线程池状态

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) //如果 工人的线程t已经被启动了,则抛出异常,线程异常
                            throw new IllegalThreadStateException();
                        
                        workers.add(w);  //-------------------------(2.2)添加工人
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); //-----------------------------------(3)工人干活
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // ============================= 第三步(3) 方法结束,返回状态  ======================================
        return workerStarted;
    }
}
7. execute() →→→→ 执行"任务"方法

执行总共分为三步 :

    (核心线程数未满载) 判断当前线程数量是否超过核心线程数,如果没有则为新"任务"创建新的核心线程,并且开始执行传入的任务(核心线程数满载,工作任务队列未满载) 如果核心线程数已经满载, 则放入任务队列中再次判断线程池状态(防止多线程并发或外部等其他情况使线程池暂停或关闭线程池等操作)如果线程池已经不在运行状态,则 remove() 当前任务 ,并执行拒绝策略再者如果允许核心线程释放资源等操作造成某一时刻核心线程数为0(最大线程数未满载)如果没有到达最大线程数,则创建临时线程来完成工作都不符合,则直接执行拒绝策略
public class ThreadPoolExecutor extends AbstractExecutorService {
	public void execute(Runnable command) {
        if (command == null) { throw new NullPointerException(); }
     	
        int c = ctl.get(); //获取线程池当前状态
     	
     	//================第一步(1) 核心线程数未满载 ==============================================
        if (workerCountOf(c) < corePoolSize) { //如果当前线程数 < 核心线程数
            
            if (addWorker(command, true)){ //添加新的工作线程,并且标记为核心线程
                return;
            }
            
            c = ctl.get(); //获取线程池状态 (考虑多线程问题,如果多线程操作,添加核心线程存在失败)
        }
     	
     	//================第一步(2) 核心线程数满载,工作任务队列未满载 =================================
        if (isRunning(c) && workQueue.offer(command)) { //判断当前状态是否是运行状态 , 并且可以将新任务放入任务队列中
            
            int recheck = ctl.get(); //检查线程池状态
            
            if (! isRunning(recheck) && remove(command)){ //如果当前线程池不是运行状态,并且可以成功的删除掉新放入队列中的任务
                reject(command); //那么执行拒绝策略 (由于线程池关闭)
            }else if (workerCountOf(recheck) == 0){//检查"工人"的数量,如果没有"工人"了 
	                addWorker(null, false); //则添加非核心线程来完成当前任务
            }
        }
     	//===============第三步(3) 拒绝策略========================================================
        else if (!addWorker(command, false)) //如果核心线程数已经满载,最大线程数未满,则创建临时线程完成任务
            reject(command); //如果都不满足,则执行拒绝策略 (由于队列满载)
    }
8. runWorker()→→→→ 任务的真正执行,“工人”,执行任务
final void runWorker(Worker w) {
    
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        try {
            //如果没有传入任务,那么就去取队列中的任务执行,否则进行异常操作
            while (task != null || (task = getTask()) != null) { //while 是防止 getTask()方法阻塞
                w.lock();

                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted()){
                    //(1)如果线程池状态停止,则中断当前线程
                    //(2)调用interrupted()方法清楚中断状态用以判断当前线程是否已经中断,如果返回false已经中断,并且线程池停止,则中断线程(防止其他原因造成线程中断)
                    //(3)isInterrupted()判断之前是否被中断,如果之前被中断了,则不需要再次中断
                    wt.interrupt();
                }
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException |Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); //如果afterExecute()存在异常,不会被捕获,没有进行处理,当前线程不会结束
                    }
                } finally {
                    task = null; //循环退出条件
                    w.completedTasks++; //当前已完成任务加一
                    w.unlock(); //此块代码执行完毕,解除锁定
                }
            }
            completedAbruptly = false; //beforeExecute()与afterExecute()发生异常时此代码无法被执行
        } finally {
            processWorkerExit(w, completedAbruptly);  //进行"工人的清理"
        }
    }
}
9. processWorkerExit()→→→→工人的清理
 public class ThreadPoolExecutor extends AbstractExecutorService {
	private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //==================================步骤一(1)"辞退"当前工人====================================
        if (completedAbruptly) //如果当前工人工作突然完成,则工作工人数减一
            decrementWorkerCount();

        mainLock.lock(); //保证原子性
        try {
            completedTaskCount += w.completedTasks; //已完成的任务加一
            workers.remove(w); //将当前工人,从工作工人HashSet中移除
        } finally {
            mainLock.unlock();
        }

        tryTerminate();
		// =================================步骤二(2) 保证线程池最少要有一个线程===========================
        int c = ctl.get();  //判断当前线程池状态
        if (runStateLessThan(c, STOP)) { //如果线程池在RUNNING与SHUTDOWN状态(可以消化任务队列中的任务)
            if (!completedAbruptly) { //如果不是突然完成
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //线程池有没有设置超时,设置了则最小工作线程数为0,否则即为核心线程数
                if (min == 0 && ! workQueue.isEmpty()) //如果设置了超时时间,并且任务队列还有任务
                    min = 1; //那么最少保留一个工作线程,处理未完成的工作
                if (workerCountOf(c) >= min) //如果工作线程数 >= 预留工作线程最小数,则不需要进行补充线程
                    return;
            }
            addWorker(null, false);  //补充线程
        }
    }
 }
10. shutdown() →→→→ 线程池状态转变为SHOUTDOWN,线程池停止接受外部处理任务请求
 public class ThreadPoolExecutor extends AbstractExecutorService {
    public void shutdown() {
        mainLock.lock();
        try {
            checkShutdownAccess(); //检查权限(调用方是否有权停止线程池)
            advanceRunState(SHUTDOWN); //更改线程池状态
            interruptIdleWorkers(); //打断闲置的线程
            onShutdown(); //shutdown时执行运行状态转换后的进一步清理 ScheduledThreadPoolExecutor 的钩子函数
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
 }
void onShutdown() {} //在ThreadPoolExecutor中无操作,在ScheduledThreadPoolExecutor中复写,用于处理延时的任务
11. shutdownNow() →→→→ 线程池状态转变为STOP,线程池停止接受外部处理任务请求
 public class ThreadPoolExecutor extends AbstractExecutorService {
	public List shutdownNow() {
        List tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//检查权限(调用方是否有权停止线程池)
            advanceRunState(STOP);//更改线程池状态
            interruptWorkers();//打断全部线程
            tasks = drainQueue(); //将尚未执行的任务,从待执行任务队列中取出,返回给用户
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
 }
12. interruptIdleWorkers() →→→→ 根据入参不同,选择打断全部闲置线程或者仅仅打断一个闲置线程
 public class ThreadPoolExecutor extends AbstractExecutorService {
	private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) { //如果当前工人的线程没有被打断,并且持有锁(由于worker的锁是不可重入锁,即只能单一获取,如果可以获取到锁,就意味着当前线程没有处于处理任务的运行状态,也就是闲置状态,也就可以被打断)
                    try {
                        t.interrupt(); //打断当前线程
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne) //如果为true,则仅需打断一个满足条件的线程,即可退出
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
 }
13. tryTerminate() →→→→ 线程池终止方法
 public class ThreadPoolExecutor extends AbstractExecutorService {
	final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||                                         //如果线程池处于运行状态,则退出
                runStateAtLeast(c, TIDYING) ||							//如果线程池已经将要终结,则退出
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))   //如果线程池在SHUTDOWN状态下还有任务没有进行处理,则退出
                return;
            if (workerCountOf(c) != 0) { 								//如果工人数不是0
                interruptIdleWorkers(ONLY_ONE);							//中断(结束)一个空闲工人,然后返回
                return;
            }

            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {    		//CAS的改变线程池状态
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll(); //唤醒等待线程池终止的线程
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            //compareAndSet()如果线程池改变状态失败,则继续while请求
        }
    }
 }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781917.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号