调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模 型,Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也会在操作系统里有 一个对应的线程。详见前面的文章。
Java线程的生命状态 :
NEW,新建
RUNNABLE,运行
BLOCKED,阻塞
WAITING,等待
TIMED_WAITING,超时等待
TERMINATED,终结
协程 (纤程,用户级线程),目的是为了追求最大力度的发挥硬件性能和提升软件的速度,协程基本原理是:在某个点挂起当前的任务,并且保存栈信息,去执行另一个任务;等完成或达到某个条件时,再还原原来的栈信息并继续执行(整个过程线程不需要上下文切换)。
三、线程池Java原生不支持协程,在纯java代码里需要使用协程的话需要引入第三方包,如:quasar
线程是珍稀资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性, Java 中提供了线程池对线程进行统一分配、调优和监控。
如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。(例如一个任务只需要执行10ms,可能创建销毁线程的时间比这更长)
项目中可能常见的 Executors.newCachedThreadPool() 等在阿里规范中是不被允许的(线程资源相当宝贵,违背了初衷),建议在使用时自己 new 一个线程池。且一个线程池只执行一种职责,多个需求创建多个线程池。
线程池使用场景
1.单个任务处理时间比较短
2.需要处理的任务数量很大
为什么要用线程池
1.重用存在的线程,减少线程创建销毁的开销,提高性能
2.提高响应速度。当任务到达时,不需要等待线程创建就能就能立即执行。
3.提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池的使用
// 创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 5000, TimeUnit.MILLISECONDS, new ArrayBlockingQueueExecutor框架(5)); // 模拟提交任务 for (int i=0;i<6;i++){ System.out.println(i); threadPoolExecutor.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"执行了任务"+); } },i); } // 结束线程池任务 threadPoolExecutor.shutdown(); // running => shutdown threadPoolExecutor.shutdownNow(); // running=> stop
Executor接口是线程池框架中最基础的部分,里面仅定义了一个用于执 Runnable 的 execute() 方法提供给子类重写。
ExecutorService 是 Excutor 的一个重要的子接口。里面定义了线程的具体行为:
| 方法 | 说明 |
|---|---|
| execute(Runnable command) | 执行Ruannable类型的任务 |
| submit(Runnable task) | 用来提交Callable或Runnable任务,并返回代表此任务的Future 对象 |
| shutdown() | 完成已提交的任务后封闭线程池,不再接受新任务 |
| shutdownNow() | 停止所有正在履行的任务并封闭线程池 |
| isTerminated() | 测试是否所有任务都执行完毕了 |
| isShutdown() | 测试是否该ExecutorService已被关闭 |
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 使用高三位
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 使用低29位
// 线程池存在的五种状态
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 高三位111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位000
private static final int STOP = 1 << COUNT_BITS; // 高三位001
private static final int TIDYING = 2 << COUNT_BITS; // 高三位010
private static final int TERMINATED = 3 << COUNT_BITS; // 高三位011
// Packing and unpacking ctl ctl相关方法
// 获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取活动线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 获取运行状态和活动线程数的值
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池的五种状态说明
- RUNNING
- 状态说明: 能够接收新任务,以及对已添加的任务进行处理
- 状态切换: 线程池的初始化状态,一旦被创建就处于该状态,且线程池中任务数量为0
- SHUTDOWN
- 状态说明: 不接收新任务,但能处理已添加的任务
- 状态切换: 调用线程池的 shutdown() 接口时,线程池由RUNNING => SHUTDOWN
- STOP
- 状态说明: 不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
- 状态切换: 调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) => STOP
- TIDYING
- 状态说明: 当所有的任务已终止,ctl 记录的”任务数量“为0,线程池会变为 TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated() 在ThreadPoolExecutor类中是空的,若用户想在线程池变为 TIDYING 时,进行相应的处理,可以通过重载 terminated() 函数来实现
- 状态切换: 当线程池在 SHUTDOWN 状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN => TIDYING。 当线程池在 STOP 状态下,线程池中执行的任务为空时,就会由STOP => TIDYING
- TERMINATED
- 状态说明: 线程池彻底已终止
- 状态切换: 线程池处在 TIDYING 状态时,执行完 terminated() 之后,就会由 TIDYING => TERMINAT
线程池的创建及任务提交
// 创建 new 构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 任务提交
// 1.提交任务无返回值
public void execute(){ ... }
// 2. 任务执行完成后有返回值
public Future> submit(){ ... }
核心参数说明
int corePoolSize 核心线程数
当提交一个任务时,线程池创建一个新线程执行任务,直到当
前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到
阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会
提前创建并启动所有核心线程
int maximumPoolSize 最大线程数
- 如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize
long keepAliveTime 线程最大空闲时间
- 线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
Timeunit unit 最大空闲等待的时间单位
- keepAliveTime 的时间单位
BlockingQueue
workQueue 等待(阻塞)队列(存放未来得及执行的任务)
- 用来保存等待被执行的任务的阻塞队列,且任务必须实现 Runable 接口。
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务
- linkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene
- SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 linkedBlockingQuene
- PriorityBlockingQuene:具有优先级的无界阻塞队列
ThreadFactory threadFactory 创建线程的工厂
- 用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程,同时也设置了线程的名称
RejectedExecutionHandler handler 拒绝策略
- 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务
拒绝策略
4 种拒绝默认的策略:
- AbortPolicy 默认策略,直接抛异常
- CallerRunsPolicy 由当前提交任务的线程执行任务(调用者线程),而不给线程池中线程执行
- DiscardOldestPolicy 丢弃掉旧的任务(队头任务),并执行当前任务
- DiscardPolicy 直接丢弃任务
自定义拒绝策略:
不希望任务被丢弃无界队列,如果还要考虑JVM容量,可以重写拒绝策略。比如将丢到中间件中(Redis等),比如监控线程池中队列少于50%时,从Redis中读之前容纳不下的任务。重写拒绝策略只需要实现 RejectedExecutionHandler 接口并重写 rejectedExecution() 方法即可。
线程数量设置(仅做参考)
CPU 密集型: CPU核数 + 1 不会怎么停顿
I/O密集型: 两倍 CPU核数 + 1 会停顿,读文件等
最佳线程数: CPU 核数 * [1 + (I/O耗时 / CPU耗时)]
线程池监控方法public long getTaskCount() // 获取线程池已执行与未执行的任务总数 public long getCompletedTaskCount()// 获取已完成的任务数 public int getPoolSize() // 线程池当前的线程数 public int getActiveCount() // 线程池中正在执行任务的线程数量线程池原理
思考:线程池中阻塞队列的作用?
- 一般队列只能保证一个有限长度的缓冲区,如果超过缓冲长度,就无法保留当前任务,通过阻塞队列可以通过阻塞保留主当前想要继续入队的任务
- 阻塞队列自带阻塞和唤醒功能,不需要额外处理,无任务执行时线程池利用阻塞队列 take 方法挂起,从而维持核心线程存活,不至于一直占用 CPU 资源
思考:为什么先添加阻塞队列而不是先创建最大线程?
在创建新线程的时候,需要获取全局锁,这时就会阻塞其他线程,影响整体效率。最大线程如果后续销毁也会影响效率。
源码分析public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// clt 记录着runState 和 workerCount
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 如果添加失败,则重新获取ctl值
c = ctl.get();
}
// 如果当前线程池是运行状态 并且 任务添加到队列成功
if (isRunning(c) && workQueue.offer(command)) {
// 重新获取ctl值
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
用一张图来表示就是如下流程:
源码解读: 首先当一个任务来后,先检查核心线程数是否满,未满则创建核心线程,此任务完成后,即使再来新的任务也不会再使用刚刚创建的核心线程,而是继续创建核心线程调用后续任务。然后当核心线程数满后,不管核心线程是否都被占用都会将任务放置到等待队列,以后线程池执行任务都从等待队列中去取,核心线程都被占用后会检查是否达到最大线程数,未达到创建最大线程,最大线程数满后会执行拒绝策略。值得注意的是线程池并未标识核心非核心线程,仅依靠线程的数量控制。
addWorker方法用于在线程池中创建一个新的线程并执行,它有两个参数,firstTask 参数用于指定新增的线程执行的第一个任务,core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize.
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失败,则重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
w = new Worker(firstTask);
// 每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker类
ThreadPoolExecutor 中定义了一个内部类 Worker,线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组 Worker 对象。
Worker源码,部分逻辑已省略:
// Worker类继承了AQS,并实现了Runnable接口
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程
final Thread thread;
// 保存传入的任务
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean tryAcquire(int unused) {
//cas修改state,不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
RunWorker方法
使用 循环 + 阻塞队列 实现线程的可重用
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环 + 条件 不断从队列拿任务,任务执行完毕task会被置空,跳出循环
// 如果task为空,则通过getTask来获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask方法
作用:从阻塞队列中取任务
private Runnable getTask() {
// 上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
// 阻塞队列,当队列为空,条件不满足,阻塞,等待多长时间还为空,返回null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,说明已经超时,timedOut设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
timedOut = false;
}
}
}
processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
processWorkerExit 执行完之后,工作线程被销毁,工作线程生命周期结束,从 execute 方法开始,Worker 使用 ThreadFactory 创建新的工作线程,runWorker 通过 getTask 获取任务,然后执行任务,如果 getTask 返回 null,进入 processWorkerExit 方法,整个线程结束。



