栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java 线程池详解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java 线程池详解

线程池详解

文章目录
  • 线程池详解
    • 1. 什么是线程池
    • 2. 线程池的实现原理
    • 3. 线程池创建
      • 3.1 Executor 框架
        • 3.1.1 两级调度框架
        • 3.1.2 框架结构
      • 3.2 ThreadPoolExecutor 类介绍
        • 3.2.1 newFixedThreadPool(固定数量的线程池)
        • 3.2.2 newSingleThreadExecutor(单线程化的线程池)
        • 3.2.3 newCachedThreadPool(可缓存的线程池)
        • 3.2.4 newScheduledThreadPool(可调度的线程池)
          • 1. scheduleAtFixedRate
          • 2. scheduleWithFixedDelay
        • 3.2.5 newSingleThreadScheduledExecutor(定时任务单线程的线程池)
        • 3.2.6 newWorkStealingPool(据当前CPU生成线程池)
        • 3.2.7 ThreadPoolExecutor(手动方式)
          • 1. ThreadPoolExecutor 参数说明
        • 3.2.8 FutureTask
          • 1. 简介
          • 2. 使用
          • 3. 实现
    • 4. 线程池状态
    • 5. 关闭线程池


1. 什么是线程池

线程池(ThreadPool)是⼀种基于池化思想管理和使用线程的机制。它是将多个线程预先存储在⼀个“池子”内,当有任务出现时可以避免重新创建和销毁线程所带来性能开销,只需要从“池子”内取出相应的线程执行对应的任务即可。

线程我们可以使用 new 的方式去创建,但如果并发的线程很多,每个线程执行的时间又不长,这样频繁的创建线程会大大的降低系统处理的效率,因为创建和销毁进程都需要消耗资源,线程池就是用来解决类似问题。

线程池实现了一个线程在执行完一段任务后,不销毁,继续执行下一段任务。用《Java并发编程艺术》提到线程池的优点:

  • 降低资源的消耗:使得线程可以重复使用,不需要在创建线程和销毁线程上浪费资源
  • 提高响应速度:任务到达时,线程可以不需要创建即可以执行
  • 线程的可管理性:线程是稀缺资源,如果无限制的创建会严重影响系统效率,线程池可以对线程进行管理、监控、调优。

回到顶部


2. 线程池的实现原理

从图中可以看到,当提交一个新任务到线程池时,线程池的处理流程如下:

  • 线程池判断核心线程池里的线程是否都在执行任务,如果不是,创建一个新的工作线程执行任务,否则进入下一流程
  • 线程池判断工作队列是否已满,如果工作队列没有满,将新提交的任务存储在工作队列中,否则进入下一流程
  • 线程池判断线程池里的线程是否都处于工作状态,如果没有,创建一个新的工作线程执行任务,否则交给饱和策略来处理这个任务

回到顶部


3. 线程池创建

线程池的创建方法总共有 7 种,总体来说可分为 2 类:

  • 通过 ThreadPoolExecutor 创建的线程池;
  • 通过 Executors 创建的线程池。

线程池的创建方式总共包含以下 7 种(其中前 6 种是通过 Executors 创建的,最后 1 种是通过 ThreadPoolExecutor 创建的):

  • Executors.newFixedThreadPool: 创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;
  • Executors.newCachedThreadPool: 创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后回收,若线程数不够,则新建线程;
  • Executors.newSingleThreadExecutor: 创建单个线程数的线程池,它可以保证先进先出的执行顺序;
  • Executors.newScheduledThreadPool: 创建⼀个可以执行延迟任务的线程池;
  • Executors.newSingleThreadScheduledExecutor: 创建一个单线程的可以执行延迟任务的线程池;
  • Executors.newWorkStealingPool: 创建一个抢占式执行的线程池(任务执行顺序不确定),根据当前CPU⽣成线程池【JDK1.8添加】;
  • ThreadPoolExecutor: 手动创建线程池的方式,它包含了 7 个参数可供设置。

线程池四大流程

1.创建线程池后,开始等待请求
2.当调用execute()方法添加一个请求任务时,线程池会做以下判断:

  • 如果正在运行的线程数量小于corePoolSize,马上创建线程执行任务
  • 如果正在运行的线程数量大于等于corePoolSize,将该任务放入等待队列
  • 如果等待队列已满,但正在运行线程数量小于max,创建非核心线程执行任务
  • 如果队列满了且正在运行的线程数量大于max,线程池会启动饱和拒绝策略

