闭锁是一个同步工具类-它可以延迟线程的进度直到其到达终止状态。闭锁的作用相当于一扇门:在到达结束状态之前,这扇门是关闭的,并且不允许任何进程通过。当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门会一直打开。闭锁可以用来确保某些活动直到其它活动都完成后才继续执行。
CountDownLatch是一种灵活的闭锁实现。它可以使一个或多个线程等待一组事件的发生。闭锁状态包含一个计数器,它被初始化为正整数,表示需要等待的事件数量。countDown()方法递减计数器:表示一个事件已经发生。而await方法等待计数器达到零,这表示所有的事件都已经发生。如果计数器非零,那么await会一直阻塞知道计数器为零、或者中断、或者等待超时。
CountDownLatch提供了几个方法:
| 方法 | 说明 |
|---|---|
| public CountDownLatch(int count) | 初始化计数器为一个非负数 |
| void await() throws InterruptedException | 阻塞直到计数器为0 |
| boolean await(long timeout, TimeUnit unit) throws InterruptedException | 限时阻塞,超时则继续执行 |
| void countDown() | 计数器递减1 |
| long getCount() | 计数器数量 |
CountDownLatch(int count) 如果初始化计数器为0,那么所有线程都可以通过,也就起不到等待事件发生的作用了
await和await(timeout,unit)都是可中断的,当线程被中断时,这两个方法会抛出InterruptedException异常,并清除中断标志。
public class TestHarness {
public long timeTask(int nThread, Runnable runnable) throws InterruptedException {
final CountDownLatch beginGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThread);
for(int i=0;i
new Thread(new Runnable() {
@Override
public void run() {
try {
beginGate.await();
runnable.run();
} catch (Exception e) {
} finally{
endGate.countDown();
}
}
}).start();
}
long begin = System.currentTimeMillis();
beginGate.countDown();
endGate.await();
long end = System.currentTimeMillis();
return (end - begin);
}
}
TestHarness创建一定数量的线程,利用他们执行指定的任务。它包含两个闭锁,分别表示”起始门“(beginGate)和”结束门“(endGate)。起始门的初始计数器为1,结束门的初始值为工作线程的数量。每个工作线程的首先要做的就是在起始门上等待,从而确保所有的线程都就绪后才执行。每个工作线程做的最后一件事就是调用结束门的countDown方法将计数器减一,这能使主线程能够等待所有的工作线程都执行完成,于是可以统计任务消耗的时间。
为什么要在TestHarness中使用闭锁beginGate,而不是创建线程后直接启动?或许我们想要测试n个线程并发执行任务时需要的时间,如果创建后直接启动,那先启动的线程会领先于后启动的线程。而beginGate能够同时释放所有工作线程,endGate能够让主线程等待最后一个工作线程执行完成。
public class MainTest {
static class CountDownLatchThread extends Thread{
CountDownLatch countDownLatch = null;
CountDownLatchThread(String name,CountDownLatch countDownLatch) {
super.setName(name);
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
countDownLatch.await();
System.out.println(super.getName()+" await finish");
} catch (Exception e) {
System.out.println(super.getName()+" interrupt flag : "+Thread.currentThread().isInterrupted());
e.printStackTrace();
}
}
}
//测试
public static void main(String[] args) throws Exception {
// 创建初始值为1的闭锁
CountDownLatch countDownLatch = new CountDownLatch(1);
List list = new ArrayList<>();
for(int i=0;i<5;i++) {
CountDownLatchThread countDownLatchThread = new CountDownLatchThread("thread"+String.valueOf(i), countDownLatch);
countDownLatchThread.start();
list.add(countDownLatchThread);
}
ThreadUtil.sleep(2000);
// 中断thread2
list.get(2).interrupt();
ThreadUtil.sleep(2000);
countDownLatch.countDown();
ThreadUtil.sleep(3000);
CountDownLatchThread countDownLatchThread = new CountDownLatchThread("last", countDownLatch);
countDownLatchThread.start();
}
}
结果:
thread2 interrupt flag : false // 2秒后才输出
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at com.hz.MainTest$CountDownLatchThread.run(MainTest.java:48)
// 又2秒后输出
thread3 await finish
thread1 await finish
thread4 await finish
thread0 await finish
// 又3秒后输出
last await finish
5个工作线程启动之后,都在闭锁上等待。2秒钟后thread2被中断,抛出了InterruptedException异常,并且线程的中断标志也被重置:Thread.currentThread().isInterrupted()返回false;再2秒后,闭锁被打开,计数器为0:所有线程都顺利通过,这时候闭锁已经是打开状态,CountDownLatch只提供了countDown方法来递减计数器,没有提供countUp方法来递增计数器,所以闭锁的状态不可再改变。3秒后,即使再次调用await方法,也不会再阻塞了。



