CyclicBarrier 循环屏障,类似于起跑线阻塞运动员,所有线程到达屏障后才可以解除阻塞。并且可以重复使用(不像CountDownLatch减到0就无法使用了)。
二、源码分析
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
private void nextGeneration() {
trip.signalAll();
count = parties;
// 为什么要新建一个对象呢
generation = new Generation();
}
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 保存当前代
final Generation g = generation;
// 如果当前线程可以dowait,那么当前代不应该被破坏
if (g.broken)
throw new BrokenBarrierException();
// 线程被中断的话,应该破坏当前屏障并唤醒所有线程
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 剩余可阻塞线程数
int index = --count;
// 如果所有线程已经到达屏障
if (index == 0) { // tripped
// 初始化标识
boolean ranAction = false;
try {
// 保存运行动作
final Runnable command = barrierCommand;
// 如果动作不为null
if (command != null)
// 动作运行
command.run();
// 设置标识为运行成功
ranAction = true;
// 唤醒所有等待线程,屏障修复,进入下一代
nextGeneration();
return 0;
} finally {
// 假如动作抛出异常,则破坏当前屏障
if (!ranAction)
breakBarrier();
}
}
// 循环直到通过屏障、被打破,中断、超时
for (;;) {
try {
// 如果没设置超时等待
if (!timed)
// 释放锁,进入条件等待队列
trip.await();
// 如果设置了等待时间并且nanos设置等待时间
else if (nanos > 0L)
// 等待指定时间
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果等于当前代(上面index为0并且进入nextGeneration时才会不等于当前代) 并且 屏障没被破坏
if (g == generation && ! g.broken) {
// 破坏当前屏障
breakBarrier();
throw ie;
} else { // 不等于当前代或者屏障被破坏了
// 即使没被中断也将要完成等待 todo
// 中断当前线程
Thread.currentThread().interrupt();
}
}
// 进入下一代的时候,屏障就已经修复了,所以这里不应该被打破
// 如果被打破,抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 如果进入了下一代(调用了nextGeneration),则说明已经通过屏障,返回当前线程到达屏障时剩余的可阻塞数
if (g != generation)
return index;
// 如果设置了超时等待时间,并且nanos设置的不正确,则打破屏障并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
public int getParties() {
return parties;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 打破屏障(必须先打破屏障再进入下一代,否则导致并非所有线程到达屏障就通过)
// 先打破会让在屏障等待的线程抛出异常
breakBarrier();
// 进入下一代
nextGeneration();
} finally {
lock.unlock();
}
}
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}