3.当一个线程完成任务时,会从等待队列中取下一个任务来执行

4.当空闲线程超过keepAliveTime定义时间,会判断:

  • 如果当前运行线程大于corePoolSize,该线程销毁
  • 所有线程执行完任务后,线程个数恢复到corePoolSize大小
3.1 Executor 框架

Excutor框架是线程池处理线程的核心,包括创建任务,传递任务,任务的执行三个方面

3.1.1 两级调度框架

在 HotSpot VM 的线程模型中,Java 线程被一对一映射为本地操作系统线程。在上层,Java 多线程程序通常应用分解成若干个任务,然后使用用户级的调度器(Executor)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器。这种两级调度模型的示意图如图所示:


从图中可以看出,应用程序通过 Executor 框架控制上层调度,下层的调度则由操作系统内核控制

3.1.2 框架结构

1.创建任务

执行的任务需要实现 Runnable 或者 Callable 接口,然后重写里面的 run 方法,这两个接口区别下面会写

2.传递任务

以前执行线程都是直接创建线程然后调用 start() 方法去执行线程,现在我们需要把任务传递到线程池里面去,传递任务的核心接口就是 Excutor 接口,而它下面有几个实现的方法,如图所示:

可以看到真正实现了功能的其实就是两个类 ThreadPoolExecutor 和 ScheduledTreadPollExecutor,而其中用的最多的就是这个 ThreadPoolExecutor ,下面会详细介绍,我们把任务通过这个方法的对象进行传递。

3.任务的执行及返回的结果

任务传递给线程池执行完毕后,对于不同的传递方式,会有不同的返回策略,对于利用 excutor 方法传递的任务,不管执行的怎么样都不会有值传递回来,而对于 submit 方法传递的任务会返回一个 FutureTask 对象,返回用户希望接受的值。

回到顶部


3.2 ThreadPoolExecutor 类介绍

Executor 框架最核心的类是 ThreadPoolExecutor,它是线程池的实现类

Executors返回的线程池对象的弊端:

1)FixedThreadPool和SingleThreadPool:
允许请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求导致OOM

2)CachedThreadPool和ScheduledThreadPool:
允许创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程导致OOM

3.2.1 newFixedThreadPool(固定数量的线程池)

称为可重用固定线程池,其源码:

	
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
   return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue(),
                                  threadFactory);
}

从源码可以看出,这个这个线程池的核心线程数 和 最大线程数量都设置为 nThreads ,这个参数是我们在创建线程池的时候传递的

其运行示意图,来源自《Java并发编程的艺术》:

上图说明:

  • 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
  • 当前运行的线程数等于corePoolSize后,如果再来新任务的话,会将任务加入LinkedBlockingQueue;
  • 线程池中的线程执行完手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行;

该方法用于创建一个“固定数量的线程池”,其唯一的参数用于设置池中线程的“固定数量”。

该线程池有以下特定:

1)如果线程池没有达到“固定数量”,每次提交一个任务线程池内就创建一个新线程,直到线程达到线程池固定的数量。

2)线程池的大小一旦达到“固定数量”就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

3)在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)。

适用的场景是:需要任务长时间的场景。

内部使用无界队列来存放排队任务,当大量任务超过线程池最大容量需要处理时,队列无限增大,使服务器资源迅速耗尽。

例子:

public class ThreadPoolDemo1 {
    public static void main(String[] args) {
        // 1.创建一个包含5个线程的线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        // 2.使用线程池执行任务
        for (int i = 0; i < 5; i++) {
            // 给线程池添加任务
            threadPool.submit(() -> System.out.println("线程名称:" + Thread.currentThread().getName()));
        }
 
        // 2.使用线程池执行任务2
        for (int i = 0; i < 10; i++) {
            // 给线程池添加任务
            threadPool.execute(() -> System.out.println("线程名称:" + Thread.currentThread().getName()));
        }
    }
}

结果:


线程池返回结果

示例代码:

public class ThreadPoolDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
 
        Future future = threadPool.submit(() -> {
            int num = new Random().nextInt(100);
            System.out.println("生成随机数:" + num);
            return num;
        });
        System.out.println("得到线程池返回结果:" + future.get());
    }
}

