- 前言
- 一、线程池
- 1.线程池的实现
- 2.ThreadPoolExecutor
- 成员介绍:
- 3.Future
- 1.FutureTask 使用
- 二、异步编程
- 4.CompletableFuture
- CompletionStage接口
- 1.串行关系
- 2.描述 AND 汇聚关系
- 3. 描述 OR 汇聚关系
- 4. 异常处理
- 5.CompletionService 批量
前言
一、线程池 1.线程池的实现
// 简化的线程池,仅用来说明工作原理
class MyThreadPool{
// 利用阻塞队列实现生产者 - 消费者模式
BlockingQueue workQueue;
// 保存内部工作线程
List threads
= new ArrayList<>();
// 构造方法
MyThreadPool(int poolSize,
BlockingQueue workQueue){
this.workQueue = workQueue;
// 创建工作线程
for(int idx=0; idx
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任务
void execute(Runnable command){
workQueue.put(command);
}
// 工作线程负责消费任务,并执行任务
class WorkerThread extends Thread{
public void run() {
// 循环取任务并执行
while(true){ ①
Runnable task = workQueue.take();
task.run();
}
}
}
}
// 创建有界阻塞队列
BlockingQueue workQueue =
new LinkedBlockingQueue<>(2);
// 创建线程池
MyThreadPool pool = new MyThreadPool(
10, workQueue);
// 提交任务
pool.execute(()->{
System.out.println("hello");
});
在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码①处的 while 循环。线程池主要的工作原理就这些
示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。
2.ThreadPoolExecutorThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue成员介绍:workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
- maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
- keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
- workQueue:工作队列,和上面示例代码的工作队列同义。
- threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
- handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略:
CallerRunsPolicy:提交任务的线程自己去执行该任务。
AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
DiscardPolicy:直接丢弃任务,没有任何异常抛出。
DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,这意味着如果项目很闲,就会将项目组的成员都撤走。
强烈建议使用有界队列,默认拒绝策略要慎重使用
3.Future// 提交 Runnable 任务 Future> submit(Runnable task); // 提交 Callable 任务Future submit(Callable task); // 提交 Runnable 任务及结果引用 Future submit(Runnable task, T result);
获得任务执行结果
这 3 个 submit() 方法之间的区别在于方法参数不同,下面我们简要介绍一下。
- 提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。
- 提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。
- 提交 Runnable 任务及结果引用 submit(Runnable task, T result):这个方法很有意思,假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result。这个方法该怎么用呢?下面这段示例代码展示了它的经典用法。需要你注意的是 Runnable 接口的实现类 Task 声明了一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。示例如下:
ExecutorService executor = Executors.newFixedThreadPool(1); // 创建 Result 对象 r Result r = new Result(); r.setAAA(a); // 提交任务 Future1.FutureTask 使用future = executor.submit(new Task(r), r); Result fr = future.get(); // 下面等式成立 fr === r; fr.getAAA() === a; fr.getXXX() === x class Task implements Runnable{ Result r; // 通过构造函数传入 result Task(Result r){ this.r = r; } void run() { // 可以操作 result a = r.getAAA(); r.setXXX(x); } }
其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。
// 创建 FutureTask FutureTaskfutureTask = new FutureTask<>(()-> 1+2); // 创建线程池 ExecutorService es = Executors.newCachedThreadPool(); // 提交 FutureTask es.submit(futureTask); // 获取计算结果 Integer result = futureTask.get();
烧水泡茶业务实现示例:
// 创建任务 T2 的 FutureTask FutureTask二、异步编程 4.CompletableFutureft2 = new FutureTask<>(new T2Task()); // 创建任务 T1 的 FutureTask FutureTask ft1 = new FutureTask<>(new T1Task(ft2)); // 线程 T1 执行任务 ft1 Thread T1 = new Thread(ft1); T1.start(); // 线程 T2 执行任务 ft2 Thread T2 = new Thread(ft2); T2.start(); // 等待线程 T1 执行结果 System.out.println(ft1.get()); // T1Task 需要执行的任务: // 洗水壶、烧开水、泡茶 class T1Task implements Callable { FutureTask ft2; // T1 任务需要 T2 任务的 FutureTask T1Task(FutureTask ft2){ this.ft2 = ft2; } @Override String call() throws Exception { System.out.println("T1: 洗水壶..."); TimeUnit.SECONDS.sleep(1); System.out.println("T1: 烧开水..."); TimeUnit.SECONDS.sleep(15); // 获取 T2 线程的茶叶 String tf = ft2.get(); System.out.println("T1: 拿到茶叶:"+tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; } } // T2Task 需要执行的任务: // 洗茶壶、洗茶杯、拿茶叶 class T2Task implements Callable { @Override String call() throws Exception { System.out.println("T2: 洗茶壶..."); TimeUnit.SECONDS.sleep(1); System.out.println("T2: 洗茶杯..."); TimeUnit.SECONDS.sleep(2); System.out.println("T2: 拿茶叶..."); TimeUnit.SECONDS.sleep(1); return " 龙井 "; } } // 一次执行结果: T1: 洗水壶... T2: 洗茶壶... T1: 烧开水... T2: 洗茶杯... T2: 拿茶叶... T1: 拿到茶叶: 龙井 T1: 泡茶... 上茶: 龙井
上例泡茶的代码可以使用CompletableFuture优化为:
// 任务 1:洗水壶 -> 烧开水 CompletableFutureCompletionStage接口f1 = CompletableFuture.runAsync(()->{ System.out.println("T1: 洗水壶..."); sleep(1, TimeUnit.SECONDS); System.out.println("T1: 烧开水..."); sleep(15, TimeUnit.SECONDS); }); // 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶 CompletableFuture f2 = CompletableFuture.supplyAsync(()->{ System.out.println("T2: 洗茶壶..."); sleep(1, TimeUnit.SECONDS); System.out.println("T2: 洗茶杯..."); sleep(2, TimeUnit.SECONDS); System.out.println("T2: 拿茶叶..."); sleep(1, TimeUnit.SECONDS); return " 龙井 "; }); // 任务 3:任务 1 和任务 2 完成后执行:泡茶 CompletableFuture f3 = f1.thenCombine(f2, (__, tf)->{ System.out.println("T1: 拿到茶叶:" + tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; }); // 等待任务 3 执行结果 System.out.println(f3.join()); void sleep(int t, TimeUnit u) { try { u.sleep(t); }catch(InterruptedException e){} } // 一次执行结果: T1: 洗水壶... T2: 洗茶壶... T1: 烧开水... T2: 洗茶杯... T2: 拿茶叶... T1: 拿到茶叶: 龙井 T1: 泡茶... 上茶: 龙井
CompletableFuture 类还实现了 CompletionStage 接口,任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等
CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。
主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。
2.描述 AND 汇聚关系CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口
3. 描述 OR 汇聚关系CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口
4. 异常处理CompletionStage exceptionally(fn); CompletionStagewhenComplete(consumer); CompletionStage whenCompleteAsync(consumer); CompletionStage handle(fn); CompletionStage handleAsync(fn);
下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。
CompletableFuture5.CompletionService 批量f0 = CompletableFuture .supplyAsync(()->7/0)) .thenApply(r->r*10) .exceptionally(e->0); System.out.println(f0.join());
用三个线程异步执行询价,通过三次调用 Future 的 get() 方法获取询价结果,之后将询价结果保存在数据库中。
// 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 异步向电商 S1 询价 Futuref1 = executor.submit( ()->getPriceByS1()); // 异步向电商 S2 询价 Future f2 = executor.submit( ()->getPriceByS2()); // 异步向电商 S3 询价 Future f3 = executor.submit( ()->getPriceByS3()); // 获取电商 S1 报价并保存 r=f1.get(); executor.execute(()->save(r)); // 获取电商 S2 报价并保存 r=f2.get(); executor.execute(()->save(r)); // 获取电商 S3 报价并保存 r=f3.get(); executor.execute(()->save(r));
简化为利用 CompletionService 实现询价系统
CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中.
CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:
ExecutorCompletionService(Executor executor);
ExecutorCompletionService(Executor executor, BlockingQueue
这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。
// 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 创建 CompletionService CompletionServicecs = new ExecutorCompletionService<>(executor); // 异步向电商 S1 询价 cs.submit(()->getPriceByS1()); // 异步向电商 S2 询价 cs.submit(()->getPriceByS2()); // 异步向电商 S3 询价 cs.submit(()->getPriceByS3()); // 将询价结果异步保存到数据库 for (int i=0; i<3; i++) { Integer r = cs.take().get(); executor.execute(()->save(r)); }
CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列



