Java8 新增一个类 CompletableFuture,用来支持流式异步编程。
创建任务CompletableFuture 提供一下API用来执行异步任务。
static CompletableFuture supplyAsync(Supplier supplier); static CompletableFuture supplyAsync(Supplier supplier, Executor executor); static CompletableFuturerunAsync(Supplier supplier); static CompletableFuture runAsync(Supplier supplier, Executor executor);
这些API都是用来创建任务,并且异步计算任务。supplyAsync 方法是有返回值的。例如:
CompletableFuturethenAccept()futureA = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture futureB = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuturethenAcceptAsync(Consumer super T> action); CompletableFuture thenAcceptAsync(Consumer super T> action, Executor executor);
表示当前这个阶段计算正常完成(没有异常发送),计算完成后的结果会作为 thenAcceptAsync 方法的入参,然后执行 action。如果当前阶段计算发送异常,thenAcceptAsync 方法不会被执行。
// 打印 hello
CompletableFuture.supplyAsync(() -> "hello")
.thenAcceptAsync(result -> System.out.println(result));
// 不会打印. 因为不会执行thenAcceptAsync
CompletableFuture.supplyAsync(() -> {
throw new NullPointerException();
})
.thenAcceptAsync(rs -> System.out.println("1234"));
thenApply()
CompletableFuture thenApply(Function super T,? extends U> fn); CompletableFuture thenApplyAsync(Function super T,? extends U> fn); CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor);
这个方法和 thenAcceptAsync 方法功能一样,就是返回值不同。
thenCombine()CompletableFuturethenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn); CompletableFuture thenCombineAsync(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn);
这里表示当前阶段计算完成并且 other 计算阶段也完成(两个阶段都没有发生异常),然后把这两个阶段计算的结果,作为 thenCombineAsync 方法的入参,执行 fn。如果其中一个计算阶段发生异常,都不会执行该方法。
CompletableFuturewhenComplete()taskA = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture taskB = CompletableFuture.supplyAsync(() -> 12L); taskB.thenCombine(taskA, (resultB, resultA) -> { return resultA + resultB; });
CompletableFuturewhenComplete(BiConsumer super T,? super Throwable> action); CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action);
这里表示无论当前阶段是计算完成还是发生异常,都将执行 whenComplete 方法。
CompletableFutureexceptionally()taskA = CompletableFuture.supplyAsync(() -> { return "hello"; }); taskA.whenCompleteAsync((rs, ex) -> { if (ex != null) { ex.printStackTrace(); } if (rs != null) { System.out.println(rs); } });
CompletableFutureexceptionally(Function fn);
这个方法也是用来处理异常的。只有在发生异常时才会执行该方法。
CompletableFutureexceptionally() vs whenCompleteAsync()区别taskA = CompletableFuture.supplyAsync(() -> { throw new IllegalArgumentException(); }); taskA.exceptionally(ex -> { // 这里可以处理异常,然后返回补偿值, 而在 whenComplete() 方法只能知道有异常发送,不能去补偿。 System.out.println(ex.getMessage()); return "hello"; })
调用顺序:先 exceptionally() 再 whenCompleteAsync() 。
CompletableFuturetaskA = CompletableFuture.supplyAsync(() -> { throw new IllegalArgumentException(); }); taskA.exceptionally(ex -> { System.out.println(ex.getMessage()); return "hello"; }).whenCompleteAsync((rs, ex) -> { if (ex != null) { ex.printStackTrace(); } if (rs != null) { // IllegalArgumentException异常已经被exceptionally处理,并返回正常值 hello. // 这里没有异常,将打印 hello System.out.println(rs); } });
在执行任务发送了异常 IllegalArgumentException,将执行方法 exceptionally ,然后执行方法 whenCompleteAsync。
调用顺序:先 whenCompleteAsync() 再 exceptionally() 。
CompletableFuture参考taskA = CompletableFuture.supplyAsync(() -> { throw new IllegalArgumentException(); }); taskA.whenCompleteAsync((rs, ex) -> { // ex 不为空,有 IllegalArgumentException 异常发送 if (ex != null) { // 打印异常栈 ex.printStackTrace(); } if (rs != null) { System.out.println(rs); } }).exceptionally(ex -> { // 捕获异常 IllegalArgumentException 并处理 System.out.println(ex.getMessage()); return "hello"; });
JDK8 CompletableFuture API使用CompletableFuture优化你的代码执行效率 拓展 ForkJoinPool
计算一些并行任务,可以使用 ForkJoinPool 来提供并行的方式提供资源利用率和吞吐量。这个类的大致原理:把大任务拆分成小任务,再去执行。每个空线程,也会去窃取其他繁忙线程的任务,帮忙处理。使用例子:
定义一个任务,继承 RecursiveAction 无返回值。需要有返回值的,继承RecursiveTask
private static class FileUploadTask extends RecursiveAction{
private int[] files;
private static final int THRESHOLD = 20;
public FileUploadTask(int[] files) {
this.files = files;
}
@Override
protected void compute() {
if (files.length <= THRESHOLD) {
this.upload();
} else {
System.out.println(Thread.currentThread().getName());
FileUploadTask subTaskOne = new FileUploadTask(Arrays.copyOfRange(files, 0, files.length / 2));
FileUploadTask subTaskTwo = new FileUploadTask(Arrays.copyOfRange(files, files.length / 2, files.length));
subTaskOne.fork();
subTaskTwo.fork();
// 如果不关注返回值,可以考虑不需要这里
subTaskTwo.join(); // 阻塞等待子任务计算完成,异常也会释放
subTaskOne.join(); // 阻塞等待子任务计算完成
}
}
private void upload() {
List items = new ArrayList<>();
HashSet set = new HashSet<>();
for (int item : files) {
items.add(String.valueOf(item));
set.add(item);
}
if (set.contains(1000)) {
throw new IllegalArgumentException();
}
System.out.println("开始上传文件: " + String.join(",", items));
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
提交任务
public static void main(String[] args) throws Exception {
int length = 100;
int[] arr = new int[length];
for (int i = 0; i < length; i++) {
arr[i] = i + 1;
}
int parallelism = ForkJoinPool.commonPool().getParallelism();
System.out.println("parallelism:" + parallelism);
ForkJoinPool.commonPool().invoke(new FileUploadTask(arr));
Thread.sleep(30000);
}
参考
ForkJoinPool使用与原理



