降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
程序的运行,本质:占用系统的资源! (优化资源的使用 => 池化技术)
线程池、连接池、内存池、对象池///… 创建、销毁。十分浪费资源
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我。
什么时候使用线程池?- 单个任务处理时间比较短
- 需要处理的任务数量很多
阻塞队列:平衡消费者线程和生产者线程的桥梁
public class CustomThreadPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{
//queue.put(task); //死等
//queue.offer(task,2500,TimeUnit.MILLISECONDS);//超时等待
//放弃任务
//System.out.println("放弃任务");
//throw new RuntimeException("任务执行失败"+task);//抛出异常
//自己执行任务
task.run();
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(()->{
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
class ThreadPool{
private final ThreadBlockingQueue tasksQueue;
private final HashSet workers = new HashSet<>();
private final ReentrantLock lock = new ReentrantLock();
private int coreSize;
private long timeout;
private TimeUnit timeUnit;
private final RejectPolicy rejectPolicy;
public ThreadPool( int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectPolicy = rejectPolicy;
this.tasksQueue = new ThreadBlockingQueue<>(queueCapacity);
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// while (task != null ||(task = tasksQueue.take()) != null)
// 任务不为空执行,否则从任务队列获取任务执行
while (task != null ||(task = tasksQueue.poll(timeout,timeUnit)) != null){
try {
System.out.println("正在执行"+task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
lock.lock();
try {
System.out.println("移除"+this);
workers.remove(this);
} finally {
lock.unlock();
}
}
}
public void execute(Runnable task){
lock.lock();
//当任务数没有超过 coreSize 时,直接交给 worker 对象执行
//如果任务数超过 coreSize 时,加入任务队列暂存
try {
if(workers.size() < coreSize){
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增"+workers);
worker.start();
}
else{
//1 死等 2 超时等待,3 放弃执行 4 抛出异常 5 调用者自己执行任务
//tasksQueue.put(task);
tasksQueue.tryPut(rejectPolicy,task);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
interface RejectPolicy{
void reject(ThreadBlockingQueue queue,T task);
}
class ThreadBlockingQueue{
private final Deque queue = new ArrayDeque<>();
// 锁保护队列头和尾
private final ReentrantLock lock = new ReentrantLock();
// 消费者等待
private final Condition fullWait = lock.newCondition();
// 消费者等待
private final Condition emptyWait = lock.newCondition();
private final int capacity;
public ThreadBlockingQueue(int capacity) {
this.capacity = capacity;
}
public T take(){
lock.lock();
try {
while (queue.isEmpty()){
emptyWait.await();
}
T t = queue.removeFirst();
fullWait.signalAll();
return t;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
long nano = unit.toNanos(timeout);
while (queue.isEmpty()){
if (nano <= 0) {
return null;
}
// awaitNanos 解决了虚假唤醒,返回的是剩余等待时间
nano = emptyWait.awaitNanos(nano);
}
T t = queue.removeFirst();
fullWait.signalAll();
return t;
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if(nanos <= 0) {
return false;
}
nanos = fullWait.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWait.signal();
return true;
} finally {
lock.unlock();
}
}
public void put(T element){
lock.lock();
try {
while (queue.size() == capacity){
System.out.println("等待加入任务队列"+element);
fullWait.await();
}
System.out.println("加入队列"+element);
queue.addLast(element);
emptyWait.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity){
System.out.println("等待加入队列"+task);
rejectPolicy.reject(this,task);
}else {
System.out.println("加入队列"+task);
queue.addLast(task);
emptyWait.signal();
}
} finally {
lock.unlock();
}
}
}
Executors工具类
-
ExecutorService executor=Executors.newCachedThreadPool();创建一个可缓存线程池,随着任务如果需要执行时间长的则根据任务的数量创建线程,如果任务执行时间短则会有线程的复用。如果线程数量超过处理需要,可灵活回收空闲线程(默认1分钟),若无可回收,则新建线程,高并发下可能出现cpu负载过高的情况
适用于服务器负载较轻,任务比较密集,执行很多短期异步任务
-
ExecutorService executor=Executors.newFixedThreadPool(n);创建一个可重用的固定长度线程池,cpu直接分配n个核心处理,后面不会在分配其他的核心来处理任务,以无界队列的方式运行这些线程,可控制线程最大并发数,超出的线程会在队列中等待。
因为采用无界的阻塞队列,所以实际线程数量永远不会变化适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。高并发可能会出现内存溢出
-
ExecutorService executor=Executors.newScheduledThreadPool();创建一个定长线程池,定时及周期性任务执行。空线程也会保留
适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。 -
ExecutorService executor=Executors.newSingleThreadPool();创建一个单线程的线程池。它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。可以延时启动,定时启动的线程池,
适用于需要多个后台线程执行周期任务的场景
-
ExecutorService executor=Executors.newWorkStealingPool():创建一个拥有多个任务队列的并行级别的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行。
此线程是ForkJoinPool的扩展,是守护线程。默认根据当前CPU的核数创建几核线程;创建的每一个线程都有一个双端任务队列,当一个线程的任务完成时,可以从其他线程的任务队列获得任务继续执行。提高了执行效率 -
ExecutorService executor=Executors.newSingleThreadScheduleExecutor()创建一个单线程执行任务,可以安排在给定的延迟后执行或者定期执行
Callable 和 Runable 对比:
Callable 是 java.util 包下 concurrent 下的接口,有返回值,可以抛出被检查的异常
Runable 是 java.lang 包下的接口,没有返回值,不可以抛出被检查的异常
二者调用的方法不同,run()/ call()
public class CallableTest {
//使用thread线程启用callable
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable()).start();// 启动Runnable
// new Thread(new FutureTask()).start();
// new Thread(new FutureTask(Callable )).start();
new Thread().start(); //怎么启动Callable?
// new 一个MyThread实例
MyThread thread = new MyThread();
// MyThread实例放入FutureTask
FutureTask futureTask = new FutureTask(thread); // 适配类
//futureTask.run();//也可以因为FutureTask 实现了runnable接口,thread也是调用futureTask.run()执行任务
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start(); // call()方法结果会被缓存,提高效率,因此只打印1个call
// 这个get 方法可能会产生阻塞!
Integer o = (Integer) futureTask.get();
// 或者使用异步通信来处理!
System.out.println(o);// 1024
}
}
细节:
1、有缓存
2、结果可能需要等待,会阻塞!
ThreadPoolExecutor 操作线程池状态的方法// 操作ctl变量,主要是进行分解或组合线程数量和线程池状态。
// 获取高3位,获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取低29位,获取线程池中线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 组合ctl变量,rs=runStatue代表的是线程池的状态,wc=workCount代表的是线程池线程的数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
//指定的线程池状态c小于状态s
private static boolean runStateLessThan(int c,int s){
return c=s;
}
// 判断线程池是否运行状态
private static boolean isRunning(int c){
return c
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize:核心线程数
-
maximumPoolSize:最大线程数–当workQueue使用的是无界限队列时,maximumPoolSize参数就变的无意义了,比如new linkedBlockingQueue(),或者new ArrayBlockingQueue(Integer.MAX_VALUE);
maximumPoolSize - corePoolSize = 救急线程数(非核心线程数)
-
keepAliveTime:救急线程空闲时的最大生存时间–有多出核心线程数的线程存在时,keepAliveTime开始发挥作用但核心线程和最大线程数量相等时keepAliveTime无作用.
-
unit:时间单位
-
workQueue:阻塞队列(存放任务)
有界阻塞队列 ArrayBlockingQueue
无界阻塞队列 linkedBlockingQueue, linkedTransferQueue
最多只有一个同步元素的队列 SynchronousQueue
优先队列 PriorityBlockingQueue
-
threadFactory:线程工厂(给线程取名字)
-
handler:拒绝策略
workQueue任务队列
- 直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。
- 有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现
- 无界任务队列可以使用linkedBlockingQueue实现
- 优先任务队列:优先任务队列通过PriorityBlockingQueue实现
核心线程预启动
//在默认情况下,只有当新任务到达时,才开始创建和启动核心线程,但是我们可以使用 preStartCoreThread() 和 preStartAllCoreThreads() 方法动态调整。
//如果使用非空队列构建池,则可能需要预先启动线程。
preStartCoreThread() //创一个空闲任务线程等待任务的到达
preStartAllCoreThreads() // 创建核心线程池数量的空闲任务线程等待任务的到达
拒绝任务策略
线程池自带:
-
AbortPolicy(系统默认)策略:该策略会丢弃任务并抛出异常,阻止系统正常工作;可以根据业务需要重试或者放弃提交策略等
-
CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会调用当前线程处理任务队列中的任务;
-
DiscardOldestPolicy策略:该策略会丢弃任务队列中最老即最前面的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交任务,存在数据丢失的风险;
-
DiscardPolicy策略:不予任何处理。当然使用此策略,业务场景中需允许任务的丢失;
其他框架:
-
Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
-
Netty 的实现,是创建一个新线程来执行任务
-
ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
-
PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
线程池执行的流程
有界队列时
- 当线程数(poolSize)小于核心线程数(corePoolSize)时,创建线程。
2. 当线程数(poolSize)大于等于核心线程数(corePoolSize),且任务队列未满时,将任务放入任务队列。
3. 当线程数(poolSize)大于等于核心线程数(corePoolSize),且任务队列已满
- 若线程数(poolSize)小于最大线程数(MaximumPoolSize),启用非核心线程创建线程
- 若线程数(poolSize)等于最大线程数(MaximumPoolSize),抛出异常,会根据饱和策略RejectedExecutionHandler拒绝新的任务
概括:提交任务,线程池中的线程数可以增长至corePoolSize,之后继续提交任务将暂存至队列中,如果队列满,则看是否能继续增长线程数至maximumPoolSize,超出后将进行拒绝策略处理
总任务数:maximumPoolSize+队列的大小
无界队列时
线程池中运行的线程数 当线程池的线程数量>corePoolSize时,则将提交的任务放入到队列中;,若后续仍有新的任务提交,而没有空闲的线程时,它会不断往队列中入队提交的任务,直到资源耗尽
如果一个任务执行完成了此线程池有keepAliveTime且超过其设置的时间则空余线程会被销毁,没有则从队列中获取任务继续执行
同步队列时
SynchronousQueue没有容量,corePoolSize为0 队列永远不满,对应的插入需要另一个线程的删除;只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程来执行任务。
如果一个任务执行完成了此线程池有keepAliveTime且超过其设置的时间则空余线程会被销毁,没有则从队列中获取任务继续执行
线程池的提交优先级和执行优先级
提交优先级:任务首先提交给核心线程处理,当超过核心线程数的提交到队列,超过队列的则提交到非核心线程(maximumPoolSize-corePoolSize)处理
执行优先级:首先执行提交到核心线程的任务—>处理提交到非核心线程的任务---->处理队列中任务
提交任务
void execute(适用于Runnable runnable)执行一个不需要返回值得线程。
Future submit(适用于Callable ,也可以用于Runnable) 可执行有返回值的线程。返回Future对象.
List> invokeAll(List set);执行一个集合的有返回值的线程。等集合中所有线程任务执行完毕后,取得全部任务的结果值
List> invokeAll(List set,Long time,TimeUnit t);在指定的时间内执行集合的方法,如果指定时间内还没有获取结果,那么终止该线程执行
返回的Future对象可通过isDone()方法和isCancel()来判断是执行成功还是被终止了。
T invokeAny(Collection extends Callable> tasks)-取得第一个方法的返回值,当第一个任务结束后,会调用interrupt方法中断其它任务,然后立刻终结所有的线程
shutdown 与 shutdownNow
shutdown():将线程池的状态设置为shutDown,不会立即终止线程池,而是中断所有没有执行的线程;要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():将线程池的状态设置为stop,立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
shutdown()会判断是否能够抢到AQS锁,如果抢到则执行中断,shutDownNow()会判断锁的的状态state>=0,只有state>=0会被中断
shutdown()会对线程设置中断标志,当任务线程数和队列为0时才逐步清理线程
shutdownNow()会强制中断正在执行的线程(前提AQS锁的状态state>=0),并将未处理的任务返回
submit() 与 execute()区别
1 提交方式
execute提交的方式只能提交一个Runnable的对象;submit()可以提交callable对象或者Runnable对象
2 返回值
execute()方法的返回值是void;submit()方法可以提供Future < T > 类型的返回值。
3 异常处理
execute()方法会抛出异常;submit()方法不会抛出异常。除非调用Future.get()方法。
Future对象代表线程的执行结果 如果在线程的执行过程中发生了异常,get会获取到异常的信息。
submit()此种提交会将callable或者runnable封装成futureTask对象
execute()此种提交不会将runnable封装成futureTask对象,但execute()方法里可以传入futureTask对象,因为futureTask本身实现了runnable接口
线程池的状态转换
一般来说线程池只有两种状态:Running 和 Terminated,其他状态都是过渡状态
Running:运行状态:能够接受提交的任务,并且能处理阻塞队列中的任务
ShutDown:关闭状态:不在接受新的提交任务,但可以继续处理阻塞队列中的以保存的任务
Stop:停止状态:不能接受新任务,不处理队列中的任务,会强制中断正在处理的任务,如果队列中存在没有处理完的任务则返回其任务,当线程池的工作线程数量为0则进入TidYing状态
TidYing: 正在结束状态,是一个中间状态,并不是结束状态,前提是所有的任务都已经终止了,有效线程数为0了,会调用terminated()进入Terminated结束状态
Terminated:结束状态:在terminated()方法执行后进入此状态,代表线程池的结束
shutDown() 阻塞队列为空且线程池中的工作线程数量为0
--------> ShutDown状态------------------------------------>
terminated()方法
Running TidYing状态-----------------> Terminated状态
shutDownNow() 线程池中的工作线程数量为0
------------>Stop状态----------------------------------->
线程池风险
1.死锁–任何多线程应用程序都有死锁风险。当一组进程或线程中的每一个都在等待一个只有该组中另一个进程才能引起的事件时,我们就说这组进程或线程 死锁了。
2.资源不足—如果线程池太大,那么被那些线程消耗的资源可能严重地影响系统性能。在线程之间进行切换将会浪费时间,而且使用超出比您实际需要的线程可能会引起资源匮乏问题
3.并发错误—线程池和其它排队机制依靠使用wait() 和 notify()方法,这两个方法都难于使用。如果编码不正确,那么可能丢失通知,导致线程保持空闲状态,尽管队列中有工作要处理。
4.线程泄漏—各种类型的线程池中一个严重的风险是线程泄漏,当从池中除去一个线程以执行一项任务,而在任务完成后该线程却没有返回时,会发生这种情况。
一般建议使用自定义线程池,根据任务的类型进行调配,如果cpu密集型如视频等对cpu要求比较高的设置核心线程数大于cpu核心数1-2个,如果是io读取密集型,对cpu要求不高的,对内存即磁盘高的则可以多设置核心线程来处理任务
合理创建线程池
过小会导致程序不能充分地利用系统资源、容易导致饥饿
过大会导致更多的线程上下文切换,占用更多内存
- CPU密集型:
CPU密集型的意思就是该任务需要大量运算,而没有阻塞,CPU一直全速运行。
CPU密集型任务只有在真正的多核CPU上才可能得到加速(通过多线程)。
CPU密集型任务配置尽可能少的线程数。
通常采用cpu核数+1能够实现最优的CPU利用率,-1是保证当线程由于页缺失故障(操作系统)或其它
保证CPU时钟周期不被浪费
CPU密集型线程数配置公式:(CPU核数+1)个线程的线程池
- IO密集型:
IO密集型,即该任务需要大量的IO,即大量的阻塞。
在单线程上运行IO密集型任务会导致浪费大量的CPU运算能力浪费在等待。
所以IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要利用了被浪费掉的阻塞时间。
第一种配置方式:
由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程。
配置公式:CPU核数 * 2。
第二种配置方式:
IO密集型时,大部分线程都阻塞,故需要多配置线程数。
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
比如:例如4核CPU计算时间是50%,其它等待时间是50%,期望cpu被100%利用 4* 100% *100%/50%=8
重要方法解析
execute方法–线程池执行方法
public void execute(Runnable command) {
// 判断提交的任务是不是为空,如果为空则抛出NullPointException异常
if (command == null)
throw new NullPointerException();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 如果线程池的数量小于corePoolSize,则进行添加线程执行任务
if (workerCountOf(c) < corePoolSize) {
//添加线程修改线程数量并且将command作为第一个任务进行处理
if (addWorker(command, true))
return;
// 获取最新的状态
c = ctl.get();
}
// 超过核心线程数 如果线程池的状态是RUNNING,将任务添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
//二次检查线程池状态和线程数量
int recheck = ctl.get();
//线程不是RUNNING状态,从队列中移除当前任务,并且执行拒绝策略。
//只有RUNNING状态的线程池才会接受新的任务,其余状态全部拒绝。
if (!isRunning(recheck) && remove(command))
reject(command);
//如果线程池的线程数量为空时,代表线程池是空的,创建一个空任务的worker。主要用来从队列中获取任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 两种情况拒绝:
1.非RUNNING状态拒绝新的任务
2.队列满了启动新的线程失败(workCount > maximumPoolSize)
//如果队列是满的,或者是SynchronousQueue队列时,则直接添加新的线程执行任务,如果添加失败则进行拒绝
//可能线程池的线程数量大于maximumPoolSize则采取拒绝策略。
//如果没有超过maximumPoolSize,这里则使用非核心线程处理任务
else if (!addWorker(command, false))
reject(command);
}
execute方法总结以下几点:
1当线程池中线程的数量小于corePoolSize时且是运行状态时,直接添加线程到线程池并且将当前任务做为第一个任务执行。
2如果线程池的状态的是RUNNING,则可以接受任务,将任务放入到阻塞队列中,内部进行二次检查,有可能在运行下面内容时线程池状态已经发生了变化,在这个时候如果线程池状态变成不是RUNNING,则将当前任务从队列中移除,并且进行拒绝策略。
3如果阻塞队列已经满了或者SynchronousQueue这种特殊队列无空间的时候,直接添加新的线程执行任务,当线程池的线程数量大于maximumPoolSize时相应拒绝策略。
4入队操作用的是offer方法,该方法不会阻塞队列,如果队列已经满时或超时导致入队失败,返回false,如果入队成功返回true。
核心方法addWork()–创建任务线程
private boolean addWorker(Runnable firstTask, boolean core) {//该工作线程需要执行的任务;core是否为核心线程的标识
retry:
//外层自循环实时获取线程状态数量和是否需要添加工作任务线程
for (;;) {
//获取线程池的状态和线程池线程的数量
int c = ctl.get();
//单独获取线程池的状态
int rs = runStateOf(c);
//检查队列是否只在必要时为空
if (rs >= SHUTDOWN && //线程池的状态是SHUTDOWN、STOP、TIDYING、TERMINATED
! (rs == SHUTDOWN && // 可以看做是rs!=SHUTDOWN,线程池状态为STOP、TIDYING、TERMINATED
firstTask == null && //可以看做firstTask!=null,并且rs=SHUTDOWN
!workQueue.isEmpty())) //可以看做rs=SHUTDOWN,并且workQueue.isEmpty()队列空
return false;
//自循环CAS增加线程池中线程的个数
for (;;) {
//获取线程池中线程个数
int wc = workerCountOf(c);
//如果线程池线程数量超过最大线程池数量,则直接返回
if (wc >= CAPACITY ||
//如果指定使用corePoolSize作为限制则使用corePoolSize,反之使用maximumPoolSize,最为工作线程最大线程线程数量,如果工作线程大于相应的线程数量则直接返回。
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加线程池中工作线程的数量到线程池的控制变量ctl
if (compareAndIncrementWorkerCount(c))
//跳出增加线程池数量。
break retry;
//如果修改失败,则重新获取线程池的状态和线程数量
c = ctl.get(); // Re-read ctl
//如果最新的线程池状态和原有线程池状态不一样时,则跳转到外层retry中,否则在内层循环重新进行CAS
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//工作线程是否开始启动标志
boolean workerStarted = false;
//工作线程添加到线程池成功与否标志
boolean workerAdded = false;
Worker w = null;
try {
//创建一个Worker对象
w = new Worker(firstTask);
//获取worker中的线程,这里线程是通过ThreadFactory线程工厂创建出来的。用来执行任务的线程
final Thread t = w.thread;
//判断线程是否为空,避免空指针
if (t != null) {
//添加独占锁,为添加worker进行同步操作,防止其他线程同时进行execute方法。
//此锁是基于AQS实现的轻量级锁。具有可重入排他的特点
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//上锁,保证对线程共享变量的操作是线程安全的
try {
//获取线程池的状态
int rs = runStateOf(ctl.get());
//如果线程池状态为RUNNING或者是线程池状态为SHUTDOWN并且第一个任务为空时,当线程池状态为SHUTDOWN时,是不允许添加新任务的,所以他会从队列中获取任务。
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 当前执行任务线程已经启动抛出异常
throw new IllegalThreadStateException();
//添加worker到set集合中
workers.add(w);
int s = workers.size();
//跟踪线程池出现的最大的线程池数量[0,maximumPoolSize]是一个状态变量
if (s > largestPoolSize)
largestPoolSize = s;//更新当前线程池出现的最大线程数
//设置添加worker成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加worker成功就启动任务
if (workerAdded) {
t.start();//启动worker的工作线程
workerStarted = true;//设置线程是否已经启动的标识为true
}
}
} finally {
//如果没有启动,w不为空就已出worker,并且线程池数量进行减少。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
小结:线程池的状态是SHUTDOWN、STOP、TIDYING、TERMINATED
当线程池状态是STOP、TIDYING、TERMINATED时,这些状态的时候不需要进行线程的添加和启动操作,因为如果是上面的状态,其实线程池的线程正在进行销毁操作,意味着线程调用了shutdownNow等方法。
如果线程池状态为SHUTDOWN并且第一个任务不为空时,不接受新的任务,直接返回false,也就是说SHUTDOWN的状态,不会接受新任务,只会针对队列中未完成的任务进行操作。
当线线程池状态为SHUTDOWN并且队列为空时,直接返回不进行任务添加。
上半部分分为内外两个循环,外循环对线程池状态的判断,用于判断是否需要添加工作任务线程,后面内循环则是通过CAS操作增加线程数,增加依靠的标准:如果指定了core参数为true,
代表线程池中线程的数量没有超过corePoolSize,当指定为false时,代表线程池中线程数量达到了corePoolSize,并且队列已经满了,或者是SynchronousQueue这种无空间的队列,但是还没有达到最大的线程池maximumPoolSize,所以它内部会根据指定的core参数来判断是否已经超过了最大的限制,
如果超过了就不能进行添加线程了,并且进行拒绝策略,如果没有超过就增加线程数量
work内部类解析
Worker是一个实现了AQS的锁,它是一个不可重入的独占非公平锁,并且他也实现了Runnable接口,实现了run方法,在构造函数中将AQS的state设置为-1表示不会被中断
在线程池中shutdown方法会判断能否争抢到锁,如果可以获得锁则对线程进行中断操作,如果调用了shutdownNow它会判断锁的状态state>=0才会被中断:interruptIfStarted()
为了避免线程还没有进入runWorker方法前,就调用了shutdown或shutdownNow方法被中断,设置为-1则不会被中断。
每次都是0->1,保证了锁的不可重入性
后面我们看到run方法,它调用的是ThreadPoolExecutor的runWorker方法,
在addWorker方法中,添加worker到HashSet中后,他会将workerAdded设置为true,代表添加worker成功
worker启动成功执行顺序:
创建时传递worker作为Runnable—>调用worker对象线程的run方法–>runWorker()–>调用FirstTask的run方法执行fistTask任务即用户自己的任务代码
worker中AQS作用:
Worker继承了AbstractQueuedSynchronizer,主要目的有两个:
-
将锁的粒度细化到每个工Worker。 如果多个Worker使用同一个锁,那么一个Worker
Running持有锁的时候,其他Worker就无法执行,这显然是不合理的。 2 直接使用CAS获取,避免阻塞。
-
如果这个锁使用阻塞获取,那么在多Worker的情况下执行shutDown。如果这个Worker此时正在Running无法获取到锁,那么执行shutDown()线程就会阻塞住了,显然是不合理的。
防止去中断正在运行中的任务,只会中断在等待从任务队列中获取任务的线程。
worker中runnable的作用:为了复用线程,减少创建线程带来的性能消耗
worker在线程池的具体作用
worker出现在以下方法:
//checkShutdownAccess:检查线程池是否关闭
//interruptWorkers:中断工作线程
//interruptIdleWorkers:中断闲着的线程
//addWorker:添加工作线程
//addWorkerFailed:添加线程失败的方法
//progressWorkerExit:工作线程退出的时候清理方法,钩子函数
//getPoolSize:获取当前线程的大小
//getActiveCount:
//getTaskCount:
//getCompletedTaskCount
举例: 现有3个线程T1(addWorker:添加线程) t2 (addWorker:添加线程) t3(getTaskCount)
Worker 位于主内存即共享内存
t1 t2 t3 启动后会将Worker的副本从共享内存拷贝的自己的线程内存进行操作 t1 addWorker后修改了worker值之后会同步到主内存
t1 addWorker后修改了worker值之后也会同步到主内存 但是t2并没有拿worker进行其他处理,比如判断worker是否为空,打印woker里的值的操作逻辑
换句话说t2不依赖t1,同样t3不依赖t2 t1 所以以上三个线程的操作worker没有依赖性,
如果t1添加了任务,t2拿worker进行判断,比如判断worker是否为空,打印worker等这就导致t1 t2具有依赖关系,因为t1的添加影响了t2为空的判断,但t2拷贝的副本值仍然没有改变,所以我们就需要保证worker对t2具有可见性,让其感知到主内存的值已经改变,让t2放弃
自己工作内存的值,从共享内存获取新值进行逻辑判断等操作
小结:两个线程之间相互依赖共享变量,一个线程改变了共享变量的值,另一个需要拿共享变量进行判断等需要感知共享变量的改变,就需要保证共享变量的在不同线程内存下的可见性。否则即使是共享变量也可以不需要保证其可见性
runWorker()方法–执行worker任务线程即执行用户代码逻辑
final void runWorker(Worker w) {
//调用者也就是Worker中的线程
Thread wt = Thread.currentThread();
//获取Worker中的第一个任务
Runnable task = w.firstTask;
//将Worker中的任务清除代表执行了第一个任务了,后面如果再有任务就从队列中获取。
w.firstTask = null;
// 在new Worker的时候将AQS的state状态设置为-1,这里先进行解锁操作,将state设置为0,表示允许中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环进行获取任务,如果第一任务(即提交到核心线程任务和非核心线程的任务)不为空,否则,从任务队列中获取任务,如果有任务则返回获取的任务信息,如果没有任务可以获取则进行阻塞,阻塞也分两种第一种是阻塞直到任务队列中有内容,第二种是阻塞队列一定时间之后还是没有任务就直接返回null。
while (task != null || (task = getTask()) != null) {
//先获取worker的独占锁,防止其他线程调用了shutdown方法。
w.lock();
// 条件1:线程池状态>=STOP,即STOP或TIDYING,TERMINATED
//条件2:如果条件一失效则意味着判断线程池状态=STOP(以消除该瞬间shutdown方法生效,使线程池处于STOP或TERMINATED),
//条件1与条件2任意满意一个,且wt不是中断状态,则中断wt,否则进入下一步
//确保只有在线程处于stop状态且wt未中断时,wt才会被设置中断标识
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//当前线程调用interrupt()中断
try {
//执行任务之前做一些操作,可进行自定义
beforeExecute(wt, task);
Throwable thrown = null;
try {
//运行任务在这里喽。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务之后做一些操作,可进行自定义
afterExecute(task, thrown);
}
} finally {
//将任务清空为了下次任务获取
task = null;
//统计当前Worker完成了多少任务
w.completedTasks++;
//独占锁释放
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理Worker的退出操作,执行清理工作。
processWorkerExit(w, completedAbruptly);
}
}
小结:从Worker中获取firstTask任务来执行,然后执行成功后,它会getTask()来从队列中获取任务
getTask()方法–从任务队列获取任务
private Runnable getTask() {
boolean timedOut = false; //poll获取超时
for (;;) {
//获取线程池的状态和线程数量
int c = ctl.get();
//获取线程池的状态
int rs = runStateOf(c);
//线程池状态大于等于SHUTDOWN
//1.线程池如果是大于STOP的话减少工作线程池数量
//2.如果线程池状态为SHUTDOW并且队列为空时,代表队列任务已经执行完,返回null,线程数量减少1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取线程池数量。
int wc = workerCountOf(c);
//如果allowCoreThreadTimeOut为true,则空闲线程在一定时间未获得任务会清除
//或者如果线程数量大于corePoolSize(非核心线程)的时候会进行清除空闲线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 两个条件全部为true,则通过CAS使工作线程数-1,即剔除工作线程
// 条件1:工作线程数大于maximumPoolSize,或(工作线程阻塞时间受限且上次在任务队列拉取任务超时)
// 条件2:wc > 1或任务队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果timed代表的是清除空闲线程的意思
// 如果工作线程阻塞时间受限,则使用poll(),否则使用take()
// poll()设定阻塞时间,而take()无时间限制,直到拿到结果为止
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //等待一段时间如果没有获取到返回null。
workQueue.take(); //阻塞当前线程
//如果队列中获取到内容则返回
if (r != null)
return r;
// 没能获取到任务,则将最近获取任务是否超时设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 响应中断,进入下一次循环前将最近获取任务超时状态置为false
timedOut = false;
}
}
}
小结:
工作线程调用getTask从队列中进行获取任务。
如果指定了allowCoreThreadTimeOut或线程池线程数量大于corePoolSize则进行清除空闲多余的线程,调用阻塞队列的poll方法,在指定时间内如果没有获取到任务直接返回false。
如果线程池中线程池数量小于corePoolSize或者allowCoreThreadTimeOut为false默认值,则进行阻塞线程从队列中获取任务,直到队列有任务唤醒线程。
processWorkerExit()–任务完成的清理工作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果突然完成则调整线程数量
decrementWorkerCount(); // 减少线程数量1
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //获取锁,同时只有一个线程获得锁
try {
completedTaskCount += w.completedTasks; //统计整个线程池完成的数量
workers.remove(w); //将完成任务的worker从HashSet中移除
} finally {
mainLock.unlock();
}
//尝试设置线程池状态为TERMINATED
//1.如果线程池状态为SHUTDOWN并且线程池线程数量与工作队列为空时,修改状态。
//2.如果线程池状态为STOP并且线程池线程数量为空时,修改状态。
tryTerminate();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 如果线程池的状态小于STOP,也就是SHUTDOWN或RUNNING状态
if (runStateLessThan(c, STOP)) {
//如果不是突然完成,也就是正常结束
if (!completedAbruptly) {
//如果指定allowCoreThreadTimeOut=true(默认false)则代表线程池中有空余线程时需要进行清理操作,否则线程池中的线程应该保持corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//这里判断如果线程池中队列为空并且线程数量最小为0时,将最小值调整为1,因为队列中还有任务没有完成需要增加队列,所以这里增加了一个线程。
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//如果当前线程数小于核心个数,就增加一个Worker,实现线程的复用
addWorker(null, false);
}
小结:
如果线程数超过核心线程数后,在runWorker中就不会等待队列中的消息,而是会进行清除操作,上面的清除是先对线程池的数量进行较少操作,其次是统计整个线程池中完成任务的数量
从任务worker的hashSet移除完成的任务,尝试修改线程池的状态
如果线程状态小于stop则增加worker实现线程的复用。如果线程是正常结束则维持线程池中最小线程为核心线程数
tryTerminate()方法–尝试将线程池状态改为terminate
final void tryTerminate() {
for (;;) {
// 获取线程池的状态和线程池的数量组合状态
int c = ctl.get();
//1.如果线程池状态STOP则不进入if语句
//2.如果线程池状态为SHUTDOWN并且工作队列为空时,不进入if语句
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果线程池数量不为空时,进行中断操作。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//修改状态为TIDYING,并且将线程池的数量进行清空
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//执行一些逻辑,默认是空的
terminated();
} finally {
//修改状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用awaitTermination方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
小结:
修改线程池的状态由SHUTDOWN->TIDYING->TERMINATED或者是由STOP->TIDYING->TERMINATED,修改线程池状态为TERMINATED,
需要有两个条件:
1当线程池线程数量和工作队列为空,并且线程池的状态为SHUTDOWN时,才会将状态进行修改,修改的过程是SHUTDOWN->TIDYING->TERMINATED
2 当线程池的状态为STOP并且线程池数量为空时,才会尝试修改状态,修改过程是STOP->TIDYING->TERMINATED
TERMINATED状态,还需要调用条件变量termination的signalAll()方法来唤醒所有因为调用awaitTermination方法而被阻塞的线程,
换句话说当调用awaitTermination后,只有线程池状态变成TERMINATED才会被唤醒。
shoutDown()方法–尝试关闭线程池方法–running–>shutDown
public void shutdown() {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//权限检查
checkShutdownAccess();
//设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回
advanceRunState(SHUTDOWN);
//如果线程没有设置中断标识并且线程没有运行则设置中断标识
interruptIdleWorkers();
//空的可以实现的内容
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试修改线程池状态为TERMINATED
tryTerminate();
}
小结:
1首先对当前线程进行权限检测,查看是否设置了安全管理器,如果设置了则要看当前调用shutdown的线程有没有权限都关闭线程的权限,如果有权限还要看是否有中断工作现成的权限,如果没有权限则抛出SecurityException或NullPointException异常。
2设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回
3尝试获取锁如果获得锁成功则设置中断标识,活跃正在执行任务的线程并没有设置中断标识,直到将任务全部执行完后才会逐步清理线程操作
4尝试修改线程池状态为TERMINATED
5:调用shutDown()方法不是结束线程池,而是将线程的状态从Running–>shutDown状态;然后调用tryTerminate()
尝试修改线程池状态为TERMINATED,但必须满足当线程池线程数量和工作队列为空才会修改为SHUTDOWN->TIDYING->TERMINATED
我们使用shutdown方法关闭线程池时,一定要确保任务里不会有永久阻塞等待的逻辑,否则线程池就关闭不了
⼀定要记得shutdownNow和shutdown调用完,线程池并不是⽴刻就关闭了,要想等待线程池关闭,还需调用awaitTermination⽅法来阻塞等待线程池结束.
shoutDownNow()方法----强制关闭线程池方法–running–>stop或者shutDown–>stop
public List shutdownNow() {
List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//权限检查
checkShutdownAccess();
//设置线程池状态为STOP,如果状态已经是大于等于STOP则直接返回
advanceRunState(STOP);
//这里是和SHUTDOWN区别的地方,这里是强制进行中断操作
interruptWorkers();
//将为完成任务复制到list集合中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试修改线程池状态为TERMINATED
tryTerminate();
return tasks;
}
小结: 1 设置线程池的状态为stop
2 强制中断任务–中断的前提条件是AQS的state状态必须大于等于0,如果状态为-1的则不会被中断,但是如果任务运行起来的时候在runWorker中则不会执行任务
3 尝试修改线程池状态为TERMINATED–并且线程池数量为空时,才会尝试修改状态,
4 返回未完成的任务列表
线程池原理小结
- 主线程进行线程池的调用,线程池执行execute方法
- 线程池通过addWorker进行创建线程,并将线程放入到线程池中,将任务提交到核心线程中,其实线程池内部不分核心线程和非核心线程,只是根据corePoolSize和maximumPoolSize设置的大小来进行区分,因为超过corePoolSize的线程会被回收,至于回收那些线程,是根据线程获取任务的时候进行判断,当前线程池数量大于corePoolSize,或者指定了allowCoreThreadTimeOut为true,则他等待一定时间后会返回,不会一直等待
- 当线程池的数量达到corePoolSize时,线程池首先会将任务添加到队列中
- 当队列中任务也达到了队列设置的最大值时,它会创建新的线程,注意的是此时的线程数量已经超过了corePoolSize,但是没有达到maximumPoolSize最大值。
- 当线程池的线程数量达到了maximumPoolSize,则会相应拒绝策略。
异步回调 Future
异步回调 Future
任务调度线程池
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但
由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个
任务的延迟或异常都将会影响到之后的任务。
TImer类-出现异常会停止运行,一个任务过长会影响另一个任务的执行
public class TimerTest {
public static void main(String[] args) {
Timer t = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
System.out.println("task1");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.schedule(task,500,1000);//延迟0.5s,每1s执行一次
TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println("task2");
}
};
t.schedule(task1,500,1000);//延迟0.5s,每1s执行一次
}
}
定时任务
newScheduledThreadPool 中的 scheduleAtFixedRate 这个方法可以执行定时任务
scheduleAtFixedRate --任务的执行时间长短会影响延时执行
scheduleWithFixedDelay – 上一次任务结束后还是延时执行
不会受异常影响
public class ScheduleExecutorTest {
public static void main(String[] args) {
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
System.out.println(now);
// 获取每周四晚时间
LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
if(now.compareTo(time) > 0) {
time = time.plusWeeks(1);
}
long initialDelay = Duration.between(now, time).toMillis();
// 一周的时间
long period = 1000 * 60 * 60 * 24 * 7;
// initalDelay 表示当前时间与周四的时间差, period 一周的间隔时间。
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 创建一个定时任务, 每周四 18:00:00 执行。
executorService.scheduleAtFixedRate(() -> {
System.out.println("running");
}, initialDelay, period, TimeUnit.MILLISECONDS);
}
}



