栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

CompletableFuture总结

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

CompletableFuture总结

java8 CompletableFuture总结 1. 简介

completableFuture是JDK1.8版本新引入的类

public class CompletableFuture implements Future, CompletionStage 

实现了 Future 和 CompletionStage 接口

CompletionStage接口去支持完成时触发的函数和操作

2. 方法说明 2.1 创建异步操作(runAsync、supplyAsync)

completableFuture提供四个静态方法来创建异步操作

public static CompletableFuture runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}


public static CompletableFuture runAsync(Runnable runnable,
                                               Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}


public static  CompletableFuture supplyAsync(Supplier supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}


public static  CompletableFuture supplyAsync(Supplier supplier,
                                                   Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

runAsync:没有返回结果

supplyAsync:可以获取返回结果

可以传入自定义线程池,否则就使用默认线程池

例子

// 线程池
public static ExecutorService executor = Executors.newFixedThreadPool(10);

runAsync:开启一个异步任务,无返回结果

CompletableFuture future = CompletableFuture.runAsync(()->{
    System.out.println("当前线程ID:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
},executor);

supplyAsync:开启一个异步任务,有返回结果

CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程ID:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
}, executor);
Integer res = future1.get();
System.out.println("*主线程End" + "---" + res);
2.2 完成时回调与异常感知(whenComplete、exceptionally)

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture whenComplete(
    BiConsumer action) {
    return uniWhenCompleteStage(null, action);
}

public CompletableFuture whenCompleteAsync(
    BiConsumer action) {
    return uniWhenCompleteStage(asyncPool, action);
}

public CompletableFuture whenCompleteAsync(
    BiConsumer action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}

public CompletableFuture exceptionally(
    Function fn) {
    return uniExceptionallyStage(fn);
}

whenComplete 和 whenCompleteAsync 的区别:

1、whenComplete;是执行当前任务的线程继续执行whenComplete的任务
2、whenCompleteAsync:是执行把whenCompleteAsync这个任务继续交给线程池来进行执行
3、不以Async结尾,意味着whenComplete中执行的任务使用的是相同的线程,
4、以Async结尾,意味着whenCompleteAsync可能会使用其他线程执行,如果使用相同的线程池,也可能在同一个线程中执行

例子:

whenComplete:可以处理正常和异常的计算结果

CompletableFuture future = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程ID:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
}, executor).whenComplete((res,exeception)->{
    System.out.println("异步任务完成了,结果是:" + res + ",异常是:" + exeception);
});

CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程ID:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
}, executor).whenComplete((res,exeception)->{
    System.out.println("异步任务完成了,结果是:" + res + ",异常是:" + exeception);
});

CompletableFuture future2 = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程ID:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
}, executor).whenComplete((res,exeception)->{
    // 虽然能够得到异常,但是无法修改返回的数据
    System.out.println("异步任务完成了,结果是:" + res + ",异常是:" + exeception);
}).exceptionally(e -> {
    // 可以感知到异常,同时返回一个值
    e.printStackTrace();
    System.out.println(e.getMessage());
    return -1;
});
Integer res = future2.get();
System.out.println("*主线程End" + "   " + res);

2.3 handle/handleAsync 方法(最终处理)

在我们的代码业务当中,有这样一种情况:我们调用service业务返回结果后,还需要对这个结果做进一步处理,这样就需要这样的方法了。特别是调用的service业务不是我们自己写的,不能对其修改的状况下。

handle 是执行任务完成时对结果的处理。

handle 方法和 handleAsync 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

public  CompletableFuture handle(
    BiFunction fn) {
    return uniHandleStage(null, fn);
}

public  CompletableFuture handleAsync(
    BiFunction fn) {
    return uniHandleStage(asyncPool, fn);
}

public  CompletableFuture handleAsync(
    BiFunction fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}

例子:

CompletableFuture future = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程ID:" + Thread.currentThread().getId());
    //            int i = 10 / 2;
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
},executor).handle((res,exception)->{
    if (res != null) {
        return res*2;
    }
    if (exception!=null) {
        System.out.println(exception);
        return 0;
    }
    return null;
});
Integer res = future.get();
System.out.println("*主线程End" + "   " + res);
2.4 线程串行化(thenRun、thenApply、thenAccept)

thenRun:当上一个任务完成,就可以开始执行当前任务

public CompletableFuture thenRun(Runnable action) {
    return uniRunStage(null, action);
}

public CompletableFuture thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}

public CompletableFuture thenRunAsync(Runnable action,
                                            Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}

thenApply:当一个线程依赖于另一个线程时,获取上一个任务的返回结果,并返回当前任务的返回值。