3.2.2 newSingleThreadExecutor(单线程化的线程池)

SingleThreadExecutor 是使用单个 worker 线程的 Executor,下面是 SingleThreadExecutor 的源代码实现

	
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue(),
                                    threadFactory));
    }

SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 被设置为 1,其他参数与 FixedThreadPool 相同。SingleThreadExecutor 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列


对上图说明如下:

  • 如果当前运行的线程数少于 corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务
  • 在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入 LinkedBlockingQueue
  • 线程执行完 1 中的任务后,会在一个无限循环中反复从 LinkedBlockingQueue 获取任务来执行

示例代码:

public class ThreadPoolDemo7 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            service.submit(() -> System.out.println("任务:" + finalI + ", 线程名:" + Thread.currentThread().getName()));
        }
    }
}

结果:

单线程的线程池有什么意义呢?

  • 自定义拒绝策略;
  • 提供了任务队列和任务管理的功能。
3.2.3 newCachedThreadPool(可缓存的线程池)

CachedThreadPool 是一个会根据需要创建新线程的线程池,下面是创建 CachedThreadPool 的源代码

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}

CachedThreadPool 的 corePoolSize 被设置为 0,即 corePool 为空。maximumPoolSize 被设置为 Integer.MAX_VALUE,即 maximumPool 是无界的。这里把 keepAliveTime 设置为 60L,意味着 CachedThreadPool 中的空闲线程等待新任务的最长时间为 60 秒,空闲线程超过 60 秒后将会被终止

CachedThreadPool 使用没有容量的 SynchronousQueue 作为线程池的工作队列,但 CachedThreadPool 的 maximumPool 是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度,CachedThreadPool 会不断创建新线程。极端情况下,CachedThreadPool 会因为创建过多线程而耗尽 CPU 和内存资源


示例代码:

public class ThreadPoolDemo4 {
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            service.submit(() -> System.out.println("i:" + finalI + " 线程名称:" + Thread.currentThread().getName()));
        }
    }
}

运行结果:

该方式适用于短时间有且有大量任务的场景,它的缺点是可能占用很多资源。

3.2.4 newScheduledThreadPool(可调度的线程池)

该方法用于创建一个“可调度线程池”,即一个提供“延时”和“周期性”任务调度功能的ScheduledExecutorService类型的线程池。

适用的场景是:周期性地执行任务地场景。

ScheduledThreadPoolExecutor 会把待调度的任务(ScheduledFutureTask)放到一个 DelayQueue 中。ScheduledFutureTask 主要包含三个成员变量

  • long 型成员变量 time,表示这个任务将要被执行的具体时间
  • long 型成员变量 sequenceNumber,表示这个任务被添加到 ScheduledThreadPoolExecutor 中的序号
  • long 型成员变量 period,表示任务执行的间隔周期

DelayQueue 封装了一个 PriorityQueue,这个 PriorityQueue 会对队列中的 ScheduledFutureTask 进行排序。排序时,time 小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask 的 time 相同,就比较 sequenceNumber,sequenceNumber 小的排在前面(如果两个任务的执行时间相同,先提交的任务先执行)

下图是 ScheduledThreadPoolExecutor 中的线程执行周期任务的过程

  • 线程 1 从 DelayQueue 获取已到期的 ScheduledFutureTask,到期任务是指
    ScheduledFutureTask 的 time 大于等于当前时间
  • 线程 1 执行这个 ScheduledFutureTask
  • 线程 1 修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间
  • 线程 1 把修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中

接下来我们看一下上图中线程获取任务的过程,源代码如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                available.await();
            } else {
                long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if (delay > 0) {
                    long tl = available.awaitNanos(delay);
                } else {
                    E x = q.poll();
                    assert x != null;
                    if (q.size() != 0)
                        available.signalAll();
                    return x;
                }
            }
        }
    } finally {
        lock.unlock();
    }
}

获取任务分为三大步骤:

  • 获取 Lock
  • 获取周期任务
  • 如果 PriorityQueue 为空,当前线程到等待队列中等待,否则执行下面的步骤
  • 如果 PriorityQueue 的头元素的 time 时间比当前时间大,到等待队列等待 time 时间,否则执行下面的步骤
  • 获取 PriorityQueue 的头元素,如果 PriorityQueue 不为空,则唤醒在等待队列中等待的所有线程
  • 释放 Lock

