跳转链接:Java 线程池的使用【一】【Executor】
跳转链接:Java 线程池的使用【三】【Executors】
一、ThreadPoolExecutor类的介绍:
线程池实现类ThreadPoolExecutor是Executor框架最核心的类,继承自Executor接口,里面有一个execute方法,用来执行线程,线程池主要提供一个队列,队列中保存着所有等待状态的线程。
ThreadPoolExecutor类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法基础上产生的。
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//空闲存活时间
TimeUnit unit,//时间单位
BlockingQueue workQueue,//线程池任务队列
ThreadFactory threadFactory,//创建线程的工厂
RejectedExecutionHandler handler) {//拒绝策略
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}
参数介绍:
1、corePoolSize 核心线程数: 是指线程池中长期存活的线程数;
2、maximumPoolSize 最大线程数: 线程池允许创建的最大线程数量,当线程数池的任务队列满了之后,可以创建的最大线程数;
注意:最大线程数 maximumPoolSize 的值不能小于核心线程数 corePoolSize,否则在程序运行时会报 IllegalArgumentException 非法参数异常。
3、keepAliveTime,空闲线程存活时间: 当线程池中没有任务时,会销毁一些线程,销毁的线程数=maximumPoolSize(最大线程数)-corePoolSize(核心线程数);
4、TimeUtil,时间单位: 空闲线程存活时间的描述单位,此参数配合参数3使用;
TimeUtil有7个参数:
- TimeUnit.DAYS:天
- TimeUnit.HOURS:小时
- TimeUnit.MINUTES:分
- TimeUnit.SECONDS:秒
- TimeUnit.MILLISECONDS:毫秒
- TimeUnit.MICROSECONDS:微妙
- TimeUnit.NANOSECONDS:纳秒
5、BlockingQueue 阻塞队列: 线程池存放任务的队列,用来存储线程池的所有待执行任务。
它可以设置以下几个值:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
比较常用的是LinkedBlockingQueue,线程池的排队策略和BlockingQueue息息相关。
6、ThreadFactory 线程工厂: 线程池创建线程调用的工厂方法,通过此方法可以设置线程的优先级、线程命名规则以及线程类型(用户线程还是守护线程)等。
线程工厂的使用实例如下:
public static void main(String[] args) {
//创建线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
//创建线程池中的线程
Thread thread = new Thread(r);
//设置线程名称
thread.setName("Thread-" + r.hashCode());
//设置线程优先级(最大值:10)
thread.setPriority(Thread.MAX_PRIORITY);
//......
return thread;
}
};
//创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10,
0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);
//使用自定义的线程工厂
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
Thread thread = Thread.currentThread();
System.out.println(String.format("线程:%s,线程优先级:%d", thread.getName(), thread.getPriority()));
}
});
}
以上程序运行结果:
线程:Thread-495623552,线程优先级:10 Process finished with exit code -1
从上述执行结果可以看出,自定义线程工厂起作用了,线程的名称和线程优先级都是通过线程工厂设置的。
7、RejectedExecutionHandler 拒绝策略: 当线程池任务超出线程池队列可以存储的最大值后,执行的策略。
默认有以下四种策略:
- AbortPolicy:拒绝并抛出异常;
- CallerRunsPolicy:使用当前调用的线程来执行此任务;
- DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务;
- DiscardPolicy:忽略并抛弃当前任务。
线程池的默认策略是:AbortPolicy(拒绝并抛出异常)。
二、工作过程:
1、通过execute方法添加任务时,如果工作线程数小于corePoolSize,则创建一个新线程并执行该任务。注:新创建的线程会通过while循环不断从阻塞队列获取任务执行。
2、如果工作线程数大于等于corePoolSize,则将任务添加到阻塞队列。
3、如果阻塞队列满了,则判断工作线程是否小于maximumPoolSize,如果小于则创建新线程并执行该任务,否则调用handler拒绝策略。
三、执行任务逻辑:(原码分析)
线程调用runWork方法,通过while循环从getTask方法获取任务并执行。只要getTask方法不返回null,线程就不会退出。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
四、获取任务过程:
getTask方法实际上是从workQueue阻塞队列取任务。如果工作线程数大于corePoolSize,则会调用workQueue的poll方法获取任务,超时时间是keepAliveTime,如果超过了keepAliveTime时长,poll返回了null,上面提到的while循环就会退出,线程也就退出了。如果工作线程数小于等于corePoolSize,则会调用workQueue的take方法阻塞在当前直到有新的任务进来。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
五、使用方法:
1、直接new一个对象:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
8, 8, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue(30),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
2、通过Spring配置:
六、总结:
线程池大小最好是corePoolSize和maximumPoolSize保持一致,不要超过CPU核心数的3倍,如果纯计算任务(包括同机房的redis服务),只有很少的网络或者磁盘IO,那么线程池的大小设置为CPU的2倍比较合理。



