栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

线程池ThreadPoolExecutor中线程的创建与销毁

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

线程池ThreadPoolExecutor中线程的创建与销毁

我们知道线程池是通过复用线程来实现的,那么在线程池中,线程是怎么创建与销毁的呢?我们通过源码来一探究竟。

给线程池添加一个任务是通过execute方法来实现的,那我们就从这个方法入手:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当前线程数小于核心线程数——新建核心线程
if (workerCountOf© < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//当前线程数大于或等于核心线程数,且线程池处于Running状态——任务加入队列
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//队列添加失败(队列满了)——新增非核心线程
else if (!addWorker(command, false))
reject(command);
}
参考代码及注释,在execute方法中,针对欲执行的任务command会有3种处理方法:(按照顺序对应代码中标红部分)

1、新增核心线程执行

2、添加到任务队列

3、新增非核心线程执行

线程池中线程的创建就在addWorker方法中:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; {
int c = ctl.get();
int rs = runStateOf©;

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
            firstTask == null &&
            ! workQueue.isEmpty()))
        return false;

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;

}
在线程池中,线程的表现形式为一个Worker类(实现了Runnable接口,并在构造函数中实例化了一个任务为自己this的线程),当成功创建一个Worker对象之后,会启动运行自己的线程,也就是说新创建的线程进入了工作状态。

在addWorker方法中有一个core参数用来区分是核心线程还是非核心线程,但是仅在第一处标红的地方使用到,作用就是用来判断当前线程池是否满载(核心线程满载依据为最大核心线程是;非核心线程满载依据为最大线程数)。也就是说,在线程池中,核心线程和非核心线程本质上没有任何区别。那么核心线程是如何做到完成任务后不被销毁,而非核心线程完成任务后就会被销毁呢?继续往下看。

经过上述操作。我们已经创建了一个线程并启动了他,而具体执行业务逻辑的代码则在Worker对象中的run方法,run方法最终调用runWorker方法:

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
通过代码可以看到,在runWorker方法中,线程第一次会将创建时的构造参数firstTask赋值给task,并调用task的run方法。接下来会循环从getTask方法中继续获取task并调用task的run方法直到task为null停止。

所以线程池创建的线程会经历如下状态:创建——工作——工作——n次工作——无工作可做——结束(销毁)

也就是说线程池中的线程不是被动销毁的,而是线程自己本身发现无任务可做之后,整个生命周期自然而然的结束。到这里已经解释了非核心线程执行完任务之后的销毁操作,那么核心线程执行完任务之后是如何保持不销毁的呢?秘密就在于getTask方法中:

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }

    int wc = workerCountOf(c);

    // Are workers subject to culling?
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
        if (compareAndDecrementWorkerCount(c))
            return null;
        continue;
    }

    try {
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        timedOut = false;
    }
}

}
在getTask方法中,通过allowCoreThreadTimeOut与当前线程数及核心线程数等配置获取一个布尔值timed,这个值就是用来控制线程超时时间的标记。

当timed为true时,会通过阻塞队列来实现线程的阻塞从而达到线程不销毁的目的;当timed为false时,则不会阻塞,此时如果获取到task为null,则线程自然结束销毁。

总结:

1、线程池是由一堆线程+阻塞队列组成。可以类比成MQ系统,线程就是消费者、阻塞队列就是MQ、execute方法就是生产者。

2、线程池中虽然有核心线程和非核心线程之分,但是本质上是没有区别的。线程最终是阻塞等待任务还是销毁取决于当前线程在getTask方法时刻线程池整体的状态。也就是说一个线程是核心线程还是非核心线程是不确定的。

3、线程池中线程的销毁不是线程池来销毁的,而是线程本身的行为,线程从创建开始就一直工作,直到无任务可工作之后会由getTask方法的阻塞状态来决定销毁(非核心线程)还是阻塞(核心线程)。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/733088.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号