ScheduledThreadPoolExecutor 在一个循环中执行步骤二,直到线程从 PriorityQueue 获取到一个元素之后才会退出无限循环

最后我们再看把任务放入 DelayQueue 的过程,下面是源码实现

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        q.offer(e);
        if (first == null || e.compareTo(first) < 0) {
            available.signalAll();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

添加任务分为三大步骤:

  • 获取 Lock
  • 添加任务
  • 向 PriorityQueue 添加任务
  • 如果添加的任务是 PriorityQueue 的头元素,唤醒在等待队列中等待的所有线程
  • 释放 Lock

示例代码:

public class ThreadPoolDemo13 {
    public static void main(String[] args) {
        ScheduledExecutorService threadPool =
                Executors.newScheduledThreadPool(10);
        // 定时任务
        System.out.println("设置定时任务:" + new Date());
        threadPool.scheduleAtFixedRate(() -> 
                System.out.println("scheduleAtFixedRate:" + new Date()), 3, 2, TimeUnit.SECONDS);
    }
}

运行结果:

延迟3s后执行,之后每2s执行一次。

参数解释:

  • 参数1:执行任务;
  • 参数2:延迟 n 秒后执行;
  • 参数3:执行定时任务的频率;
  • 参数4:配合参数3使用的时间单位。
1. scheduleAtFixedRate

scheduleAtFixedRate 是以上⼀次任务的开始时间,作为下次定时任务的参考时间的(参考时间+延迟任务=任务执行)。

示例代码:

public class ThreadPoolDemo5 {
    public static void main(String[] args) {
        // 创建线程池
        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
        System.out.println("添加任务执行时间:" + LocalDateTime.now());
 
        // 2s之后开始执行定时任务,定时任务每隔4s执行一次
        service.scheduleAtFixedRate(() -> {
            System.out.println("执行了任务:" + LocalDateTime.now());
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 2, 4, TimeUnit.SECONDS);
    }
}


2s后开始执行定时任务,每隔5s执行一次。

设置的是每隔4秒执行一次定时任务,为什么实际上是5s执行一次呢?

注意,如果执行任务时间大于设置的定时任务执行时间,那么此方法会以执行任务的时间为准,简而言之,就是哪个时间长就以哪个时间作为定时任务执行的周期。

2. scheduleWithFixedDelay

scheduleWithFixedDelay 是以上⼀次任务的结束时间,作为下次定时任务的参考时间的。

示例代码:

public class ThreadPoolDemo5 {
    public static void main(String[] args) {
        // 创建线程池
        ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
        System.out.println("添加任务执行时间:" + LocalDateTime.now());
 
        // 2s之后开始执行定时任务,每次执行间隔4秒
        service.scheduleWithFixedDelay(() -> {
            System.out.println("执行了任务:" + LocalDateTime.now());
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 2, 4, TimeUnit.SECONDS);
    }
}

运行结果:

2s后开始执行任务,每隔9秒执行一次定时任务。

为什么这个也不是每隔4s执行一次,而是9s呢???

因为 scheduleWithFixedDelay 是以上⼀次任务的结束时间,作为下次定时任务的参考时间的,上个任务执行5s后,再延时4s执行延时任务。

回到顶部


3.2.5 newSingleThreadScheduledExecutor(定时任务单线程的线程池)

示例代码:

public class ThreadPoolDemo6 {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
        System.out.println("添加任务时间:" + LocalDateTime.now());
        service.schedule(() -> System.out.println("执行任务:" + LocalDateTime.now()), 2, TimeUnit.SECONDS);
    }
}

运行结果:

3.2.6 newWorkStealingPool(据当前CPU生成线程池)

示例代码:

public class ThreadPoolDemo8 {
    public static void main(String[] args) {
        ExecutorService service = Executors.newWorkStealingPool();
        for (int i = 0; i < 100; i++) {
            service.submit(() -> System.out.println("线程名:" + Thread.currentThread().getName()));
        }
        while (!service.isTerminated()){
        }
    }
}

3.2.7 ThreadPoolExecutor(手动方式)

源码分析:

	
    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                               ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
1. ThreadPoolExecutor 参数说明

1、corePoolSize:核心线程数,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即时其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。

可以大致理解为长期驻留的线程数目(除非设置了allowCoreThreadTimeOut)。对于不同的线程池,这个值可能会有很大区别,比如newFixedThreadPool 会将其设置为 nThreads,而对于 newCachedThreadPool 则是为 0。

2、maximumPoolSize:线程池能创建的最大线程的数量,在核心线程都被占用的时候,继续申请的任务会被搁置在等待队列里面,而当等待队列满了的时候,线程池就会把线程数量创建至maximumPoolSize 个。

对于newFixedThreadPool,是 nThreads,因为其要求是固定大小,而 newCachedThreadPool 则是 Integer.MAX_VALUE。

如何设置maximumPoolSize大小
Runtime.getRuntime().availableProcessors()方法获取核数

CPU密集型
maximumPoolSize设为核数+1

IO密集型
maximumPoolSize设为核数/阻塞系数

3、keepAliveTime:空闲线程的保活时间,如果线程的空闲时间超过这个值,那么将会被关闭。注意此值生效条件必须满足:空闲时间超过这个值,并且线程池中的线程数少于等于核⼼线程数corePoolSize。当然核心线程默认是不会关闭的,除非设置了allowCoreThreadTimeOut(true)那么核心线程也可以被回收。

如果任务很多,并且每个任务的执行时间都比较短,可以调大时间,提高线程利用率。

4、TimeUnit:参数keepAliveTime的时间单位

可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS)和纳秒(NANOSECONDS)

5、BlockingQueue:任务队列,用于存储线程池的待执行任务的。

用于保存等到执行的任务的阻塞队列,可以选择以下几个阻塞队列:
ArrayBlockingQueue
是一个数组实现的有界阻塞队列,队列中的元素按FIFO排序。ArrayBlockingQueue在创建时必须设置大小,接收的任务超出corePoolSize数量时,任务被缓存到该任务阻塞队列中。

LinkedBlockingQueue
是一个基于链表实现的阻塞队列,按FIFO排序任务,可以设置容量(有界队列),不设置容量则默认使用Integer.Max_VALUE作为容量(无界队列)。该队列的吞吐量高于ArrayBlockingQueue。快捷工厂方法Executors.newSingleThreadExecutor和newSingleThreadExecutor所创建的线程池使用此队列,并且都没有设置容量。

SynchronousQueue
(同步队列)是一个不存储元素的任务阻塞队列,每个插入操作必须等到另一个线程的调用移除操作,否则插入操作一直处于阻塞状态,其吞吐量通常高于LinkedBlockingQueue。快捷工厂方法Executors.newCachedThreadPool所创建的线程池使用此队列。

DelayQueue
是一个无界阻塞延迟队列,底层基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列获取元素(元素出队)时,只有已经过期的元素才会出队,队列头部的元素是过期最快的元素。快捷工厂方法Executors.newScheduledThreadPool所创建的线程池使用此队列。

PriorityBlockingQueue
一个具有优先级的无界阻塞队列

6、ThreadFactory:线程工厂:主要用来创建线程,一般默认即可

7、handler:饱和策略,即当线程池和等待队列都达到最大负荷量时,下一个任务来临时采取的策略。

当任务和线程池都满了,说明线程池处于饱和状态,必须采取一种策略处理提交的新任务。在 JDK5 中线程池框架提供了以下四种策略:

1.AbortPolicy:拒绝策略。使用该策略时,如果线程池队列满了,新任务就会被拒绝,并且抛出RejectedExecutionException异常。该策略是线程池默认的拒绝策略。

2.CallerRunsPolicy:调用者执行策略。在新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务。

"调用者运行"的调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量

3.DiscardOldestPolicy:抛弃最老任务策略。抛弃最老任务策略,也就是说如果队列满了,就会将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列。因为队列是队尾进队头出,队头元素是最老的,所以每次都是移除队头元素后再尝试入队。

4.DiscardPolicy:抛弃策略。使用该策略时,如果线程池队列满了,新任务就会直接被丢掉,并且不会有任何异常抛出。

5.自定义策略:如果以上拒绝策略都不符合需求,那么可自定义一个拒绝策略,实现RejectedExecutionHandler接口的rejectedExecution方法即可。

自定义拒绝策略:
示例代码:

public class ThreadPoolDemo10 {
    public static void main(String[] args) {
        ThreadFactory factory = r -> {
            Thread thread = new Thread(r);
            return thread;
        };
 
        // 手动方式创建线程池
        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS,
                        new LinkedBlockingDeque<>(2), factory,
                        new RejectedExecutionHandler() {
                            @Override
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                // 自定义拒绝策略
                                System.out.println("自定义拒绝策略");
                            }
                        });
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            executor.submit(() -> {
                try {
                    Thread.sleep(finalI * 100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "执行任务" + finalI);
            });
        }
        // 终止线程池
        executor.shutdown();
    }
}

