非常感谢您的所有建议!
最后,我选择了一些我认为相当简单的东西。我发现CountDownLatch几乎是我所需要的。它一直阻塞直到计数器达到0。唯一的问题是它只能递减计数,不能递减计数,因此不能在我有任务可以提交新任务的动态设置中使用。因此,我实现了一个
CountLatch提供附加功能的新类。(请参阅下文)然后,按如下方式使用此类。
主线程调用
latch.awaitZero(),直到锁存器到达0为止。
任何线程,在调用
executor.execute(..)calls 之前
latch.increment()。
在完成之前,任何任务都会调用
latch.decrement()。
当最后一个任务终止时,计数器将达到0,从而释放主线程。
欢迎进一步的建议和反馈!
public class CountLatch {@SuppressWarnings("serial")private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected int acquireNonBlocking(int acquires) { // increment count for (;;) { int c = getState(); int nextc = c + 1; if (compareAndSetState(c, nextc)) return 1; } } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } }}private final Sync sync;public CountLatch(int count) { this.sync = new Sync(count);}public void awaitZero() throws InterruptedException { sync.acquireSharedInterruptibly(1);}public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.tonanos(timeout));}public void increment() { sync.acquireNonBlocking(1);}public void decrement() { sync.releaseShared(1);}public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]";}}请注意,可以将
increment()/
decrement()调用封装到自定义的
Executor子类中,如Sami Korhonen
所建议的那样,或者与
beforeExecute和
afterExecute一起由impl所建议的那样。看这里:
public class CountingThreadPoolExecutor extends ThreadPoolExecutor {protected final CountLatch numRunningTasks = new CountLatch(0);public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}@Overridepublic void execute(Runnable command) { numRunningTasks.increment(); super.execute(command);}@Overrideprotected void afterExecute(Runnable r, Throwable t) { numRunningTasks.decrement(); super.afterExecute(r, t);}public void awaitCompletion() throws InterruptedException { numRunningTasks.awaitZero();}public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException { numRunningTasks.awaitZero(timeout, unit);}}