public  CompletableFuture thenApply(
    Function fn) {
    return uniApplyStage(null, fn);
}

public  CompletableFuture thenApplyAsync(
    Function fn) {
    return uniApplyStage(asyncPool, fn);
}

public  CompletableFuture thenApplyAsync(
    Function fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}

thenAccept:接收上一任务的处理结果,并消费处理结果,无返回结果

public CompletableFuture thenAccept(Consumer action) {
    return uniAcceptStage(null, action);
}

public CompletableFuture thenAcceptAsync(Consumer action) {
    return uniAcceptStage(asyncPool, action);
}

public CompletableFuture thenAcceptAsync(Consumer action,
                                               Executor executor) {
    return uniAcceptStage(screenExecutor(executor), action);
}

例子:

CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程ID:" + Thread.currentThread().getId());
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
},executor).thenRunAsync(()->{
    System.out.println("任务2启动了");
}, executor);
System.out.println("*主线程End");
CompletableFuture future2 = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程ID:" + Thread.currentThread().getId());
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
},executor).thenAcceptAsync((res)->{
    System.out.println("任务2启动了");
    System.out.println("任务1的返回结果:" + res);
}, executor);
System.out.println("*主线程End");
CompletableFuture future3 = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程ID:" + Thread.currentThread().getId());
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
},executor).thenApplyAsync((res)->{
    System.out.println("任务2启动了");
    System.out.println("任务1的返回结果:" + res);
    return res*2;
}, executor);
Integer result = future3.get();
System.out.println("*主线程End         " + result);
2.5 两个任务组合,都完成后执行新任务(thenCombine、thenAcceptBoth、runAfterBoth)

两个任务都完成,触发该任务

thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值

public  CompletableFuture thenCombine(
    CompletionStage other,
    BiFunction fn) {
    return biApplyStage(null, other, fn);
}

public  CompletableFuture thenCombineAsync(
    CompletionStage other,
    BiFunction fn) {
    return biApplyStage(asyncPool, other, fn);
}

public  CompletableFuture thenCombineAsync(
    CompletionStage other,
    BiFunction fn, Executor executor) {
    return biApplyStage(screenExecutor(executor), other, fn);
}

thenAcceptBoth:组合两个future,获取两个future的返回结果,然后处理自己的任务,无返回值

public  CompletableFuture thenAcceptBoth(
    CompletionStage other,
    BiConsumer action) {
    return biAcceptStage(null, other, action);
}

public  CompletableFuture thenAcceptBothAsync(
    CompletionStage other,
    BiConsumer action) {
    return biAcceptStage(asyncPool, other, action);
}

public  CompletableFuture thenAcceptBothAsync(
    CompletionStage other,
    BiConsumer action, Executor executor) {
    return biAcceptStage(screenExecutor(executor), other, action);
}

runAfterBoth:组合两个future,不需要两个future的返回结果,只需要两个future处理完任务后,处理自己的任务,无返回值

public CompletableFuture runAfterBoth(CompletionStage other,
                                            Runnable action) {
    return biRunStage(null, other, action);
}

public CompletableFuture runAfterBothAsync(CompletionStage other,
                                                 Runnable action) {
    return biRunStage(asyncPool, other, action);
}

public CompletableFuture runAfterBothAsync(CompletionStage other,
                                                 Runnable action,
                                                 Executor executor) {
    return biRunStage(screenExecutor(executor), other, action);
}

例子:

CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
    System.out.println("任务1线程ID:" + Thread.currentThread().getId());
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    int i = 10 / 2;
    System.out.println("任务1运行结果:" + i);
    return i;
}, executor);

CompletableFuture future2 = CompletableFuture.supplyAsync(()->{
    System.out.println("任务2线程ID:" + Thread.currentThread().getId());
    int i = 12 / 2;
    System.out.println("任务2运行结果:" + i);
    return i;
}, executor);

future1.thenAcceptBothAsync(future2, (f1, f2)->{
    System.out.println("任务3线程ID:" + Thread.currentThread().getId());
    System.out.println("1:" + f1);
    System.out.println("2:" + f2);
}, executor);
CompletableFuture future3 = future1.thenCombineAsync(future2, (f1, f2)->{
    System.out.println("任务3线程ID:" + Thread.currentThread().getId());
    System.out.println("1:" + f1);
    System.out.println("2:" + f2);
    Integer r = f1 + f2;
    return r;
},executor);
Integer result = future3.get();
System.out.println("*主线程End         " + result);
2.6 两个任务,完成其中一个,第三个任务就可以开始执行

当两个任务中,任意一个future完成后,执行任务

