栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

jdk1.8 线程池ThreadPoolExecutor源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

jdk1.8 线程池ThreadPoolExecutor源码解析

目录

前言

一、ThreadPoolExecutor基本使用

1.1 参数介绍

1.2 创建线程

二、线程池工作原理

2.1 前提条件

2.2 文字描述

2.3 流程图展示

三、源码解析

3.1 execute源码中的位运算

3.2 线程池状态

3.3 execute源码解析

总结


前言

        ThreadPoolExecutor是java的JUC提供的线程池化技术,是一种享元模式(主要用于减少创建对象的数量,以减少内存占用和提高性能),它的作用是管理线程。
        优点:帮助我们去创建、管理和销毁线程,减少内存占用和提高性能。
        缺点:好像没有啥大的缺点,使用不好是我们自己的问题,不是线程池的。

        学习线程池可以解答我们的很多疑惑,如:我们传入的任务,线程池如何调用、线程池的线程如何复用、线程池与队列的联系、线程池的核心线程是什么和线程池的非核心线程是什么。


一、ThreadPoolExecutor基本使用

1.1 参数介绍
public ThreadPoolExecutor(int corePoolSize,
						  int maximumPoolSize,
						  long keepAliveTime,
						  TimeUnit unit,
						  BlockingQueue workQueue,
						  ThreadFactory threadFactory,
						  RejectedExecutionHandler handler) {
	if (corePoolSize < 0 ||
		maximumPoolSize <= 0 ||
		maximumPoolSize < corePoolSize ||
		keepAliveTime < 0)
		throw new IllegalArgumentException();
	if (workQueue == null || threadFactory == null || handler == null)
		throw new NullPointerException();
	this.acc = System.getSecurityManager() == null ?
			null :
			AccessController.getContext();
	this.corePoolSize = corePoolSize;
	this.maximumPoolSize = maximumPoolSize;
	this.workQueue = workQueue;
	this.keepAliveTime = unit.tonanos(keepAliveTime);
	this.threadFactory = threadFactory;
	this.handler = handler;
}

        上面的是线程池参数最全的一个构造方法,接下来讲解每个参数的作用

        int corePoolSize:核心线程数

        int maximumPoolSize:最大的线程数=核心+临时

        long keepAliveTime:线程超时等待时间

        TimeUnit unit:超时等待单位

        BlockingQueue workQueue:阻塞队列,常用的有三个:

        ArrayBlockingQueue:数组队列,必须指定初始容量

        linkedBlockingDeque:链表队列,不用指定

        SynchronousQueue:没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者

        ThreadFactory threadFactory:线程工厂,使用默认就行,不想用,可以实现ThreadFactory接口自定义

        RejectedExecutionHandler handler:决绝策略,也可以自定义,实现RejectedExecutionHandler接口,JUC提供的有四个:

        AbortPolicy:执行不了抛RejectedExecutionException异常

        CallerRunsPolicy:交给main线程执行,就是直接调用任务的run方法

        DiscardOldestPolicy:把队列最先进来的任务扔掉

        DiscardPolicy:啥也不干,就是把最后进来的任务扔掉

1.2 创建线程

        创建线程的方式很多,JUC提供了工具类可以创建不同类型的线程池,但是一般公司不建议使用,还有就是调用线程池构造方法创建。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExcutorTest {

    public static void main(String[] args) {
        // 最大创建int.MaxValue这么多个线程,来一个任务new一个线程,使用SynchronousQueue队列
        ExecutorService es1 = Executors.newCachedThreadPool();
        // 创建5个线程的线程池,可以指定线程池线程的数量
        ExecutorService es2 = Executors.newFixedThreadPool(5);
        // 创建1个线程的线程池,其他任务都放队列linkedBlockingQueue
        ExecutorService es3 = Executors.newSingleThreadExecutor();
        // 有定时功能的线程池
        ExecutorService ess1 = Executors.newScheduledThreadPool(100);
    }

}
import java.util.concurrent.*;

public class ThreadPoolExcutorTest{

    public static void main(String[] args) {
                
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 1, TimeUnit.SEConDS
                , new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        // allowCoreThreadTimeOut设置为true,线程池执行完会销毁所有线程
        // threadPoolExecutor.allowCoreThreadTimeOut(true);
        for (int i = 0; i < 5; i++) {
            threadPoolExecutor.execute(()->{
                System.out.println(Thread.currentThread().getName());
            });
        }

    }

}