运行结果:

回到顶部


3.2.8 FutureTask 1. 简介

Future 接口和实现 Future 接口的 FutureTask 类,代表异步计算的结果。FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口。因此,FutureTask 可以交给 Executor 执行,也可以由调用线程直接执行 FutureTask.run()。根据 FutureTask.run() 方法被执行的时机,FutureTask可以处于下面三种状态:

1.未启动

FutureTask.run() 方法还没有被执行之前,FutureTask 处于未启动状态,当创建一个 FutureTask,且没有执行 FutureTask.run() 方法之前,这个 FutureTask 处于未启动状态

2.已启动

FutureTask.run() 方法被执行的过程中,FutureTask 处于已启动状态

3.已完成

FutureTask.run() 方法执行完后正常结束,或被取消 FutureTask.cancel(…),或执行 FutureTask.run() 方法时抛出异常而结束,FutureTask 处于已完成状态

下图是 FutureTask 的状态迁移图

下图是 get 方法和 cancel 方法的执行示意图

  • 当 FutureTask 处于未启动或已启动状态时,执行 FutureTask.get() 方法将导致调用线程阻塞
  • 当 FutureTask 处于已完成状态时,执行 FutureTask.get() 方法将导致调用线程立即返回结果或抛出异常
  • 当 FutureTask 处于未启动状态时,执行 FutureTask.cancel() 方法将导致此任务永远不会被执行
  • 当 FutureTask 处于已启动状态时,执行 FutureTask.cancel(true)
    方法将以中断执行此任务线程的方式来试图停止任务
  • 当 FutureTask 处于已启动状态时,执行 FutureTask.cancel(false)
    方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成)
  • 当 FutureTask 处于已完成状态时,执行 FutureTask.cancel(…) 方法将返回 false
