在spark中有异步多线程的需求,需要阻塞主线程,等所有子线程都执行完成后,主线程继续执行。
如果是用Thread,不太好实现,用Callable+FutrueTask结合线程池,可以快速实现:
final AtomicInteger messagesReceived = new AtomicInteger(0); // ThreadedListenerAdapter is the class that I'm testing // It's not germane to the question other than as a target for a thread pool. final ThreadedListenerAdapteradapter = new ThreadedListenerAdapter (listener); int taskCount = 10; List > taskList = new ArrayList >(); for (int whichTask = 0; whichTask < taskCount; whichTask++) { FutureTask futureTask = new FutureTask (new Callable () { @Override public Integer call() throws Exception { // Does useful work that affects messagesSent return messagesSent; } }); taskList.add(futureTask); } for (FutureTask task : taskList) { LocalExecutorService.getExecutorService().submit(task); } for (FutureTask task : taskList) { int result = 0; try { // 关键是这里 result = task.get(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (ExecutionException ex) { throw new RuntimeException("ExecutionException in task " + task, ex); } assertEquals(maxMessages, result); } int messagesSent = taskCount * maxMessages; assertEquals(messagesSent, messagesReceived.intValue());



