栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

CompletableFuture 异步编程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

CompletableFuture 异步编程

CompletableFuture 异步编程

Java8 新增一个类 CompletableFuture,用来支持流式异步编程。

创建任务

CompletableFuture 提供一下API用来执行异步任务。

static  CompletableFuture supplyAsync(Supplier supplier);

static  CompletableFuture supplyAsync(Supplier supplier, Executor executor);

static CompletableFuture runAsync(Supplier supplier);

static CompletableFuture runAsync(Supplier supplier, Executor executor);

这些API都是用来创建任务,并且异步计算任务。supplyAsync 方法是有返回值的。例如:

CompletableFuture futureA = CompletableFuture.supplyAsync(() -> "hello");

CompletableFuture futureB = CompletableFuture.supplyAsync(() -> "hello");
thenAccept()
CompletableFuture thenAcceptAsync(Consumer action);

CompletableFuture thenAcceptAsync(Consumer action, Executor executor);

表示当前这个阶段计算正常完成(没有异常发送),计算完成后的结果会作为 thenAcceptAsync 方法的入参,然后执行 action。如果当前阶段计算发送异常,thenAcceptAsync 方法不会被执行。

// 打印 hello
CompletableFuture.supplyAsync(() -> "hello")
               .thenAcceptAsync(result -> System.out.println(result));

// 不会打印. 因为不会执行thenAcceptAsync
CompletableFuture.supplyAsync(() -> {
                    throw new NullPointerException();
               })
               .thenAcceptAsync(rs -> System.out.println("1234"));
thenApply()
 CompletableFuture thenApply(Function fn);

 CompletableFuture thenApplyAsync(Function fn);

 CompletableFuture thenApplyAsync(Function fn, Executor executor);

这个方法和 thenAcceptAsync 方法功能一样,就是返回值不同。

thenCombine()
 CompletableFuture	thenCombine(CompletionStage other, BiFunction fn);

 CompletableFuture	thenCombineAsync(CompletionStage other, BiFunction fn);

这里表示当前阶段计算完成并且 other 计算阶段也完成(两个阶段都没有发生异常),然后把这两个阶段计算的结果,作为 thenCombineAsync 方法的入参,执行 fn。如果其中一个计算阶段发生异常,都不会执行该方法。

CompletableFuture taskA = CompletableFuture.supplyAsync(() -> "hello");

CompletableFuture taskB = CompletableFuture.supplyAsync(() -> 12L);
taskB.thenCombine(taskA, (resultB, resultA) -> {
    return resultA + resultB;
});
whenComplete()
CompletableFuture whenComplete(BiConsumer action);

CompletableFuture whenCompleteAsync(BiConsumer action);

这里表示无论当前阶段是计算完成还是发生异常,都将执行 whenComplete 方法。

CompletableFuture taskA = CompletableFuture.supplyAsync(() -> {
            return "hello";
        });

taskA.whenCompleteAsync((rs, ex) -> {
    if (ex != null) {
        ex.printStackTrace();
    }
    if (rs != null) {
        System.out.println(rs);
    }
});
exceptionally()
CompletableFuture exceptionally(Function fn);

这个方法也是用来处理异常的。只有在发生异常时才会执行该方法。

CompletableFuture taskA = CompletableFuture.supplyAsync(() -> {
            throw new IllegalArgumentException();
        });

taskA.exceptionally(ex -> {
    // 这里可以处理异常,然后返回补偿值, 而在 whenComplete() 方法只能知道有异常发送,不能去补偿。
    System.out.println(ex.getMessage());
    return "hello";
})
exceptionally() vs whenCompleteAsync()区别

调用顺序:先 exceptionally() 再 whenCompleteAsync() 。

CompletableFuture taskA = CompletableFuture.supplyAsync(() -> {
            throw new IllegalArgumentException();
        });

taskA.exceptionally(ex -> {
    System.out.println(ex.getMessage());
    return "hello";
}).whenCompleteAsync((rs, ex) -> {
    if (ex != null) {
        ex.printStackTrace();
    }
    if (rs != null) {
        // IllegalArgumentException异常已经被exceptionally处理,并返回正常值 hello.
        // 这里没有异常,将打印 hello
        System.out.println(rs);
    }
});

在执行任务发送了异常 IllegalArgumentException,将执行方法 exceptionally ,然后执行方法 whenCompleteAsync。

调用顺序:先 whenCompleteAsync() 再 exceptionally() 。

CompletableFuture taskA = CompletableFuture.supplyAsync(() -> {
            throw new IllegalArgumentException();
        });

taskA.whenCompleteAsync((rs, ex) -> {
    // ex 不为空,有 IllegalArgumentException 异常发送
    if (ex != null) {
        // 打印异常栈
        ex.printStackTrace();
    }
    if (rs != null) {
        System.out.println(rs);
    }
}).exceptionally(ex -> {
    // 捕获异常 IllegalArgumentException 并处理
    System.out.println(ex.getMessage());
    return "hello";
});
参考

JDK8 CompletableFuture API使用CompletableFuture优化你的代码执行效率 拓展 ForkJoinPool

计算一些并行任务,可以使用 ForkJoinPool 来提供并行的方式提供资源利用率和吞吐量。这个类的大致原理:把大任务拆分成小任务,再去执行。每个空线程,也会去窃取其他繁忙线程的任务,帮忙处理。使用例子:

定义一个任务,继承 RecursiveAction 无返回值。需要有返回值的,继承RecursiveTask

private static class FileUploadTask extends RecursiveAction{
        private int[] files;
        private static final int THRESHOLD = 20;

        public FileUploadTask(int[] files) {
            this.files = files;
        }

        @Override
        protected void compute() {
            if (files.length <= THRESHOLD) {
                this.upload();
            } else {
                System.out.println(Thread.currentThread().getName());
                FileUploadTask subTaskOne = new FileUploadTask(Arrays.copyOfRange(files, 0, files.length / 2));
                FileUploadTask subTaskTwo = new FileUploadTask(Arrays.copyOfRange(files, files.length / 2, files.length));
                subTaskOne.fork();
                subTaskTwo.fork();

                // 如果不关注返回值,可以考虑不需要这里
                subTaskTwo.join();  // 阻塞等待子任务计算完成,异常也会释放
                subTaskOne.join(); // 阻塞等待子任务计算完成
            }
        }

        private void upload()  {
            List items = new ArrayList<>();
            HashSet set = new HashSet<>();

            for (int item : files) {
                items.add(String.valueOf(item));
                set.add(item);
            }
            if (set.contains(1000)) {
                throw new IllegalArgumentException();
            }
            System.out.println("开始上传文件: " + String.join(",", items));
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

提交任务

public static void main(String[] args) throws Exception {
    int length = 100;
    int[] arr = new int[length];
    for (int i = 0; i < length; i++) {
        arr[i] = i + 1;
    }
    int parallelism = ForkJoinPool.commonPool().getParallelism();
    System.out.println("parallelism:" + parallelism);
    ForkJoinPool.commonPool().invoke(new FileUploadTask(arr));
    Thread.sleep(30000);
}
参考

ForkJoinPool使用与原理

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号