最开始学习多线程的时候,除了通过继承Thread类创建线程外,还可以通过实现Runnable接口创建线程,并且Thread类本身就是Runnable接口的实现类。
但是Runnable接口有个问题,那就是没有返回值,因此又学习了可以通过实现Callable接口创建线程,使用FutureTask类的构造方法传入Callable接口的实现类,创建可以获取返回值的线程。
Future接口是JDK1.5之后新增的,目的是用于获取异步执行的结果,FutureTask类是RunnableFuture接口的实现类,RunnableFuture接口同时继承了Runnable接口和Future接口,所以使用FutureTask类和Callable接口创建线程,其底层还是通过Runnable接口实现的。
在Executor中提供的execute方法之外,在Executor的子接口ExecutorService中还提供了submit方法,submit方法的返回值是Future类型的对象。
1.2 在线程池中的使用 1.2.1 通过Callable获得返回值使用Callable作为入参,获取Future结果:
Future submit(Callable task);
这种方式是最常用的一种,通过Callable对象存储结果,通过Future对象返回获取结果。
举例如下:
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(
new Callable() {
@Override
public String call() throws Exception {
return "Hello World";
}
}
);
try {
System.out.print(future.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
1.2.2 通过传入指定对象获取返回值
使用Runnable和返回对象作为入参,获取Future结果:
Future submit(Runnable task, T result);
通过传入的对象存储返回值,通过Future对象返回获取结果。
举例如下:
public static void main(String[] args) {
final String[] result = new String[1];
ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(
new Runnable() {
@Override
public void run() {
result[0] = "Hello World";
}
}, result
);
try {
System.out.print(future.get()[0]);
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
1.2.3 无需返回值
使用Runnable作为入参:
Future> submit(Runnable task);
这种方式无需获取返回值。
举例如下:
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(
new Runnable() {
@Override
public void run() {
}
}
);
try {
System.out.print(future.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
2 CompletableFuture
2.1 说明
虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果。
在JDK1.8中,CompletableFuture提供了非常强大的Future的扩展功能,简化了异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。
CompletableFuture实现了CompletionStage接口和Future接口,CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
2.2 使用 2.2.1 新建CompletableFuture提供了四个静态方法来创建一个异步操作:
public static CompletableFuturerunAsync(Runnable runnable) public static CompletableFuture runAsync(Runnable runnable, Executor executor) public static CompletableFuture supplyAsync(Supplier supplier) public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
runAsync表示创建无返回值的异步任务,相当于Runnable作为参数的submit方法:
public static void main(String[] args) {
CompletableFuture cf = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName() + "-runAsync");
});
try {
System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
} catch (Exception e) {
e.printStackTrace();
}
}
结果如下:
ForkJoinPool.commonPool-worker-9-runAsync main-result-null
supplyAsync表示创建带返回值的异步任务的,相当于Callable作为参数的submit方法:
public static void main(String[] args) {
CompletableFuture cf = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "test";
});
try {
System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
} catch (Exception e) {
e.printStackTrace();
}
}
结果如下:
ForkJoinPool.commonPool-worker-9-supplyAsync main-result-test2.2.2 完成时回调
当CompletableFuture的计算结果完成,可以执行特定的方法:
public CompletableFuturewhenComplete(BiConsumer super T,? super Throwable> action) public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action) public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)
whenComplete会在当前执行的线程继续执行CompletableFuture定义的任务:
public static void main(String[] args) {
CompletableFuture future = new CompletableFuture();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "-启动");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "-完成");
future.complete(100);
System.out.println(Thread.currentThread().getName() + "-结束");
}).start();
future.whenComplete((v, ex) -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "-v-" + v);
System.out.println(Thread.currentThread().getName() + "-ex-" + ex);
});
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
执行结果:
Thread-0-启动 //等待1s Thread-0-完成 main-return-100 //等待1s Thread-0-v-100 Thread-0-ex-null Thread-0-结束
whenCompleteAsync会在线程池起一个新的线程执行CompletableFuture定义的任务:
public static void main(String[] args) {
CompletableFuture future = new CompletableFuture();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "-启动");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "-完成");
future.complete(100);
System.out.println(Thread.currentThread().getName() + "-结束");
}).start();
future.whenCompleteAsync((v, ex) -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "-v-" + v);
System.out.println(Thread.currentThread().getName() + "-ex-" + ex);
});
try {
System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
执行结果:
Thread-0-启动 //等待1s Thread-0-完成 main-return-100 Thread-0-结束 //等待1s ForkJoinPool.commonPool-worker-9-v-100 ForkJoinPool.commonPool-worker-9-ex-null //等待1s2.2.3 异常处理
当CompletableFuture的计算抛出异常的时候,可以执行特定的方法:
public CompletableFutureexceptionally(Function fn)
exceptionally会在抛出异常的时候执行:
public static void main(String[] args) {
CompletableFuture future = new CompletableFuture();
new Thread(() -> {
future.completeExceptionally(new NullPointerException());
}).start();
future.exceptionally(e -> {
e.printStackTrace();
return 100;
});
}
结果如下:
java.lang.NullPointerException2.2.4 线程依赖
当一个线程依赖另一个线程时,可以使用方法来把这两个线程串行化:
public CompletableFuture thenApply(Function super T,? extends U> fn) public CompletableFuture thenApplyAsync(Function super T,? extends U> fn) public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)
串行化执行:
public static void main(String[] args) {
CompletableFuture future = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "test";
}).thenApply(t->{
System.out.println(Thread.currentThread().getName() + "-t-" + t);
return t + "-apply";
});
try {
System.out.println(Thread.currentThread().getName() + "-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
结果如下:
ForkJoinPool.commonPool-worker-9-supplyAsync ForkJoinPool.commonPool-worker-9-t-test main-test-apply2.2.5 消费处理
接收任务的处理结果,并消费处理,无返回结果:
public CompletionStagethenAccept(Consumer super T> action) public CompletionStage thenAcceptAsync(Consumer super T> action) public CompletionStage thenAcceptAsync(Consumer super T> action,Executor executor)
消费处理结果:
public static void main(String[] args) {
CompletableFuture future = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "test";
}).thenAccept(t->{
System.out.println(Thread.currentThread().getName() + "-t-" + t);
});
try {
System.out.println(Thread.currentThread().getName() + "-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
结果如下:
ForkJoinPool.commonPool-worker-9-supplyAsync main-t-test main-null2.2.6 取消返回
不接收任务的处理结果,取消返回:
public CompletionStagethenRun(Runnable action) public CompletionStage thenRunAsync(Runnable action) public CompletionStage thenRunAsync(Runnable action,Executor executor)
取消返回:
public static void main(String[] args) {
CompletableFuture future = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName() + "-supplyAsync");
return "test";
}).thenRun(()->{
System.out.println(Thread.currentThread().getName() + "-t-null");
});
try {
System.out.println(Thread.currentThread().getName() + "-" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
结果如下:
ForkJoinPool.commonPool-worker-9-supplyAsync main-t-null main-null



