我们都知道在Java并发场景下JUC提供很多工具类,但是AbstrctQueuedSynchronizer这个抽象同步队列绝对是核心和基石。在理解这个框架运行机制中,很多抽象的逻辑让我们理解起来很困难,所以想分享一些自己在理解过程的心得,跟大家交流探讨,有不对的地方望各位大神指正,谢谢!
实现独占模式的工具类有ReentrantLock,无非就是重写了tryAcquire() 和tryRelease()方法,以及公平与不公平获得锁机制的分别实现,ReentrantLock默认实现非公平获得锁机制。关于管理队列的任务还是由AQS内部来完成。我们一步一步来分析。
首先获取尝试同步状态state,AQS同步队列 通过CAS方式来维护state这个字段,来保证原子性。默认初始化为0,当state=1时表示该node要被删除,state=-1表示该node对应的线程release时,要唤醒后继节点,state=-2时,说明该node在条件队列中,state=-3,表示向后继节点传播,在共享锁中才会使用到。
//acquire方式就是获取锁的方法调入口,所有获取独占锁的实现机制都是从该方法进入
public final void acquire(int arg) {
//tryAcquire()方法会被其它类根据业务需要被重写,它仅仅只是尝试获取同步状态state,不涉及任何关于队
//列的操作,
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
以上就是线程获取同步状态,以及获取失败后进入同步队列等待被唤醒的整个过程。
关于独占模式下还有另外一个实现方式就是条件队列。这个队列由ConditionObject对象来维护。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//生成当前线程对应的ConditionWaiter,并进入条件队列,
//因为condition作用在独占模式下,只有获得同步状态的前提下才能执行该方法,所以是线程安全的
//故在插入条件队列时不需要使用CAS方法操作
Node node = addConditionWaiter();
//完全释放同步状态:因为有可能重入,所以需要一次全释放干净,释放后会唤醒同步队列中的等待节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//这一步是维护整个条件等待队列,清理被删除的node节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//该方法是通知还在条件队列中的node,将其转移到同步队列中并唤醒对应的线程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
以上就是条件队列的执行机制。



