栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【并发编程五:Java中的线程池(2)-线程池的实现原理】

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【并发编程五:Java中的线程池(2)-线程池的实现原理】

衔接上一章 【编发编程四:Java中的线程池(1)】

学习路线
    • 1.6一些开源项目实现的拒绝策略
    • 1.7线程池的实现原理
    • 1.8线程池的底层源码实现
      • 1.8.1十进制、二进制、八进制、十六进制
      • 1.8.2线程池源码-构造方法![在这里插入图片描述](https://img-blog.csdnimg.cn/ce3ee63d509e4b07981d7994d9f8be77.png)
      • 1.8.4线程池源码-线程池状态值
        • (1) RUNNING = -1 << 29
        • (2) SHUTDOWN = 0 << 29
        • (3) STOP = 1 << 29
        • (4) TIDYING = 2 << 29
        • (5) TERMINATED = 3 << 29
      • 1.8.5线程池源码-拆解线程状态和线程池工作线程数
      • 1.8.6线程池源码-拆解线程状态和线程池工作线程数-ctlOf
      • 1.8.7线程池源码-拆解线程状态和线程池工作线程数-runStateOf
      • 1.8.8线程池源码-拆解线程状态和线程池工作线程数-workerCountOf
      • 1.8.9线程池源码-线程池状态和工作线程数为什么是一个变量?
        • ctl 是线程池中的非常重要的成员变量,它代表两个含义;
        • 它是如何解决共享变量线程安全问题的?
      • 1.8.10线程池源码-核心源码解读-execute方法
      • 1.8.111.8.11线程池源码-核心源码解读-addWorker方法
      • 1.8.12线程池源码-核心源码解读-runWorker方法
      • 1.8.13线程池源码-核心源码解读-getTask方法
      • 1.8.14线程池源码-核心源码解读-线程池复用
      • 1.8.15线程池源码-核心源码解读-线程池大小变化
    • 1.8.16线程池源码-核心源码解读-画图总结

1.6一些开源项目实现的拒绝策略

org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport
当dubbo的工作线程触发了线程拒绝策略后,为了能尽量让使用者清楚触发线程拒绝策略的真实原因,在拒绝策略中它做了4件事情:

  • 1、输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息,日志打印非常详细,让开发和运维更容易容易定位到问题所在;
  • 2、输出当前线程堆栈详情,将发生拒绝策略时的现场情况dump线程上下文信息到一个文件中,助你发现问题所在;
  • 3、发送事件;
  • 4、抛出拒绝执行异常,使本次任务失败,这个使用的是JDK默认拒绝策略的异常;
    org.elasticsearch.common.util.concurrent.EsAbortPolicy
    org.elasticsearch.common.util.concurrent.EsExecutors.ForceQueuePolicy
    io.netty.util.concurrent.RejectedExecutionHandlers
    稍微总结下:
    AbortPolicy异常中止策略:异常中止,无特殊场景;
    DiscardPolicy丢弃策略:无关紧要的任务(文章点击量、商品浏览量等);
    DiscardOldestPolicy弃老策略:允许丢掉老数据的场景;
    CallerRunsPolicy调用者运行策略:不允许失败场景(对性能要求不高、并发量较小);
1.7线程池的实现原理

1.8线程池的底层源码实现 1.8.1十进制、二进制、八进制、十六进制

进制是为了计数的需要而产生的;10 11
日常中我们用的最多的是十进制(0 - 9),人有10个指头;(系绳法)
计算机使用的是二进制(0,1),计算机只认识0和1,其它的都不认识,不管是十进制、八进制、还是十六进制,计算机在处理的时候都会转化为二进制,不仅如此,即使是色彩斑斓的图片、视频、声音等,想让计算机识别,都需要转化为二进制,所以在计算机中,只有0和1;

虽然计算机只认识0和1,但是在编写程序的时候如果使用二进制来表示数字书写就太长了,比如十进制数字100,用二进制表示为:01100100?,这样不利于程序员阅读,所以又延伸出了八进制和十六进制,比如十进制数值100,用八进制表示为0144,用十六进制表示为0x64;
八进制和十六进制缩短了二进制数字的长度,又容易识别,也容易在各进制之间相互转化,所以八进制、十六进制也会在计算机中采用;

十六进制以0x或0X开头
十进制直接写数字
八进制以0(零)开头
二进制以0b或0B开头

1.8.2线程池源码-构造方法

1.8.3线程池源码-控制变量

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
COUNT_BITS = 29
CAPACITY = (1 << COUNT_BITS) - 1
int 类型的数是占用4字节,32位,所以前面填了一堆0;
原码:00000000 00000000 00000000 00000001
左移:00100000 00000000 00000000 00000000
减一:00011111 11111111 11111111 11111111 (536870911 = 5亿多)
1.8.4线程池源码-线程池状态值
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
RUNNING    = -1 << COUNT_BITS; 
(1) RUNNING = -1 << 29
在计算机中,负数以其正数的补码形式表示;
原码:一个整数,按照绝对值大小转换成的二进制数,称为原码;
1的原码:00000000 00000000 00000000 00000001;
反码:将二进制数按位取反,所得的新二进制数称为原二进制数的反码;
1的反码:11111111 11111111 11111111 11111110
称:11111111 11111111 11111111 11111110是00000000 00000000 00000000 00000001的反码,反码是相互的,所以也可称这两个数互为反码;
补码:反码加1称为补码;
1的补码:11111111 11111111 11111111 11111110 + 1 = 11111111 11111111 11111111 11111111

所以计算  RUNNING  =  -1 << 29
原码:00000000 00000000 00000000 00000001
反码:11111111 11111111 11111111 11111110(按位取反)
补码:11111111 11111111 11111111 11111111(反码+1)
左移:11100000 00000000 00000000 00000000

SHUTDOWN = 0 << COUNT_BITS;

(2) SHUTDOWN = 0 << 29
初始值:00000000 00000000 00000000 00000000
左移后:00000000 00000000 00000000 00000000

STOP = 1 << COUNT_BITS;

(3) STOP = 1 << 29
初始值:00000000 00000000 00000000 00000001
左移后:00100000 00000000 00000000 00000000

TIDYING = 2 << COUNT_BITS;

(4) TIDYING = 2 << 29
初始值:00000000 00000000 00000000 00000010
左移后:01000000 00000000 00000000 00000000

TERMINATED = 3 << COUNT_BITS;

(5) TERMINATED = 3 << 29
初始值:00000000 00000000 00000000 00000011
左移后:01100000 00000000 00000000 00000000
1.8.5线程池源码-拆解线程状态和线程池工作线程数
// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
1.8.6线程池源码-拆解线程状态和线程池工作线程数-ctlOf
先看private static int ctlOf(int rs, int wc) { return rs | wc; }
比如 ctlOf(RUNNING, 0)
RUNNING | 0
11100000 00000000 00000000 00000000 
|
00000000 00000000 00000000 00000000
=
11100000 00000000 00000000 00000000
注:
&同为1时为1,否则为0;
|只要一个为1就为1,否则为0
------------------------------------------------------------------------
比如 ctlOf(RUNNING, 1)
RUNNING | 1
11100000 00000000 00000000 00000000 
|
00000000 00000000 00000000 00000001
=
11100000 00000000 00000000 00000001
1.8.7线程池源码-拆解线程状态和线程池工作线程数-runStateOf

计算线程状态的方法

计算线程状态的方法
private static int runStateOf(int c)     { return c & ~CAPACITY; }

runStateOf(ctl.get())
ctl.get() & ~CAPACITY
ctlOf(RUNNING, 0) & ~CAPACITY

11100000 00000000 00000000 00000000
&
(
原始00011111 11111111 11111111 11111111
取反11100000 00000000 00000000 00000000
)
=11100000 00000000 00000000 00000000 (RUNNING)
------------------------------------------------------------------------------
如果是1:
runStateOf(ctl.get())
ctl.get() & ~CAPACITY
ctlOf(TIDYING, 1) & ~CAPACITY

ctlOf(TIDYING, 1)
TIDYING | 1
01000000 00000000 00000000 00000000 
|
00000000 00000000 00000000 00000001
=
01000000 00000000 00000000 00000001
&
11100000 00000000 00000000 00000000
=
01000000 00000000 00000000 00000000 (TIDYING)
1.8.8线程池源码-拆解线程状态和线程池工作线程数-workerCountOf

计算线程工作线程数

private static int workerCountOf(int c)  { return c & CAPACITY; }
workerCountOf(ctl.get())
ctl.get() & CAPACITY
ctlOf(RUNNING, 0) & CAPACITY
11100000 00000000 00000000 00000000
&
00011111 11111111 11111111 11111111
=
00000000 00000000 00000000 00000000
-------------------------------------------------------------------------------
如果是1:
ctlOf(RUNNING, 1) & CAPACITY
11100000 00000000 00000000 00000001
&
00011111 11111111 11111111 11111111
=
00000000 00000000 00000000 00000001
整个推算过程如上所分析(如果没有看太懂,可以多看几遍)
1.8.9线程池源码-线程池状态和工作线程数为什么是一个变量? ctl 是线程池中的非常重要的成员变量,它代表两个含义;

1、表示线程池的运行状态;
2、表示线程池的工作线程数;
为什么线程池要用一个ctl变量代表两个含义呢?
考虑共享变量,在多线程条件下存在线程安全问题;
一个变量代表两个含义就能解决线程安全问题吗?ctl变量是一个成员变量;

它是如何解决共享变量线程安全问题的?

(1)synchronize 写多读少
(2)cas+volatile 写少读多(cas只能保证一个变量的原子性);
在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个修改而另一个没有修改的情况,如果将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的;

1.8.10线程池源码-核心源码解读-execute方法
1.//Worker类 
2.private?final?class?Worker 
3.    extends?AbstractQueuedSynchronizer 
4.    implements?Runnable?{ 
5. 
6.   
7.  final?Thread?thread; 
8.   
9.   
10.  Runnable?firstTask; 
11.  
12.  Worker(Runnable?firstTask)?{ 
13.    setState(-1);?//?inhibit?interrupts?until?runWorker 
14.    this.firstTask?=?firstTask; 
15.    this.thread?=?getThreadFactory().newThread(this); 
16.     
17.    //Thread?t?=?new?Thread(worker); 
18.    //t.start();?==>?worker.run() 
19.  } 
20. 
21.  @Override 
22.  public?void?run()?{ 
23.    runWorker(this); 
24.  } 
25.   
26.  //.........................?省略 
27.} 

1.8.111.8.11线程池源码-核心源码解读-addWorker方法
1.private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{ 
2.  retry: 
3.  //?外层for循环 
4.  for?(;;)?{ 
5.    //?获取线程池控制变量的值 
6.    int?c?=?ctl.get(); 
7.    //?获取线程池运行状态 
8.    int?rs?=?runStateOf(c); 
9.     
10.    //?if判断,如果rs?>=?SHUTDOWN?并且?(判断3个条件,只要有1个不满足),返回false; 
11.    //?Check?if?queue?empty?only?if?necessary. 
12.    if?(rs?>=?SHUTDOWN?&& 
13.      !?(rs?==?SHUTDOWN?&& 
14.       ?firstTask?==?null?&& 
15.       ?!?workQueue.isEmpty())) 
16.      return?false; 
17.       
18.    //?内存for循环 
19.    for?(;;)?{ 
20.      //?获取线程池线程数 
21.      int?wc?=?workerCountOf(c); 
22.      //?如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; 
23.           //?core为addWorker方法的第二个参数,若为true根据corePoolSize比较,若为false根据maximumPoolSize比较; 
24.      if?(wc?>=?CAPACITY?|| 
25.        wc?>=?(core ?corePoolSize?:?maximumPoolSize)) 
26.        return?false; 
27.      //?尝试增加workerCount,如果成功,则跳出外层for循环 
28.      if?(compareAndIncrementWorkerCount(c)) 
29.        break?retry; 
30.      //?如果增加workerCount失败,则重新获取控制变量ctl的值 
31.      c?=?ctl.get(); //?Re-read?ctl 
32.      //?如果当前线程池的运行状态不等于rs,说明线程池运行状态已被改变,返回外层for循环继续执行 
33.      if?(runStateOf(c)?!=?rs) 
34.        continue?retry; 
35.      //?else?CAS?failed?due?to?workerCount?change;?retry?inner?loop 
36.    } 
37.  } 
38.   
39.  //?Worker线程是否启动 
40.  boolean?workerStarted?=?false; 
41.  //?Worker线程是否添加 
42.  boolean?workerAdded?=?false; 
43.  Worker?w?=?null; 
44.  try?{ 
45.    //?根据firstTask来创建Worker对象 
46.    w?=?new?Worker(firstTask); 
47.    //?每一个Worker对象都会创建一个线程 
48.    final?Thread?t?=?w.thread; 
49.    if?(t?!=?null)?{ 
50.      final?ReentrantLock?mainLock?=?this.mainLock; 
51.      mainLock.lock(); 
52.      try?{ 
53.        //?检查线程池运行状态 
54.        //?Recheck?while?holding?lock. 
55.        //?Back?out?on?ThreadFactory?failure?or?if 
56.        //?shut?down?before?lock?acquired. 
57.        int?rs?=?runStateOf(ctl.get()); 
58.        //?rs? 
63.          //?检查线程已经是运行状态,抛出非法线程状态异常 
64.          if?(t.isAlive())?//?precheck?that?t?is?startable 
65.            throw?new?IllegalThreadStateException(); 
66.          //?workers是一个HashSet 
67.          workers.add(w); 
68.          int?s?=?workers.size(); 
69.          //?largestPoolSize记录着线程池中出现过的最大线程数量 
70.          if?(s?>?largestPoolSize) 
71.            //?把历史上出现过的最大线程数的值更新一下 
72.            largestPoolSize?=?s; 
73.          //?Worker线程添加成功 
74.          workerAdded?=?true; 
75.        } 
76.      }?finally?{ 
77.        //?释放ReentrantLock锁 
78.        mainLock.unlock(); 
79.      } 
80.      if?(workerAdded)?{ 
81.        //?启动线程 
82.        t.start(); 
83.        //?Worker线程已经启动 
84.        workerStarted?=?true; 
85.      } 
86.    } 
87.  }?finally?{ 
88.    //?Worker线程没有启动成功 
89.    if?(!?workerStarted) 
90.      addWorkerFailed(w); 
91.  } 
92.  //?返回Worker线程是否启动成功 
93.  return?workerStarted; 
94.} 

