衔接上一章 【编发编程四:Java中的线程池(1)】
- 1.6一些开源项目实现的拒绝策略
- 1.7线程池的实现原理
- 1.8线程池的底层源码实现
- 1.8.1十进制、二进制、八进制、十六进制
- 1.8.2线程池源码-构造方法
- 1.8.4线程池源码-线程池状态值
- (1) RUNNING = -1 << 29
- (2) SHUTDOWN = 0 << 29
- (3) STOP = 1 << 29
- (4) TIDYING = 2 << 29
- (5) TERMINATED = 3 << 29
- 1.8.5线程池源码-拆解线程状态和线程池工作线程数
- 1.8.6线程池源码-拆解线程状态和线程池工作线程数-ctlOf
- 1.8.7线程池源码-拆解线程状态和线程池工作线程数-runStateOf
- 1.8.8线程池源码-拆解线程状态和线程池工作线程数-workerCountOf
- 1.8.9线程池源码-线程池状态和工作线程数为什么是一个变量?
- ctl 是线程池中的非常重要的成员变量,它代表两个含义;
- 它是如何解决共享变量线程安全问题的?
- 1.8.10线程池源码-核心源码解读-execute方法
- 1.8.111.8.11线程池源码-核心源码解读-addWorker方法
- 1.8.12线程池源码-核心源码解读-runWorker方法
- 1.8.13线程池源码-核心源码解读-getTask方法
- 1.8.14线程池源码-核心源码解读-线程池复用
- 1.8.15线程池源码-核心源码解读-线程池大小变化
- 1.8.16线程池源码-核心源码解读-画图总结
org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport
当dubbo的工作线程触发了线程拒绝策略后,为了能尽量让使用者清楚触发线程拒绝策略的真实原因,在拒绝策略中它做了4件事情:
- 1、输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息,日志打印非常详细,让开发和运维更容易容易定位到问题所在;
- 2、输出当前线程堆栈详情,将发生拒绝策略时的现场情况dump线程上下文信息到一个文件中,助你发现问题所在;
- 3、发送事件;
- 4、抛出拒绝执行异常,使本次任务失败,这个使用的是JDK默认拒绝策略的异常;
org.elasticsearch.common.util.concurrent.EsAbortPolicy
org.elasticsearch.common.util.concurrent.EsExecutors.ForceQueuePolicy
io.netty.util.concurrent.RejectedExecutionHandlers
稍微总结下:
AbortPolicy异常中止策略:异常中止,无特殊场景;
DiscardPolicy丢弃策略:无关紧要的任务(文章点击量、商品浏览量等);
DiscardOldestPolicy弃老策略:允许丢掉老数据的场景;
CallerRunsPolicy调用者运行策略:不允许失败场景(对性能要求不高、并发量较小);
进制是为了计数的需要而产生的;10 11
日常中我们用的最多的是十进制(0 - 9),人有10个指头;(系绳法)
计算机使用的是二进制(0,1),计算机只认识0和1,其它的都不认识,不管是十进制、八进制、还是十六进制,计算机在处理的时候都会转化为二进制,不仅如此,即使是色彩斑斓的图片、视频、声音等,想让计算机识别,都需要转化为二进制,所以在计算机中,只有0和1;
虽然计算机只认识0和1,但是在编写程序的时候如果使用二进制来表示数字书写就太长了,比如十进制数字100,用二进制表示为:01100100?,这样不利于程序员阅读,所以又延伸出了八进制和十六进制,比如十进制数值100,用八进制表示为0144,用十六进制表示为0x64;
八进制和十六进制缩短了二进制数字的长度,又容易识别,也容易在各进制之间相互转化,所以八进制、十六进制也会在计算机中采用;
十六进制以0x或0X开头
十进制直接写数字
八进制以0(零)开头
二进制以0b或0B开头
1.8.3线程池源码-控制变量
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; COUNT_BITS = 29 CAPACITY = (1 << COUNT_BITS) - 1 int 类型的数是占用4字节,32位,所以前面填了一堆0; 原码:00000000 00000000 00000000 00000001 左移:00100000 00000000 00000000 00000000 减一:00011111 11111111 11111111 11111111 (536870911 = 5亿多)1.8.4线程池源码-线程池状态值
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; RUNNING = -1 << COUNT_BITS;(1) RUNNING = -1 << 29
在计算机中,负数以其正数的补码形式表示; 原码:一个整数,按照绝对值大小转换成的二进制数,称为原码; 1的原码:00000000 00000000 00000000 00000001; 反码:将二进制数按位取反,所得的新二进制数称为原二进制数的反码; 1的反码:11111111 11111111 11111111 11111110 称:11111111 11111111 11111111 11111110是00000000 00000000 00000000 00000001的反码,反码是相互的,所以也可称这两个数互为反码; 补码:反码加1称为补码; 1的补码:11111111 11111111 11111111 11111110 + 1 = 11111111 11111111 11111111 11111111 所以计算 RUNNING = -1 << 29 原码:00000000 00000000 00000000 00000001 反码:11111111 11111111 11111111 11111110(按位取反) 补码:11111111 11111111 11111111 11111111(反码+1) 左移:11100000 00000000 00000000 00000000
SHUTDOWN = 0 << COUNT_BITS;
(2) SHUTDOWN = 0 << 29初始值:00000000 00000000 00000000 00000000 左移后:00000000 00000000 00000000 00000000
STOP = 1 << COUNT_BITS;
(3) STOP = 1 << 29初始值:00000000 00000000 00000000 00000001 左移后:00100000 00000000 00000000 00000000
TIDYING = 2 << COUNT_BITS;
(4) TIDYING = 2 << 29初始值:00000000 00000000 00000000 00000010 左移后:01000000 00000000 00000000 00000000
TERMINATED = 3 << COUNT_BITS;
(5) TERMINATED = 3 << 29初始值:00000000 00000000 00000000 00000011 左移后:01100000 00000000 00000000 000000001.8.5线程池源码-拆解线程状态和线程池工作线程数
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
1.8.6线程池源码-拆解线程状态和线程池工作线程数-ctlOf
先看private static int ctlOf(int rs, int wc) { return rs | wc; }
比如 ctlOf(RUNNING, 0)
RUNNING | 0
11100000 00000000 00000000 00000000
|
00000000 00000000 00000000 00000000
=
11100000 00000000 00000000 00000000
注:
&同为1时为1,否则为0;
|只要一个为1就为1,否则为0
------------------------------------------------------------------------
比如 ctlOf(RUNNING, 1)
RUNNING | 1
11100000 00000000 00000000 00000000
|
00000000 00000000 00000000 00000001
=
11100000 00000000 00000000 00000001
1.8.7线程池源码-拆解线程状态和线程池工作线程数-runStateOf
计算线程状态的方法
计算线程状态的方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
runStateOf(ctl.get())
ctl.get() & ~CAPACITY
ctlOf(RUNNING, 0) & ~CAPACITY
11100000 00000000 00000000 00000000
&
(
原始00011111 11111111 11111111 11111111
取反11100000 00000000 00000000 00000000
)
=11100000 00000000 00000000 00000000 (RUNNING)
------------------------------------------------------------------------------
如果是1:
runStateOf(ctl.get())
ctl.get() & ~CAPACITY
ctlOf(TIDYING, 1) & ~CAPACITY
ctlOf(TIDYING, 1)
TIDYING | 1
01000000 00000000 00000000 00000000
|
00000000 00000000 00000000 00000001
=
01000000 00000000 00000000 00000001
&
11100000 00000000 00000000 00000000
=
01000000 00000000 00000000 00000000 (TIDYING)
1.8.8线程池源码-拆解线程状态和线程池工作线程数-workerCountOf
计算线程工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
workerCountOf(ctl.get())
ctl.get() & CAPACITY
ctlOf(RUNNING, 0) & CAPACITY
11100000 00000000 00000000 00000000
&
00011111 11111111 11111111 11111111
=
00000000 00000000 00000000 00000000
-------------------------------------------------------------------------------
如果是1:
ctlOf(RUNNING, 1) & CAPACITY
11100000 00000000 00000000 00000001
&
00011111 11111111 11111111 11111111
=
00000000 00000000 00000000 00000001
整个推算过程如上所分析(如果没有看太懂,可以多看几遍)
1.8.9线程池源码-线程池状态和工作线程数为什么是一个变量?
ctl 是线程池中的非常重要的成员变量,它代表两个含义;
1、表示线程池的运行状态;
2、表示线程池的工作线程数;
为什么线程池要用一个ctl变量代表两个含义呢?
考虑共享变量,在多线程条件下存在线程安全问题;
一个变量代表两个含义就能解决线程安全问题吗?ctl变量是一个成员变量;
(1)synchronize 写多读少
(2)cas+volatile 写少读多(cas只能保证一个变量的原子性);
在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个修改而另一个没有修改的情况,如果将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的;
1.//Worker类
2.private?final?class?Worker
3. extends?AbstractQueuedSynchronizer
4. implements?Runnable?{
5.
6.
7. final?Thread?thread;
8.
9.
10. Runnable?firstTask;
11.
12. Worker(Runnable?firstTask)?{
13. setState(-1);?//?inhibit?interrupts?until?runWorker
14. this.firstTask?=?firstTask;
15. this.thread?=?getThreadFactory().newThread(this);
16.
17. //Thread?t?=?new?Thread(worker);
18. //t.start();?==>?worker.run()
19. }
20.
21. @Override
22. public?void?run()?{
23. runWorker(this);
24. }
25.
26. //.........................?省略
27.}
1.8.111.8.11线程池源码-核心源码解读-addWorker方法
1.private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{
2. retry:
3. //?外层for循环
4. for?(;;)?{
5. //?获取线程池控制变量的值
6. int?c?=?ctl.get();
7. //?获取线程池运行状态
8. int?rs?=?runStateOf(c);
9.
10. //?if判断,如果rs?>=?SHUTDOWN?并且?(判断3个条件,只要有1个不满足),返回false;
11. //?Check?if?queue?empty?only?if?necessary.
12. if?(rs?>=?SHUTDOWN?&&
13. !?(rs?==?SHUTDOWN?&&
14. ?firstTask?==?null?&&
15. ?!?workQueue.isEmpty()))
16. return?false;
17.
18. //?内存for循环
19. for?(;;)?{
20. //?获取线程池线程数
21. int?wc?=?workerCountOf(c);
22. //?如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
23. //?core为addWorker方法的第二个参数,若为true根据corePoolSize比较,若为false根据maximumPoolSize比较;
24. if?(wc?>=?CAPACITY?||
25. wc?>=?(core ?corePoolSize?:?maximumPoolSize))
26. return?false;
27. //?尝试增加workerCount,如果成功,则跳出外层for循环
28. if?(compareAndIncrementWorkerCount(c))
29. break?retry;
30. //?如果增加workerCount失败,则重新获取控制变量ctl的值
31. c?=?ctl.get(); //?Re-read?ctl
32. //?如果当前线程池的运行状态不等于rs,说明线程池运行状态已被改变,返回外层for循环继续执行
33. if?(runStateOf(c)?!=?rs)
34. continue?retry;
35. //?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop
36. }
37. }
38.
39. //?Worker线程是否启动
40. boolean?workerStarted?=?false;
41. //?Worker线程是否添加
42. boolean?workerAdded?=?false;
43. Worker?w?=?null;
44. try?{
45. //?根据firstTask来创建Worker对象
46. w?=?new?Worker(firstTask);
47. //?每一个Worker对象都会创建一个线程
48. final?Thread?t?=?w.thread;
49. if?(t?!=?null)?{
50. final?ReentrantLock?mainLock?=?this.mainLock;
51. mainLock.lock();
52. try?{
53. //?检查线程池运行状态
54. //?Recheck?while?holding?lock.
55. //?Back?out?on?ThreadFactory?failure?or?if
56. //?shut?down?before?lock?acquired.
57. int?rs?=?runStateOf(ctl.get());
58. //?rs?
63. //?检查线程已经是运行状态,抛出非法线程状态异常
64. if?(t.isAlive())?//?precheck?that?t?is?startable
65. throw?new?IllegalThreadStateException();
66. //?workers是一个HashSet
67. workers.add(w);
68. int?s?=?workers.size();
69. //?largestPoolSize记录着线程池中出现过的最大线程数量
70. if?(s?>?largestPoolSize)
71. //?把历史上出现过的最大线程数的值更新一下
72. largestPoolSize?=?s;
73. //?Worker线程添加成功
74. workerAdded?=?true;
75. }
76. }?finally?{
77. //?释放ReentrantLock锁
78. mainLock.unlock();
79. }
80. if?(workerAdded)?{
81. //?启动线程
82. t.start();
83. //?Worker线程已经启动
84. workerStarted?=?true;
85. }
86. }
87. }?finally?{
88. //?Worker线程没有启动成功
89. if?(!?workerStarted)
90. addWorkerFailed(w);
91. }
92. //?返回Worker线程是否启动成功
93. return?workerStarted;
94.}
1.//Worker类
2.private?final?class?Worker
3. extends?AbstractQueuedSynchronizer
4. implements?Runnable?{
5.
6.
7. final?Thread?thread;
8.
9.
10. Runnable?firstTask;
11.
12. Worker(Runnable?firstTask)?{
13. setState(-1);?//?inhibit?interrupts?until?runWorker
14. this.firstTask?=?firstTask;
15. this.thread?=?getThreadFactory().newThread(this);
16.
17. //Thread?t?=?new?Thread(worker);
18. //t.start();?==>?worker.run()
19. }
20.
21. @Override
22. public?void?run()?{
23. runWorker(this);
24. }
25.
26. //.........................?省略
27.}
1.8.12线程池源码-核心源码解读-runWorker方法
1.final?void?runWorker(Worker?w){
2.
3. Thread?wt?=?Thread.currentThread();
4. Runnable?task?=?w.firstTask;
5. w.firstTask?=?null;
6.
7. //允许响应中断
8. w.unlock();
9. //?线程退出的原因,true是任务导致,false是线程正常退出
10. boolean?completedAbruptly?=?true;
11.
12. try{
13. //?当前任务为空,且当前任务队列为空,停止循环
14. while?(task?!=?null?||?(task?=?getTask())?!=?null)?{
15.
16. //?上锁处理并发问题,防止在shutdown()时终止正在运行的worker
17. w.lock();
18.
19. //?如果线程池是stop状态,并且线程没有被中断,就要确保线程被中断,如果线程池不是,确保线程池没有被中断;
20. //?清除当前线程的中断标志,做一个recheck来应对shutdownNow方法;
21. if?((runStateAtLeast(ctl.get(),STOP)?||?(Thread.interrupted()?&&?runStateAtLeast(ctl.get(),STOP)))?&&?!wt.isInterrupted())
22. wt.interrupt();
23. try?{
24. //?执行前(空方法,由子类重写实现)
25. beforeExecute(wt,?task);
26. Throwable?thrown?=?null;
27. try?{
28. //?执行Runnable类的run()方法
29. task.run();
30. }?catch?(RuntimeException?x)?{
31. thrown?=?x;?throw?x;
32. }?catch?(Error?x)?{
33. thrown?=?x;?throw?x;
34. }?catch?(Throwable?x)?{
35. thrown?=?x;?throw?new?Error(x);
36. }?finally?{
37. //?执行后(空方法,由子类重写实现)
38. afterExecute(task,?thrown);
39. }
40. }?finally?{
41. task?=?null;
42. //?完成的任务数+1
43. w.completedTasks++;
44. //?释放锁
45. w.unlock();
46. }
47. }
48. //?到此,线程是正常退出
49. completedAbruptly?=?false;
50. }?finally?{
51. //?处理worker的退出
52. processWorkerExit(w,completedAbruptly);
53. }
54.}
1.8.13线程池源码-核心源码解读-getTask方法
1.private?Runnable?getTask()?{
2.
3. //?表示上一次从任务队列中取任务时是否超时
4. boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out ?
5. for?(;;)?{
6. int?c?=?ctl.get();
7. int?rs?=?runStateOf(c);
8.
9. //?Check?if?queue?empty?only?if?necessary.
10.
16. if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{
17. decrementWorkerCount();
18. return?null;
19. }
20.
21. //线程数
22. int?wc?=?workerCountOf(c);
23. //?Are?workers?subject?to?culling ?
24.
30. boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;
31.
32.
38. if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))?&&?(wc?>?1?||?workQueue.isEmpty()))?{
39. if?(compareAndDecrementWorkerCount(c))
40. return?null;
41. continue;
42. }
43. try?{
44.
51. Runnable?r?=?timed ?workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:?workQueue.take();
52. //?r不为空,则返回该Runnable
53. if?(r?!=?null) ?
54. return?r;
55. //?如果?r?==?null,说明已经超时得不到任务,timedOut设置为true
56. timedOut?=?true;
57. }?catch?(InterruptedException?retry)?{
58. //?如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
59. timedOut?=?false;
60. }
61. }
}?
1.8.14线程池源码-核心源码解读-线程池复用
1、threadPoolExecutor.execute(runnable) 2、addWorker(command, boolean) 3、Worker w = new Worker(firstTask); //已经创建了Thread 4、HashSet workers.add(w); 5、t.start(); //w.thread.start(); 6、worker.run(); 7、runWorker(this) 8、task = w.firstTask 或者 task = getTask() 9、task.run();
线程的复用是通过while循环实现的,worker会首先获取当前的firstTask进行run,然后不停地循环从等待队列中获取新的任务task,如果有新任务则直接调用task的run方法,不会再去新建一个线程,从而实现线程复用;
1.8.15线程池源码-核心源码解读-线程池大小变化1.private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{
2.
3. //completedAbruptly为true表示线程异常执行结束
4. //completedAbruptly为false表示线程正常执行结束
5. if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted
6. decrementWorkerCount();
7.
8. //从线程set集合中移除工作线程,该过程需要加锁,因为HashSet是线程不安全的集合
9. final?ReentrantLock?mainLock?=?this.mainLock;
10. mainLock.lock();
11. try?{
12. //统计完成的任务数:将该worker已完成的任务数追加到线程池已完成的任务数
13. completedTaskCount?+=?w.completedTasks;
14. //从HashSet中移除该worker
15. workers.remove(w);
16. }?finally?{
17. //释放锁
18. mainLock.unlock();
19. }
20. //根据线程池状态进行判断是否结束线程池
21. tryTerminate();
22.
23. int?c?=?ctl.get();
24. //当线程池是RUNNING或SHUTDOWN状态时
25. if?(runStateLessThan(c,?STOP))?{
26. //如果worker不是异常结束:
27. if?(!completedAbruptly)?{
28. //如果allowCoreThreadTimeOut=true,最小线程个数就可以变为0;
29. int?min?=?allowCoreThreadTimeOut ?0?:?corePoolSize;
30. //但是,如果等待队列有任务,至少保留一个worker来处理任务;
31. if?(min?==?0?&&?!?workQueue.isEmpty())
32. min?=?1;
33. //如果工作线程大于等于核心线程,直接return就行了,否则就需要添加一个线程;
34. if?(workerCountOf(c)?>=?min)
35. return;?//?replacement?not?needed
36. }
37. //是异常执行结束的,添加一个线程去执行任务
38. addWorker(null,?false);
39. }
40.}
1.8.16线程池源码-核心源码解读-画图总结
【衔接下一章【并发编程六:Java中的线程池(3)-线程池的应用】】



