- (1)基础结构
- (2)构造函数
- (3)countDown
- (4)await
上面展示了CountDownLatch 的类结构,其中有一个静态内部类Sync。
这个类正是继承自 AQS,并且重写了tryAcquireShared、tryReleaseShared方法。在 Sync 类里,根据是否是独占,来重写对应的方法。如果是独占,则重写 tryAcquire 或 tryRelease 等方法;如果是非独占,则重写 tryAcquireShared 和 tryReleaseShared 等方法”。那么从这里可以看出CountDownLatch 属于非独占的类型。
首先我们需要知道CountDownLatch的count在使用上有什么意义,它是 JDK 提供的并发流程控制的工具类,它是在 java.util.concurrent 包下,在 JDK1.5 以后加入的。 CountDownLatch 的核心思想,等到一个设定的数值达到之后,才能出发。CountDownLatch允许一个或多个线程等待其他线程完成操作。
继续往下看,其中调用了父类也就是AbstractQueuedSynchronizer的setState方法,所以我们通过 CountDownLatch 构造函数将传入的 count 最终传递到 AQS 内部的 state 变量,给 state 赋值,state 就代表还需要倒数的次数。
在 countDown 方法中调用的是 sync 的 releaseShared 方法:
public final boolean releaseShared(int arg) {
//尝试以共享模式去释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
接下来在看下tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
//不断循环检查操作状态
for (;;) {
int c = getState();
//如果为0则说明没有未持有资源,则不能释放
if (c == 0)
return false;
// 执行自减操作,直到state为0
int nextc = c-1;
//CAS更新state值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
【1】方法体内是一个for的死循环,在循环体中,最开始是通过 getState 拿到当前 state 的值并赋值给变量 c,如果为0则说明没有未持有资源,则不能释放,直接返回 false,相当于 releaseShared 方法不产生效果,也就意味着 countDown 方法不产生效果。
【2】如果 c 不等于 0,在这里会先把 c-1 的值赋给 nextc,然后再利用 CAS 尝试把 nextc 赋值到 state 上。如果赋值成功就代表本次 countDown 方法操作成功,也就意味着把 AQS 内部的 state 值减了 1。最后,是 return nextc == 0,如果 nextc 为 0,意味着本次倒数后恰好达到了规定的倒数次数,门闩应当在此时打开,所以 tryReleaseShared 方法会返回 true,那么再回到之前的 releaseShared 方法中,可以看到,接下来会调用 doReleaseShared 方法,效果是对之前阻塞的线程进行唤醒,让它们继续执行。
【3】注意最后一行代码return nextc == 0;这意味着只有在当前CountDownLatch 的state的计数器为0时才会返回true,一旦 tryReleaseShared 方法 return true,则 releaseShared 方法会调用 doReleaseShared 方法,把所有之前阻塞的线程都唤醒。这就是意味着在在执行countDown的时候如果此时state不为0的话,那么线程都会阻塞
(4)await该方法是 CountDownLatch 的“获取”方法,调用 await 方法会把线程阻塞,直到倒数为 0 才能继续执行。await 方法和 countDown 是配对的
1:可响应中断
2:非独占模式
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
以上代码一方面响应了对于中断的处理,另一方面调用了 tryAcquireShared 方法。这个方法很简单,它会直接判断 getState 的值是不是等于 0,如果等于 0 就返回 1,不等于 0 则返回 -1。
getState 方法获取到的值是剩余需要倒数的次数,如果此时剩余倒数的次数大于 0,那么 getState 的返回值自然不等于 0,因此 tryAcquireShared 方法会返回 -1,一旦返回 -1,再看到 if (tryAcquireShared(arg) < 0) 语句中,就会符合 if 的判断条件,并且去执行 doAcquireSharedInterruptibly 方法,然后会让线程进入阻塞状态。
private void doAcquireSharedInterruptibly(int arg)
//doAcquireSharedInterruptibly会将线程阻塞放入链表队列中
throws InterruptedException {
//将当前线程放入到等待队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//自旋
for (;;) {
//获取当前节点的上一个节点
final Node p = node.predecessor();
//如果该节点为头结点,则尝试再次获取锁
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//获取锁失败后判断是否应该挂起该线程
if (shouldParkAfterFailedAcquire(p, node) &&
//挂起线程,即处于等待状态
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}



