在Lock 接口,它在 JDK 中的实现类主要是 ReentrantLock (可译为“重入锁”)。ReentrantLock 的实现主要依赖于其内部的一个嵌套类 Sync,而 Sync 又继承自 AbstractQueuedSynchronizer (简称 AQS)。而且,不仅 ReentrantLock,其他一些并发工具类如 CountdownLatch、CyclicBarrier 等,其实现也都是基于 AQS 类。AQS 可以理解为并发包中许多类实现的基石。因此,在分析并发包中常用类的实现原理前,有必要先理解一下 AQS,之后再分析的时候就会简单不少。
AQS 内部有一个核心变量 state;此外,以 Node 类为节点维护了两种队列:主队列(main queue)和条件队列(condition queue),简单起见,分别可以将二者理解为双链表和单链表。
AQS 就像是提供了一套基础设施的设备,其它常用类如 ReentrantLock、CountdownLatch 等的内部嵌套类 Sync,都是在 AQS 提供的基础设施之上制定了自己的“游戏规则”,进而生产出了不同的产品。而它们的游戏规则都是围绕 state 变量和这两种队列进行操作的。
可重入锁(递归锁)是指的是在同一个线程外层方法获取锁的时候,在进入线程内的方法会自动获取锁(前提是同一个对象),不会因为之前已经获取过还没有释放而阻塞。java中的ReentrantLock和Synchrogazed都是可重入锁,可重入锁的一个优点是可以一定程度的避免死锁。
package com.example.jdk;
import sun.misc.Lock;
public class JDK_Test {
static Object objectt1=new Object();
public static void m1(){
new Thread(()->{
synchronized (objectt1){
System.out.println(Thread.currentThread().getName()+"t"+"----------外层调用");
synchronized (objectt1){
System.out.println(Thread.currentThread().getName()+"t"+"----------中层调用");
synchronized (objectt1){
System.out.println(Thread.currentThread().getName()+"t"+"----------内层调用");
}
}
}
},"t1").start();
}
public static void main(String[] args) {
v1();
}
public static synchronized void v1(){
System.out.println("=======外层锁");
v2();
}
public static synchronized void v2(){
System.out.println("=======中层锁");
v3();
}
public static synchronized void v3(){
System.out.println("=======内层锁");
}
}
实现结果
t1 ----------外层调用
t1 ----------中层调用
t1 ----------内层调用
为什么是两个呢?:主要是为了防止异常的情况下,需要彻底释放锁资源。
AQS 类public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {}
可以看到它是一个抽象类,不能直接被实例化。它的父类 AbstractOwnableSynchronizer 的主要代码如下:
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient Thread exclusiveOwnerThread;
// 其他代码
}
其内部主要维护了一个变量 exclusiveOwnerThread,作用是标记独占模式下的 Owner 线程,后面涉及到的时候再进行分析。
AQS成员变量其中,head 和 tail 为主队列的头尾节点,state 为 AQS 维护的核心变量,ReentrantLock 等类中的 Sync 类实现,都是通过操作 state 来实现各自功能的。
// 主队列头节点 private transient volatile Node head; // 主队列尾结点 private transient volatile Node tail; // 状态,AQS 维护的一个核心变量 private volatile int state;嵌套类
AQS 内部有两个嵌套类,分别为 Node 和 ConditionObject。
Node类static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// waitStatus的几种状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int ConDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
// 前驱节点(主队列)
volatile Node prev;
// 后继节点(主队列)
volatile Node next;
// 节点的线程
volatile Thread thread;
// 后继节点(条件队列)
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
添加到主队列用的是第二个构造器,Node 类可以理解为对线程 Thread 的封装。因此,在主队列中排队的一个个节点可以理解为一个个有模式(mode)、有状态(waitStatus)的线程。
ConditionObject 类public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
// ...
}
ConditionObject 实现了 Condition 接口,它主要操作的是条件队列,这里只贴了其类签名和头尾节点,后面用到的时候再具体分析。
CAS 操作AQS 内部通过 Unsafe 类实现了一系列 CAS (Compare And Swap) 操作。AQS 内部的许多操作是通过 CAS 来实现线程安全的。
// 获取 Unsafe 实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state、head、tail 等变量的内存偏移地址
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
// 一些 CAS 操作
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
Node 节点的「独占模式」和「共享模式」,其实 AQS 也主要是围绕对这两种模式的操作进行的。Node 节点是对线程 Thread 类的封装,因此两种模式可以理解如下:
- 独占模式(exclusive):线程对资源的访问是排他的,即某个时间只能一个线程单独访问资源;
- 共享模式(shared):与独占模式不同,多个线程可以同时访问资源。
独占模式下的操作主要有以下几个方法(可与前面分析的 Lock 接口的方法类比):
- acquire(int arg):以独占模式获取资源,忽略中断;可以类比 Lock 接口的 lock 方法;
- acquireInterruptibly(int arg):以独占模式获取资源,响应中断;可以类比 Lock 接口的 lockInterruptibly 方法;
- tryAcquireNanos(int arg, long nanosTimeout):以独占模式获取资源,响应中断,且有超时等待;可以类比 Lock 接口的 tryLock(long, TimeUnit) 方法;
- release(int arg):释放资源,可以类比 Lock 接口的 unlock 方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法看似很短,其实是内部做了封装。这几行代码包含了如下四个操作步骤:
- tryAcquire
- addWaiter(Node.EXECUSIVE)
- acquireQueued(final Node node, arg))
- selfInterrupt
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
该方法的作用是尝试以独占模式获取资源,若成功则返回 true。可以看到该方法是一个 protected 方法,而且 AQS 中该方法直接抛出了异常,其实是它把实现委托给了子类。这也是 ReentrantLock、CountdownLatch 等类(严格来说是其内部类 Sync)的实现功能不同的地方,这些类正是通过对该方法的不同实现来制定了自己的“游戏规则”。
若 step 1 中的 tryAcquire 方法返回 true,则表示当前线程获取资源成功,方法直接返回,该线程接下来就可以“为所欲为”了;否则表示获取失败,接下来会依次执行 step 2 和 step 3。
private Node addWaiter(Node mode) {
// 将当前线程封装为一个 Node 节点,指定 mode
// PS: 独占模式 Node.EXECUSIVE, 共享模式 Node.SHARED
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 通过 CAS 操作设置主队列的尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点 tail 为 null,表示主队列未初始化
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾节点为空,表明当前队列未初始化
if (t == null) { // Must initialize
// 将队列的头尾节点都设置为一个新的节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将 node 节点插入主队列末尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以看到 addWaiter(Node.EXECUSIVE) 方法的作用是:把当前线程封装成一个独占模式的 Node 节点,并插入到主队列末尾(若主队列未初始化,则将其初始化后再插入)。
step 3: acquireQueued(final Node node, arg))
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标志位
boolean interrupted = false;
for (;;) {
// 获取该节点的前驱节点
final Node p = node.predecessor();
// 若前驱节点为头节点,则尝试获取资源
if (p == head && tryAcquire(arg)) {
// 若获取成功,则将该节点设置为头节点并返回
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 若上面条件不满足,即前驱节点不是头节点,或尝试获取失败
// 判断当前线程是否可以休眠
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
若当前节点的前驱节点为头节点,则会再次尝试获取资源(tryAcuqire),若获取成功,则将当前节点设置为头节点并返回;否则若前驱节点不是头节点,或者获取资源失败,执行如下两个方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的等待状态
int ws = pred.waitStatus;
// 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠
if (ws == Node.SIGNAL)
return true;
// 若前驱节点的状态大于 0,表示前驱节点处于取消(CANCELLED)状态
// 则将前驱节点跳过(相当于踢出队列)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 此时 waitStatus 只能为 0 或 PROPAGATE 状态,将前驱节点的等着状态设置为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
该方法的流程:
- 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠(park);
- 若前驱节点是取消状态 (ws > 0),则将其清理出队列,以此类推;
- 若前驱节点为 0 或 PROPAGATE,则将其设置为 SIGNAL 状态。
正如其名,该方法(shouldParkAfterFailedAcquire)的作用就是判断当前线程在获取资源失败后,是否可以休眠(park)。
private final boolean parkAndCheckInterrupt() {
// 将当前线程休眠
LockSupport.park(this);
return Thread.interrupted();
}
该方法的作用:
- 使当前线程休眠(park);
- 返回该线程是否被中断(其他线程对其发过中断信号)
上面就是 acquireQueued(final Node node, arg)) 方法的执行过程,为了便于理解,可参考下面的流程图:
若此期间被其他线程中断过,则此时再去执行 selfInterrupt 方法去响应中断请求:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
以上就是 acquire 方法执行的整体流程。
以独占模式获取资源(响应中断)该操作其实与前面的过程类似,因此分析相对简单些,代码如下:
public final void acquireInterruptibly(int arg)
throws InterruptedException
// 若线程被中断过,则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取资源
if (!tryAcquire(arg))
// 尝试获取资源失败
doAcquireInterruptibly(arg);
}
tryAcquire 与前面的操作一样,若尝试获取资源成功则直接返回;否则,执行doAcquireInterruptibly:
private void doAcquireInterruptibly(int arg)
throws InterruptedException
// 将当前线程封装成 Node 节点插入主队列末尾
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 抛出中断异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
通过与前面的 acquire 方法对比可以发现,二者代码几乎一样,区别在于 acquire 方法检测到中断(parkAndCheckInterrupt)时只是记录了标志位,并未响应;而此处直接抛出了异常。这也是二者仅有的区别。
以独占模式获取资源(响应中断,且有超时)public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException
// 若被中断,则响应
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
static final long spinForTimeoutThreshold = 1000L;
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 若超时时间小于等于 0,直接获取失败
if (nanosTimeout <= 0L)
return false;
// 计算截止时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 已经超时了,获取失败
if (nanosTimeout <= 0L)
return false;
// 若大于自旋时间,则线程休眠;否则自旋
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 若被中断,则响应
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里有个变量 spinForTimeoutThreshold,表示自旋时间,若大于该值则将线程休眠,否则继续自旋。个人理解这里增加该时间是为了提高效率,即,只有在等待时间较长的时候才让线程休眠。该方法与 acquireInterruptibly 也是类似的,在前者的基础上增加了 timeout。
释放资源前面分析了三种获取资源的方式,自然也有释放资源。下面分析释放资源的 release 操作:
public final boolean release(int arg) {
// 尝试释放资源,若成功则返回 true
if (tryRelease(arg)) {
Node h = head;
// 若头节点不为空,且等待状态不为 0(此时为 SIGNAL)
// 则唤醒其后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
与 tryAcquire 方法类似,tryRelease 方法在 AQS 中也是抛出异常,同样交由子类实现:
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
unparkSuccessor 的主要作用是唤醒 node 的后继节点,代码如下:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 若后继节点是取消状态,则从尾节点向前遍历,找到 node 节点后面一个未取消状态的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒node节点的后继节点
if (s != null)
LockSupport.unpark(s.thread);
}
若 node 节点的后继节点是取消状态(ws > 0),则从主队列中取其后面一个非取消状态的线程唤醒。前面三个获取资源的方法中,finally 代码块中都用到了 cancelAcquire 方法,都是获取失败时的操作,这里也分析一下:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
// 跳过取消状态的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 前驱节点的后继节点引用
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 将当前节点设置为取消状态
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// 若该节点为尾节点(后面没其他节点了),将 predNext 指向 null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 前驱节点为头节点,表明当前节点为第一个,取消时唤醒它的下一个节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
该方法的主要操作:
- 将 node 节点设置为取消(CANCELLED)状态;
- 找到它在队列中非取消状态的前驱节点 pred:
- 若 node 节点是尾节点,则前驱节点的后继设为空,
- 若 pred 不是头节点,且状态为 SIGNAL,则后继节点设为 node 的后继节点;
- 若 pred 是头节点,则唤醒 node 的后继节点。
该过程可以跟双链表删除一个节点的过程进行对比分析
AQS的Node的共享模式与独占模式类似,共享模式下也有与之类似的相应操作,分别如下:
- acquireShared(int arg): 以共享模式获取资源,忽略中断;
- acquireSharedInterruptibly(int arg): 以共享模式获取资源,响应中断;
- tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式获取资源,响应中断,且有超时等待;
- releaseShared(int arg): 释放资源,唤醒后继节点,并确保传播。
public final void acquireShared(int arg) {
// 返回值小于 0,表示获取失败
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 尝试以共享模式获取资源(返回值为 int 类型)
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
与独占模式的 tryAcquire 方法类似,tryAcquireShared 方法在 AQS 中也抛出异常,由子类实现其逻辑。不同的地方在于,tryAcquire 方法的返回结果是 boolean 类型,表示获取成功与否;而 tryAcquireShared 的返回结果是 int 类型,分别为:
- 负数:表示获取失败;
- 零:表示获取成功,但后续共享模式的获取会失败;
- 正数:表示获取成功,后续共享模式的获取可能会成功(需要进行检测)
若 tryAcquireShared 获取成功,则直接返回;否则执行 doAcquireShared 方法:
private void doAcquireShared(int arg) {
// 把当前线程封装成共享模式的 Node 节点,插入主队列末尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 中断标志位
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 若前驱节点为头节点,则尝试获取资源
if (p == head) {
int r = tryAcquireShared(arg);
// 这里表示当前线程成功获取到了资源
if (r >= 0) {
// 设置头节点,并传播状态(注意这里与独占模式不同)
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 是否应该休眠(与独占模式相同,不再赘述)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 取消操作(与独占模式相同)
cancelAcquire(node);
}
}
doAcquireShared 方法会把当前线程封装成一个共享模式(SHARED)的节点,并插入主队列末尾。addWaiter(Node mode) 方法前文已经分析过。该方法与 acquireQueued 方法的区别在于 setHeadAndPropagate 方法,把当前节点设置为头节点之后,还会有传播(propagate)行为。
private void setHeadAndPropagate(Node node, int propagate) {
// 记录旧的头节点
Node h = head; // Record old head for check below
// 将 node 设置为头节点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 后继节点为空或共享模式唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
// 这里的头节点已经是上面设置后的头节点了
Node h = head;
// 由于该方法有两个入口(setHeadAndPropagate 和 releaseShared),需考虑并发控制
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 若头节点不变,则跳出循环;否则继续循环
if (h == head) // loop if head changed
break;
}
}
该方法与独占模式下的获取方法 acquire 大体相似,不同在于该方法中,节点获取资源后会传播状态,即,有可能会继续唤醒后继节点。值得注意的是:该方法有两个入口 setHeadAndPropagate 和 releaseShared,可能有多个线程操作,需考虑并发控制。
以共享模式获取资源(响应中断)public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared 方法前面已分析,若获取资源失败,会执行 doAcquireSharedInterruptly 方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 把当前线程封装成共享模式节点,并插入主队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 与 doAcquireShared 相比,区别在于这里抛出了异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
从代码可以看到,acquireSharedInterruptibly 方法与 acquireShared 方法几乎完全一样,不同之处仅在于前者会抛出 InterruptedException 异常响应中断;而后者仅记录标志位,获取结束后才响应。
以共享模式获取资源(响应中断,且有超时)public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
doAcquireSharedNanos:该方法可与独占模式下的超时等待方法 tryAcquireNanos(int arg, long nanosTimeout) 进行对比,二者操作基本一致
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放资源,唤醒节点,传播状态
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
AQS中的Node两种模式下的场景分析
场景如下:有 T0~T4 共 5 个线程按先后顺序获取资源,其中 T2 和 T3 为共享模式,其他均为独占模式。就此场景分析:T0 先获取到资源(假设占用时间较长),而后 T1~T4 再获取则失败,会依次进入主队列。此时主队列中各个节点的状态示意图如下:
之后,T0 操作完毕并释放资源,会将 T1 唤醒。T1(独占模式) 会从 acquireQueued(final Node node, int arg) 方法的循环中继续获取资源,这时会获取成功,并将 T1 设置为头节点(T 被移除)。此时主队列节点示意图如下:
此时,T1 获取到资源并进行相关操作。而后,T1 操作完释放资源,并唤醒下一个节点 T2,T2(共享模式) 继续从 doAcquireShared(int) 方法的循环中执行。此时 T2 获取资源成功,将自身设为头节点(T1 被移除),由于后继节点 T3 也是共享模式,因此 T1 会继续唤醒T3;T3 唤醒后的操作与 T2 相同,但后继节点 T4 不是共享模式,因此不再继续唤醒。此时队列节点状态示意图如下:
此时,T2 和 T3 同时获取到资源。之后,当二者都释放资源后会唤醒 T4:
T4 获取资源的与 T1 类似。
AQS源码总结- AQS 是一个抽象类,无法直接进行实例化;
- AQS 内部维护了一个核心变量 state,以及两种队列:主队列(main queue)和条件队列(condition queue);
- AQS 提供了一套基础设施,ReentrantLock 等类通常用一个内部嵌套类 Sync 继承 AQS,并在 Sync 类中制定自己的“游戏规则”。
- acquire: 独占模式获取资源,忽略中断;
- acquireInterruptibly: 独占模式获取资源,响应中断;
- tryAcquireNanos: 独占模式获取资源,响应中断,有超时;
- release: 释放资源,唤醒主队列中的下一个线程。
-
本文分析了以共享模式获取资源的三种方式,以及释放资源的操作。分别为:
- acquireShared: 共享模式获取资源,忽略中断;
- acquireSharedInterruptibly: 共享模式获取资源,响应中断;
- tryAcquireSharedNanos: 共享模式获取资源,响应中断,有超时;
- releaseShared: 释放资源,唤醒后继节点,并确保传播。
JDK源码分析-AbstractQueuedSynchronizer(1) - 知乎
JDK源码分析-AbstractQueuedSynchronizer(2) - 知乎
JDK源码分析-AbstractQueuedSynchronizer(3) - 知乎



