栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java 中ThreadPoolExecutor原理分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java 中ThreadPoolExecutor原理分析

java 中ThreadPoolExecutor原理分析

线程池简介

Java线程池是开发中常用的工具,当我们有异步、并行的任务要处理时,经常会用到线程池,或者在实现一个服务器时,也需要使用线程池来接收连接处理请求。

线程池使用

JDK中提供的线程池实现位于java.util.concurrent.ThreadPoolExecutor。在使用时,通常使用ExecutorService接口,它提供了submit,invokeAll,shutdown等通用的方法。

在线程池配置方面,Executors类中提供了一些静态方法能够提供一些常用场景的线程池,如newFixedThreadPool,newCachedThreadPool,newSingleThreadExecutor等,这些方法最终都是调用到了ThreadPoolExecutor的构造函数。

ThreadPoolExecutor的包含所有参数的构造函数是


    private static final long serialVersionUID = 6138294804551838833L;
    
    final Thread thread;
    
    Runnable firstTask;
    
    volatile long completedTasks;
    
    Worker(Runnable firstTask) {
      setState(-1); // inhibit interrupts until runWorker
      this.firstTask = firstTask;
      this.thread = getThreadFactory().newThread(this);
    }
    
    public void run() {
      runWorker(this);
    }
    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    protected boolean isHeldExclusively() {
      return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
      if (compareAndSetState(0, 1)) {
 setExclusiveOwnerThread(Thread.currentThread());
 return true;
      }
      return false;
    }
    protected boolean tryRelease(int unused) {
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }
    public void lock()    { acquire(1); }
    public boolean tryLock() { return tryAcquire(1); }
    public void unlock()   { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    void interruptIfStarted() {
      Thread t;
      if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
 try {
   t.interrupt();
 } catch (SecurityException ignore) {
 }
      }
    }

runWorker(Worker)是Worker的轮询执行逻辑,不断地从工作队列中获取任务并执行它们。Worker每次执行任务前需要进行lock,防止在执行任务时被interrupt。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      while (task != null || (task = getTask()) != null) {
 w.lock();
 // If pool is stopping, ensure thread is interrupted;
 // if not, ensure thread is not interrupted. This
 // requires a recheck in second case to deal with
 // shutdownNow race while clearing interrupt
 if ((runStateAtLeast(ctl.get(), STOP) ||
    (Thread.interrupted() &&
    runStateAtLeast(ctl.get(), STOP))) &&
   !wt.isInterrupted())
   wt.interrupt();
 try {
   beforeExecute(wt, task);
   Throwable thrown = null;
   try {
     task.run();
   } catch (RuntimeException x) {
     thrown = x; throw x;
   } catch (Error x) {
     thrown = x; throw x;
   } catch (Throwable x) {
     thrown = x; throw new Error(x);
   } finally {
     afterExecute(task, thrown);
   }
 } finally {
   task = null;
   w.completedTasks++;
   w.unlock();
 }
      }
      completedAbruptly = false;
    } finally {
      processWorkerExit(w, completedAbruptly);
    }
  }

ThreadPoolExecutor的submit方法中将Callable包装成FutureTask后交给execute方法。

FutureTask

FutureTask继承于Runnable和Future,FutureTask定义的几个状态为
NEW, 尚未执行
COMPLETING, 正在执行
NORMAL, 正常执行完成得到结果
EXCEPTIONAL, 执行抛出异常
CANCELLED, 执行被取消
INTERRUPTING,执行正在被中断
INTERRUPTED, 已经中断。

其中关键的get方法

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
  }

先获取当前状态,如果还未执行完成并且正常,则进入等待结果流程。在awaitDone不断循环获取当前状态,如果没有结果,则将自己通过CAS的方式添加到等待链表的头部,如果设置了超时,则LockSupport.parkNanos到指定的时间。

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
  }
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
      if (Thread.interrupted()) {
 removeWaiter(q);
 throw new InterruptedException();
      }
      int s = state;
      if (s > COMPLETING) {
 if (q != null)
   q.thread = null;
 return s;
      }
      else if (s == COMPLETING) // cannot time out yet
 Thread.yield();
      else if (q == null)
 q = new WaitNode();
      else if (!queued)
 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
      q.next = waiters, q);
      else if (timed) {
 nanos = deadline - System.nanoTime();
 if (nanos <= 0L) {
   removeWaiter(q);
   return state;
 }
 LockSupport.parkNanos(this, nanos);
      }
      else
 LockSupport.park(this);
    }
  }

FutureTask的run方法是执行任务并设置结果的位置,首先判断当前状态是否为NEW并且将当前线程设置为执行线程,然后调用Callable的call获取结果后设置结果修改FutureTask状态。

public void run() {
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
      return;
    try {
      Callable c = callable;
      if (c != null && state == NEW) {
 V result;
 boolean ran;
 try {
   result = c.call();
   ran = true;
 } catch (Throwable ex) {
   result = null;
   ran = false;
   setException(ex);
 }
 if (ran)
   set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING)
 handlePossibleCancellationInterrupt(s);
    }
  }

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/147044.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号