1.//Worker类 
2.private?final?class?Worker 
3.    extends?AbstractQueuedSynchronizer 
4.    implements?Runnable?{ 
5. 
6.   
7.  final?Thread?thread; 
8.   
9.   
10.  Runnable?firstTask; 
11.  
12.  Worker(Runnable?firstTask)?{ 
13.    setState(-1);?//?inhibit?interrupts?until?runWorker 
14.    this.firstTask?=?firstTask; 
15.    this.thread?=?getThreadFactory().newThread(this); 
16.     
17.    //Thread?t?=?new?Thread(worker); 
18.    //t.start();?==>?worker.run() 
19.  } 
20. 
21.  @Override 
22.  public?void?run()?{ 
23.    runWorker(this); 
24.  } 
25.   
26.  //.........................?省略 
27.} 
1.8.12线程池源码-核心源码解读-runWorker方法
1.final?void?runWorker(Worker?w){ 
2. 
3.  Thread?wt?=?Thread.currentThread(); 
4.  Runnable?task?=?w.firstTask; 
5.  w.firstTask?=?null; 
6. 
7.  //允许响应中断 
8.  w.unlock(); 
9.  //?线程退出的原因,true是任务导致,false是线程正常退出 
10.  boolean?completedAbruptly?=?true; 
11.   
12.  try{ 
13.    //?当前任务为空,且当前任务队列为空,停止循环 
14.    while?(task?!=?null?||?(task?=?getTask())?!=?null)?{ 
15.       
16.      //?上锁处理并发问题,防止在shutdown()时终止正在运行的worker 
17.      w.lock(); 
18.       
19.      //?如果线程池是stop状态,并且线程没有被中断,就要确保线程被中断,如果线程池不是,确保线程池没有被中断; 
20.      //?清除当前线程的中断标志,做一个recheck来应对shutdownNow方法; 
21.      if?((runStateAtLeast(ctl.get(),STOP)?||?(Thread.interrupted()?&&?runStateAtLeast(ctl.get(),STOP)))?&&?!wt.isInterrupted()) 
22.        wt.interrupt(); 
23.      try?{ 
24.        //?执行前(空方法,由子类重写实现) 
25.        beforeExecute(wt,?task); 
26.        Throwable?thrown?=?null; 
27.        try?{ 
28.          //?执行Runnable类的run()方法 
29.          task.run(); 
30.        }?catch?(RuntimeException?x)?{ 
31.          thrown?=?x;?throw?x; 
32.        }?catch?(Error?x)?{ 
33.          thrown?=?x;?throw?x; 
34.        }?catch?(Throwable?x)?{ 
35.          thrown?=?x;?throw?new?Error(x); 
36.        }?finally?{ 
37.          //?执行后(空方法,由子类重写实现) 
38.          afterExecute(task,?thrown); 
39.        } 
40.      }?finally?{ 
41.        task?=?null; 
42.        //?完成的任务数+1 
43.        w.completedTasks++; 
44.        //?释放锁 
45.        w.unlock(); 
46.      } 
47.    } 
48.    //?到此,线程是正常退出 
49.    completedAbruptly?=?false; 
50.  }?finally?{ 
51.    //?处理worker的退出 
52.    processWorkerExit(w,completedAbruptly); 
53.  } 
54.} 

