completableFuture是JDK1.8版本新引入的类
public class CompletableFutureimplements Future , CompletionStage
实现了 Future 和 CompletionStage 接口
CompletionStage接口去支持完成时触发的函数和操作
2. 方法说明 2.1 创建异步操作(runAsync、supplyAsync)completableFuture提供四个静态方法来创建异步操作
public static CompletableFuturerunAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); } public static CompletableFuture supplyAsync(Supplier supplier) { return asyncSupplyStage(asyncPool, supplier); } public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); }
runAsync:没有返回结果
supplyAsync:可以获取返回结果
可以传入自定义线程池,否则就使用默认线程池
例子
2.2 完成时回调与异常感知(whenComplete、exceptionally)// 线程池 public static ExecutorService executor = Executors.newFixedThreadPool(10);runAsync:开启一个异步任务,无返回结果
CompletableFuturefuture = CompletableFuture.runAsync(()->{ System.out.println("当前线程ID:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); },executor); supplyAsync:开启一个异步任务,有返回结果
CompletableFuturefuture1 = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程ID:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; }, executor); Integer res = future1.get(); System.out.println("*主线程End" + "---" + res);
当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:
public CompletableFuturewhenComplete( BiConsumer super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); } public CompletableFuture whenCompleteAsync( BiConsumer super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); } public CompletableFuture whenCompleteAsync( BiConsumer super T, ? super Throwable> action, Executor executor) { return uniWhenCompleteStage(screenExecutor(executor), action); } public CompletableFuture exceptionally( Function fn) { return uniExceptionallyStage(fn); }
whenComplete 和 whenCompleteAsync 的区别:
1、whenComplete;是执行当前任务的线程继续执行whenComplete的任务
2、whenCompleteAsync:是执行把whenCompleteAsync这个任务继续交给线程池来进行执行
3、不以Async结尾,意味着whenComplete中执行的任务使用的是相同的线程,
4、以Async结尾,意味着whenCompleteAsync可能会使用其他线程执行,如果使用相同的线程池,也可能在同一个线程中执行
例子:
2.3 handle/handleAsync 方法(最终处理)whenComplete:可以处理正常和异常的计算结果
CompletableFuturefuture = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程ID:" + Thread.currentThread().getId()); int i = 10 / 2; System.out.println("运行结果:" + i); return i; }, executor).whenComplete((res,exeception)->{ System.out.println("异步任务完成了,结果是:" + res + ",异常是:" + exeception); }); CompletableFuturefuture1 = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程ID:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("运行结果:" + i); return i; }, executor).whenComplete((res,exeception)->{ System.out.println("异步任务完成了,结果是:" + res + ",异常是:" + exeception); }); CompletableFuturefuture2 = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程ID:" + Thread.currentThread().getId()); int i = 10 / 0; System.out.println("运行结果:" + i); return i; }, executor).whenComplete((res,exeception)->{ // 虽然能够得到异常,但是无法修改返回的数据 System.out.println("异步任务完成了,结果是:" + res + ",异常是:" + exeception); }).exceptionally(e -> { // 可以感知到异常,同时返回一个值 e.printStackTrace(); System.out.println(e.getMessage()); return -1; }); Integer res = future2.get(); System.out.println("*主线程End" + " " + res);
在我们的代码业务当中,有这样一种情况:我们调用service业务返回结果后,还需要对这个结果做进一步处理,这样就需要这样的方法了。特别是调用的service业务不是我们自己写的,不能对其修改的状况下。
handle 是执行任务完成时对结果的处理。
handle 方法和 handleAsync 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。
public CompletableFuture handle(
BiFunction super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
public CompletableFuture handleAsync(
BiFunction super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}
public CompletableFuture handleAsync(
BiFunction super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
例子:
CompletableFuture2.4 线程串行化(thenRun、thenApply、thenAccept)future = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程ID:" + Thread.currentThread().getId()); // int i = 10 / 2; int i = 10 / 0; System.out.println("运行结果:" + i); return i; },executor).handle((res,exception)->{ if (res != null) { return res*2; } if (exception!=null) { System.out.println(exception); return 0; } return null; }); Integer res = future.get(); System.out.println("*主线程End" + " " + res);
thenRun:当上一个任务完成,就可以开始执行当前任务
public CompletableFuturethenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); } public CompletableFuture thenRunAsync(Runnable action, Executor executor) { return uniRunStage(screenExecutor(executor), action); }
thenApply:当一个线程依赖于另一个线程时,获取上一个任务的返回结果,并返回当前任务的返回值。
public CompletableFuture thenApply(
Function super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public CompletableFuture thenApplyAsync(
Function super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public CompletableFuture thenApplyAsync(
Function super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
thenAccept:接收上一任务的处理结果,并消费处理结果,无返回结果
public CompletableFuturethenAccept(Consumer super T> action) { return uniAcceptStage(null, action); } public CompletableFuture thenAcceptAsync(Consumer super T> action) { return uniAcceptStage(asyncPool, action); } public CompletableFuture thenAcceptAsync(Consumer super T> action, Executor executor) { return uniAcceptStage(screenExecutor(executor), action); }
例子:
CompletableFuturefuture1 = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程ID:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } int i = 10 / 2; System.out.println("运行结果:" + i); return i; },executor).thenRunAsync(()->{ System.out.println("任务2启动了"); }, executor); System.out.println("*主线程End");
CompletableFuturefuture2 = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程ID:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } int i = 10 / 2; System.out.println("运行结果:" + i); return i; },executor).thenAcceptAsync((res)->{ System.out.println("任务2启动了"); System.out.println("任务1的返回结果:" + res); }, executor); System.out.println("*主线程End");
CompletableFuture2.5 两个任务组合,都完成后执行新任务(thenCombine、thenAcceptBoth、runAfterBoth)future3 = CompletableFuture.supplyAsync(()->{ System.out.println("当前线程ID:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } int i = 10 / 2; System.out.println("运行结果:" + i); return i; },executor).thenApplyAsync((res)->{ System.out.println("任务2启动了"); System.out.println("任务1的返回结果:" + res); return res*2; }, executor); Integer result = future3.get(); System.out.println("*主线程End " + result);
两个任务都完成,触发该任务
thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
public CompletableFuturethenCombine( CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } public CompletableFuture thenCombineAsync( CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } public CompletableFuture thenCombineAsync( CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn, Executor executor) { return biApplyStage(screenExecutor(executor), other, fn); }
thenAcceptBoth:组合两个future,获取两个future的返回结果,然后处理自己的任务,无返回值
public CompletableFuturethenAcceptBoth( CompletionStage extends U> other, BiConsumer super T, ? super U> action) { return biAcceptStage(null, other, action); } public CompletableFuture thenAcceptBothAsync( CompletionStage extends U> other, BiConsumer super T, ? super U> action) { return biAcceptStage(asyncPool, other, action); } public CompletableFuture thenAcceptBothAsync( CompletionStage extends U> other, BiConsumer super T, ? super U> action, Executor executor) { return biAcceptStage(screenExecutor(executor), other, action); }
runAfterBoth:组合两个future,不需要两个future的返回结果,只需要两个future处理完任务后,处理自己的任务,无返回值
public CompletableFuturerunAfterBoth(CompletionStage> other, Runnable action) { return biRunStage(null, other, action); } public CompletableFuture runAfterBothAsync(CompletionStage> other, Runnable action) { return biRunStage(asyncPool, other, action); } public CompletableFuture runAfterBothAsync(CompletionStage> other, Runnable action, Executor executor) { return biRunStage(screenExecutor(executor), other, action); }
例子:
CompletableFuturefuture1 = CompletableFuture.supplyAsync(()->{ System.out.println("任务1线程ID:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } int i = 10 / 2; System.out.println("任务1运行结果:" + i); return i; }, executor); CompletableFuture future2 = CompletableFuture.supplyAsync(()->{ System.out.println("任务2线程ID:" + Thread.currentThread().getId()); int i = 12 / 2; System.out.println("任务2运行结果:" + i); return i; }, executor); future1.thenAcceptBothAsync(future2, (f1, f2)->{ System.out.println("任务3线程ID:" + Thread.currentThread().getId()); System.out.println("1:" + f1); System.out.println("2:" + f2); }, executor);
CompletableFuture2.6 两个任务,完成其中一个,第三个任务就可以开始执行future3 = future1.thenCombineAsync(future2, (f1, f2)->{ System.out.println("任务3线程ID:" + Thread.currentThread().getId()); System.out.println("1:" + f1); System.out.println("2:" + f2); Integer r = f1 + f2; return r; },executor); Integer result = future3.get(); System.out.println("*主线程End " + result);
当两个任务中,任意一个future完成后,执行任务
applyToEither:两个任务中有一个执行完成,获取它的返回值,处理自己的任务,并返回返回值
public CompletableFuture applyToEither(
CompletionStage extends T> other, Function super T, U> fn) {
return orApplyStage(null, other, fn);
}
public CompletableFuture applyToEitherAsync(
CompletionStage extends T> other, Function super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}
public CompletableFuture applyToEitherAsync(
CompletionStage extends T> other, Function super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
acceptEither:两个任务中有一个执行完成,获取它的返回值,处理自己的任务,无返回值
public CompletableFutureacceptEither( CompletionStage extends T> other, Consumer super T> action) { return orAcceptStage(null, other, action); } public CompletableFuture acceptEitherAsync( CompletionStage extends T> other, Consumer super T> action) { return orAcceptStage(asyncPool, other, action); } public CompletableFuture acceptEitherAsync( CompletionStage extends T> other, Consumer super T> action, Executor executor) { return orAcceptStage(screenExecutor(executor), other, action); }
runAfterEither:两个任务中有一个执行完成,不需要获取它的返回值,处理自己的任务,无返回值
public CompletableFuturerunAfterEither(CompletionStage> other, Runnable action) { return orRunStage(null, other, action); } public CompletableFuture runAfterEitherAsync(CompletionStage> other, Runnable action) { return orRunStage(asyncPool, other, action); } public CompletableFuture runAfterEitherAsync(CompletionStage> other, Runnable action, Executor executor) { return orRunStage(screenExecutor(executor), other, action); }
例子
CompletableFuturefuture1 = CompletableFuture.supplyAsync(()->{ System.out.println("任务1线程ID:" + Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } int i = 10 / 2; System.out.println("任务1运行结果:" + i); return i; }, executor); CompletableFuture future2 = CompletableFuture.supplyAsync(()->{ System.out.println("任务2线程ID:" + Thread.currentThread().getId()); int i = 12 / 2; System.out.println("任务2运行结果:" + i); return i; }, executor); future1.runAfterEitherAsync(future2,()->{ System.out.println("任务3开始"); }, executor); System.out.println("*主线程End");
future1.acceptEitherAsync(future2,(res)->{
System.out.println("任务3开始 + 之前任务的返回值:" + res);
}, executor);
System.out.println("*主线程End");
CompletableFuture2.7 多任务组合(allOf、anyOf)future3 = future1.applyToEitherAsync(future2, (res) -> { System.out.println("任务3开始 + 之前任务的返回值:" + res); return res * 10; }, executor); System.out.println("*主线程End" + " " + future3.get());
allOf:等待所有任务都完成,无返回结果
public static CompletableFutureallOf(CompletableFuture>... cfs) { return andTree(cfs, 0, cfs.length - 1); }
anyOf:等待上面任意一个任务完成即可,有返回结果
public static CompletableFuture
例子:
CompletableFuturefuture1 = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(3000); System.out.println("任务1:查询商品图片信息"); } catch (InterruptedException e) { e.printStackTrace(); } return "hello.jpg"; }, executor); CompletableFuture future2 = CompletableFuture.supplyAsync(()->{ System.out.println("任务2:查询商品属性"); return "黑色 +256G"; }, executor); CompletableFuture future3 = CompletableFuture.supplyAsync(()->{ System.out.println("任务3:查询商品介绍"); return "华为手机"; }, executor); CompletableFuture allOf = CompletableFuture.allOf(future1, future2, future3); allOf.get();//等待所有任务都完成 System.out.println("*主线程End" + " 任务1:"+ future1.get() + " 任务2:"+ future2.get() + " 任务3:"+ future3.get());
CompletableFuture方法总结
- 方法名称开头
当方法名是**runXXX表示改方法不接收上一任务的结果**,无返回值;
当方法名是**acceptXXX表示改方法接收上一任务的结果**,无返回值;
当方法名是**applyXXX表示改方法接收上一任务的结果**,有返回值;
- 方法名称结尾
不以Async结尾:意味着方法中执行的任务使用的是相同的线程,
以Async结尾:意味着方法可能会使用其他线程执行,如果使用相同的线程池,也可能在同一个线程中执行
https://blog.csdn.net/hong10086/article/details/103651338/



