可以分成以上的三部分,分别是用于执行任务的线程池,一个基于生产者消费者模式的、用于平衡生产消费速率关系、存放任务的阻塞队列、以及给出任务的线程,下面对每个部分的代码进行编写
首先思考阻塞队列中的成员变量
队列的数据结构队列的最大大小用于解决并发问题的锁分别用于读写线程阻塞的条件变量
所应该具有的方法
获取队列大小存放任务(并发环境下,核心方法)获取任务(并发环境下,核心方法)
下面给出代码,该段代码的核心是阻塞添加和阻塞获取的两个方法
class BlockingQueue{ //队列 private Deque queue = new ArrayDeque<>(); //队列最大大小 private int capacity; //锁 private ReentrantLock lock = new ReentrantLock(); //读写条件变量 private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); //阻塞获取 public T take() { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); System.out.println("队列已空、获取线程等待"); } catch (InterruptedException e) { e.printStackTrace(); } } T result = queue.pollFirst(); emptyWaitSet.signal(); return result; } finally { lock.unlock(); } } //阻塞添加 public void add(T task) { lock.lock(); try { while (queue.size() == capacity) { try { fullWaitSet.await(); System.out.println("队列已满,添加线程等待"); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(task); System.out.println(task + "放入队列"); emptyWaitSet.signal(); } finally { lock.unlock(); } } }
在上述的代码当中,没有设置超时时间,因此会导致线程一致处在等待过程当中,浪费线程池中有限的线程资源,因此需要设置超时时间,当获取超时的时候返回空的结果,这里使用的是awaitNacos方法,该方法返回的是总的等待时间与已经等待的时间的差值,也就是剩余等待时间,当剩余等待时间小于0的时候证明已经等待了足够长的时间,返回结果即可,下面给出带超时时间的poll拉取方法
public T poll(long timeOut, TimeUnit timeUnit) {
lock.lock();
try{
long leftWaitTime = timeUnit.toNanos(timeOut);
while (queue.isEmpty()) {
if (leftWaitTime <= 0) return null;
try {
leftWaitTime = emptyWaitSet.awaitNanos(leftWaitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T result = queue.pollFirst();
fullWaitSet.signal();
return result;
} finally {
lock.unlock();
}
}
带超时时间的添加方法也是采用同样的思路,返回一个布尔值,标注是否添加成功
public boolean offer(T task, long timeOut, TimeUnit timeUnit) {
lock.lock();
try {
long leftWaitTime = timeUnit.toNanos(timeOut);
while (queue.size() == capacity) {
if (leftWaitTime <= 0) return false;
try {
leftWaitTime = fullWaitSet.awaitNanos(leftWaitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
线程池
线程池中需要有这样几个参数
阻塞队列线程的集合核心线程的最大数量超时时间超时时间单位
所以可以写出成员变量与构造方法
//阻塞队列
private BlockingQueue queue;
//核心线程集合
private Set workerSet = new HashSet<>();
//核心线程数量
private int capacity;
//线程超时时间
private long timeOut;
private TimeUnit timeUnit;
public ThreadPoll(int capacity, long timeOut, TimeUnit timeUnit, int queueCapacity) {
this.queue = new BlockingQueue<>(queueCapacity);
this.capacity = capacity;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
}
其次是任务的执行方法,传入的参数为一个任务,核心的逻辑是,线程数没有超过核心线程的时候,就新建一个继承了Thread的worker对象并启动线程,来对任务进行执行,既然启动了线程,那就把线程放入线程的集合当中
如果线程数已经太多,就将任务放入阻塞队列
public void execute(Runnable task) {
//Set并非线程安全的集合,因此上锁
synchronized (workerSet) {
//在线程数允许的前提下创建新的线程来完成任务,否则任务进入阻塞队列
if (workerSet.size() < capacity) {
Worker worker = new Worker(task);
worker.start();
workerSet.add(worker);
} else {
queue.add(task);
}
}
}
注意这里的worker是线程池的内部类,它拥有一个task成员变量,重写了Thread的run方法,也就是线程的执行逻辑写在了run方法当中,核心逻辑是,执行当前任务,并不断从阻塞队列中获取任务执行,达成线程的复用,当阻塞队列中没有任务了,该线程结束前更新线程的集合
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//如果任务不为空,执行
//如果为空,则从队列中获取新的任务
while (task != null || (task = queue.take()) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
//任务执行完毕,则task置为空
task = null;
}
}
//更新workerSet
synchronized (workerSet) {
workerSet.remove(this);
}
}
}
编写测试方法
public class ThreadPoolDIY {
public static void main(String[] args) {
ThreadPoll threadPoll = new ThreadPoll(2, 1000, TimeUnit.MICROSECONDS, 10);
for (int i = 0; i < 5; i++) {
int count = i;
threadPoll.execute(() -> {
System.out.println(count);
});
}
}
}
得到的结果如下
可以看到,针对最开始的两个任务创建了两个线程,其他任务进入阻塞队列,而后其他任务相继由最初的线程执行
但是程序是卡住的,因为最后两个线程都停在了队列的await方法,这是就需要加入超时机制来解决这个问题,让无用的线程释放,只要把调用队列的方法改一下就行
@Override
public void run() {
//如果任务不为空,执行
//如果为空,则从队列中获取新的任务
while (task != null || (task = queue.poll(timeOut, timeUnit)) != null) {
try {
System.out.println(task + "任务由线程" + this + "执行");
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
//任务执行完毕,则task置为空
task = null;
}
}
//更新workerSet
synchronized (workerSet) {
System.out.println(this + "worker被移除");
workerSet.remove(this);
}
}
再次测试线程正常退出
上面代码的问题在于,如果任务数量超过了线程最大数量和阻塞队列的大小,多出来的任务就无法执行,比如改写测试代码
public static void main(String[] args) {
ThreadPoll threadPoll = new ThreadPoll(2, 1000, TimeUnit.MICROSECONDS, 10);
for (int i = 0; i < 15; i++) {
int j = i;
threadPoll.execute(() -> {
System.out.println(j);
try {
Thread.sleep(150000000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
得到的结果
任务会一直等待加入阻塞队列,这主要是由于我们在线程池的执行代码中,线程不够的时候将任务进入阻塞队列,并且没有设置超时时间,调用的put方法,一直等下去,直到fullCondition的条件变量唤醒。但是我们希望这些多出来的线程能够交由调用者判断如何进行处理,因此采用了策略模式这一设计模式
首先编写接口
@FunctionalInterface interface RejectPolicy{ void reject(BlockingQueue queue, T task); }
改造线程池,加入拒绝策略的成员变量
//拒绝策略
private RejectPolicy rejectPolicy;
public ThreadPoll(int capacity, long timeOut, TimeUnit timeUnit, int queueCapacity, RejectPolicy rejectPolicy) {
this.queue = new BlockingQueue<>(queueCapacity);
this.capacity = capacity;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
this.rejectPolicy = rejectPolicy;
}
而后改造线程池的加入阻塞队列的方法,put改为tryPut
public void execute(Runnable task) {
//Set并非线程安全的集合,因此上锁
synchronized (workerSet) {
//在线程数允许的前提下创建新的线程来完成任务,否则任务进入阻塞队列
if (workerSet.size() < capacity) {
Worker worker = new Worker(task);
System.out.println("针对" + task + "新增了worker" + worker);
worker.start();
workerSet.add(worker);
} else {
System.out.println("线程数过多" + task + "任务尝试进入阻塞队列");
queue.tryPut(rejectPolicy, task);
}
}
}
编写队列的tryPut方法,核心逻辑是,如果队列没满,就执行正常的入队逻辑,加入队列,emptyCondition条件变量的唤醒,信息的打印,否则的话就执行接口中的抽象方法,抽象方法的具体拒绝逻辑由调用者确定
public void tryPut(RejectPolicy rejectPolicy, T task) {
lock.lock();
try {
//队列已满,执行拒绝策略
if (capacity == queue.size()) {
rejectPolicy.reject(this, task);
} else {
//队列没满,执行对应的逻辑
System.out.println(task + "加入队列成功");
queue.offerFirst(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
编写测试方法,自定义拒绝策略
public static void main(String[] args) {
ThreadPoll threadPoll = new ThreadPoll(2, 1000, TimeUnit.MICROSECONDS, 10,
(queue, task) -> {
System.out.println(task + "任务被放弃");
});
for (int i = 0; i < 15; i++) {
int j = i;
threadPoll.execute(() -> {
System.out.println(j);
try {
Thread.sleep(150000000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
结果如下
多出来的三个任务被放弃
当然还有死等、超时返回false、抛出异常等等拒绝策略可以采用
package com.cui.juc;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadPoolDIY {
public static void main(String[] args) {
ThreadPoll threadPoll = new ThreadPoll(2, 1000, TimeUnit.MICROSECONDS, 10,
(queue, task) -> {
System.out.println(task + "任务被放弃");
});
for (int i = 0; i < 15; i++) {
int j = i;
threadPoll.execute(() -> {
System.out.println(j);
try {
Thread.sleep(150000000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
@FunctionalInterface
interface RejectPolicy {
void reject(BlockingQueue queue, T task);
}
class ThreadPoll {
//阻塞队列
private BlockingQueue queue;
//核心线程集合
private Set workerSet = new HashSet<>();
//核心线程数量
private int capacity;
//线程超时时间
private long timeOut;
private TimeUnit timeUnit;
//拒绝策略
private RejectPolicy rejectPolicy;
public ThreadPoll(int capacity, long timeOut, TimeUnit timeUnit, int queueCapacity, RejectPolicy rejectPolicy) {
this.queue = new BlockingQueue<>(queueCapacity);
this.capacity = capacity;
this.timeOut = timeOut;
this.timeUnit = timeUnit;
this.rejectPolicy = rejectPolicy;
}
public void execute(Runnable task) {
//Set并非线程安全的集合,因此上锁
synchronized (workerSet) {
//在线程数允许的前提下创建新的线程来完成任务,否则任务进入阻塞队列
if (workerSet.size() < capacity) {
Worker worker = new Worker(task);
System.out.println("针对" + task + "新增了worker" + worker);
worker.start();
workerSet.add(worker);
} else {
System.out.println("线程数过多" + task + "任务尝试进入阻塞队列");
queue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
//如果任务不为空,执行
//如果为空,则从队列中获取新的任务
while (task != null || (task = queue.poll(timeOut, timeUnit)) != null) {
try {
System.out.println(task + "任务由线程" + this + "执行");
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
//任务执行完毕,则task置为空
task = null;
}
}
//更新workerSet
synchronized (workerSet) {
System.out.println(this + "worker被移除");
workerSet.remove(this);
}
}
}
}
class BlockingQueue {
//队列
private Deque queue = new ArrayDeque<>();
//队列最大大小
private int capacity;
//锁
private ReentrantLock lock = new ReentrantLock();
//读写条件变量
private Condition fullWaitSet = lock.newCondition();
private Condition emptyWaitSet = lock.newCondition();
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
System.out.println("队列已空、获取线程等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T result = queue.pollFirst();
emptyWaitSet.signal();
return result;
} finally {
lock.unlock();
}
}
public T poll(long timeOut, TimeUnit timeUnit) {
lock.lock();
try {
long leftWaitTime = timeUnit.toNanos(timeOut);
while (queue.isEmpty()) {
if (leftWaitTime <= 0) return null;
try {
leftWaitTime = emptyWaitSet.awaitNanos(leftWaitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T result = queue.pollFirst();
fullWaitSet.signal();
return result;
} finally {
lock.unlock();
}
}
public void add(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
fullWaitSet.await();
System.out.println("队列已满,添加线程等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
System.out.println(task + "加入任务队列成功");
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public boolean offer(T task, long timeOut, TimeUnit timeUnit) {
lock.lock();
try {
long leftWaitTime = timeUnit.toNanos(timeOut);
while (queue.size() == capacity) {
if (leftWaitTime <= 0) return false;
try {
leftWaitTime = fullWaitSet.awaitNanos(leftWaitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(task + "加入阻塞队列成功");
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy rejectPolicy, T task) {
lock.lock();
try {
//队列已满,执行拒绝策略
if (capacity == queue.size()) {
rejectPolicy.reject(this, task);
} else {
//队列没满,执行对应的逻辑
System.out.println(task + "加入队列成功");
queue.offerFirst(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}