二、线程池工作原理

2.1 前提条件

        学习线程池,必须了解阻塞队列的几个方法。下发图片来自个人学习笔记。。。 

2.2 文字描述

        下面的场景,以所有的任务都是死循环为前提,就是一直执行,不会结束,线程池参数如下: 

       


        1.每次调用threadPoolExecutor.execute()方法,传入一个任务Runnable,这个Runnable是一个函数式接口,我们可以给execute方法传入一个匿名内部类、实现了Runnable接口的实现类或者lambda表达式。

        2.当第1个任务进来的时候,判断工作线程数是否小于核心线程数,小于的话就创建一个核心线程,执行任务。

        3.每一个核心线程或者临时线程都会被包装成为一个Worker对象,传入的任务也会被包装在Worker对象里面,最后调用线程的start方法执行任务,具体的源码解释。

        4.当第2个任务进来的时候,判断工作线程数是否小于核心线程数,小于的话就创建一个核心线程,执行任务。

        5.当第3个任务进来的时候,判断工作线程数是否小于核心线程数,发现不小于了,就加入阻塞队列了。

        6.当第4个任务进来的时候,判断工作线程数是否小于核心线程数,发现不小于了,就加入阻塞队列了。

        7.当第5个任务进来的时候,判断工作线程数是否小于核心线程数,发现不小于了,就加入阻塞队列,但是发现阻塞队列满了,就创建一个临时线程,执行任务。

        8.当第6个任务进来的时候,判断工作线程数是否小于核心线程数,发现不小于了,就加入阻塞队列,但是发现阻塞队列满了,就创建一个临时线程,但是发现工作线程数大于等于最大线程数了,所以按照构造方法传入的拒绝策略,拒绝了,抛出RejectedExecutionException异常。

2.3 流程图展示

 

         当其他线程死循环的时候,第六个进来的就抛异常了。

        Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task ConcurrentHashMapTest$$Lambda$1/1096979270@214c265e rejected from java.util.concurrent.ThreadPoolExecutor@448139f0[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]


三、源码解析

3.1 execute源码中的位运算

        线程池常用的位运算常量介绍,32位int类型,二进制的高3位表示线程状态,低29位表示线程数。后续的线程池状态和线程数量的统计就是对这些值进行位运算,这个就不讲解。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
@Native public static final int SIZE = 32;
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

3.2 线程池状态

        这个是从其他优秀文章中复制的,因为这个基本知识,也不是讲解的内容。

状态
解释
RUNNING能接受新提交的任务,并且也能处理阻塞队列中的任务
SHUTDOW关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务,在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态
STOP不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程,在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态
TIDYING如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态
TERMINATED在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。

3.3 execute源码解析

        execute方法解析

public void execute(Runnable command) {
	if (command == null)
		throw new NullPointerException();
	// 得到一个int值,具体的看3.1标题的解释
	int c = ctl.get();
    // 判断工作线程是否小于核心线程
	if (workerCountOf(c) < corePoolSize) {
        // 小于就创建核心线程处理任务,这个方法后面解释,true代表创建核心线程
		if (addWorker(command, true))
			return;
		c = ctl.get();
	}
    // 如果工作线程数大于等于核心线程数
    // 判断是不是运行状态,运行状态就加入阻塞队列
	if (isRunning(c) && workQueue.offer(command)) {
		int recheck = ctl.get();
        // 加入队列成功,再次校验状态,如果不是运行时,把刚刚加入阻塞队列的任务移除
		if (! isRunning(recheck) && remove(command))
            // 按照不同拒绝策略处理
			reject(command);
        // 如果线程工作线程为0了,就生成一个临时线程处理队列中的任务
		else if (workerCountOf(recheck) == 0)
            // 生成一个临时线程,false代表创建临时线程
			addWorker(null, false);
	}
    // 不是运行状态或者加入阻塞队列失败,就会创建临时线程处理了,失败就拒绝
    // 为什么不是运行状态也会调用addWorker方法哪,因为addWorker方法里面是会校验状态的
	else if (!addWorker(command, false))
        // 按照不同拒绝策略处理
		reject(command);
}

         addWorker方法解析