1.8.13线程池源码-核心源码解读-getTask方法
1.private?Runnable?getTask()?{ 
2. 
3.  //?表示上一次从任务队列中取任务时是否超时 
4.  boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out ?
5.  for?(;;)?{ 
6.    int?c?=?ctl.get(); 
7.    int?rs?=?runStateOf(c); 
8.     
9.    //?Check?if?queue?empty?only?if?necessary. 
10.     
16.    if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{ 
17.      decrementWorkerCount(); 
18.      return?null; 
19.    } 
20.     
21.    //线程数 
22.    int?wc?=?workerCountOf(c); 
23.    //?Are?workers?subject?to?culling ?
24.     
30.    boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize; 
31. 
32.     
38.    if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))?&&?(wc?>?1?||?workQueue.isEmpty()))?{ 
39.      if?(compareAndDecrementWorkerCount(c)) 
40.        return?null; 
41.      continue; 
42.    } 
43.    try?{ 
44.       
51.      Runnable?r?=?timed ?workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:?workQueue.take(); 
52.      //?r不为空,则返回该Runnable 
53.      if?(r?!=?null)  ?
54.        return?r; 
55.      //?如果?r?==?null,说明已经超时得不到任务,timedOut设置为true 
56.      timedOut?=?true; 
57.    }?catch?(InterruptedException?retry)?{ 
58.      //?如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 
59.      timedOut?=?false; 
60.    } 
61.  } 
}?
1.8.14线程池源码-核心源码解读-线程池复用
1、threadPoolExecutor.execute(runnable)
2、addWorker(command, boolean)
3、Worker w = new Worker(firstTask); //已经创建了Thread
4、HashSet workers.add(w);
5、t.start(); //w.thread.start();
6、worker.run();
7、runWorker(this)
8、task = w.firstTask 或者 task = getTask()
9、task.run();

