栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java线程池执行流程、execute源码阅读

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java线程池执行流程、execute源码阅读

线程池

参考文章:Java线程池实现原理及其在美团业务中的实践

文章目录
    • 线程池
      • 线程池状态含义
      • 线程池状态转换
      • 线程池参数
      • ThreadExecutorPool
      • 线程池类型
      • 拒绝策略
      • 好处
      • 线程池执行流程

线程池状态含义
  1. RUNNING:接收新任务并处理阻塞队列里面的任务。
  2. SHUTDOWN:拒绝新任务但是处理阻塞队列里面的任务。
  3. STOP:拒接新任务并且会抛弃阻塞队列里面的任务,同时还会中断当前正在处理的任务。
  4. TIDYING:所有任务都执行完(包括阻塞队列里面的任务)后当前线程次活动线程为0,将要调用terminated`方法。
  5. TERMINATED:终止状态,terminated方法调用完成以后的状态。
线程池状态转换
  1. RUNNING->SHUTDOWN: 显式调用shutdown()方法或者隐式调用了finalize()方法里面的shutdown()方法。
  2. RUNNING或SHUTDOWN->STOP: 显式调用shutdownNow()。
  3. SHUTDOWN->TIDYING: 当线程池和任务队列都为空时。
  4. STOP->TIDYING: 当线程池为空时。
  5. TIDYING->TERMINATED: 当terminated() hook方法执行完成时。
线程池参数
  1. corePoolSize : 线程池核心线程个数。
  2. workQueue : 用于保存等待执行的任务的阻塞队列。比如基于数组的有界ArrayBlockingQueue、基于链表的无界linkedBlockingQueue、最多一个元素的同步队列SynchronousQueue及优先级队列PriorityBlockingQueue等。
  3. maximumPoolSize : 线程池最大线程个数。
  4. ThreadFactory : 创建线程的工厂。
  5. RejectedExecutionHandler : 饱和策略,当队列满并且线程个数达到maximumPoolSize后采取的策略,比如AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用者所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)及DiscardPolicy(默认丢弃,不抛出异常)。
  6. keepAliveTime : 存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。
  7. TimeUnit : 存活时间的时间单位。
ThreadExecutorPool
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
线程池类型
  1. newFixedThreadPool : 创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keepAliveTime = 0 说明只要线程个数比核心个数多并且当前空闲则回收。默认使用AbortPolicy(抛出异常)拒绝策略。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new linkedBlockingQueue());
}	

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new linkedBlockingQueue(),
                                  threadFactory);
}
  1. newSingleThreadExecutor: 创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keepAliveTime` = 0 说明只要线程个数比核心个数多并且当前空闲则回收。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new linkedBlockingQueue()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new linkedBlockingQueue(),
                                threadFactory));
}
  1. newCachedThreadPool : 创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列。keepAliveTime = 60 说明只要线程在60s内空闲则回收。加入同步队列的任务会被马上执行,同步队列最多只有一个任务。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue(),
                                  threadFactory);
}
拒绝策略
  1. AbortPolicy:默认策略。抛出RejectedExecutionExcepiton拒绝新任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}
  1. DiscardPolicy:不处理新任务,直接丢弃(一般不会用这个)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
  1. DiscardOldestPolicy:丢弃最早未处理的线程。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        // 弹出队列头部(先进先出,头部就是最早进入的)
        e.getQueue().poll();
        e.execute(r);
    }
}
  1. CallerRunsPolicy:调用执行自己的线程处理任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}
好处
  1. 降低资源消耗。通过重复利用已创建的线程降低和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,不需要等到线程创建完毕就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。
线程池执行流程
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

 private static int workerCountOf(int c) {
 	return c & CAPACITY;
 }

 private final BlockingQueue workQueue;

 public void execute(Runnable command) {
     // 如果任务为null,则抛出异常。
     if (command == null)
     throw new NullPointerException();
     // ctl 中保存的线程池当前的一些状态信息
     int c = ctl.get();

     // 下面会涉及到 3 步 操作
     // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
     // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true))
            return;
         c = ctl.get();
 	}
 	// 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
 	// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
 	if (isRunning(c) && workQueue.offer(command)) {
 		int recheck = ctl.get();
 		// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
		 if (!isRunning(recheck) && remove(command))
			 reject(command);
	 // 如果当前线程池为空就新创建一个线程并执行。
		 else if (workerCountOf(recheck) == 0)
 			addWorker(null, false);
 	}
 	//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
 	//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
     // 如果workQueue.offer(command)返回false,就会跑到这里执行一下!addWorker(command, false),也就是尝试新建线程
    else if (!addWorker(command, false))
 		reject(command);
 }

流程图如下:

注意:只有在等待队列满了后才会考虑创建新的非核心线程。

这里就暂时打住。之后有时间可以深入。

有兴趣的朋友可以关注一下我的公众号,不定期更新。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336632.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号