一、简介
1. 为什么引入
- 一个任务过来,一个线程去负责做。如果每次过来都去创建新的线程,性能低且比较耗费内存
- 线程并不是越多越好,如果线程数多于CPU的核心数,由于每次线程切换,要保存原来线程饿的状态,开启现在的线程,势必会更加耗费资源
- 充分利用已有线程,去处理原来的任务
2. 自定义线程池
- 线程池(消费者):用来保存一定数量的线程来处理任务
- 生产者:用来不断的产生任务
- 阻塞队列:用来平衡消费者和生产者之间任务处理的一个等待队列
1. 生产者生产任务的速度较快,多余的任务需要存储在blocking queue中
2. 生产者生产任务速度慢,那么线程池中也需要在blocking queue中等
3. blocking queue:用来平衡生产者和消费者之间的一个桥梁
二、自定义线程池
1. 阻塞队列
package com.dreamer.multithread.day03;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue {
private Deque queue = new ArrayDeque();
private ReentrantLock lock = new ReentrantLock();
private Condition producerRoom = lock.newCondition();
private Condition consumerRoom = lock.newCondition();
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public T getTaskWithTimes(long timeout, TimeUnit timeUnit) {
lock.lock();
try {
// 统一时间管理为nano,将s,h,d转换为纳秒
long nanos = timeUnit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
nanos = consumerRoom.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
producerRoom.signalAll();
return t;
} finally {
lock.unlock();
}
}
public T getTask() {
lock.lock();
try {
// while 防止虚假唤醒,不能用if
while (queue.isEmpty()) {
try {
System.out.println(" no task,consumer waiting....");
consumerRoom.await();
// 被唤醒后,再次进入while循环,先判断是否为空,因为可能刚添加的任务已经被消费了
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 唤醒后执行返回,并唤醒等待添加的线程
T t = queue.removeFirst();
producerRoom.signalAll();
return t;
} finally {
lock.unlock();
}
}
public void addTask(T t) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
System.out.println("full task, producer waiting");
producerRoom.await();
// 唤醒后先再次检查,不为空,执行添加, 并唤醒在等着消费的线程
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(t);
consumerRoom.signalAll();
} finally {
lock.unlock();
}
}
public int getSize() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
2. 线程池
package com.dreamer.multithread.day03;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
private BlockingQueue blockingQueue;
private Set workers = new HashSet();
private int threadNumber;
private long timeout;
private TimeUnit timeUnit;
public ThreadPool(int blockingQueueCapacity, int threadNumber, long timeout, TimeUnit timeUnit) {
blockingQueue = new BlockingQueue<>(100);
this.threadNumber = threadNumber;
this.timeout = timeout;
this.timeUnit = timeUnit;
}
public synchronized void execute(Runnable task) {
if (workers.size() < threadNumber) {
Worker workerThread = new Worker(task);
System.out.println("新增worker对象:" + workerThread);
workerThread.start(); // 开启线程执行任务
workers.add(workerThread);
} else {
System.out.println("线程池已满,加入等待队列中");
blockingQueue.addTask(task);
}
}
private class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 检查当前的任务,检查阻塞队列中是否有任务
// 如果没有任务,线程会尝试一直去阻塞等待:程序不会结束,只需要换一个超时方法即可
// task = blockingQueue.getTaskWithTimes(2, TimeUnit.SECONDS)) != null) {
while (task != null || (task = blockingQueue.getTask()) != null) {
// 执行任务时候可能有异常
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;//本次执行完毕,置为null,继续进入下一个循环
}
}
// 一旦执行完当前任务,并且阻塞队列中没其他任务,则将该线程关闭,并挪出线程池
synchronized (workers) {
System.out.println("任务执行完毕,线程已经被移除池子");
workers.remove(this);
}
}
}
}
3. 测试
package com.dreamer.multithread.day03;
import java.util.concurrent.TimeUnit;
public class TestPool {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool(100, 3, 5, TimeUnit.SECONDS);
for (int i = 0; i < 5; i++) {
int round = i;
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("本次任务是打印:" + round);
}
});
}
}
}
三、拒绝策略
- 如果多个任务都在等待线程池中的线程来执行,但是当前所有线程都很忙
- 多个任务就会被加入到blocking queue中,但是如果超过了blocking queue的最大值
- 除非blocking queue中的任务被其他干完活的线程开始处理了,否则blocking queue就一直添加不进去任务
# 对于添加不进阻塞队列的任务,这个任务是由调用方来发起的
1. 一直等待
2. 超时等待,过时不加
3. 让调用者自己放弃添加
4. 让调用者抛出异常
5. 让调用者自己执行任务