2. 使用

可以把 FutureTask 交给 Executor 执行,也可以通过 ExecutorService.submit(…) 方法返回一个 FutureTask,然后执行 FutureTask.get() 方法或 FutureTask.cancel(…) 方法,还可以单独使用 FutureTask

当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用 FutureTask。假设有多个线程执行若干任务,每个任务最多只能被执行一次。当多个线程试图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行

private final ConcurrentMap> taskCache = new ConcurrentHashMap<>();

private String executionTask(final String taskName)
        throws ExecutionException, InterruptedException {
    while (true) {
        Future future = taskCache.get(taskName); // 1.1, 2.1
        if (future == null) {
            Callable task = new Callable() {
                @Override
                public String call() throws InterruptedException {
                    return taskName;
                }
            };
            FutureTask futureTask = new FutureTask(task);
            future = taskCache.putIfAbsent(taskName, futureTask); // 1.3
            if (future == null) {
                future = futureTask;
                futureTask.run(); // 1.4 执行任务
            }
        }
        try {
            return future.get(); // 1.5, 2.2
        } catch (CancellationException e) {
            taskCache.remove(taskName, future);
        }
    }
}

  • 两个线程试图同时执行同一个任务,这里使用了线程安全的 ConcurrentHashMap 作为任务缓存可能到了注释
  • 两个线程都执行到 // 1.1, 2.1 这行时,假设线程一首先得到
    future,根据接下来的代码可得知,线程一创建任务放入缓存,并执行,而线程二获取线程一创建的任务,不需创建
  • 两个线程都在 // 1.5, 2.2 处等待结果,只有线程一执行完任务后,线程二才能从 future.get() 返回
3. 实现

FutureTask 的实现基于 AbstractQueuedSynchronizer(AQS)

FutureTask 声明了一个内部私有的继承 AQS 的子类 Sync,对 FutureTask 所有公有方法的调用都会委托给这个内部子类,FutureTask 的设计示意图如下所示