applyToEither:两个任务中有一个执行完成,获取它的返回值,处理自己的任务,并返回返回值

public  CompletableFuture applyToEither(
    CompletionStage other, Function fn) {
    return orApplyStage(null, other, fn);
}

public  CompletableFuture applyToEitherAsync(
    CompletionStage other, Function fn) {
    return orApplyStage(asyncPool, other, fn);
}

public  CompletableFuture applyToEitherAsync(
    CompletionStage other, Function fn,
    Executor executor) {
    return orApplyStage(screenExecutor(executor), other, fn);
}

acceptEither:两个任务中有一个执行完成,获取它的返回值,处理自己的任务,无返回值

public CompletableFuture acceptEither(
    CompletionStage other, Consumer action) {
    return orAcceptStage(null, other, action);
}

public CompletableFuture acceptEitherAsync(
    CompletionStage other, Consumer action) {
    return orAcceptStage(asyncPool, other, action);
}

public CompletableFuture acceptEitherAsync(
    CompletionStage other, Consumer action,
    Executor executor) {
    return orAcceptStage(screenExecutor(executor), other, action);
}

runAfterEither:两个任务中有一个执行完成,不需要获取它的返回值,处理自己的任务,无返回值

public CompletableFuture runAfterEither(CompletionStage other,
                                              Runnable action) {
    return orRunStage(null, other, action);
}

public CompletableFuture runAfterEitherAsync(CompletionStage other,
                                                   Runnable action) {
    return orRunStage(asyncPool, other, action);
}

public CompletableFuture runAfterEitherAsync(CompletionStage other,
                                                   Runnable action,
                                                   Executor executor) {
    return orRunStage(screenExecutor(executor), other, action);
}

例子

CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
    System.out.println("任务1线程ID:" + Thread.currentThread().getId());
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    int i = 10 / 2;
    System.out.println("任务1运行结果:" + i);
    return i;
}, executor);
CompletableFuture future2 = CompletableFuture.supplyAsync(()->{
    System.out.println("任务2线程ID:" + Thread.currentThread().getId());
    int i = 12 / 2;
    System.out.println("任务2运行结果:" + i);
    return i;
}, executor);
future1.runAfterEitherAsync(future2,()->{
    System.out.println("任务3开始");
}, executor);
System.out.println("*主线程End");
future1.acceptEitherAsync(future2,(res)->{
    System.out.println("任务3开始 + 之前任务的返回值:" + res);
}, executor);
System.out.println("*主线程End");
CompletableFuture future3 = future1.applyToEitherAsync(future2, (res) -> {
    System.out.println("任务3开始 + 之前任务的返回值:" + res);
    return res * 10;
}, executor);
System.out.println("*主线程End" + "           " + future3.get());
2.7 多任务组合(allOf、anyOf)

allOf:等待所有任务都完成,无返回结果

public static CompletableFuture allOf(CompletableFuture... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

anyOf:等待上面任意一个任务完成即可,有返回结果

public static CompletableFuture anyOf(CompletableFuture... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}
 

例子:

CompletableFuture future1 = CompletableFuture.supplyAsync(()->{
    try {
        Thread.sleep(3000);
        System.out.println("任务1:查询商品图片信息");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello.jpg";
}, executor);
CompletableFuture future2 = CompletableFuture.supplyAsync(()->{
    System.out.println("任务2:查询商品属性");
    return "黑色 +256G";
}, executor);

CompletableFuture future3 = CompletableFuture.supplyAsync(()->{
    System.out.println("任务3:查询商品介绍");
    return "华为手机";
}, executor);
CompletableFuture allOf = CompletableFuture.allOf(future1, future2, future3);
        allOf.get();//等待所有任务都完成
        System.out.println("*主线程End" + "   任务1:"+ future1.get() + "   任务2:"+ future2.get() + "   任务3:"+ future3.get());

CompletableFuture anyOf = CompletableFuture.anyOf(future1, future2, future3);
anyOf.get();//等待上面任意一个任务完成即可
System.out.println("*主线程End" + "   "+ anyOf.get());
 
方法总结 
  • 方法名称开头

当方法名是**runXXX表示改方法不接收上一任务的结果**,无返回值;

当方法名是**acceptXXX表示改方法接收上一任务的结果**,无返回值;

当方法名是**applyXXX表示改方法接收上一任务的结果**,有返回值;

  • 方法名称结尾

不以Async结尾:意味着方法中执行的任务使用的是相同的线程,
以Async结尾:意味着方法可能会使用其他线程执行,如果使用相同的线程池,也可能在同一个线程中执行

参考

https://blog.csdn.net/hong10086/article/details/103651338/

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号