// 核心或者临时线程被包装为Worker对象,线程和任务是Worker对象的一个属性
// 不重要的代码没有赋值出来
private final class Worker
	extends AbstractQueuedSynchronizer
	implements Runnable
{
	
	private static final long serialVersionUID = 6138294804551838833L;

	
	final Thread thread;
	
	Runnable firstTask;
	
	volatile long completedTasks;
	
	Worker(Runnable firstTask) {
		setState(-1); // inhibit interrupts until runWorker
        // 把传入的任务赋值给类变量
		this.firstTask = firstTask;
        //###这里很重要,new的线程传入的this对象,因为Worker也实现了Runnable
		this.thread = getThreadFactory().newThread(this);
	}

	
	public void run() {
		runWorker(this);
	}

	public void lock()        { acquire(1); }
	public boolean tryLock()  { return tryAcquire(1); }
	public void unlock()      { release(1); }
	public boolean isLocked() { return isHeldExclusively(); }

}
private boolean addWorker(Runnable firstTask, boolean core) {
	retry:
	for (;;) {
		int c = ctl.get();
		int rs = runStateOf(c);

		// Check if queue empty only if necessary.
        // 检查状态,返回false,调用者就是return或者调用拒绝处理方法
        // 状态大于等于SHUTDOWN ,而且不等于SHUTDOWN,传入的任务不为空,队列为空,就返回false
		if (rs >= SHUTDOWN &&
			! (rs == SHUTDOWN &&
			   firstTask == null &&
			   ! workQueue.isEmpty()))
			return false;
        // 自旋校验
		for (;;) {
            // 工作线程数量
			int wc = workerCountOf(c);
            // 工作线程大于2^29-1,大于核心线程或者临时线程,返回flase,
            //调用者就是return或者调用拒绝处理方法
			if (wc >= CAPACITY ||
				wc >= (core ? corePoolSize : maximumPoolSize))
				return false;
            // 这个就是对线程数量+1的,是原子操作,失败就跳到上面的retry
			if (compareAndIncrementWorkerCount(c))
				break retry;
            // 如果此时和上面int rs = runStateOf(c);处,线程池状态不一样,也会跳到上面的retry
			c = ctl.get();  // Re-read ctl
			if (runStateOf(c) != rs)
				continue retry;
			// else CAS failed due to workerCount change; retry inner loop
		}
	}

	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
        // new一个线程,其实就是new一个Worker对象,详细看上面Worker对象解析
		w = new Worker(firstTask);
        // 拿到Worker对象的线程属性
		final Thread t = w.thread;
		if (t != null) {
            // 排它锁
			final ReentrantLock mainLock = this.mainLock;
            // 抢锁,高并发要保证线程安全
			mainLock.lock();
			try {
				// 线程状态
				int rs = runStateOf(ctl.get());
                // 运行状态或者
                // 是SHUTDOWN但是任务为空(因为这种情况创建线程去处理队列中的任务)
                // 这个就是execute方法,else if (workerCountOf(recheck) == 0)
                // addWorker(null, false);这种情况
				if (rs < SHUTDOWN ||
					(rs == SHUTDOWN && firstTask == null)) {
					if (t.isAlive()) // precheck that t is startable
						throw new IllegalThreadStateException();
                    // 把Worker对象放入workers集合
					workers.add(w);
					int s = workers.size();
                    // 记录workers大小
					if (s > largestPoolSize)
						largestPoolSize = s;
					workerAdded = true;
				}
			} finally {
                // 释放锁
				mainLock.unlock();
			}
			if (workerAdded) { 
				t.start(); // 启动线程,接下来在介绍
				workerStarted = true;
			}
		}
	} finally {
        // 没有放入成功,或者放入成功了,发生其他问题,就要减去增加的线程数量
		if (! workerStarted) 
			addWorkerFailed(w); // 接下来在介绍
	}
	return workerStarted;
}

         addWorkerFailed解析

private void addWorkerFailed(Worker w) {
	final ReentrantLock mainLock = this.mainLock;
    // 加锁
	mainLock.lock();
	try {
        // 如果放入成功就取出
		if (w != null)
			workers.remove(w);
        // 原子操作,ctl低29位-1
		decrementWorkerCount();
        // 尝试终止线程,这个不讲解
		tryTerminate();
	} finally {
        // 释放锁
		mainLock.unlock();
	}
}

        如果放入成功了,而且线程启动了,线程启动时会调用传入Runnable的run方法,此时传入的Runnable是Worker对象。

        那么线程启动后就会调用Worker对象的run方法。

