什么是共享锁?AQS原理概述共享锁逻辑分析(Semaphore角度分析AQS共享逻辑)
共享锁获取共享锁释放共用方法
什么是共享锁?能被多个线程同时获得的锁,为共享锁。也称之为读锁。与互斥锁(写锁)互斥。
AQS原理概述其核心是一个volatile关键字修饰的int类型的state变量,以及一个由双向指针组成的链表队列。队列再初始化的时候,会CAS生成一个head空节点,后继被阻塞的节点会添加到这个空节点的后面,并相互建立pre和next指针。
而state变量在不同的锁中,使用方式也有锁不同,比如
ReentrantLock 重入锁中,state变量用于记录锁的重入次数,即state>0表示有锁,state=0表示无锁。ReentrantReadWriteLock读写锁,它把int类型的state看作是一个32位的位图,高16位和低16位分别代表了读锁和写锁,高16位表示所有线程读锁的总次数。低16位表示写锁的重入次数。CountDownLatch门闩,state用于记录初始门闩数量,await方法用于把线程加入队列,countDown方法用于减少门闩,当门闩等于0的时候,释放所有阻塞在队列中的线程。Semaphore用来记录信号数量,当state=0时,即新进入的线程会被阻塞到队列。
AQS再代码中使用了模版方法模式,AQS只负责把线程封装成node节点加入列表进行阻塞以及唤醒的工作,至于ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore这些子类如何实现的具体上锁逻辑,AQS并不关心,它只是一个抽象类,把一些公用逻辑进行提炼。
共享锁逻辑分析(Semaphore角度分析AQS共享逻辑) 共享锁获取
//AQS添加锁入口
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 小于0表示获取共享锁失败
//获取共享锁失败,会再次尝试获取锁,如果失败添加队列并阻塞线程
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 封装当前线程以及类型
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点是head节点(获取了锁的节点),此时head节点可能已经完成了锁释放,抢锁
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
// 如果获取锁成功,更新head节点,尝试唤醒队列中的后继节点(因为共享锁是可以多线程同时获取,参考:读写锁)
if (r >= 0) {
// 将当前获取锁的节点更新头部,然后唤醒后继节点。
// 四个线程:A B(有锁) {(head初始化的空节点) -> C -> D }
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//确保pre节点能够通知当前节点,并阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//如果中途发生异常,则取消线程
cancelAcquire(node);
}
}
检查并矫正当前节点的前驱节点,确保当前节点可被通知:
//确保pre节点能够通知当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) { // 只有取消状态是大于1(但是肯定不是head节点,head节点是已经获得锁的)
do {
// 拆分代码,即:如果前驱节点是取消状态,则找前驱节点的前驱节点,一直往前找,直到找到活着的节点,
// 然后相互建立pre以及next指针
// Node predprev = pred.prev;
// pred = predprev;
// node.prev = pred;
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
唤醒阻塞线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // this 指明当前线程是阻塞在哪个对象上,后去方便使用jstack命令排查问题
// 判断是否是中断的方式来唤醒线程的
// 唤醒线程的两种方式 1.unpark 2. interrupt
return Thread.interrupted();
}
更新头节点,并唤醒后继线程:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 更新头节点
setHead(node);
if (propagate > 0 ||
h == null || // 假设C线程在获取锁失败,准备进入队列前尝试获取锁的时候A线程正好释放了锁。head节点还没有初始化
h.waitStatus < 0 || // 头节点SIGNAL/PROPAGATE状态,表示活跃 可通知 或 传播
(h = head) == null ||
h.waitStatus < 0) {
Node s = node.next;
// 队列中下一个节点为 读锁节点
if (s == null || s.isShared())
//此方法见目录 加锁、解锁的共用方法。同时解释node节点中PROPAGATE状态的含义
doReleaseShared();
}
}
共享锁释放此方法在获取共享锁逻辑中被调用,共享锁是可以被多个线程同时持有的。
public final boolean releaseShared(int arg) {
// 模版方法,子类实现释放锁逻辑,如果成功,唤醒后继节点
if (tryReleaseShared(arg)) {
//此方法见目录 加锁、解锁的共用方法。同时解释node节点中PROPAGATE状态的含义
doReleaseShared();
return true;
}
return false;
}
共用方法
private void doReleaseShared() {
for (;;) {
Node h = head;
// h != tail 表明后续还有节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 当前节点状态为活跃节点
if (ws == Node.SIGNAL) {
//CAS修改头节点状态为0,成功则唤醒线程,失败则跳过当前循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue; // loop to recheck cases
}
//唤醒线程
unparkSuccessor(h);
} else if (ws == 0 && //ws=0 表示有线程已经唤醒了后继节点,当前线程会跳过唤醒再次循环走到这里
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
// 如果head没有改变,但是CAS失败,则跳过此次循环,从新刷新head节点
continue;
}
}
// 从始至终,head节点未发生改变 ,表明head没有被更新
if (h == head) // loop if head changed
break;
}
}
以上就是我对AQS共享机制的分析,如果有理解错误的地方,欢迎大家留言指正,一起学习共同进步