FutureTask.get() 方法会调用 AQS.acquireSharedInterruptibly(int arg) 方法,这个方法的执行过程如下:

  • 调用 AQS.acquireSharedInterruptibly(int arg) 方法,该方法会回调在子类 Sync 中实现的
    tryAcquireShared() 方法来判断 acquire 操作是否可以成功。acquire 操作可以成功的条件为:state
    为执行完成状态 RAN 或已取消状态 CANCELLED,且 runner 不为 null
  • 如果成功,get() 方法立即返回,否则线程等待队列中去等待其他线程执行 release 操作
  • 当其他线程执行 release 操作(FutureTask.run() 或FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行 tryAcquireShared() 将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程 最后返回计算的结果或抛出异常

FutureTask.run() 的执行过程如下:

  • 执行在构造函数中指定的任务
  • 以原子方式来更新同步状态(调用 AQS.compareAndSetState(int expect,int update),设置state 为执行完成状态 RAN)。如果这个原子操作成功,就设置代表计算结果的变量 result 的值为Callable.call()的返回值,然后调用 AQS.releaseShared(int arg)
  • AQS.releaseShared(int arg) 首先会回调在子类 Sync 中实现的 tryReleaseShared(arg)来执行 release 操作(设置运行任务的线程 runner 为 null,然会返回 true),然后唤醒线程等待队列中的第一个线程
  • 调用 FutureTask.done()

当执行 FutureTask.get() 方法时,如果 FutureTask 不是处于执行完成状态 RAN 或已取消状态CANCELLED,当前执行线程将到 AQS 的线程等待队列中等待(见下图的线程 A、B、C、D)。当某个线程执行 FutureTask.run() 方法或 FutureTask.cancel(…) 方法时,会唤醒线程等待队列的第一个线程

假设开始时 FutureTask 处于未启动状态或已启动状态,等待队列中已经有3个线程(A、B、C)在等待。此时,线程 D 执行 get() 方法将导致线程 D 也到等待队列中去等待

当线程 E 执行 run() 方法时,会唤醒队列中的第一个线程 A,线程 A 被唤醒后,首先把自己从队列中删除,然后唤醒它的后继线程 B,最后线程 A 从 get() 方法返回。线程 B、C、D 重复 A 线程的处理流程。最终,在队列中等待的所有线程都被级联唤醒并从 get() 方法返回

回到顶部


4. 线程池状态

线程池的5种状态具体如下:

1)RUNNING(运行状态):线程池创建之后的初始状态,这种状态下可以执行任务。

这是最正常的状态:接受新的任务,处理等待队列中的任务;

2)SHUTDOWN(关闭状态):该状态下线程池不接受新的任务提交,但是会继续处理等待队列中的任务;

3)STOP(停止状态):该状态下线程池不再接受新任务,也不会处理任务队列中的剩余任务,并且将会中断所有工作线程。

4)TIDYING(整理状态):该状态下所有任务都已终止或者处理完成,将会执行terminated()钩子方法。

5)TERMINATED(结束状态):执行完terminated()钩子方法之后的状态。

线程池的状态转换规则为:

1)线程池创建之后状态为RUNNING。

2)执行线程池的shutdown()实例方法,会使线程池状态从RUNNING转变为SHUTDOWN。

3)执行线程池的shutdownNow()实例方法,会使线程池状态从RUNNING转变为STOP。

4)当线程池处于SHUTDOWN状态时,执行其shutdownNow()方法会将其状态转变为STOP。

5)等待线程池的所有工作线程停止,工作队列清空之后,线程池状态会从STOP转变为TIDYING。

6)执行完terminated()钩子方法之后,线程池状态从TIDYING转变为TERMINATED。

线程池的状态转换规则如下图所示:

shutdown VS shutdownNow :

  • shutdown 执行时线程池终止接收新任务,并且会将任务队列中的任务处理完;
  • shutdownNow 执行时线程池终止接收新任务,并且会给终止执行任务队列中的任务。

回到顶部


5. 关闭线程池

可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池,它们的原理是遍历线程池中的工作线程,逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止

shutdown 方法和 shutdownNow 方法存在一定的区别:

  • shutdownNow 方法首先将线程池状态设置成 STOP,然后尝试停止所有正在执行或暂停任务的线程,并返回等待执行任务的列表
  • shutdown 方法只是将线程池状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程

只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true,当所有任务都已关闭,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该采用哪种方法关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法关闭线程池,如果任务不一定要执行完成,可以调用 shutdownNow 方法

回到顶部


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1036478.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号