目录
一、介绍
二、七种阻塞队列
三、阻塞队列的实现原理
1. 初始化对象时
2. put()添加时
3. take()获取时
四、DelayQueue实例
五、参考资料
一、介绍
阻塞队列BlockingQueue是对队列的操作进行阻塞,主要有两个功能:
- 阻塞插入:队列满时,会阻塞插入元素的线程,直到队列不满。
- 阻塞移除:队列空时,会阻塞获取元素的线程,直到队列非空。
如下表所示,对阻塞插入/移除有4中处理方式,且对应的方法有所不同。
| 方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
| 插入 | add(e) | offer(e) | put(e) | offer(e, ltimeout, unit) |
| 移除 | remove() | poll() | take() | poll(timeout, unit) |
| 检查 | element() | peek() | 不可用 | 不可用 |
| 注意 | 1. 抛出异常:队列已满,抛出IllegalStateException; 队列为空,抛出NoSuchElementException 2. 返回特殊值:插入成功,则true;否则返回null; 获取成功,则为元素,否则返回null 3. 一直阻塞:直到队列可用或响应中断退出 4. 超时退出:超过指定时间则退出 | |||
二、七种阻塞队列
如下图所示,是BlockingQueue的实现类图。
有界队列是指队列的元素数量有限,即队列元素个数。公平访问队列是指阻塞的线程按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性访问有可能先阻塞的线程最后才访问队列。一般默认非公平访问,提高了并发量和吞吐量。如下表所示,是七种阻塞队列的对比。
| 类型 | 特点 |
| ArrayBlockingQueue | 1. 数组结构的有界阻塞队列; 2. FIFO原则对元素排序; 3. 支持公平访问队列,默认非公平; 4. 初始化this.items = new Object[capacity]。 |
| LinkedBlockingQueue | 1. 链表结构的有界阻塞队列; 2. FIFO原则对元素排序; 3. 默认最大长度Integer.MAX_VALUE; 4. 初始化last = head = new Node 5. 线程池SingleThreadExecutor、FixedThreadPool使用该队列。 |
| PriorityBlockingQueue | 1. 元素排序的无界阻塞队列; 2. 默认是自然升序;自定义Comparator进行排序; 3. 不能保证同顺序元素的顺序。 |
| DelayQueue | 1. 延时无界阻塞队列; 2. 只有延时到期时才能从队列获取元素; 3. 队列元素必须实现Delayed接口; 4. 应用场景:缓存系统的设计、定时任务调度等。 |
| SynchronousQueue | 1. 不存储元素的同步阻塞队列; 2. 每次添加元素时,必须等待移除(队列不存储元素),反之依然; 3. 适用场景:传递性的数据; 4. 线程池CachedThreadPool使用该队列。 |
| LinkedTransferQueue | 1. 链表结构的无界阻塞队列; 2. 消费者空闲时,添加元素时直接传输给消费者,无需添加到队列尾部; 消费者非空时,添加元素时添加到队列尾部,且等到消费者消费才返回; 3. transfer()见2;tryTransfer():判定是否能直接传给消费者。 |
| LinkedBlockingDeque | 1. 链表结构的双向阻塞队列; 2. FIFO双向队列:队列两端都可插入/移除元素; 3. 优点:两端操作,多线程减少竞争。 |
三、阻塞队列的实现原理
如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?查看java.util.concurrent.ArrayBlockingQueue使用了Condition的等待/通知机制实现。
1. 初始化对象时
如下代码所示,初始化ArrayBlockingQueue对象时,会初始化notEmpty、notFull属性。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 获取重入锁
lock = new ReentrantLock(fair);
// 等待take()操作的线程
notEmpty = lock.newCondition();
// 等待put()操作的线程
notFull = lock.newCondition();
}
2. put()添加时
添加元素时,队列已满则进入while循环,调用Condition的await()来阻塞当前线程,进入WAITING状态;退出while循环,说明队列不满,唤醒当前线程。
// 添加元素
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列已满时,Condition调用await()来阻塞当前线程
while (count == items.length)
notFull.await();
// 退出while循环,即:队列不满时,唤醒当前线程
enqueue(e);
} finally {
lock.unlock();
}
}
// 队列不满时,唤醒当前线程
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒当前线程
notEmpty.signal();
}
3. take()获取时
获取元素时,队列已空则进入while循环,调用Condition的await()来阻塞当前线程,进入WAITING状态;退出while循环,说明队列不空,唤醒当前线程。
// 移除元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列已空时,Condition调用await()来阻塞当前线程
while (count == 0)
notEmpty.await();
// 退出while循环,即:队列不空时,唤醒当前线程
return dequeue();
} finally {
lock.unlock();
}
}
// 队列不空时,唤醒当前线程
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒当前线程
notFull.signal();
return x;
}
四、DelayQueue实例
DelayQueue是支持延时获取元素的无界阻塞队列。其元素必须实现Delayed接口,在创建元素时指定过期时间。只有在延迟期满时才能从队列中提取元素。
元素实现Delayed接口,必须覆写getDelay(TimeUnit unit)、compareTo(Delayed o)方法。注意getDelay()方法时可以指定任意时间单位,一旦以秒或分作为单位,而延时时间又精确不到就麻烦了。使用时请注意当scheduledTime小于当前时间时,getDelay会返回负数。
可以参考ScheduledThreadPoolExecutor.ScheduledFutureTask类的实现。
DelayQueuedelayQueue = new DelayQueue<>(); public static SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); @Test public void delayQueueTest() throws InterruptedException { // 添加元素 new Thread(new Runnable() { @SneakyThrows @Override public void run() { for (int i = 0; i < 3; i++) { // 生成随机的过期时间,10~20s int random = (int)(Math.random()*10 + 10); // 创建元素 MyDelayElement myDelayElement = new MyDelayElement("myDelayElement" + i, random * 1000); // 添加到延时队列 delayQueue.add(myDelayElement); } } }).start(); Thread.sleep(1000); while (true) { // 延时队列为元素为0时,退出循环 if (delayQueue.isEmpty()) { break; } else { // 弹出一个元素 MyDelayElement myDelayElement = delayQueue.poll(); if (Objects.isNull(myDelayElement)) { continue; } else { System.out.println(myDelayElement.getName() + ", currentTime: " + dateFormat.format(new Date(myDelayElement.getCurrentTime())) + ", scheduledTime: " + dateFormat.format(new Date(myDelayElement.getScheduledTime()))); System.out.println("============================================="); } } } System.out.println("currentThread end"); } // 定义队列元素 public class MyDelayElement implements Delayed { // 当前时间 private final long currentTime = System.currentTimeMillis(); private String name; // 到期的绝对时间 private long scheduledTime; public MyDelayElement(String name, long delayTime) { this.name = name; // 计算到期的绝对时间 scheduledTime = currentTime + delayTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(scheduledTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int) (this.scheduledTime - ((MyDelayElement) o).scheduledTime); } public String getName() { return name; } public long getScheduledTime() { return scheduledTime; } public long getCurrentTime() { return currentTime; } }
// 代码执行结果 myDelayElement1, currentTime: 15:44:59, scheduledTime: 15:45:09 ============================================= myDelayElement0, currentTime: 15:44:59, scheduledTime: 15:45:13 ============================================= myDelayElement2, currentTime: 15:44:59, scheduledTime: 15:45:18 ============================================= currentThread end
五、参考资料
Lock锁<一> _ 基础_爱我所爱0505的博客-CSDN博客
DelayQueue实现原理及应用场景分析_五星上炕的博客-CSDN博客_delayqueue



