栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

011Java并发包014异步调用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

011Java并发包014异步调用

1 Future 1.1 说明

最开始学习多线程的时候,除了通过继承Thread类创建线程外,还可以通过实现Runnable接口创建线程,并且Thread类本身就是Runnable接口的实现类。

但是Runnable接口有个问题,那就是没有返回值,因此又学习了可以通过实现Callable接口创建线程,使用FutureTask类的构造方法传入Callable接口的实现类,创建可以获取返回值的线程。

Future接口是JDK1.5之后新增的,目的是用于获取异步执行的结果,FutureTask类是RunnableFuture接口的实现类,RunnableFuture接口同时继承了Runnable接口和Future接口,所以使用FutureTask类和Callable接口创建线程,其底层还是通过Runnable接口实现的。

在Executor中提供的execute方法之外,在Executor的子接口ExecutorService中还提供了submit方法,submit方法的返回值是Future类型的对象。

1.2 在线程池中的使用 1.2.1 通过Callable获得返回值

使用Callable作为入参,获取Future结果:

 Future submit(Callable task);

这种方式是最常用的一种,通过Callable对象存储结果,通过Future对象返回获取结果。

举例如下:

public static void main(String[] args) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future future = executor.submit(
            new Callable() {
                @Override
                public String call() throws Exception {
                    return "Hello World";
                }
            }
    );
    try {
        System.out.print(future.get());
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        executor.shutdown();
    }
}
1.2.2 通过传入指定对象获取返回值

使用Runnable和返回对象作为入参,获取Future结果:

 Future submit(Runnable task, T result);

通过传入的对象存储返回值,通过Future对象返回获取结果。

举例如下:

public static void main(String[] args) {
    final String[] result = new String[1];
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future future = executor.submit(
            new Runnable() {
                @Override
                public void run() {
                    result[0] = "Hello World";
                }
            }, result
    );
    try {
        System.out.print(future.get()[0]);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        executor.shutdown();
    }
}
1.2.3 无需返回值

使用Runnable作为入参:

Future submit(Runnable task);

这种方式无需获取返回值。

举例如下:

public static void main(String[] args) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future future = executor.submit(
            new Runnable() {
                @Override
                public void run() {
                }
            }
    );
    try {
        System.out.print(future.get());
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        executor.shutdown();
    }
}
2 CompletableFuture 2.1 说明

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果。

在JDK1.8中,CompletableFuture提供了非常强大的Future的扩展功能,简化了异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。

CompletableFuture实现了CompletionStage接口和Future接口,CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

2.2 使用 2.2.1 新建

CompletableFuture提供了四个静态方法来创建一个异步操作:

public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static  CompletableFuture supplyAsync(Supplier supplier)
public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)

runAsync表示创建无返回值的异步任务,相当于Runnable作为参数的submit方法:

public static void main(String[] args) {
    CompletableFuture cf = CompletableFuture.runAsync(()->{
        System.out.println(Thread.currentThread().getName() + "-runAsync");
    });
    try {
        System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

结果如下:

ForkJoinPool.commonPool-worker-9-runAsync
main-result-null

supplyAsync表示创建带返回值的异步任务的,相当于Callable作为参数的submit方法:

public static void main(String[] args) {
    CompletableFuture cf = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getName() + "-supplyAsync");
        return "test";
    });
    try {
        System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

结果如下:

ForkJoinPool.commonPool-worker-9-supplyAsync
main-result-test
2.2.2 完成时回调

当CompletableFuture的计算结果完成,可以执行特定的方法:

public CompletableFuture whenComplete(BiConsumer action)
public CompletableFuture whenCompleteAsync(BiConsumer action)
public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor)

whenComplete会在当前执行的线程继续执行CompletableFuture定义的任务:

