- 降低资源消耗:通过重复使用已经创建的线程,降低线程创建和销毁的系统开销。提高执行效率:当任务到达时,任务不需要等待线程创建就可以执行。提高线程的管理性:线程是稀缺资源,如果不加限制的创建,将会极大的消耗系统资源,严重时还会导致系统宕机。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
// 核心线程数不能小于0,最大线程数不能小于等于0,最小线程数不能小于核心线程数,空闲线程存活时间不能小于0,否则抛出参数异常。
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
// 工作队列或线程工厂或拒绝策略不能为空,否则抛出空指针异常
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 参数赋值
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.tonanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池的五种状态描述
线程池中有五种状态
五种状态分别是RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
RUNNING: 在RUNNING状态下,线程池可以接收新的任务和执行已添加的任务。
SHUTDOWN: 线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
STOP: 线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在执行的任务。
TIDYING: 当所有线程已终止,记录的线程数为0时,线程池会变成TIDYING状态。当线程池变成TIDYING状态时,会执行钩子函数terminated()。
TERMINATED: 当钩子函数terminated()被执行完成之后,线程池彻底终止,就会变成TERMINATED。
五种状态之间的相互转换// 位数,32 - 3 = 29 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程池容量大小 capacity = 1 * 2^29 - 1,转成二进制为 00011111 11111111 11111111 11111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 线程的运行状态 RUNNING = -1 * 2^29 转成二进制为 11100000 00000000 00000000 00000000 private static final int RUNNING = -1 << COUNT_BITS; // 线程的关闭状态 SHUTDOWN = 0 * 2^29 转成二进制为 00000000 00000000 00000000 00000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 线程的停止状态 STOP = 1 * 2^29 转成二进制为 00100000 00000000 00000000 00000000 private static final int STOP = 1 << COUNT_BITS; // 线程的运行状态 TIDYING = 2 * 2^29 转成二进制为 01000000 00000000 00000000 00000000 private static final int TIDYING = 2 << COUNT_BITS; // 线程的结束状态 TERMINATED = 3 * 2^29 转成二进制为 01100000 00000000 00000000 00000000 private static final int TERMINATED = 3 << COUNT_BITS;
说明:线程池中的线程状态由32位中高3位表示。
ThreadPoolExecutor源码分析 executor()方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个
// 新的线程,如果能完成新线程创建exexute方法结束,成功提交任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 在第一步没有完成任务提交;状态为运行并且能成功加入任务到工作队列后,
// 再进行一次check,如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要reject;
// 否则判断当前线程数是否为0(有可能这个时候线程数变为了0),如果是则新增一个null线程,始终保证线程池中有一个线程处于运行状态;
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经
// shutdown或者线程池已经达到饱和状态,所以reject;
else if (!addWorker(command, false))
reject(command);
}
说明:拒绝策略不仅仅是在饱和状态下使用,在线程池进入到关闭阶段同样需要使用到;
addWorker()方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 重试机制,类似于goto
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 线程池状态为非运行时状态,并且是非SHUTDOWN状态且任务为空,且阻塞队列不为空时,则不再新增线程。
return false;
for (;;) {
// 获取工作线程数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 当前工作线程数大于最大值
// 或者 判断当前线程是否是核心线程,若是核心线程则判断当前线程数是否大于等于corePoolSize;若非核心线程则判断当前线程数是否大于等于maximumPoolSize
// 若是则不再新增线程
return false;
if (compareAndIncrementWorkerCount(c))
// 工作线程数量 ctl+1,如果成功,则跳出循环。compareAndIncrementWorkerCount()方法中使用了CAS原子操作。
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// 刚进来的状态和此时的状态发生改变 重头开始 再次重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 内部类Worker,继承了AQS,封装了线程和任务的管理方法,通过threadfactory创建线程
w = new Worker(firstTask);
final Thread t = w.thread;
// 线程创建成功
if (t != null) {
// 获取独占锁ReentrantLock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获得独占锁之后,再次获取当前线程的状态
int rs = runStateOf(ctl.get());
// 若是当前线程正在运行中,或者当前线程处于SHUTDOWN状态,且当前任务为空时
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判断但前线程是否存活,若是存活则直接抛出异常,因为此时线程尚未start
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 否则将线程添加到线程集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 修改状态,表示线程添加成功
workerAdded = true;
}
} finally {
// 释放mainLock独占锁
mainLock.unlock();
}
if (workerAdded) {
// 线程添加成功之后,启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 失败回退 从wokers移除w 线程数减1 尝试结束线程池
addWorkerFailed(w);
}
return workerStarted;
}
线程池中的拒绝策略
线程池中有4种拒绝策略,分别是AbortPolicy,DiscardPolicy,DiscardOldestPolicy,CallerRunsPolicy
AbortPolicy策略public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy策略
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy策略
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
CallerRunsPolicy策略
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}



