线程池工具类
public class ThreadPoolUtil {
public static final long DEFAULT_WAIT_SECONDS = 5000;
private static class ThreadPool {
private static final ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("thread-pool-%d").build();
private static final ThreadPoolExecutor pool = new ThreadPoolExecutor(20, 40, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue(200), namedThreadFactory, new ThreadPoolExecutor.DiscardPolicy()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
printException(r, t);
}
};
}
public static ExecutorService getThreadPool() {
log.info("ThreadPool task num now: {}", ThreadPool.pool.getActiveCount());
return ThreadPool.pool;
}
private static void printException(Runnable r, Throwable t) {
if (t == null && r instanceof Future>) {
try {
Future> future = (Future>)r;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
log.error(t.getMessage(), t);
}
}
public static List executeTasks(List> callables, long waitSeconds) {
return doExecute(callables, waitSeconds, ThreadPool.pool);
}
private static List doExecute(List> callables, long waitSeconds, ThreadPoolExecutor executor) {
try {
List result = Lists.newArrayList();
if (callables == null || callables.isEmpty()) {
return result;
}
List> futures = executor.invokeAll(callables);
try {
for (Future future : futures) {
result.add(future.get(waitSeconds, TimeUnit.SECONDS));
}
} catch (Exception e) {
throw e;
} finally {
for (Future future : futures) {
try {
future.cancel(true);
} catch (Exception e2) {
}
}
}
return result;
} catch (Exception e) {
log.error("doExecute error", e);
throw new RuntimeException(e.getMessage());
}
}
}
使用Callable
public String getTableOwnersByTableGuid(ListtableGuids) { String ret = "["; List > callables = Lists.newArrayList(); tableGuids.stream().forEach(tableGuid -> { Callable callable = () -> Optional.ofNullable(odpsTableManager.queryOdpsTableBaseInfo(tableGuid)).map( GetMetaTableBasicInfoResponse.Data::getOwnerId).orElse(null); callables.add(callable); }); List ownerIdList = ThreadPoolUtil.executeTasks(callables, ThreadPoolUtil.DEFAULT_WAIT_SECONDS).stream().filter( Objects::nonNull).collect(Collectors.toList()); for (String ownerId : ownerIdList) { ret += "'" + ownerId + "',"; } return StringUtils.stripEnd(ret, ",") + "]"; }
使用CompletableFuture
public String getTableOwnersByTableGuid1(ListtableGuids) { String ret = "["; List > completableFutureList = Lists.newArrayList(); tableGuids.stream().forEach(tableGuid -> { try { CompletableFuture future = CompletableFuture.supplyAsync(() -> odpsTableManager.queryOdpsTableBaseInfo(tableGuid).getOwnerId()); completableFutureList.add(future); } catch (Exception e) { log.info("getTableOwnersByWorkOrderId queryOdpsTableBaseInfo fail"); } }); List ownerIdList = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList()); for (String ownerId : ownerIdList) { ret += "'" + ownerId + "',"; } return StringUtils.stripEnd(ret, ",") + "]"; }