public static void main(String[] args) {
    CompletableFuture future = new CompletableFuture();
    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + "-启动");
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-完成");
        future.complete(100);
        System.out.println(Thread.currentThread().getName() + "-结束");
    }).start();
    future.whenComplete((v, ex) -> {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-v-" + v);
        System.out.println(Thread.currentThread().getName() + "-ex-" + ex);
    });
    try {
        System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

执行结果:

Thread-0-启动
//等待1s
Thread-0-完成
main-return-100
//等待1s
Thread-0-v-100
Thread-0-ex-null
Thread-0-结束

whenCompleteAsync会在线程池起一个新的线程执行CompletableFuture定义的任务:

public static void main(String[] args) {
    CompletableFuture future = new CompletableFuture();
    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + "-启动");
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-完成");
        future.complete(100);
        System.out.println(Thread.currentThread().getName() + "-结束");
    }).start();
    future.whenCompleteAsync((v, ex) -> {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "-v-" + v);
        System.out.println(Thread.currentThread().getName() + "-ex-" + ex);
    });
    try {
        System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
        Thread.sleep(2000);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

执行结果:

Thread-0-启动
//等待1s
Thread-0-完成
main-return-100
Thread-0-结束
//等待1s
ForkJoinPool.commonPool-worker-9-v-100
ForkJoinPool.commonPool-worker-9-ex-null
//等待1s
2.2.3 异常处理

当CompletableFuture的计算抛出异常的时候,可以执行特定的方法:

public CompletableFuture exceptionally(Function fn)

exceptionally会在抛出异常的时候执行:

public static void main(String[] args) {
    CompletableFuture future = new CompletableFuture();
    new Thread(() -> {
        future.completeExceptionally(new NullPointerException());
    }).start();
    future.exceptionally(e -> {
        e.printStackTrace();
        return 100;
    });
}

结果如下:

java.lang.NullPointerException
2.2.4 线程依赖

当一个线程依赖另一个线程时,可以使用方法来把这两个线程串行化:

public  CompletableFuture thenApply(Function fn)
public  CompletableFuture thenApplyAsync(Function fn)
public  CompletableFuture thenApplyAsync(Function fn, Executor executor)

串行化执行:

public static void main(String[] args) {
    CompletableFuture future = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getName() + "-supplyAsync");
        return "test";
    }).thenApply(t->{
        System.out.println(Thread.currentThread().getName() + "-t-" + t);
        return t + "-apply";
    });
    try {
        System.out.println(Thread.currentThread().getName() + "-" + future.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

结果如下:

ForkJoinPool.commonPool-worker-9-supplyAsync
ForkJoinPool.commonPool-worker-9-t-test
main-test-apply
2.2.5 消费处理

接收任务的处理结果,并消费处理,无返回结果:

public CompletionStage thenAccept(Consumer action)
public CompletionStage thenAcceptAsync(Consumer action)
public CompletionStage thenAcceptAsync(Consumer action,Executor executor)

消费处理结果:

public static void main(String[] args) {
    CompletableFuture future = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getName() + "-supplyAsync");
        return "test";
    }).thenAccept(t->{
        System.out.println(Thread.currentThread().getName() + "-t-" + t);
    });
    try {
        System.out.println(Thread.currentThread().getName() + "-" + future.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

结果如下:

ForkJoinPool.commonPool-worker-9-supplyAsync
main-t-test
main-null
2.2.6 取消返回

不接收任务的处理结果,取消返回:

public CompletionStage thenRun(Runnable action)
public CompletionStage thenRunAsync(Runnable action)
public CompletionStage thenRunAsync(Runnable action,Executor executor)

取消返回:

public static void main(String[] args) {
    CompletableFuture future = CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getName() + "-supplyAsync");
        return "test";
    }).thenRun(()->{
        System.out.println(Thread.currentThread().getName() + "-t-null");
    });
    try {
        System.out.println(Thread.currentThread().getName() + "-" + future.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

结果如下:

ForkJoinPool.commonPool-worker-9-supplyAsync
main-t-null
main-null

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号