AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
State对State的操作有三种:
- getState()
- setState()
- compareAndSetState()
Node结点是对每一个等待获取资源的线程的封装,变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED(1,取消调度)、SIGNAL(-1,后继结点在等待当前结点唤醒)、ConDITION(-2,结点等待在Condition上)、PROPAGATE(-3,共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点)、0
需要注意的是,正值代表结点已被取消,负值代表结点处于有效等待状态,所以源码中很多地方用>0、<0来判断结点的状态是否正常。
acquire(int)独占模式下线程获取共享资源的顶层入口,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
对acquire的核心流程可表述如下:
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队。
enq(node);
return node;
}
enq(Node)
private Node enq(final Node node) {
//CAS"自旋",直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
release(int)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了。
参考Java并发之AQS详解



