目录
1.什么是线程池
2.线程池的创建
3.线程池任务调度流程
4.线程池的拒绝策略
5.线程池的状态
6. 线程池的关闭
7.ForkJoinPool
工作窃取算法
代码示例
8.CompletionService
9.CompletableFuture
1.什么是线程池
线程池就是管理线程的一个容器。
2.线程池的创建
JAVA中创建线程池主要有两类方法:
(1)通过Executors工厂类提供的方法,该类提供了4种不同的线程池可供使用。
ExecutorService pool = Executors.newSingleThreadExecutor(); ExecutorService pool2 = Executors.newFixedThreadPool(3); ExecutorService pool3 = Executors.newCachedThreadPool(); ExecutorService pool4 = Executors.newScheduledThreadPool(3);
(2)通过ThreadPoolExecutor类进行自定义创建。
BlockingQueueblockingQueue = new linkedBlockingDeque (10); ExecutorService pool = new ThreadPoolExecutor(3, // corePoolSize 12, // maximumPoolSize 5, TimeUnit.SECONDS, // max idle time blockingQueue // blocking queue ) { // three hook functions @Override protected void terminated() { } @Override protected void beforeExecute(Thread t, Runnable target) { } @Override protected void afterExecute(Runnable target, Throwable e) { } };
提交任务的方式
pool.execute(new TargetTask()); pool.submit(new TargetTask());
3.线程池任务调度流程
4.线程池的拒绝策略
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,就会执行拒绝策略。
- CallerRunsPolicy
在调用者线程中直接执行被拒绝任务的 run 方法,除非线程池已经 shutdown,则直接抛弃任务。
2. AbortPolicy
直接丢弃任务,并抛出 RejectedExecutionException 异常。
3. DiscardPolicy
直接丢弃任务,什么都不做。
4. DiscardOldestPolicy
抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列。
5.实现 RejectedExecutionHandler 接口自定义策略。(上面4个拒绝策略也是实现的RejectedExecutionHandler接口)
5.线程池的状态
- RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务。
- SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown() 方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用 shutdown() 方法进入该状态)。
- STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。
- TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入 TERMINATED 状态。
- TERMINATED:在 terminated() 方法执行完后进入该状态,默认 terminated() 方法中什么也没有做。
6. 线程池的关闭
// close the thread pool gracefully
if (!pool.isTerminated()) {
try {
//最多关闭1000次,每次等10毫秒判断是否关闭,如果关闭成功,则跳出循环。
for (int i = 0; i < 1000; i++) {
if (pool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
pool.shutdownNow();
}
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (Throwable e) {
System.out.println(e.getMessage());
}
}
7.ForkJoinPool
ForkJoin框架是从jdk1.7中引入的新特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入指定的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
工作窃取算法
充分利用线程进行并行计算.
把一个比较大的任务分割为若干互不依赖的子任务,把这些子任务分别放到不同的队列里,减少线程间的竞争,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。如果有的线程先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程就去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。这时它们会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,会使用双端队列存放任务,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
代码示例
// create a ForkJoinPool
ForkJoinPool forkjoinPool = new ForkJoinPool();
forkjoinPool.submit(new RecursiveTask() {
public static final int threshold = 2;
private int start;
private int end;
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
}
);
8.CompletionService
CompletionService的实现目标是任务先完成可优先获取到,即结果按照完成先后顺序排序。
Java8的CompletionService使用与原理
9.CompletableFuture
简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture详解



