基本概念 名词解释写在前面:
Java并发这块内容无论是在工作中还是面试中都是常见而且实用的重点,推荐将本文一些关键代码自己运行一遍,以查看实际效果,这里推荐一个在线运行Java代码的网站
- CPU线程数
是同一时刻设备能并行执行的程序个数,线程数 = cpu个数 * 核数; - Java多线程
Java线程数可以大于CPU线程数,操作系统使用时间片机制,采用线程调度算法,频繁的进行线程切换。在同一时刻,JAVA进程中不同的线程可能会在不同的核上并行运行。 - 进程调度机制
时间片轮转法(Round-Robin,RR):根据先进先出原则,排成队列(就绪队列),调度时,将CPU分配给队首进程,让其执行一个时间段(称为:时间片),时间片通常为10-100ms数量级,当执行的时间片用完时,会由计时器发出时钟中断请求,调度程序便据此来停止该进程的执行,并将它排到队列末尾,然后再把CPU重新分配给当前队列的队首进程,同理如此往复。
时间片大小取决于:1)系统对响应时间的要求;2)就绪队列中进程的数目;3)系统的处理能力。 - 线程与进程
线程是操作系统最小的调度单位,进程是资源(比如:内存)分配的最小单位 - IO阻塞
当线程处于IO操作时,线程是阻塞的,线程由运行状态切换到等待状态。此时CPU会做上下文切换,以便处理其他程序;当IO操作完成后,CPU会收到一个来自硬盘的中断信号,CPU正在执行的线程因此会被打断,回到ready队列。而先前因I/O而waiting的线程随着I/O的完成也再次回到就绪队列,此时CPU可能会选择他执行。 - 并发和并行
并行:指两个或多个事件在同一时刻点发生,CPU同时执行;
并发:指两个或多个事件在同一时间段内发生,CPU交替执行; - Java线程调度机制
Java虚拟机都有一个线程调度器,用来确定哪个时刻运行哪个线程。每个线程可能会有自己的优先级,但是优先及并不意味着高优先级的线程一定会被调度,而是由CPU随机的选择:
抢占式线程调度:一个线程在执行自己的任务时,虽然任务还没有执行完,但是CPU会迫使它暂停,让其它线程占有CPU的使用权。
协作式线程调度:一个线程在执行自己的任务时,不允许被中途打断,一定等当前线程将任务执行完毕后才会释放对cpu的占有,其它线程才可以抢占该cpu。
Java在调度机制上采用的是抢占式的线程调度机制。Java线程在运行的过程中多个线程之间是协作式的。线程优先级决定了线程是分配多一些还是少一些处理器的资源(时间片);
使用Java编写线程安全的程序关键在于正确的使用共享对象,以及安全的对其进行访问管理。Java的内置锁可以保障线程安全,对于其他的应用来说并发的安全性是使用内置锁保障了线程变量使用的边界。谈到线程的边界问题,随之而来的是Java内存模型另外的一个重要的含义,可见性。Java对可见性提供的原生支持是volatile关键字。 - 线程不安全
在JMM(Java Memory Model,Java内存模型)中,线程可以把变量保存在本地内存(比如机器的寄存器)中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另一个线程还在继续使用它在寄存器中的变量值的拷贝,造成数据的不一致,这样就会导致线程不安全 - 可见性
在一个线程的工作内存中修改了该变量的值,该变量的值立即能回显到主内存中,从而保证所有的线程看到这个变量的值是一致的 - CLH
CLH队列(Craig, Landin, and Hagersten),公平锁的AbstractQueuedSynchronizer中“等待锁”的线程队列就属于CLH队列,在线程并发的过程中没有获得锁的线程都会进入这个队列。 - CAS
比较并交换函数(compare_and_swap),它是基于乐观锁的原子操作函数。当写入的时候,如果寄存器旧值已经不等于现值,说明有其他CPU在修改,那就继续尝试。所以这就保证了操作的原子性。CAS在Java的Atomic和Locks均有应用。
// 下面的代码是Java Atomic核心方法
int compare_and_swap(int reg, int oldval, int newval) {
ATOMIC();
int old_reg_val = reg;
if (old_reg_val == oldval)
reg = newval;
END_ATOMIC();
return old_reg_val;
}
- 悲观锁
Pessimistic Lock, 每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。比如synchronized独占锁。 - 乐观锁
Optimistic Lock, 每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。 - 死锁
相互持有对方的锁导致各自阻塞无法继续执行。示例:
public class DeadLock implements Runnable {
public int flag = 1;
//静态对象是类的所有对象共享的
private static Object o1 = new Object(), o2 = new Object();
@Override
public void run() {
System.out.println("flag=" + flag);
if (flag == 1) {
synchronized (o1) {
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (o2) {
System.out.println("1");
}
}
}
if (flag == 0) {
synchronized (o2) {
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (o1) {
System.out.println("0");
}
}
}
}
public static void main(String[] args) {
DeadLock td1 = new DeadLock();
DeadLock td2 = new DeadLock();
td1.flag = 1;
td2.flag = 0;
//td1,td2都处于可执行状态,但JVM线程调度先执行哪个线程是不确定的。
//td2的run()可能在td1的run()之前运行
new Thread(td1).start();
new Thread(td2).start();
}
}
- AQS
AbstractQueuedSynchronizer抽象队列同步器,在ReentrantLock/Semaphore/CountDownLatch均有使用,详见:Java并发之AQS详解 - JUC
指Java的并发编程工具包java.util.concurrent,相关类的层次结构如下图所示:
在某个线程执行完时间片之后,就会进行CPU切换。切换过程:
- 清空寄存器和缓存数据;
- 重新从主存加载新的线程所需要的数据(线程私有的:JVM运行时所需的数据:程序计数器、虚拟机栈和本地方法栈);
假设有int count=0,我们开1000个线程对其进行count++操作,并打印最终结果,可以发现不是我们想要的1000(大于1000或小于1000),这是怎么发生的呢?
- 每个线程都要从主存中读取count;
- load变量到工作内存,后续本线程的操作对象都将是这个工作内存中的副本;
- 每个线程操作完数据都要把结果回写到主存;
- 从read到最终write这一系列操作,并不是原子操作,线程随时可能失去CPU时间片,转而执行其他线程;
从上述过程结合CPU切换过程,可知如果在数据操作之后,write之前CPU失去时间片,那么下次进来将重新从主存读取count及后续操作,这就解释了count出现大于1000的情况。
CAS机制核心就是在write之前先比较工作内存副本与主存数据是否相等(是否有其他线程已经修改了主存数据值),不相等表示此时的主存值已经被其他线程改变,需要重新执行操作过程;相等才继续回写。
//CAS原子操作伪代码:A为副本,V为主存
if (A == V) {
V = B;
return B;
} else {
return V;
}
线程不安全解决之道
从上面的概念可知:JMM为了加快线程变量读写速度,在主内存的基础上,进一步引入了工作内存,多个线程访问共享资源时实际是访问副本,造成数据不一致问题,解决这个问题的方法有三种:
- 锁:synchronized和Lock,当有线程 操作该资源的时候锁定该资源,禁止别的线程访问,保证操作的原子性
- 可见性:volatile和atomic,保证每个线程访问资源的时候获取到的都是资源的最新值,能确保读安全,若无原子性保证,则写是不安全的
- 变量私有化:ThreadLocal、私有变量,我的变量只有我能读写,所以安全
Java原子操作类存在于java.util.concurrent.atomic包中,常用的有:
- AtomicInteger、AtomicLong、AtomicBoolean
- AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
- AtomicReference
、AtomicMarkableReference、AtomicStampedReference - AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
对于原子操作类,最大的特点是在多线程并发操作同一个资源的情况下,使用Lock-Free算法来替代锁,采用原子操作指令保证操作的原子性,这样开销小、速度快。
例如:++i或者--i这种操作不是原子操作,其底层可以分拆为三个操作:1)当去变量当前值;2)将当前值+1/-1;3)回写变量。因此不是线程安全的,
AtomicInteger提供了与之对应的原子操作:getAndIncrement()/getAndDecrement()。看下相关代码:
class AtomicInteger{
private volatile int value;
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
public final int get() {
return value;
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
...无关部分省略...
}
sun.misc.Unsafe是java用来在CPU级别的操作CAS指令的类,对于程序员来说,此类是不可用。由于是cpu级别的指令,其开销比需要操作系统参与的锁的开销小。
对于多个线程进入时,会先比较现在的value是否与expect相等,如果不相等,则进入下一个循环。如果相等,则会更新成update值。之后再进入的线程则会死循环。这样就保证了操作的原子性。这样一个方法中既包含了原子性,又包含了可见性。
示例:演示AtomicIntegerFieldUpdater的基本使用:
public class AtomicIntegerFieldUpdaterTest {
// 创建原子更新器,并设置需要更新的对象类和对象的属性
private static AtomicIntegerFieldUpdater a = AtomicIntegerFieldUpdater.newUpdater(User.class, "old");
public static void main(String[] args) {
// 设置tim的年龄是10岁
User conan = new User("tim", 10);
// tim长了一岁,但是仍然会输出旧的年龄
System.out.println(a.getAndIncrement(conan));
// 输出tim现在的年龄
System.out.println(a.get(conan));
}
public static class User {
private String name;
// 更新类的字段(属性)必须使用public volatile修饰符
public volatile int old;
public User(String name, int old) {
this.name = name;
this.old = old;
}
public String getName() {
return name;
}
public int getOld() {
return old;
}
}
}
Lock-Free算法
Lock-Free算法是一种新的策略替代锁来保证资源在并发时的完整性的,Lock-Free的实现有三步:
- 循环(for(;、while)
- CAS(CompareAndSet)
- 回退(return、break)
演示AtomicInteger存在的ABA问题及对应的解决办法:使用AtomicStampedReference。
public class ABA {
// 普通的原子类,存在ABA问题
AtomicInteger a1 = new AtomicInteger(10);
// 带有时间戳的原子类,不存在ABA问题,第二个参数就是默认时间戳,这里指定为0
AtomicStampedReference a2 = new AtomicStampedReference(10, 0);
public static void main(String[] args) {
ABA a = new ABA();
a.test();
}
public void test() {
new Thread1().start();
new Thread2().start();
new Thread3().start();
new Thread4().start();
}
class Thread1 extends Thread {
@Override
public void run() {
a1.compareAndSet(10, 11);
a1.compareAndSet(11, 10);
}
}
class Thread2 extends Thread {
@Override
public void run() {
try {
Thread.sleep(200); // 睡0.2秒,给线程1时间做ABA操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("AtomicInteger原子操作:" + a1.compareAndSet(10, 11));
}
}
class Thread3 extends Thread {
@Override
public void run() {
try {
Thread.sleep(500); // 睡0.5秒,保证线程4先执行
} catch (InterruptedException e) {
e.printStackTrace();
}
int stamp = a2.getStamp();
a2.compareAndSet(10, 11, stamp, stamp + 1);
stamp = a2.getStamp();
a2.compareAndSet(11, 10, stamp, stamp + 1);
}
}
class Thread4 extends Thread {
@Override
public void run() {
int stamp = a2.getStamp();
try {
Thread.sleep(1000); // 睡一秒,给线程3时间做ABA操作
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("AtomicStampedReference原子操作:" + a2.compareAndSet(10, 11, stamp, stamp + 1));
}
}
}
volatile与synchronized volatile运行结果:
AtomicInteger原子操作:true
AtomicStampedReference原子操作:false
volatile相比synchronized是一个轻量级的同步机制,其内部通过汇编指令实现。在访问volatile变量时不会执行加锁操作,也就不会执行线程阻塞,因而其并不具备原子性。
主要有两个作用:1)在多线程并发的情况下保证变量的可见性;2)禁止了指令重排。
在使用volatile关键字的时候,会多出一个lock前缀指令,lock前缀指令实际上相当于一个内存屏障(也称内存栅栏),内存屏障会提供3个功能:
- 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;
- 它会强制将对缓存的修改操作立即写入主存;
- 如果是写操作,它会导致其他CPU中对应的缓存行无效。
示例:使用volatile+DCL方式实现的懒汉单例
public class Singleton {
private static volatile Singleton instance;
private Singleton() {
}
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
虽然 volatile 变量具有可见性和禁止指令重排序,但是并不能说 volatile 变量能确保并发安全,因为它不具有操作的原子性,也就是它不适合在对该变量的写操作依赖于变量本身自己。
//改造前:线程不安全
public class Counter {
private volatile int count;
public int getCount(){
return count;
}
public void increment(){
// 不安全,写变量时依赖了自己:count=count+1;
// 安全:count=10;
count++;
}
}
//改造后:加锁保证原子性
public class Counter {
private volatile int count;
public int getCount(){ //利用volatile的可见性,保证了读安全
return count;
}
public synchronized void increment(){ //加锁实现写操作的原子性,保证写安全
count++;
}
}
所以volatile一般用于声明简单类型变量,使得这些变量具有原子性,即一些简单的赋值与返回操作将被确保不中断。但是当该变量的值由自身的上一个决定时,volatile的作用就将失效,这是由volatile关键字的性质所决定的。
synchronizedsynchronized关键字是Java利用锁的机制自动实现的,一般有同步方法和同步代码块两种使用方式。
- 对象锁:每个对象都内置单一的锁标记(也称为监视器monitor)。
- 类锁:每个类有且仅有一个锁作为类的Class对象的一部分。
采用synchronized修饰符实现的同步机制叫做互斥锁机制,它所获得的锁叫做互斥锁。当在对象上调用其任意的synchronized方法时,此对象被加锁(一个任务可以多次获得对象的锁,计数会递增),同时在线程从该方法返回之前,该对象内其他所有要调用类中被标记为synchronized的方法的线程都会被阻塞。
正因为它基于这种阻塞的策略,所以它的性能不太好,但是由于操作上的优势,只需要简单的声明一下即可,而且被它声明的代码块也是具有操作的原子性。
最后需要注意的是synchronized是同步机制中最安全的一种方式,其他的任何方式都是有风险的,当然付出的代价也是最大的。通常来说锁的范围越小性能越好。
示例:方发锁和代码块锁
public synchronized void increment(){
count++;
}
public void increment(){
synchronized (Counter.class){
count++;
}
}
private final Object mLock = new Object();
public void increment() {
synchronized (mLock){
count++;
}
}
Thread相关
相关类在java.lang包下。
Thread API Thread状态线程也是一个状态机,Thread提供了wait、notify、sleep等方法就是实现线程状态的切换。通过Thread#getState获取当前线程状态。
| 状态名称 | 说明 |
|---|---|
| NEW | 初始状态,线程被创建,但是还没有调用start()方法,线程还未被启动 |
| RUNNABLE | 运行状态,一个线程开始在java虚拟机中被执行 |
| BLOCKED | 阻塞状态,线程被锁住等待获得对象的monitor lock,换言之就是被锁(Synchronize)阻塞了 |
| WAITING | 等待状态,无限期等待另一个线程执行特定操作的线程处于此状态。wait/join |
| TIMED_WAITING | 超时等待状态,在指定的等待时间内等待另一个线程执行操作的线程处于此状态。wait/sleep/join |
| TERMINATED | 终止状态,线程执行完毕已经退出 |
wait/notify操作的都是对象的同步锁(monitor),所以前提是对象持有同步锁,比如在synchronized块中,否则会抛出:java.lang.IllegalMonitorStateException。monitor是所有对象的对象头里都拥有的,所以这三个方法定义在Object类中,而不是Thread类中。
- wait
将当前运行的线程挂起,进入阻塞状态并释放它持有的同步锁(monitor),进入WaitSet队列,通知其他线程来获取执行,直到notify和notifyAll方法来唤醒。 - notify 和 notifyAll
notify方法只唤醒一个等待线程并使该线程开始执行,如果有多个线程等待一个对象,那么只会随机唤醒其中一个线程,后者则会唤醒所有等待线程,哪个线程第一个被唤醒也是取决于操作系统。负责调用方法去唤醒线程的线程也被称为唤醒线程,唤醒线程后不能被立刻执行,因为唤醒线程还持有该对象的同步锁,必须等待唤醒线程执行完毕后释放了对象的同步锁后,等待线程才能获取到对象的同步锁进而继续执行。
示例:双线程打印奇偶数来展示wait和notify的用法
public class Main {
final Object odd = new Object(); // 奇数条件锁
final Object even = new Object(); // 偶数条件锁
private int max=200;
private final AtomicInteger status = new AtomicInteger(0); // AtomicInteger保证可见性,也可以用volatile
public Main() {
}
public static void main(String[] args) {
Main main = new Main();
Thread printer1 = new Thread(main.new MyPrinter("线程1", 0));
Thread printer2 = new Thread(main.new MyPrinter("线程2", 1));
printer1.start();
printer2.start();
}
public class MyPrinter implements Runnable {
private String name;
private int type; // 打印的类型,0:代表打印奇数,1:代表打印偶数
public MyPrinter(String name, int type) {
this.name = name;
this.type = type;
}
@Override
public void run() {
if (type == 0){
while(status.get()<20){
if(status.get()%2==0){
synchronized (even){
try {
even.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}else{
synchronized (odd){
System.out.println("当前是"+name+"输出"+status.get());
status.set(status.get()+1);
odd.notify();
}
}
}
}else{
while(status.get()<20){
if(status.get()%2==1){
synchronized (odd){
try {
odd.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}else{
synchronized (even){
System.out.println("当前是"+name+"输出"+status.get());
status.set(status.get()+1);
even.notify();
}
}
}
}
}
}
}
yield
yield是一个静态的原生native方法,他的作用是让出当前线程的CPU分配的时间片,将其分配给和当前线程同优先级的线程,然后当前线程状态由运行中(RUNNING)转换为可运行(RUNNABLE)状态,但这个并不是等待或者阻塞状态,也不会释放对象锁,如果在下一次竞争中,又获得了CPU时间片当前线程依然会继续运行。
线程优先级决定了线程是分配多一些还是少一些处理器的资源。Java中,通过一个整型变量Priority来控制线程的优先级,范围为1~10,通过调用setPriority(int Priority)可以设置,默认值为5。
同yield一样,sleep也调用时也会交出当前线程的处理器资源,但是不同的是sleep交出的资源所有线程都可以去竞争,yield交出的时间片资源只有和当前线程同优先级的线程才可以获取到。
joinjoin方法的作用是父线程(一般是main主线程)等待子线程执行完成后再执行,换言之就是将异步执行的线程合并为同步的主线程。同wait一样,join方法也有多个参数的方法,也可以设定超时时间,join()方法调用的也是join(0L)。
示例:
public class JoinDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("主线程开始"+"时间:"+System.currentTimeMillis());
JoinDemo main = new JoinDemo();
Thread printer1 = new Thread(main.new MyPrinter("线程1"));
Thread printer2 = new Thread(main.new MyPrinter("线程2"));
Thread printer3 = new Thread(main.new MyPrinter("线程3"));
printer1.start();
printer1.join();
printer2.start();
printer2.join();
printer3.start();
System.out.println("主线程结束"+"时间:"+System.currentTimeMillis());
}
public class MyPrinter implements Runnable {
String content;
public MyPrinter(String content) {
this.content = content;
}
@Override
public void run() {
System.out.println("当前线程"+content+"时间:"+System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
interrupt
interrupt的目的是为了中断线程,原来Thread.stop, Thread.suspend, Thread.resume 都有这个功能,但由于都太暴力了而被废弃了,暴力中断线程是一种不安全的操作,相对而言interrupt通过设置标志位的方式就比较合适。
interrupt基于一个线程不应该由其他线程来强制中断或停止,而是应该由线程内部来自行停止的思想来实现的。从API文档的中的介绍来看interrupt()的作用是中断本线程。除非当前线程正在中断自身(始终允许),否则将调用此线程的checkAccess方法,但这可能导致抛出SecurityException。
如果在调用Object类的wait()、join()、sleep(long)阻塞了这个线程,那么它的中断状态将被清除并收到InterruptedException。
如果在InterruptibleChannel上的I/O操作中阻塞了该线程,则该通道将被关闭,线程的中断状态将被设置,并且线程将收到ClosedByInterruptException。
如果一个线程被阻塞,就无法检测中断状态,线程在检查中断标识位时如果发现中断标识位为true,则会在阻塞方法调用处抛出InterruptedException异常,并且在抛出异常前将线程的中断标识位复位,即重新设置为false。
示例:将中断的捕获放在while(true)之外,退出while循环
@Override
public void run() {
try {
while (true) {
// 执行任务...
}
} catch (InterruptedException ie) {
// 由于产生InterruptedException异常,退出while(true)循环,线程终止!
}
}
示例:try-catch在while循环内,通过break退出循环
@Override
public void run() {
while (true) {
try {
// 执行任务...
} catch (InterruptedException ie) {
// InterruptedException在while(true)循环体内。
// 当线程产生了InterruptedException异常时,while(true)仍能继续运行!需要手动退出
break;
}
}
}
终止运行线程
通常我们通过“标记”方式终止处于“运行状态”的线程。其中,包括“中断标记”和“额外添加标记”。通过设立一个标志来在线程运行的时候判断是否执行下去。
示例:通过中断标记判断是否退出循环
@Override
public void run() {
//isInterrupted是Thread的内部方法,可以获取当前线程是否中断的标志,
//当线程处于运行状态时,我们通过interrupt()修改线程的中断标志,来达到退出while循环的作用
while (!isInterrupted()) {
}
}
示例:通过自定义额外标记判断是否退出循环
private volatile boolean isExit= false;
protected void exitThread() {
isExit= true;
}
@Override
public void run() {
while (isExit) {
}
}
通用终止线程的方式
结合上述两种方式,可以得到如下更加通用的退出循环的方式:
@Override
public void run() {
try {
// 1. isInterrupted()保证,只要中断标记为true就终止线程。
while (!isInterrupted()) {
}
} catch (InterruptedException ie) {
// 2. InterruptedException异常保证,当InterruptedException异常产生时,线程被终止。
}
}
sleepinterrupted()和 isInterrupted():
- 二者都能够用于检测对象的“中断标记”
- interrupted()除了返回中断标记之外,它还会清除中断标记(即将中断标记设为false)
- isInterrupted()仅仅返回中断标记
sleep是Thread的静态native方法,它的作用是让当前线程按照指定的时间休眠,休眠时期线程不会释放锁,但是会让出执行当前线程的cpu资源给其他线程使用。
ThreadLocalThreadLocal不是解决资源共享的问题,而是用来存取线程内的局部变量,每个线程各自维护互不影响。这样做其实就是以空间换时间的方式(与synchronized相反),以耗费内存为代价,单大大减少了线程同步(如synchronized)所带来性能消耗以及减少了线程并发控制的复杂度。
ThreadLocal用法比较简单,有三个暴露的方法:set、get、remove。
它的典型使用场景比如:
- 保存错误信息,在同一个线程的任何地方读取,而不必担心被其他线程覆盖。
- 每个线程创建一个独立数据库连接等;
示例:ThreadLocal典型用法,官方建议我们定义为private static
public class TestThreadLocal {
private static final ThreadLocal value = new ThreadLocal() {
@Override
protected Integer initialValue() {
return 0;
}
};
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(new MyThread(i)).start();
}
}
static class MyThread implements Runnable {
private int index;
public MyThread(int index) {
this.index = index;
}
public void run() {
System.out.println("线程" + index + "的初始value:" + value.get());
for (int i = 0; i < 10; i++) {
value.set(value.get() + i);
}
System.out.println("线程" + index + "的累加value:" + value.get());
}
}
}
内部实现概述
ThreadLocal内部有一个静态类ThreadLocalMap,使用到ThreadLocal的线程会与ThreadLocalMap绑定,维护着这个Map对象,而这个ThreadLocalMap的作用是映射当前ThreadLocal对应的值,它key为当前ThreadLocal的弱引用:WeakReference。
看下关键部分代码就一目了然了:
public class ThreadLocal{ ... public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } public void set(T value) { Thread t = Thread.currentThread(); // 获取当前线程 ThreadLocalMap map = getMap(t); // 拿到当前线程的 ThreadLocalMap if (map != null) // 判断 ThreadLocalMap 是否存在 map.set(this, value); // 调用 ThreadLocalMap 的 set 方法 else createMap(t, value); // 创建 ThreadLocalMap } public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); // 调用 ThreadLocalMap 的 remove方法 } ThreadLocalMap getMap(Thread t) { return t.threadLocals; } ... }
ThreadLocal更像是一个客户端或者说门面,真正的实现在ThreadLocalMap
class ThreadLocalMap {
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
//魔法数字1640531527:这是一个神奇的数字,能够让hash槽位分布相当均匀
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
= private static final int INITIAL_CAPACITY = 16;//初始容量,2的幂
private Entry[] table;//用来存放entry的数组
private int size = 0;//数组长度
private int threshold; // 阈值
private void set(ThreadLocal> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1); // *:用key的hashCode计算槽位
// *:hash冲突时,使用开放地址法,因为独特和hash算法,导致hash冲突很少,一般不会走进这个for循环
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal> k = e.get();
if (k == key) { // key 相同,则覆盖value
e.value = value;
return;
}
if (k == null) { // key = null,说明 key 已经被回收了,进入替换方法
replaceStaleEntry(key, value, i);
return;
}
}
// 新增 Entry
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold) // 清除一些过期的值,并判断是否需要扩容
rehash(); // 扩容
}
//getEntry() 主要是在 ThreadLocal 的 get() 方法里被调用
private Entry getEntry(ThreadLocal> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key) // 无hash冲突情况
return e;
else
return getEntryAfterMiss(key, i, e); // 有hash冲突情况
}
private void rehash() {
expungeStaleEntries();
// 在上面的清除过程中,size会减小,在此处重新计算是否需要扩容
// 并没有直接使用threshold,而是用较低的threshold (约 threshold 的 3/4)提前触发resize
if (size >= threshold - threshold / 4)
resize();
}
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
//Entry继承了WeakReference,对key进行弱引用,实现对 Reference 的 key 的自动回收,而对 value 的回收需要手动解决
static class Entry extends WeakReference> {
Object value;
Entry(ThreadLocal> k, Object v) {
super(k);
value = v;
}
}
}
Java里大部分Map都是用链表发解决hash冲突的,而ThreadLocalMap采用的是开发地址法。
-
开放地址法:
容易产生堆积问题;不适于大规模的数据存储;散列函数的设计对冲突会有很大的影响;插入时可能会出现多次冲突的现象,删除的元素是多个冲突元素中的一个,需要对后面的元素作处理,实现较复杂;结点规模很大时会浪费很多空间; -
链地址法:
处理冲突简单,且无堆积现象,平均查找长度短;链表中的结点是动态申请的,适合构造表不能确定长度的情况;相对而言,拉链法的指针域可以忽略不计,因此较开放地址法更加节省空间。插入结点应该在链首,删除结点比较方便,只需调整指针而不需要对其他冲突元素作调整。
public class Thread implements Runnable {
...
ThreadLocal.ThreadLocalMap threadLocals = null;
...
}
内存泄露问题
一个Thread维持着一个ThreadLocalMap对象,而该Map对象的key又由提供该value的ThreadLocal对象弱引用提供,所以这就有这种情况:
如果ThreadLocal不设为static的,由于Thread的生命周期不可预知,这就导致了当系统gc时将会回收它,而ThreadLocal对象被回收了,此时它对应key必定为null,这就导致了该key对应得value拿不出来了,而value之前被Thread所引用,所以就存在key为null、value存在强引用导致这个Entry回收不了,从而导致内存泄露。
避免内存泄露的方法,是对于ThreadLocal要设为static静态的。这样的话ThreadLocal的生命周期就更长,由于一直存在ThreadLocal的强引用,所以ThreadLocal也就不会被回收,也就能保证任何时候都能根据ThreadLocal的弱引用访问到Entry的value值,然后remove它,防止内存泄露。除了这个,还必须在线程不使用它的值时手动remove掉该ThreadLocal的值,这样Entry就能够在系统gc的时候正常回收,而关于ThreadLocalMap的回收,会在当前Thread销毁之后进行回收。
InheritableThreadLocalThreadLocal类固然很好,但是子线程并不能取到父线程的ThreadLocal类的变量,InheritableThreadLocal类就是解决这个问题的。
示例:演示子线程访问父线程私有变量
public class Test3 {
public static InheritableThreadLocalExt tl = new InheritableThreadLocalExt();
public static void main(String[] args) {
try {
ThreadA a = new ThreadA();
a.start();
for (int i = 0; i < 10; i++) {
if(i == 3){
tl.set("111");
}
System.out.println(" 在Main线程中取值=" + tl.get());
Thread.sleep(100);
}
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static public class InheritableThreadLocalExt extends InheritableThreadLocal {
@Override
protected String initialValue() {
return "000";
}
@Override
protected String childValue(String parentValue) {
return parentValue + " 我在子线程加的~!";
}
}
static public class ThreadA extends Thread {
@Override
public void run() {
try {
Thread.sleep(200);
for (int i = 0; i < 10; i++) {
System.out.println("在ThreadA线程中取值=" + tl.get());
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ExecutorService主线程将InheritableThreadLocal中的值进行更改,子线程取到的还是旧值。
创建线程是需要内存和CPU开销的,创建一个无限循环空线程的内存开销大约为24KB,而实际上线程是需要做任务的,会带来额外的内存开销,通常平均每个线程内存开销在1M左右。因此应用不能无限创建线程,最好的方式是利用线程池对线程进行复用和统一管理。
示例:创建线程池并提交任务
@Test
public void testCallable() {
ExecutorService service = Executors.newSingleThreadExecutor();
//提交并执行Callable,由于这是异步操作,无法直接拿到返回的结果
Future future = service.submit(new ThreadCallable());
try {
//调用Future#get()获取结果时,当前线程就会阻塞,直到call()方法返回结果
//Future算是一个比较实用的类,还可以cancel取消任务
String s = future.get();
System.out.println("res ="+s);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
public static class ThreadCallable implements Callable {
@Override
public String call() throws Exception {
Thread.sleep(1000);
System.out.print("do returnn");
return "hello world";
}
}
示例:需要查出一百个用户的信息,并且给他们的邮箱发送邮件,打印出最终结果
public class SingleVSConcurrent {
public static void main(String[] args) {
//我们模拟一百个用户,我们查出来这一百个用户然后再给他们发邮件
long singleStart = System.currentTimeMillis();
for (int i=0;i<100;i++){
User user = null;
try {
user = DoSomethingService.queryUser(i);
String s = DoSomethingService.sendUserEmail(user.getEmail());
System.out.println(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long singleEnd = System.currentTimeMillis();
System.out.println("单线程共用了"+(singleEnd-singleStart)+"ms");
System.out.println("-------分割线-----------------分割线-----------------分割线-----------------分割线-----------------分割线----------");
long concurrentStart = System.currentTimeMillis();
//构建要做的任务列表,查询出用户来并且发送邮件
List tasks = new ArrayList<>();
for (int i=0;i<100;i++){
//传id进去构造不同的任务,业务中有可能是给你个list列表
Task task = new Task(i);
tasks.add(task);
}
//返回任务执行结果
List> futures = null;
//用线程池查询用户发送邮件
ExecutorService executorService = Executors.newFixedThreadPool(100);
try {
//是线程池执行提交的批量任务
futures = executorService.invokeAll(tasks);
} catch (InterruptedException e) {
e.printStackTrace();
}
//关闭线程池
executorService.shutdown();
//存放任务结果的集合
List results = new ArrayList<>();
//遍历这个任务执行结果
for (Future result:futures) {
//如果这个任务结束了
if (result.isDone()){
String s = null;
try {
//得到这个任务的处理结果,得不到会一直阻塞
s = result.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//将任务结果放进任务结果集合里面
results.add(s);
}
}
//遍历任务结果的集合
for (String s:results) {
System.out.println(s);
}
long concurrentEnd = System.currentTimeMillis();
System.out.println("多线程共用了"+(concurrentEnd-concurrentStart)+"ms");
}
}
//业务提供的服务
public class DoSomethingService {
//查询用户100ms
public static User queryUser(Integer id) throws InterruptedException {
//这里可以调用查询user的sql语句
Thread.sleep(100);
User u= new User(id,id+"xhJaver.com");
return u;
}
//发送邮件50ms
public static String sendUserEmail(String email) throws InterruptedException {
if (email!=null){
//这里可以调用发送email的语句
Thread.sleep(50);
return "发送成功"+email;
}else {
return "发送失败";
}
}
}
//任务类
public class Task implements Callable {
private Integer id;
public Task(Integer id) {
this.id = id;
}
@Override
public String call() throws Exception {
//调用业务方提供的查user的服务,id不同,创建任务的时候就传过来id
User user = DoSomethingService.queryUser(this.id);
//调用业务方提供发送邮件的服务,email不同
String result = DoSomethingService.sendUserEmail(user.getEmail());
return result;
}
}
public class User {
private Integer id;
private String email;
public User(Integer id, String email) {
this.id =id;
this.email =email;
}
public String getEmail() {
return email;
}
}
API
Executors
可以通过Executors快速创建4种常见类型的线程池:
| 线程池 | 创建方法 | 说明 | 使用场景 |
|---|---|---|---|
| FixedThreadPool | Executors.newFixedThreadPool(int nThreads) | 固定核心线程和最大线程个数,阻塞队列无界 | 由于固定了线程数,所以可以控制最大并发数, |
| CachedThreadPool | Executors.newCachedThreadPool(ThreadFactory) | 没有核心线程 最大线程个数不限制,阻塞队列不存储 | 任务不排队,投递即执行,适用于很多短期异步任务的环境,以提高程序性能 |
| SingleThreadExecutor | Executors.newSingleThreadExecutor() | 核心线程和最大线程个数 只有一个,阻塞队列无界 | 由于仅一条线程,所以可以保证任务执行顺序 |
| ScheduledThreadPool | Executors.newScheduledThreadPool(int corePoolSize) | 定时任务的线程池 |
//固定核心线程和最大线程个数,阻塞队列无界
fixThreadPool = Executors.newFixedThreadPool(5)
//没有核心线程 最大线程个数不限制,阻塞队列不存储
cacheThreadPool = Executors.newCachedThreadPool(object : ThreadFactory {
var count = 0
override fun newThread(r: Runnable?): Thread {
Log.d("hh", "新开的线程: newThreadPool_$count")
val thread = Thread(r, "newThreadPool_${count++}")
thread.setUncaughtExceptionHandler { t, e ->
//处理非正常的线程中止,多线程中通过trycatch试图捕获线程的异常是不可取的
Log.d("hh", t.name)
e.printStackTrace()
}
return thread
}
})
//核心线程和最大线程个数 只有一个,阻塞队列无界
singleThreadPool = Executors.newSingleThreadExecutor()
//定时任务的线程池
scheduledThreadPool = Executors.newScheduledThreadPool(5)
ThreadPoolExecutor通过常看Executors源码可以发现其仅是一个简单的工厂类,真正的实现全都是ThreadPoolExecutor或其子类
虽然Executors提供了一些列便捷的接口方便我们创建线程池,但是其线程池要么是线程数量不可控、要么是任务队列长度不可控,都有溢出风险,此时可以通过ThreadPoolExecutor来创建一个符合我们要求的线程池。
- 继承关系
AbstractExecutorService实现了ExecutorService接口,实现了其中大部分的方法;
ThreadPoolExecutor,继承了AbstractExecutorService,是ExecutorService的默认实现; - 构造方法
ThreadPoolExecutor提供了多个重载的构造方法,其参数含义如下(本节非常重要,理论上必须掌握):
| 参数 | 说明 |
|---|---|
| int corePoolSize | 核心线程数。默认情况下线程池是空的,只有任务提交时才会创建线程。如果当前运 行的线程数少于corePoolSize,则创建新线程来处理任务;如果等于或者多于corePoolSize,则不再创建。如 果调用线程池的prestartAllcoreThread方法,线程池会提前创建并启动所有的核心线程来等待任务。 |
| int maximumPoolSize | 线程池允许创建的最大线程数。如果任务队列满了并且线程数小于 maximumPoolSize时,则线程池仍旧会创建新的线程来处理任务。 |
| TimeUnit keepAliveTime | 非核心线程闲置的超时时间。超过这个时间则回收。如果任务很多,并且每个任务 的执行事件很短,则可以调大keepAliveTime来提高线程的利用率。另外,如果设置 allowCoreThreadTimeOut属性为true时,keepAliveTime也会应用到核心线程上, keepAliveTime参数的时间单位。可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、秒(SECONDS)、毫秒(MILLISECONDS)等。 |
| BlockingQueue workQueue | 任务队列。如果当前线程数大于corePoolSize,则将任务添加到此任务队列中。该任务 队列是BlockingQueue类型的,也就是阻塞队列。 |
| ThreadFactory factory | 线程工厂。可以用线程工厂给每个创建出来的线程设置名字。一般情况下无须设置 该参数。 |
| RejectedExecutionHandler handler | 饱和策略。这是当任务队列和线程池都满了时所采取的应对策略,默认 是AbordPolicy,表示无法处理新任务,并抛出RejectedExecutionException异常。此外还有3种策略,它们分 别如下。(1)CallerRunsPolicy:用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,能够减缓 新任务的提交速度。 (2)DiscardPolicy:不能执行的任务,并将该任务删除。(3)DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。 |
- 线程池执行流程
(1)如果线程池中的线程数未达到核心线程数,则创建核心线程处理任务。
(2)如果线程数大于或者等于核心线程数,则将任务加入任务队列,线程池中的空闲线程会不断地从 任务队列中取出任务进行处理。
(3)如果任务队列满了,并且线程数没有达到最大线程数,则创建非核心线程去处理任务。
(4)如果线程数超过了最大线程数,则执行饱和策略
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列两个常见的阻塞场景分别是:
1)当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
2)当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的 位置,线程被自动唤醒。
支持以上两种阻塞场景的队列被称为阻塞队列。
- offer(anObject)
表示如果可能的话,将anObject加到BlockingQueue里。即如果BlockingQueue可以容纳,则返回true,否则返回false。(本方法不阻塞当前执行方法的线程。) - offer(E o,long timeout,TimeUnit unit)
可以设定等待的时间。如果在指定的时间内还不能往队列中加入BlockingQueue,则返回失败。 - put(anObject)
将anObject加到BlockingQueue里。如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。 - poll(time)
取走 BlockingQueue 里排在首位的对象。若不能立即取出,则可以等time参数规定的时间,取不到时返回null。 - poll(long timeout,TimeUnit unit)
从BlockingQueue中取出一个队首的对象。如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据;否则直到时间超时还没有数据可取,返回失败。 - take()
取走BlockingQueue里排在首位的对象。若BlockingQueue为空,则阻断进入等待状态,直到BlockingQueue有新的数据被加入。 - drainTo()
一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数)。通过该方法,可以提升获取数据的效率;无须多次分批加锁或释放锁。
由数组结构组成的 有界 阻塞队列。有界队列(如ArrayBlockingQueue)有助于防止资源耗尽当最大线程数有限时,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷。
示例:使用阻塞队列实现 生产者和消费者模式,无需考虑同步和线程间同步的问题
import java.util.concurrent.ArrayBlockingQueue;
public class BlockingQueueTest {
private int queueSize = 10;
private ArrayBlockingQueue mArrayBlockingQueue = new ArrayBlockingQueue<>(queueSize);
public static void main(String[] args) {
BlockingQueueTest mBlockingQueue = new BlockingQueueTest();
Consumer consumer = mBlockingQueue.new Consumer();
Producer producer = mBlockingQueue.new Producer();
consumer.start();
producer.start();
}
class Consumer extends Thread {
@Override
public void run() {
while (true) {
try {
mArrayBlockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
while (true) {
try {
mArrayBlockingQueue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
linkedBlockingQueue排队且可以指定队伍长度
由链表结构组成的 有界 阻塞队列。linkedBlockingQueue将导致当所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize(因此,maximumPoolSize 的值也就无效了。)。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。
PriorityBlockingQueue支持优先级排序的 无界 阻塞队列。
DelayQueue使用优先级队列实现的 无界 阻塞队列。DelayedWorkQueue是ScheduledThreadPoolExecutor的静态内部类。Executors.newScheduledThreadPool(3)使用DelayedWorkQueue创建线程池。
SynchronousQueue不用等不存储元素的阻塞队列。直接提交策略表示线程池不对任务进行缓存,新进任务直接提交给线程池,当线程池中没有空闲线程时,创建一个新的线程处理此任务。这种策略需要线程池具有无限增长的可能性。
linkedTransferQueue由链表结构组成的 无界 阻塞队列。
linkedBlockingDeque由链表结构组成的 双向 阻塞队列
Locks位于java.util.concurrent.locks包下。常用的如ReentrantLock。
ReentrantLock公平锁和非公平锁是ReentrantLock的两种锁策略:
- 公平锁(FairSync):是指多个线程在等待同一个锁时,必须按照申请的时间顺序来依次获得锁;
- 非公平锁(NonfairSync):在锁被释放时,任何一个等待锁的线程都有机会获得锁;
在实现层面上,公平锁中要通过hasQueuedPredecessors()来判断该线程是否位于CLH队列头部,是则获取锁;而非公平锁则不管你在哪个位置都直接获取锁。
Locks与synchronized比较示例:使用ReentrantLock后线程等待的状态是什么?
class Untitled {
public static void main(String[] args) throws InterruptedException {
TestLock testLock = new TestLock();
Thread thread2 = new Thread(() -> {
testLock.myTestLock();
}, "thread2");
Thread thread1 = new Thread(() -> {
testLock.myTestLock();
}, "thread1");
thread1.start();
Thread.sleep(1000);
thread2.start();
Thread.sleep(1000);
System.out.println("****" + (thread2.getState()));
Thread.sleep(20000);
}
static class TestLock{
private final Lock lock = new ReentrantLock();
public void myTestLock(){
lock.lock();
try{
Thread.sleep(10000);
System.out.println("testLock status");
} catch (InterruptedException e) {
System.out.println(e.getMessage());
} finally {
lock.unlock();
}
}
}
}
运行结果:
****WAITING
这个结果说明,Lock底层没有使用同步锁(锁住后线程状态是BLOCKING),而是使用了类似wait的方式让线程挂起
| 维度 | synchronized | Locks |
|---|---|---|
| 等待可中断 | 线程A跟线程B同时竞争同一把锁,如果线程A获得锁之后不释放,那么线程B会一直等待下去,并不会释放 | 可以在线程等待了很长时间之后进行中断,不需要一直等待 |
| 锁的公平性 | 非公平锁 | 可以是非公平锁也可以是公平锁 |
| 绑定条件 | 默认隐含条件 | 可以绑定多个条件 |
并发集合注意一点,迭代器是线程不安全的,需要自己另行控制,其他接口已做线程安全处理。
推荐工具 jstackjstack是命令行工具,不仅能查看线程当前状态,还能看调用栈,锁等线程栈信息。
Arthasarthas是功能强大JVM命令行监控工具,查看线程状态只是其中一个功能,详见:https://alibaba.github.io/arthas/thread.html



