详细讨论java中AQS底层加锁原理和应用。
文章目录
- java锁-AQS
- 前言
- 1. 原理
- 1.1 核心变量
- 1.2 核心方法
- 1.3 加锁、释放锁流程
- 1.3.1 独占锁加锁流程
- 1.3.2 独占锁释放流程
- 1.3.3 共享锁加锁流程
- 1.3.4 共享锁释放流程
- 2. 源码分析
- 2.1 独占锁加锁源码分析
- 2.2 独占锁释放源码分析
- 2.3 共享锁加锁源码分析
- 2.4 共享锁释放源码分析
- 总结
前言
java中AQS是java.util.concurrent.locks.AbstractQueuedSynchronizer抽象类的缩写,大部分的锁都是基于这个类来实现的,如:ReentrantLock,ReentrantReadWriteLock和CountDownLatch等。这是一个基于队列实现的同步器,AbstractQueuedSynchronizer内部维护一张以双向链表的队列和一个全局持有锁状态state变量。本文详细讨论AQS的加锁和释放锁原理。
1. 原理 1.1 核心变量内部维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:
- getState()
- setState()
- compareAndSetState()
AQS定义两种资源类型:Exclusive(独占,同一时刻只有一个线程能执行,如ReentrantLock)和Share(共享,同一时刻能有多个共享线程同时执行,如Semaphore/CountDownLatch)。
1.2 核心方法不同锁的实现,同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等)由AQS实现。自定义同步器实现时主要实现以下几种方法:
-
isHeldExclusively()
该线程是否正在独占资源。只有用到condition才需要去实现它。 -
tryAcquire(int)
独占方式。尝试获取资源,成功则返回true,失败则返回false。 -
tryRelease(int)
独占方式。尝试释放资源,成功则返回true,失败则返回false。 -
tryAcquireShared(int)
共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 -
tryReleaseShared(int)
共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。上面5个方法都需要具体的实现类去实现获取锁和释放锁的逻辑,阻塞和唤醒由AQS来处理,这也是证明了它为什么是其他锁实现的基础类,想知道java中锁的实现原理,则理解AQS原理就 非常重要,非常重要,非常重要。
这里详细说明AbstractQueuedSynchronizer加锁和释放锁的实现逻辑。
1.3.1 独占锁加锁流程 1.3.2 独占锁释放流程 1.3.3 共享锁加锁流程 1.3.4 共享锁释放流程
看完这些流程图是不是有点头晕,别急,下面我们跟着java8中AbstractQueuedSynchronizer源码来分析,在代码中是如何实现。
这一节我们跟着源码来分析,博主使用java8中AbstractQueuedSynchronizer进行源码分析,不同版本源码可能存在一点差异。
2.1 独占锁加锁源码分析独占锁的加锁入口方法是**acquire(1)**源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 步骤1:执行tryAcquire(arg)方法尝试获取锁,返回true说明获取锁成功直接结束,否则执行步骤2
- 步骤2:执行addWaiter(Node.EXCLUSIVE)方法,创新的独占模式等待节点,加入到队列尾部,返回这个新节点node(新尾节点)。
- 步骤3:执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法重新尝试获取锁,获取失败将当前线程挂起,返回当前线程的中断状态,返回ture时执行步骤4
- 步骤4:执行selfInterrupt()方法将当前线程中断。
addWaiter(Node.EXCLUSIVE) 源码如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 申明一个变量pred 将队列的尾节点赋值给它(老尾节点)
Node pred = tail;
// 如果尾节点不为null,说明队列已被初始化
if (pred != null) {
// 将当前节点的前一个节点指针指向队列尾结点
node.prev = pred;
// 采用CAS方式将node节点修改为尾节点(新尾节点)
if (compareAndSetTail(pred, node)) {
// 修改成功后将老尾节点的下一个节点指针指向新尾节点
pred.next = node;
// 返回新尾节点
return node;
}
}
// 如果尾节点为null,说明队列未被初始化
enq(node);
// 返回新尾节点
return node;
}
private Node enq(final Node node) {
// 采用自旋方式操作
for (;;) {
Node t = tail;
// 如果尾节点为null,说明未初始化,必须先初始化
if (t == null) {
// 将头节点通过CAS操作赋值初始值 new Node()
if (compareAndSetHead(new Node()))
// 将头节点赋值给尾节点
tail = head;
} else {
// 申明一个变量pred 将队列的尾节点赋值给它(老尾节点)
node.prev = t;
// 采用CAS方式将node节点修改为尾节点(新尾节点)
if (compareAndSetTail(t, node)) {
// 修改成功后将老尾节点的下一个节点指针指向新尾节点
t.next = node;
// 返回新尾节点
return t;
}
}
}
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法源码如下:
final boolean acquireQueued(final Node node, int arg) {
// 定义一个变量记录操作 failed == true 操作失败, failed == false 操作成功
boolean failed = true;
try {
// 定义一个变量记录当前线程的中断状态
boolean interrupted = false;
// 自旋方式操作
for (;;) {
// 获取上一个线程
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 获取锁成功将当前节点设置为头节点
setHead(node);
// 将原来头节点的下一个节点指针设置为null,使该对象变成垃圾对象,有利于垃圾回收
p.next = null;
// 将操作记录设置为成功
failed = false;
// 返回当前线程的中断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 检查或修改前一个节点的waitStatus状态
parkAndCheckInterrupt())// 挂起线程
// 将线程中断状态设置为true
interrupted = true;
}
} finally {
// 操作出现异常
if (failed)
// 如果当前节点是唤醒节点则唤醒下一个节点,否则删除
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前一个节点的waitStatus值
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
// ws > 0 表示上一个节点的线程已被取消
if (ws > 0) {
do {
// 寻找前一个节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 将当前节点插入到未被取消的节点后面
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 表示当前线程还不能挂起
return false;
}
private final boolean parkAndCheckInterrupt() {
// 挂起线程 底层是使用Unsafe工具类实现,Unsafe直接操作虚拟机底层的方法,不了解请自行百度
LockSupport.park(this);
// 返回当前线程中断状态
return Thread.interrupted();
}
private void cancelAcquire(Node node) {
// 节点为null 直接返回
if (node == null)
return;
// 设置节点线程为null
node.thread = null;
// 过滤掉所有已取消的前置节点
Node pred = node.prev;
while (pred.waitStatus > 0)
// 找到未被取消的前节点
node.prev = pred = pred.prev;
// 记录前节点的下一个节点的指针
Node predNext = pred.next;
// 将当前的节点状态标记为取消
node.waitStatus = Node.CANCELLED;
// 当前节点是尾节点直接移除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 前节点是中间节点则修改下一个节点的指针(相当于删除中间被取消的节点)
compareAndSetNext(pred, predNext, next);
} else {
// 当前节点为头节点直接唤醒下个节点
unparkSuccessor(node);
}
// 释放引用,有利于GC回收
node.next = node; // help GC
}
}
到这里独占锁的加锁方式就分析完成了,根据分析源码我们发现:
2.2 独占锁释放源码分析1、当只有一个线程竞争资源时会立刻得到锁
2、当第二个线程参与竞争资源时先立刻尝试获取锁,获取失败后又自旋两次获取锁,两次获取失败后才会挂起线程。
3、当第三个线程参与竞争资源时立刻尝试获取锁,获取失败后直接挂起线程
4、知道synchronized底层逻辑的不难发现,它们的加锁原理几乎一致
独占锁释放的入口方法是release(int arg) 源码如下:
public final boolean release(int arg) {
// 尝试释放锁(由具体实现类完成)
if (tryRelease(arg)) {
Node h = head;
// 如果存在等待节点,则唤醒后续节点
if (h != null && h.waitStatus != 0)
// 唤醒下一个等待的节点
unparkSuccessor(h);
// 释放成功
return true;
}
// 释放失败
return false;
}
- 步骤1:执行tryRelease(arg)方法尝试释放锁,释放逻辑由实现类完成,返回true说明释放锁成功执行步骤2,否则释放失败。
- 步骤2:执unparkSuccessor(h)方法,唤醒下一个等待的节点。
unparkSuccessor(h) 源码分析如下:
private void unparkSuccessor(Node node) {
// 获取当前节点的等待状态
int ws = node.waitStatus;
if (ws < 0)
// waitStatus 值大于0设置为0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 如果下一个节点存在且已经取消
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
// 用递归方式向后找到未被取消的节点
s = t;
}
if (s != null)
// 如果等待唤醒的节点存在,则唤醒(使用Unsafe来唤醒,如不知道Unsafe什么请自行百度)
LockSupport.unpark(s.thread);
}
到这里释放独占锁的代码分析就完成了,其实很简单,就两步骤:
2.3 共享锁加锁源码分析1、释放锁(将state设置为0,设置独占线程为null)
2、唤醒下一个等待线程
AQS中共享锁加锁的入口方法为acquireShared(int arg) 下面源码分析
public final void acquireShared(int arg) {
// 尝试获取共享锁
if (tryAcquireShared(arg) < 0)
// 获取失败处理逻辑
doAcquireShared(arg);
}
- 步骤1:执行tryAcquireShared(arg)方法尝试获取锁,获取逻辑由实现类完成,返回值负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。当返回负数时执行步骤2
- 步骤2:执doAcquireShared(arg)方法,尝试获取锁和等待操作
doAcquireShared(arg) 源码分析如下
private void doAcquireShared(int arg) {
// 创建和添加一个共享类型的节点(和独占锁一样,只是节点类型为共享模型,这里不再分析)
final Node node = addWaiter(Node.SHARED);
// 定义一个变量记录操作 failed == true 操作失败, failed == false 操作成功
boolean failed = true;
try {
// 定义一个变量记录当前线程的中断状态
boolean interrupted = false;
// 自旋方式操作
for (;;) {
// 获取前一个节点
final Node p = node.predecessor();
if (p == head) {
// 如果上一个节点上头节点,则尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取到锁设置头节点,且唤醒下一个节点(这个需要满足条件,在setHeadAndPropagate方法中详细说明)
// 这个方法也是和独占模式处理不同之处,需要特别注意
setHeadAndPropagate(node, r);
// 释放引用,有利于垃圾回收
p.next = null; // help GC
if (interrupted)
// 如果线程中断过,这设置当前线程中断
selfInterrupt();
failed = false;
return;
}
}
// 这里和独占模式一模一样,不再分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 操作出现异常
if (failed)
// 如果当前节点是唤醒节点则唤醒下一个节点,否则删除 (和独占模式一样,这里不再分析)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
// 记录老头节点
Node h = head;
// 设置当前节点为头节点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 下一个节点是共享节点或者是未知节点是,尝试唤醒,可能存在不必要的唤醒
if (s == null || s.isShared())
// 释放后续节点,下一个章节详解
doReleaseShared();
}
}
到这里共享锁加锁就讲完了,大部分逻辑和独占锁的加锁逻辑一致,不同点主要体现在setHeadAndPropagate(Node node, int propagate)这个方法,详细请看代码注释。
2.4 共享锁释放源码分析1、 尝试获取锁,获取不到锁一般是至少有一个独占线程在阻塞,只要加入到阻塞队列说明队列已被创建
2、只要尝试获取到锁,会尝试去释放后续共享节点(递归执行会把后面连续的共享节点全部释放)
共享锁释放方法入口releaseShared(int arg) 源码如下:
public final boolean releaseShared(int arg) {
// 尝试释放锁,如果释放后允许唤醒后续等待结点返回true,否则返回false。
if (tryReleaseShared(arg)) {
// 存在等待节点,则唤醒
doReleaseShared();
return true;
}
return false;
}
- 步骤1:执行tryReleaseShared(arg)方法尝试释放共享锁,释放逻辑由实现类完成,如果释放后允许唤醒后续等待结点返回true,否则返回false。当返回true时执行步骤2
- 步骤2:执doReleaseShared()方法,唤醒后续节点
doReleaseShared() 源码分析
private void doReleaseShared() {
for (;;) {
// 获取头节点
Node h = head;
// 如果存在等待唤醒的节点
if (h != null && h != tail) {
// 获取当前头节点的等待状态
int ws = h.waitStatus;
// 第三次尝试获取到锁或被挂起的线程
//(已执行过shouldParkAfterFailedAcquire(p, node)方法,前节点的waitStatus就必须是 Node.SIGNAL)
// 用来唤醒下一个被阻塞的节点
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // 修改失败则,从新检查修改
// 唤醒后续等待的节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 修改失败继续自旋修改
}
// 在自旋期间,可能有另一个线程执行setHead(node),导致head发生变化,则一直自旋,直到head是同一个对象为止。
if (h == head)
break;
}
}
共享锁释放到这就分析完了,和独占锁有点像,但是更复杂一些。
1、尝试释放锁释放成功判断是否有等待的后续节点
2、判断头节点的waitStatus值是否为Node.SIGNAL,如果是,则修改waitStatus=0且唤醒后续节点
3、判断头节点的waitStatus值是否为0,如果是,则修改waitStatus = Node.PROPAGATE
AQS的方法和参数有很多,本文主要分析加锁和释放锁逻辑。如果上文都能看懂我相信AQS的其他源码你也一定能看懂。
总结AQS是java锁里重要的基础类,全局维护一个state资源,通过一个队列来同步线程的顺序。原子操作部分大量采用CAS方式提高同步性能,也保证了数据的原子性,线程唤醒和挂起使用了Unsafe工具类实现。希望读者读完本文后不仅知道AQS的原理和使用还知道CAS的使用。AQS都能搞定,其他锁你还怕吗?还想了解java其他锁欢迎阅读博主的《java锁系列》
如果对您有帮助,请支持博主点一个鼓励的赞
原创不易,转载请标明来源