private final class Worker
	extends AbstractQueuedSynchronizer
	implements Runnable
{
	
	private static final long serialVersionUID = 6138294804551838833L;

	
	final Thread thread;
	
	Runnable firstTask;
	
	volatile long completedTasks;
	
	Worker(Runnable firstTask) {
		setState(-1); // inhibit interrupts until runWorker
        // 把传入的任务赋值给类变量
		this.firstTask = firstTask;
        //###这里很重要,new的线程传入的this对象,因为Worker也实现了Runnable
		this.thread = getThreadFactory().newThread(this);
	}

	
	public void run() {
		runWorker(this);
	}

	public void lock()        { acquire(1); }
	public boolean tryLock()  { return tryAcquire(1); }
	public void unlock()      { release(1); }
	public boolean isLocked() { return isHeldExclusively(); }

}

final void runWorker(Worker w) {
	Thread wt = Thread.currentThread();
    // 得到任务
	Runnable task = w.firstTask;
    // 帮助GC
	w.firstTask = null;
	w.unlock(); // allow interrupts
	boolean completedAbruptly = true;
	try {
        // 传入的任务不为空就执行任务,为空就去队列中取任务
		while (task != null || (task = getTask()) != null) {
			w.lock();
		
			if ((runStateAtLeast(ctl.get(), STOP) ||
				 (Thread.interrupted() &&
				  runStateAtLeast(ctl.get(), STOP))) &&
				!wt.isInterrupted())
				wt.interrupt();
			try {
				beforeExecute(wt, task);
				Throwable thrown = null;
				try {
                    // 执行任务
					task.run();
				} catch (RuntimeException x) {
					thrown = x; throw x;
				} catch (Error x) {
					thrown = x; throw x;
				} catch (Throwable x) {
					thrown = x; throw new Error(x);
				} finally {
					afterExecute(task, thrown);
				}
			} finally {
				task = null;
				w.completedTasks++;
				w.unlock();
			}
		}
		completedAbruptly = false;
	} finally {
		processWorkerExit(w, completedAbruptly);
	}
}

        进入getTask方法查看如何取任务 

// 当前类:ThreadPoolExecutor
private Runnable getTask() {
	boolean timedOut = false; // Did the last poll() time out?

	for (;;) {
		int c = ctl.get();
        // 线程状态
		int rs = runStateOf(c);

		// 状态大于等于SHUTDOWN,而且队列状态大于等于STOP或者阻塞队列为空,就返回
		if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 线程数-1
			decrementWorkerCount();
			return null;
		}

        // 工作线程的数量
		int wc = workerCountOf(c);

		// allowCoreThreadTimeOut为是否允许核心线程过期,
        // 如果为true,所有线程等待设置的超时时间,队列没有任务,就销毁
        // 默认是false,当工作线程大于核心线程时,大于的线程会被销毁,
        // 销毁时不会判断它是不是核心线程,详细看下面解析
		boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 一些异常处理,可以不用关注
		if ((wc > maximumPoolSize || (timed && timedOut))
			&& (wc > 1 || workQueue.isEmpty())) {
			if (compareAndDecrementWorkerCount(c))
				return null;
			continue;
		}

		try {
            // timed为true按照超时时间等待取任务,取不到就线程结束了,相当于销毁了
            // 为false阻塞取任务,队列具体的方法看2.1标题,取到就继续执行runWorker
			Runnable r = timed ?
				workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
				workQueue.take();
			if (r != null)
				return r;
			timedOut = true;
		} catch (InterruptedException retry) {
			timedOut = false;
		}
	}
}

         看完上面的解析,就会知道为什么线程池的线程会一直存活,不断的消费任务了。


总结

        线程的基本流程就讲完了,其中的一些细节,加锁、释放锁、停止线程和中断啊,这些都没有讲,这篇文章只讲线程池的核心原理。核心原理是比较简单的,复杂的是线程状态的的改变对于队列中的任务进行如何的处理,Worker对于AbstractQueuedSynchronizer的使用和它对线程中断进行的一系列处理等等。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/753748.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号