- 什么是AQS
- AQS有哪些子类
- 源码解读前需了解
- 1、什么是CAS?
- 2、什么是公平锁?什么是非公平锁?
- 3、关于线程的唤醒。
- ReentrantLock 源码解读
- ReentrantLock 类的结构
- AbstractQueuedSynchronizer结构
- ReentrantLock 对象的创建
- 加锁操作
- 释放锁和唤醒下一个线程
AQS是在Java中AbstractQueuedSynchronizer的缩写。他的含义是抽象的队列式的同步器。
在JUC(java.util.concurrent)中,很多和锁有关的操作类,都会依赖他。比如下面博客需要说道的ReentrantLock。
针对ReentrantLock的简单使用,可以参考之前博客 java.util.concurrent.locks.Lock锁 。
AQS有哪些子类在java.util.concurrent.locks.AbstractQueuedSynchronizer这个类中,根据IDEA可以更为直观的生成其子类信息等。如下所示:
【扩展:】关于如何生成?
源码解读前需了解 1、什么是CAS?IDEA——如何查看某个类的其他子类
2、什么是公平锁?什么是非公平锁?内容较多,单独独立出来写了篇文章,可以参考下列地址
CAS的原理理解
公平和非公平,取决于当锁被上一个线程释放后,新线程能否立即可以拿锁。
1、假设最开始 T0、T1、T2 需要获取锁,进行加锁操作,但此时T0获取锁成功,那么T1和T2则需要进行等待,等待T0释放锁后进行锁的获取。
2、如果此时来了另外一个线程T3,并且此时正好T0释放锁成功;
3、关于线程的唤醒。公平:T3向后排队等待。
非公平:可以不考虑排队,直接参与锁的争抢操作。
关于线程的睡眠和唤醒,有多重实现方式。
-
sleep(xx)
sleep(long miilis)方法是Thread类的静态方法,在代码中调用Thread.sleep(timeout)方法,会使操作系统将当前线程挂起,调用Thread.sleep(timeout)方法后会产生下面的变化:- 线程的状态变为TIMED_WAITING
- 调用了sleep方法后,timeout耗尽线程才会重新进入可执行状态
- 如果在syncronized代码块中执行Thread.sleep(timeout),Monitor锁的Owner并不会发生切换,也就是说线程调用了sleep方法并不会释放锁
-
wait
与sleep方法不同,wait是属于Object类的方法,JDK中常用的两种wait方法,即wait()和wait(long timeout)。从Monitor锁的原理我们不难得出以下结论:- wait方法必须在synchronized代码块中执行(或者使用了syncronized关键字修饰的方法)
- 线程调用wait方法后会释放锁,进入Monitor锁对象的WaitSet
- 线程调用wait方法后会变成Wating状态
- 调用了wait()方法的线程会一直处于Waiting状态,直到Monitor对象的Owner调用了notify或者notifyAll方法。notify方法会随机唤醒WaitSet中的一个线程,而notifyAll会唤醒WaitSet中的所有线程
- wait(long timeout)在等待时间耗尽的情况下也会自动唤醒
-
park/unpark
park/unpark的功能有点类似于wait/notify,都是暂停/唤醒线程。他是java.util.concurrent.locks.LockSupport下的相关静态方法。LockSupport.park(); LockSupport.unpark(线程指向); // 唤醒指定的线程
-
notify、notifyAll
notify/notifyAll() 方法是Object的本地final方法,无法被重写。notify 的唤醒操作,具有随机性。
【标注:】
ReentrantLock 源码解读 ReentrantLock 类的结构上述线程的 睡眠和唤醒参考资料:
Java多线程学习之wait、notify/notifyAll 详解
java并发学习-线程的睡眠与唤醒
在java.util.concurrent.locks.ReentrantLock源码中,ReentrantLock的定义如下所示:
public class ReentrantLock implements Lock, java.io.Serializable
从类的继承关系,并未发现其是否与AbstractQueuedSynchronizer(AQS)有关联。其实ReentrantLock的核心部分在于其sync属性信息。
private final Sync sync;
ReentrantLock内部抽象类。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
其中有关公平和非公平在其内部子类中有相关的定义。
// 非公平
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁方式
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 公平
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
AbstractQueuedSynchronizer结构
由于在java.util.concurrent.locks.ReentrantLock中定义一个抽象内部类 java.util.concurrent.locks.ReentrantLock.Sync,其中该内部类实现 AbstractQueuedSynchronizer这个抽象类。
java.util.concurrent.locks.AbstractQueuedSynchronizer类的继承关系如下所示:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
从继承关系可以看出,java.util.concurrent.locks.AbstractQueuedSynchronizer是AbstractOwnableSynchronizer的子类,在AbstractOwnableSynchronizer 父类中,只有一个java.util.concurrent.locks.AbstractOwnableSynchronizer#exclusiveOwnerThread属性的申明,和对其提供有get/set方法。
用于保存具体的哪个线程对象引用。
在AbstractQueuedSynchronizer 子类中,定义有Node 内部类,用于保存具体的数据信息,其结构如下所示:
【标注:】
AQS主要是为了保存哪个线程持有锁这个基本信息。
其次保留其他未获取锁的线程所在队列队首和队尾的指向。
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
【疑问:】为什么要设计成双向链表?
双向具有不同方向都能快速查找的优点。
【疑问:】双向链表数据中的Node类下thread属性指向的是什么?
ReentrantLock 对象的创建未获取到锁,需要排队等待的线程对象信息。
ReentrantLock 对象的创建分为以下两种方式:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从构造方法中可以看出:
加锁操作直接采取 new ReentrantLock()默认创建的是一个非公平 NonfairSync 对象。
以公平锁为例,创建公平锁采取以下方式进行创建:
Lock lock = new ReentrantLock(true);
源码如下:
其中加锁操作采取:
lock.lock();
查看公平锁的加锁方法,其逻辑有以下定义:
final void lock() {
acquire(1);
}
调用acquire()并传递一个常量1。
继续向下探索,得知其具体加锁操作如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这段代码采取整合方式,将其拆分为大家便于识别的代码,如下所示:
if (!tryAcquire(arg){
if(acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
接下来分别解析其逻辑实现:
-
tryAcquire:尝试获取锁
在具体的java.util.concurrent.locks.ReentrantLock.FairSync子类中,存在以下方式有对tryAcquire的复写,其代码如下所示:protected final boolean tryAcquire(int acquires) { // 1、获取当前执行线程的对象 final Thread current = Thread.currentThread(); // 2、获取 AbstractQueuedSynchronizer 中 state 的属性值 int c = getState(); // 3、如果当前持有状态器的状态信息为0 if (c == 0) { // 4、为0的时候并非表示一定就是无其他线程加锁了 // 还需要考虑,t0线程刚释放锁,在AbstractQueuedSynchronizer 队列中是否还存在其他线程排队的问题 // !hasQueuedPredecessors() 表示队列中无其他线程进行排队操作 // 当判断 AQS 队列中无其他线程排队时,此时则尝试去进行锁的获取操作 // compareAndSetState 采取 CAS 算法(具有原子性),去进行锁的获取操作,并将AQS中的state进行修改 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 5、如果队列中无其他线程排队等待,且采取CAS算法获取锁成功,则将AQS中的 exclusiveOwnerThread 设置为当前线程的引用 // 表示当前线程持有锁 setExclusiveOwnerThread(current); return true; } } // 6、考虑到非0的情况 // 判断当前的线程和AQS中保存的 exclusiveOwnerThread 信息是否吻合 // 如果两者匹配,则说明这个锁就是当前线程所持有的,所以可以直接将AQS中的 state 进行 +1操作。 // 判断当前线程是否重复加锁 (可重入性) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 修改AQS中的state状态值信息,多次调用lock()进行加锁,将其进行累加操作 // 注意:state只有为 0 的情况,才能允许其他线程进行锁的获取! setState(nextc); return true; } // 7、如果当前锁的持有者不是当前线程,则返回false,表示获取锁失败,进入下一个操作 return false; }if (c == 0)的情况时,处理操作如下:
if (c != 0) 且 current == getExclusiveOwnerThread()时,处理操作如下所示:
【疑问:】为什么会出现 state 不为 0 的情况?
在方法中可能会出现各个引用方法中同样包含lock.lock()和lock.unlock()的方法。但方法引用别的方法整体上来看属于同一个线程对其进行反复加锁情况。也就是锁的重入性。
如下所示:
此时在业务代码之前执行,针对同一个线程进行了两次(或多次)加锁操作。其中判断队列中是否存在其他排队队列的代码如下所示:
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); // 队列的AQS数据结构中,头head和尾tail的指向都是null时, // 表示无其他Node节点,即队列中无任何的其他等待线程 } -
acquireQueued:尝试将线程对象入队列
当程序执行tryAcquire(arg)返回为false时,表示当前线程获取锁失败。此时!tryAcquire(arg)为真,进行将当前线程对象保存队列操作,其中源码逻辑如下所示:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
当前的尝试加入队列分为两部实现。
第一步addWaiter(Node.EXCLUSIVE);
第二步acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。其中addWaiter(Node.EXCLUSIVE)的操作逻辑如下所示:
private Node addWaiter(Node mode) { // 创建一个Node节点,保存当前线程的引用和状态信息 // mode分为两种:Node.EXCLUSIVE 互斥(ReenTrantLock特性);Node.SHARED共享(Semaphore特性) Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 将队列尾部获取 Node pred = tail; if (pred != null) { // 如果CAS中存在其他Node对象,则将新的node类的prev节点指向这个Node node.prev = pred; // 同时采取CAS算法设置原队列,将tail属性值设置为新的node对象 if (compareAndSetTail(pred, node)) { // 原队列最后个node对象的next属性指向新的最后node对象 pred.next = node; return node; } } // 当队列中,tail属性上并无任何指向时(空队列) // 这个方法有两个用处: // 1、构建队列头部,预防空指针问题 // 2、将新的node节点插入队列,直至插入成功 enq(node); return node; } private Node enq(final Node node) { // 死循环,无限自旋 // 保证凡是没有拿到锁的线程都能成功入队列 for (;;) { // 获取未节点 Node t = tail; // AbstractQueuedSynchronizer 中的head和tail属性,分别指向双向链表,也就是CLH队列的队首和队尾 // 当队列存在数据时,head和tail属性不会是null if (t == null) { // Must initialize // 当为null,表示队列中不存在数据信息,即队列无 // 此时则需要使用CAS算法创建一个“空的Node”节点,并设置到AQS中的head属性上去 if (compareAndSetHead(new Node())) // 把 AbstractQueuedSynchronizer 的 tail属性 也设置成这个“空的Node”节点 tail = head; // 队列中只有一个“空的Node”节点时,head和tail属性 都指向 这个“空的Node”节点 // 进入下一次循环 } else { // 获取目前队列中的尾部Node节点对象 // 这里是需要添加至队列中的那个新的node对象。 // 将新的node对象的prev属性指向之前的队列中的最后那个node node.prev = t; // CAS算法,重新设置队列的tail属性,为需要新加入的node对象 if (compareAndSetTail(t, node)) { // 将原来旧队列中的最后那个node的next属性指向新的node对象,实现入队操作 t.next = node; return t; } } } }enq(final Node node) 方法执行逻辑如下所示:
addWaiter(Node mode)中,一开始就判断Node pred = tail存在!= null的情况时,其处理如下所示:
假设此时线程 T2 进入。
当未获取到锁的线程,成功进入队列(双向链表)后,此处则将进入下一个处理方法:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
这个方法的处理操作逻辑如下所示:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋操作,必须保证线程能阻塞住 for (;;) { // 获取当前node对象的prev属性值,也就是队列中该node的上一个node对象 final Node p = node.predecessor(); // 如果该node对象 p 与AQS中的head匹配(队列头),则再尝试地去获取锁 // 因为:双向链表的结构,队首为一个 “空的Node对象” // 尽可能不让你的线程被阻塞(内核态和用户态的切换,耗时且效率低下) if (p == head && tryAcquire(arg)) { // 如果获取到了锁,则上个节点出队列 // 将AQS的head头部节点设置成当前的node // 将队列中的上一个Node对象中的thread属性设置为null(因为这个Node中的线程拿到了锁) // 总而言之: // 其中的线程对象拿到了锁,不需要继续保留在队列中; // 将其Node对象中的thread属性置为null,成为一个”空Node节点“,成为新的队首 setHead(node); // 将队列中的旧队首next清理(因为已经将新的node设置成为了队首,之前的队首需要gc) p.next = null; // help GC failed = false; return interrupted; } // 如果没拿到锁,则需要进行下面的操作----阻塞住线程 // shouldParkAfterFailedAcquire 设置Node节点中的waitStatus属性值 // parkAndCheckInterrupt 调用 LockSupport.park(线程),阻塞当前线程 // 由于创建的Node对象,waitStatus 默认值为0; // 1、首先第一轮循环,如果 waitStatus 的值是默认的0; // 修改head状态,将 waitStatus 修改为 sinal(-1),使其可以被唤醒,但是其返回的是false // 2、接下来的循环,判断到 waitStatus 状态为 sinal(-1),返回true,表示可以开始执行 parkAndCheckInterrupt() 阻塞线程 // 同时判断线程是否是有中断信号唤醒的! //注意: // 如果一开始 waitStatus 值就 > 0 (被取消状态),则将该驱动节点废除 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // 根据上一个Node节点,判断队列中下一个Node节点是否能够被唤醒 // 参数一:当前node对象中prev属性值,即上一个Node对象节点 // 参数二:当前node对象节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取上一个node节点中的 waitStatus 值 int ws = pred.waitStatus; // 如果满足“可被唤醒”的状态 if (ws == Node.SIGNAL) // 返回true return true; if (ws > 0) { // 前驱节点状态如果被取消状态,将被移除出队列 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 当前驱节点waitStatus为 0 or PROPAGATE状态时 // 将其设置为SIGNAL(-1)状态,然后当前结点才可以可以被安全地park // 外面是一个自旋操作 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { // 使用 LockSupport 让当前Node中的线程thread被阻塞 LockSupport.park(this); return Thread.interrupted();//返回线程是否已经被中断 }【注意:】shouldParkAfterFailedAcquire(Node pred, Node node) 表示的意思:
队列的队首(也就是“空的Node对象”)中的waitStatus保存的是真正有效的下一个Node节点的状态信息。
if (p == head && tryAcquire(arg))当条件满足时,其执行逻辑如下所示:
【扩展:】java.util.concurrent.locks.AbstractQueuedSynchronizer.Node#waitStatus 几种值的含义:
- SIGNAL:-1可被唤醒
继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,
将会通知后继节点,使后继节点的线程得以运行。 - CANCELLED:1代表出现异常,中断引起的,需要废弃结束
在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
- CONDITION:-2 条件等待
节点在等待队列中,节点的线程等待在Condition上,
当其他线程对Condition调用了signal()方法后,该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中 - PROPAGATE:-3传播
表示下一次共享式同步状态获取将会被无条件地传播下去
- 0:初始化状态值
- SIGNAL:-1可被唤醒
在Java代码中,当一个线程执行完成之后,需要开发者在finally代码块中进行锁的释放操作。使用如下代码即可实现:
lock.unlock();
其中,该方法的执行逻辑如下所示:
public void unlock() {
sync.release(1);
}
继续向下深入探索,得知具体实现为:
public final boolean release(int arg)
// 将 AQS 中的 state 属性值进行修改操作
// 如果 AQS 中的state属性值成了0,则还需要将 exclusiveOwnerThread 属性值进行置空操作
// 如果满足 state 成为了0,且 exclusiveOwnerThread 也得到了置空,(即锁的成功释放)
if (tryRelease(arg)) {
// 那么,就从双向链表(队列)中获取head属性指向的Node节点
Node h = head;
// 根据队首的空Node节点上的 waitStatus 状态值判断下一个有效的Node节点是否能被唤醒
// 在Node节点创建的时候,waitStatus 数据默认为0
// 在加锁操作中的 shouldParkAfterFailedAcquire(Node pred, Node node) 操作,会将其更改为 sinal(-1)
// 当满足可被唤醒状态时,当然这里还有其他的状态信息
if (h != null && h.waitStatus != 0)
// 进行 唤醒操作
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 获取AQS 中的 state 状态值信息,并将其进行减法操作
// 这里需要注意一点:
// 当存在加锁操作,锁的重入时,此时AQS中的state值信息不完全就是1!
int c = getState() - releases;
// 校验当前线程和AQS中保存的 exclusiveOwnerThread 数据是否一致
if (Thread.currentThread() != getExclusiveOwnerThread())
// 不一致则抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
// 当 AQS中的state属性值为0时,才能保证这个线程“完全”释放了锁
// 考虑到锁的重入
if (c == 0) {
free = true;
// 将 AQS中exclusiveOwnerThread属性的值进行置空操作
setExclusiveOwnerThread(null);
}
// 将AQS中的state值设置为新的值
setState(c);
return free;
}
// 设置 AQS 中的 exclusiveOwnerThread 属性
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 唤醒操作
private void unparkSuccessor(Node node) {
// 获取队首的空Node节点,根据其 waitStatus 判断下一个Node是否支持被唤醒操作
int ws = node.waitStatus;
// 如果 其数据信息 <0 ,则使用CAS算法修改其waitStatus值,改为0
// 这里为什么会改回0呢?参考下列疑问说明!
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取队首空Node节点的下一个有效的Node节点(存在线程的引用保存)
Node s = node.next;
// 如果下一个节点不存在,或者其 waitStatus 状态信息 >0 (大于0只有 CANCELLED)表示出现异常,需要废弃
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 双向链表从后往前找 <= 0的Node节点
// 即从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点
// 进行唤醒
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 获取可以被唤醒的Node节点中保存的 thread
// 即:唤醒一个可以被唤醒的线程(说的有点绕口,知道意思就行了QAQ)
LockSupport.unpark(s.thread);
}
【疑问:】为什么在 unparkSuccessor(Node node) 执行逻辑中,会出现 当 node.waitStatus < 0 时,进行compareAndSetWaitStatus(node, ws, 0) 操作?
1、在加锁操作中,当某个线程执行到parkAndCheckInterrupt()时,此时该线程会被中断。(暂停在那!)
2、如果是公平锁,当释放锁的 unparkSuccessor(Node node)执行后,对应队列中指定Node节点的thread某个线程将会被唤醒!
由于加锁的parkAndCheckInterrupt()存在于死循环(自旋锁)中,此时指定线程被唤醒后,就会去进行加锁逻辑的 if (p == head && tryAcquire(arg))判断。
如果拿到锁,就会变更队列和AQS中的信息。如果没拿到锁,则一样会将 waitstatus 值修改为 -1。
公平锁中,前面的线程释放锁后,队列中符合要求的Node中的thread肯定可以获取到锁的!
但如果是非公平锁呢?
1、在非公平锁中,每个Node对象创建后,其waitStatus属性值都是默认为0。
2、此时持有锁的线程释放了锁,队列中符合要求的Node节点中的线程,并不一定能够立即获取到锁!可能被外面新来的一个线程将锁抢走了!
所以,此处源码中,将其进行compareAndSetWaitStatus(node, ws, 0)操作!