线程的复用是通过while循环实现的,worker会首先获取当前的firstTask进行run,然后不停地循环从等待队列中获取新的任务task,如果有新任务则直接调用task的run方法,不会再去新建一个线程,从而实现线程复用;

1.8.15线程池源码-核心源码解读-线程池大小变化
1.private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{ 
2.   
3.  //completedAbruptly为true表示线程异常执行结束 
4.  //completedAbruptly为false表示线程正常执行结束 
5.  if?(completedAbruptly)?//?If?abrupt,?then?workerCount?wasn't?adjusted 
6.    decrementWorkerCount(); 
7.     
8.  //从线程set集合中移除工作线程,该过程需要加锁,因为HashSet是线程不安全的集合 
9.  final?ReentrantLock?mainLock?=?this.mainLock; 
10.  mainLock.lock(); 
11.  try?{ 
12.    //统计完成的任务数:将该worker已完成的任务数追加到线程池已完成的任务数 
13.    completedTaskCount?+=?w.completedTasks; 
14.    //从HashSet中移除该worker 
15.    workers.remove(w); 
16.  }?finally?{ 
17.    //释放锁 
18.    mainLock.unlock(); 
19.  } 
20.  //根据线程池状态进行判断是否结束线程池 
21.  tryTerminate(); 
22.   
23.  int?c?=?ctl.get(); 
24.  //当线程池是RUNNING或SHUTDOWN状态时 
25.  if?(runStateLessThan(c,?STOP))?{ 
26.    //如果worker不是异常结束: 
27.    if?(!completedAbruptly)?{ 
28.      //如果allowCoreThreadTimeOut=true,最小线程个数就可以变为0; 
29.      int?min?=?allowCoreThreadTimeOut ?0?:?corePoolSize; 
30.      //但是,如果等待队列有任务,至少保留一个worker来处理任务; 
31.      if?(min?==?0?&&?!?workQueue.isEmpty()) 
32.        min?=?1; 
33.      //如果工作线程大于等于核心线程,直接return就行了,否则就需要添加一个线程; 
34.      if?(workerCountOf(c)?>=?min) 
35.        return;?//?replacement?not?needed 
36.    } 
37.    //是异常执行结束的,添加一个线程去执行任务 
38.    addWorker(null,?false); 
39.  } 
40.} 
1.8.16线程池源码-核心源码解读-画图总结


【衔接下一章【并发编程六:Java中的线程池(3)-线程池的应用】】

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/854047.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号