最近在学JUC,学到线程池,感觉简简单单调用API对线程池的理解不够透彻,于是我决定,手写一个线程池。
首先,要手写一个线程池,就需要了解线程池具体的概念,它实现了什么功能,具体的组件是什么。以大哥李写的Executors为参考,下面先对线程池做一个简单的介绍。
线程池顾名思义,是一个装线程的一个池子。它可以创建,销毁线程,并且给这些线程安排任务。线程池被设计出来肯定是为了解决一些技术问题。技术问题就是比如创建一个线程需要T1的时间,销毁一个线程需要T2的时间,执行这个线程的任务需要T3的时间。一般来说,创建和销毁线程所需要的时间都很多,为了节省T1和T2的时间,使用线程池这个技术,可以避免多次创建,多次销毁线程,实现线程的复用。
线程池的具体组件:
- 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
- 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
- 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
- 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
先写任务队列
//使用泛型 class BlockingQueue{ //1.任务队列 private Deque queue = new ArrayDeque<>(); //2.锁 private ReentrantLock lock = new ReentrantLock(); //3.生产者条件变量 private Condition fullWaitSet = lock.newCondition(); //4.消费者条件变量 private Condition emptyWaitSet = lock.newCondition(); //5.容量 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } public T pull(long timeout, TimeUnit unit){ lock.lock(); try{ //转换为纳秒 long nanos =unit.tonanos(timeout); while(queue.isEmpty()){ try{ if(nanos<=0){ return null; } //返回的是剩余的等待时间 nanos = emptyWaitSet.awaitNanos(nanos); }catch (InterruptedException e){ e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; }finally { lock.unlock(); } } //阻塞获取 public T take(){ lock.lock(); try{ while(queue.isEmpty()){ try{ emptyWaitSet.await(); }catch (InterruptedException e){ e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; }finally { lock.unlock(); } } //阻塞添加 public void put(T element){ lock.lock(); try{ while(queue.size() == capcity) { try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } queue.addLast(element); emptyWaitSet.signal(); } }finally { lock.unlock(); } } //获取大小 public int size(){ lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } }
然后写工作线程
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run(){
//执行任务
//1)当task不为空,则执行任务
//2)当task执行完毕,接着从任务队列获取任务并执行
while(task != null || (task = taskQueue.pull(timeout,timeUnit)) != null){
try{
log.debug("正在执行。。。{}",task);
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
log.debug("worker被移除{}",this);
workers.remove(this);
}
}
}
}
最后写线程池
class ThreadPool{
//任务队列
private BlockingQueue taskQueue;
//线程集合
private HashSet workers = new HashSet<>();
//线程核心数
private int coreSize;
//获取任务超时时间
private long timeout;
//时间单位
private TimeUnit timeUnit;
public ThreadPool(int queueCapcity,int coreSize, long timeout, TimeUnit timeUnit) {
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
//执行任务
public void execute(Runnable task){
//当任务数没有超过coreSize时,直接交给worker对象执行
//如果任务超过coreSize,加入任务队列暂存
synchronized (workers){
if(coreSize>workers.size()){
Worker worker = new Worker(task);
log.debug("新增worker{}",worker);
workers.add(worker);
worker.start();
}else{
log.debug("加入任务队列{}",task);
taskQueue.put(task);
}
}
}
测试代码:
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(10,3,1000,TimeUnit.MILLISECONDS);
for(int i = 0; i < 10;i++){
int j = i;
log.debug("增加任务{}",j);
threadPool.execute(()->{
log.debug("{}",j);
});
}
}
}
测试结果:
出现问题,线程池中的线程执行完一个任务后,不会再接着去执行任务,而是等到时间结束之后自动被移出,代码问题不知道出在那里,有会的大佬希望能指点一下。



