大纲:
什么是AQS:
AQS( AbstractQueuedSynchronizer )是一个用来构建多线程访问共享资源的同步器抽象框架,只需要继承 AQS 就可以很方便的实现我们自定义的多线程同步器、锁,是典型的模板设计模式,父类定义好骨架和部分内部操作细节,具体规则由子类去实现。
是整个JUC包的基石,ReentrantLock、ReadWriteLock、CountDowndLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor等都是在AQS的基础上实现的。
AQS的原理概述:
如果被请求的共享资源未被占用,将当前请求资源的线程设置为独占线程,并将共享资源设置为锁定状态,AQS 使用一个 Volatile 修饰的 int 类型的成员变量 State 来表示同步状态,修改同步状态成功即为获得锁,修改 State 值时通过 CAS 机制来保证修改的原子性, 如果共享资源被占用,需要一定的阻塞等待唤醒机制来保证锁的分配,AQS 中会将竞争共享资源失败的线程添加到一个FIFO(First input first)队列(变体的CLH队列)中。
AQS队列模型:
CLH队列:Craig、Landin and Hagersten 队列,是以人名命名的,是 单向链表实现的队列。申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现 前驱节点释放了锁就结束自旋
1. 是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。
2. CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。
3. AQS就是基于CLH队列,AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),来实现锁的分配。
AQS中使用了CLH队列,但是实现方式略有变动,Head 指向节点为已获得锁的节点,是一个虚拟节点,节点本身不持有具体线程; 获取不到同步状态,会将节点进行自旋获取锁,自旋一定次数失败后会将线程阻塞,相对于 CLH 队列性能较好
AQS同步器整体结构:
AQS代码,head, tail记录了队列的头尾节点(Node中记录了prev和next,中间节点可通过头尾节点获取到):
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
// CLH 变体队列头、尾节点,双向链表,记录头尾指针就维护了一个队列
private transient volatile Node head;
private transient volatile Node tail;
// AQS 同步状态
private volatile int state;
// CAS 方式更新 state
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
Node节点代码, 记录了当前节点的状态waitStatus,当前节点的前后节(双向链表结构),当前节点的线程对象:
static final class Node {
static final Node SHARED = new Node();//标识等待节点处于共享模式
static final Node EXCLUSIVE = null;//标识等待节点处于独占模式
static final int CANCELLED = 1;//由于超时或中断,节点已被取消,不再执行了
static final int SIGNAL = -1;//表示下一个节点是通过park堵塞的,需要通过unpark唤醒,也就是说这个节点有义务去唤醒它后继节点
static final int CONDITION = -2;//表示线程在等待条件变量(先获取锁,加入到条件等待队列,然后释放锁,等待条件变量满足条件;只有重新获取锁之后才能返回)
static final int PROPAGATE = -3;//表示后续结点会传播唤醒的操作,共享模式下起作用
//等待状态:对于condition节点,初始化为CONDITION;其它情况,默认为0,通过CAS操作原子更新
volatile int waitStatus;
//前节点
volatile Node prev;
//后节点
volatile Node next;
//线程对象
volatile Thread thread;
//对于Condtion表示下一个等待条件变量的节点;其它情况下用于区分共享模式和独占模式;
Node nextWaiter;
}
AOS:
AQS继承自AOS,用于记录独占锁(互斥锁)获取锁的线程。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// 独占线程(不参与序列化)
private transient Thread exclusiveOwnerThread;
// 设置当前独占的线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 返回当前独占的线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
ReentrantLock实现:
类结构图:
(1)lock.lock() 方法后,第一步试图获取锁,主要为tryAcquire()方法
(2)获取锁成功,则执行业务代码,如果失败,则需要初始化队列,如果队列已初始化则加入队列尾部进入阻塞 执行addWaiter(),enq(),acquireQueued方法。
1. 获取锁 tryAcquire():非公平锁在执行 nofaireTryAcquire之前会先cas修改state尝试获取锁一次,在获取失败后执行nofaireTryAcquire,判断 state ==0 之后,非公平锁会直接cas修改state尝试获取锁,公平锁则需要先执行hasQueuedPredecessors(),如果当前线程前面有一个排队的线程则返回true,如果当前线程位于队列的头部或队列为空则返回false, 返回false 才会去cas修改state获取锁。
公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU 唤醒阻塞线程的开销比非公平锁大。
非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU 不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。
2. 创建当前线程node 并加入等待队列
第一步尝试获得锁失败,则接下来会将线程组装成为 Node 进行入队流程,Node 是 AQS 中最基本的数据结构,也是 CLH 队列中的节点,Node 有分为SHARED(共享)、EXCLUSIVE(独占) 两种模式。
addWaiter() : 创建当前线程的Node, 队列为空则初始化队列,不为空则加入到队列尾部,核心方法 enq(node) 中会循环cas去加入队列尾部.
// 为当前线程和给定模式 并创建排队节点。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 这里的逻辑和 enq(node)方法是一幕一样的,只是为了在指定条件下快速enq
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 将节点插入队列,必要时进行初始化
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acqureQueued: 实现同步的核心方法,是一个自旋的过程, 如果当前Node的prev为头节点,则会cas获取锁,获取成功则将当前Node设置为head, 将原来的head节点设置为null 方便GC回收, 退出自旋,执行业务代码。其他情况如果Node的prev为不为头节点或者为头节点但是获取锁失败,说明仍需排队,会调用shouldParkAfterFailedAcquire() 方法, 设置前驱节点状态为-1,然后调用parkAndCheckInterrupt()阻塞线程直到unlock()的时候被唤醒
shouldParkAfterFailedAcquire(): 该方法的主要目的在于
1. 排除被取消的前驱节点
2. 告诉前驱节点,当前节点被parkAndCheckInterrupt()阻塞
3. realse()释放锁的时候会判断head.waitStatus != 0,才会去LockSupport.unpark(head.next.thread)。
会有下面几种情况:
1. 前驱节点waitStatus =0 则将其设置为-1
2. 前驱节点waitStatus = -1 则返回true,且仅有这种情况才会返回true, 执行后续的parkAndCheckInterrupt()方法阻塞线程, 就是只有某个节点的前驱节点 为 -1,该节点才会通过parkAndCheckInterrupt阻塞, 同样的,一个节点要执行后续的parkAndCheckInterrupt方法,就需要想将其前驱节点设置为-1,然后下次自旋执行shouldParkAfterFailedAcquire的时候才会返回true,这个有点绕,可以多看几遍源码理解下。
3. 前驱节点 waitStatus > 0 被取消了, 则一直往链表头取,直到找到没有被取消的节点为止,将当前节点的前驱节点修改为取到的节点。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// SIGNAL = -1;//表示下一个节点是通过park堵塞的,需要通过unpark唤醒,也就是说这个节点有义务去唤醒它后继节点
return true;
if (ws > 0) {
// CANCELLED = 1;//由于超时或中断,节点已被取消,不再执行了
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 初始化的状态 0
// CONDITION = -2;//表示线程在等待条件变量(先获取锁,加入到条件等待队列,然后释放锁,等待条件变量满足条件;只有重新获取锁之后才能返回)
// int PROPAGATE = -3;//表示后续结点会传播唤醒的操作,共享模式下起作用
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
执行流程:
队列变化逻辑图:
API:
lock.lock();
lock.unlock();
公平锁&非公平锁
实现代码流程:
实现自己的自旋锁代码
彻底理解其思想和api的原理
记住所有的细节,记录在博客中,
问题:enq(final Node node) 总自旋的作用,就是为防止并发的时候,两个线程竞争加入队列尾部,则会有失败的需要自旋去加入队列尾部,所以后面执行acquireQueued()各个想成互不干扰
结合 shouldParkAfterFailedAcquire 理解节点取消操作
shouldParkAfterFailedAcquire 这个方法这个涉及的目的是什么?
cancelAcquire 何时执行
tryAcquire 是 AQS 中抽象模版方法,但是内部会有默认实现,虽然默认的方法内部抛出异常,为什么不直接定义为抽象方法呢?
因为 AQS 不只是对独占锁实现了抽象,同时还包括共享锁;不同锁定义了不同类别的方法,共享锁就不需要 tryAcquire,如果定义为抽象方法,继承 AQS 子类都需要实现该方法
为何tryAcquire() 公平锁的在 FairSync中,nofairTryAcqurie在 Sync中,Sync中级
为何称为自旋锁
修改state为何需要 cas修改
直接继承 AQS就好了,为什么要定义一个内部类来实现AQS
CONDITION 的作用
PROPAGATE 的作用
参考:
聊一聊ReentrantLock和AQS那点事,不刷别后悔! - 知乎



