对于服务端,在高并发场景下,如果每来一个客户端的请求都创建一个线程然后去执行客户端发来的请求,在原型阶段是不错的选择,但是面对成千上万的任务递交给服务器时,如果还是采用一个任务一个线程的方式,那将会创建成千上万的线程,这不是一个好的选择。
一方面系统会频繁的发生上下文切换,产生额外的开销,且性能也没用实质性的提高;另一方面线程的创建和销毁都是十分耗费系统资源的,且java线程是内核级线程,每次线程的创建和销毁都需要系统调用,无疑加剧系统的负担。
线程池很好的解决了该问题,线程池即预先创建固定多的线程(可以动态增长或减少线程),客服端请求的任务提交到线程池中,由线程池中空闲的线程执行,让这些线程得到重复的利用 ,如果没有空闲的线程则让任务延迟执行。一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面面的过量的任务提交能够平缓的劣化。
自定义线程池我们先来简单的实现一个简单版的线程池,后面再讲解jdk提供的线程池实现,由浅入深。
如上图:BlockingQueue存放的是任务对象,服务端这边接收到客户端的任务后,把任务提交到阻塞队列里,线程池源源不断的从阻塞队列中拿任务执行,差不多就是生产者和消费者的工作方式,阻塞队列是平衡主线程和线程池速度差异的一个中间容器。
一种情况是任务太少,线程池中的有些线程就空闲了,需要在阻塞队列里阻塞等待新任务的到来,另一种情况是任务数太多,那么线程较少的情况下,任务对象也需要到阻塞队列里进行等待,等待空闲线程空出手取走任务并执行它。
由此可以看出,Blocking Queue是一个核心。
Blocking Queue代码:
@Slf4j public class BlockingQueue{ //任务队列 private Deque queue = new linkedList<>(); //锁,用于同步生产者和消费者 private ReentrantLock lock = new ReentrantLock(); //条件变量,emptyWaitSet装消费者,fullWaitSet装生产者 private Condition emptyWaitSet = lock.newCondition(); private Condition fullWaitSet = lock.newCondition(); //任务队列的最大容量 private int capacity; public BlockingQueue(int capacity) { this.capacity = capacity; } public Job poll() { Job job = null; lock.lock(); try { while (queue.isEmpty()) { log.debug("{}进入等待状态,原因:任务队列为空", Thread.currentThread().getName()); emptyWaitSet.await(); } job = queue.removeFirst(); fullWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } //需要返回任务对象 return job; } public Job poll(long timeout, TimeUnit timeUnit) { Job job = null; lock.lock(); try { //统一转换为毫秒 timeout = timeUnit.toMillis(timeout); long start = System.currentTimeMillis(); long remaining = timeout; //如果队列为空或者是等待未超时就继续等待 while (queue.isEmpty() && remaining > 0) { log.debug("{}进入等待状态,原因:任务队列为空", Thread.currentThread().getName()); emptyWaitSet.await(remaining, TimeUnit.MILLISECONDS); remaining = timeout - (System.currentTimeMillis() - start); } //如果还是为空则证明是超时了,返回一个null if (queue.isEmpty()) { log.debug("{}获取任务超时", Thread.currentThread().getName()); return null; } job = queue.removeFirst(); fullWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } //需要返回任务对象 return job; } public void put(Job job) { lock.lock(); try { while (queue.size() == capacity) { log.debug("{}进入等待状态,原因:任务队列满", Thread.currentThread().getName()); fullWaitSet.await(); } log.debug("{}提交了一个任务{}", Thread.currentThread().getName(), job); queue.addLast(job); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public boolean put(Job job, long timeout, TimeUnit timeUnit) { timeout = timeUnit.toMillis(timeout);//转为统一格式 lock.lock(); try { long start = System.currentTimeMillis(); long remaining = timeout; while (remaining > 0 && queue.size() == capacity) { fullWaitSet.await(remaining, TimeUnit.MILLISECONDS); remaining = timeout - (System.currentTimeMillis() - start); } //如果因为超时而结束,则提交任务失败 if (queue.size() == capacity) { log.debug("{}提交{}任务失败,原因:超时", Thread.currentThread().getName(), job); return false; } log.debug("{}提交了一个任务{}", Thread.currentThread().getName(), job); queue.addLast(job); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return true; } public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public void putWithPolicy(RejectPolicy policy, Job task) { lock.lock(); try { //队列满的情况 if (queue.size() == capacity) { policy.put(this, task); } else {//如果任务队列没满直接加入即可 log.debug("{}提交了一个任务{}", Thread.currentThread().getName(), task); queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } }
ThreadPool:
@Slf4j public class ThreadPool{ private static final int MAX_WORKER_NUMBERS = 10; private static final int MIN_WORKER_NUMBERS = 1; private static final int DEFAULT_WORKER_NUMBERS = 5; //阻塞队列 private BlockingQueue blockingQueue; //线程池中的线程数 private AtomicInteger workerNum; //worker计数器 private AtomicInteger threadNum = new AtomicInteger(0); //线程池中的所有线程集合 private Set workers = new HashSet<>(); //拒绝策略 private RejectPolicy policy; public ThreadPool(int taskCount, RejectPolicy policy) { blockingQueue = new BlockingQueue (taskCount); workerNum = new AtomicInteger(DEFAULT_WORKER_NUMBERS); this.policy = policy; initializeWorkers(DEFAULT_WORKER_NUMBERS); } public ThreadPool(int threadSize, int taskCount, RejectPolicy policy) { threadSize = threadSize > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : (threadSize < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : threadSize); workerNum = new AtomicInteger(threadSize); blockingQueue = new BlockingQueue<>(taskCount); this.policy = policy; initializeWorkers(threadSize); } private void initializeWorkers(int numbers) { synchronized (workers) { for (int i = 0; i < numbers; i++) { Worker worker = new Worker(); workers.add(worker); //开始工作 Thread t = new Thread(worker, "worker" + threadNum.getAndIncrement()); log.debug("{}开始工作", t.getName()); t.start(); } } } public void addWorkers(int num) { while (true) { //如果当前线程数已经是最大值了那就退出即可 if (workerNum.get() == MAX_WORKER_NUMBERS) { return; } else { //先取到当前的数量 int nowNum = workerNum.get(); //如果当前的数量+增长的数量大于最大线程数就增加到最大值为止 if (nowNum + num >= MAX_WORKER_NUMBERS) { if (workerNum.compareAndSet(nowNum, MAX_WORKER_NUMBERS)) { initializeWorkers(MAX_WORKER_NUMBERS - nowNum); return; } } else { //如果没到最大值,就CAS增加即可,谁先改掉线程数,谁就可以开始增加了 if (workerNum.compareAndSet(nowNum, nowNum + num)) { initializeWorkers(num); return; } } } } } public void shutdown() { for (Worker worker : workers) { worker.shutdown(); } } public void execute(Job task) { //直接提交到BlockingQueue的任务队列中,BlockingQueue是线程安全的 //还有一个问题就是主线程在提交任务时,如果任务队列满了,那么主线程也阻塞等待的话不是很友好 //所以可以运用策略模式,让用户自己选择提交任务时的策略,有以下几种: //1)死等,阻塞等待任务队列空出位置 //2)带超时的等待 ,阻塞等待一段时间 //3)让提交任务的线程放弃任务的执行 //4)抛出异常 //5)让提交任务的线程自己执行任务 // blockingQueue.put(task); blockingQueue.putWithPolicy(policy, task); } class Worker implements Runnable { //任务对象 private Runnable task; //该线程是否正在运行,如果为false则结束此线程的运行 private volatile boolean running = true; @Override public void run() { //1.如果task为null则退出循环 while (running) { if (task != null) { log.debug("{}开始执行任务{}", Thread.currentThread().getName(), task); task.run();//执行任务 task = null; } else { //当前线程会在BlockingQueue中阻塞等待或者是超时返回null task = blockingQueue.poll(); } } } //关闭当前线程 public void shutdown() { this.running = false; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } Worker worker = (Worker) o; return running == worker.running && Objects.equals(task, worker.task); } @Override public int hashCode() { return Objects.hash(task, running); } } } @FunctionalInterface interface RejectPolicy { void put(BlockingQueue queue, Job task); }
测试类:
@Slf4j
public class Demo {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool<>(3, 3, (queue, task) -> {
//1)死等,阻塞等待任务队列空出位置
queue.put(task);
//2)带超时的等待 ,阻塞等待一段时间
// queue.put(task,3000, TimeUnit.MILLISECONDS);
//3)让提交任务的线程放弃任务的执行
// 随便写,不做任何事就行
//4)抛出异常
// throw new RuntimeException("任务执行失败");
//5)让提交任务的线程自己执行任务
// task.run();
});
for (int i = 0; i < 20; i++) {
int j = i;
pool.execute(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("任务{}被执行完了", j);
});
}
}
}



