关于队列的概念,我们不在赘述,可以参考:队列以及队列的应用
阻塞队列就是在队列的基础上对队列的读写操作加了阻塞的功能,并且提供了线程安全的机制
- 阻塞读,当从阻塞队列中获取元素的时候,如果队列不为空,正常获取,如果队列为空,线程会阻塞,等到队列中有元素在尝试获取
- 阻塞写,当队列不满时,可以正常插入元素,但如果队列满,并不会放弃,而是会阻塞,直到队列中的元素被消费导致队列中有空位置,在尝试插入元素
BlockingQueue是阻塞队列的顶级接口,他规定了阻塞队列必须要实现的方法,下面我们列举出关于读写的几个方法:
| 方法 | 功能 |
|---|---|
| boolean add(E e) | 插入元素,成功返回true,失败会抛出一个异常IllegalStateException |
| boolean offer(E e) | 插入元素,成功返回true,失败返回false |
| void put(E e) | 插入元素,如果队列已经满了,会阻塞等待 |
| boolean offer(E e, long timeout, TimeUnit unit) | 插入元素,如果队列已经满了,会阻塞等到有空间插入,但是不会一直等下去,timeout和unit决定了等待多久 |
| boolean remove(Object o) | |
| public E poll() | |
| E take() | 检索并将队头元素出队,如果队列为空,会阻塞等待队列有元素 |
| E poll(long timeout, TimeUnit unit) | 检索并将队头元素出队,如果队列为空,会阻塞等待队列有元素,但是不会一直等下去,timeout和unit决定了等待多久 |
BlockingQueue有很多实现类,提供了各种功能强大的阻塞队列为我们所用,下面我们通过他的类图看一下具体提供了那些实现类。
- DelayedWorkQueue:优先阻塞队列,通过二叉堆保证优先级
- LinkedBlockingQueue:通过链表实现的阻塞队列
- ArrayBlockingQueue:通过数组实现的阻塞队列
- LinkedTransferQueue:当消费者来取元素的时候,如果没有元素,会创建一个空的节点,并阻塞在这里,下次生产者如果添加元素,会直接将元素添加到这个节点,唤醒生产者去消费。
- DelayQueue:支持延时获取元素的阻塞队列
- SynchronousQueue:同步队列,不存储元素,每一个put操作都会等待一个take操作完成
- PriorityBlockingQueue:优先阻塞队列
- LinkedBlockingDeque:通过双向链表实现的双向阻塞队列
static class Node{ E item; Node next; Node(E x) { item = x; } } transient Node head; private transient Node last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
以上是组成LinkedBlockingQueue的基本数据结构,通过代码可以得到以下几点关键信息:
- 首先他是通过一个单向链表维护的队列,并且提供了两个成员变量head和last,分别指向队头和队尾,队头节点不存储元素,只作为一个虚拟的头结点,方便链表的操作。
- 提供了两把由ReentrantLock实现的可重入锁takeLock和putLock,用来实现从队列中取元素和写元素时的线程阻塞和线程安全的功能。我们知道ReentrantLock是基于AQS实现的,所以自然会有用于排队的CLH队列。用来存储没有拿到锁的线程
- 两个条件队列notEmpty和notFull。这又是干什么的呢,我们知道阻塞队列取元素,如果队列为空,是需要去阻塞,那直接阻塞到CLH队列可以吗?当然不行,我们需要将他们先暂存到条件队列,等生产者给阻塞队列里面放入了元素,才可以将这个条件队列唤醒,让他们转移到CLH队列,在去竞争takeLock这把锁,否则不管有没有元素,都放到CLH队列去竞争锁,那显然是不合理的。
画一个简单的图来表示呢就是这个样子
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node(null);
}
public LinkedBlockingQueue(Collection extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
入队相关API
add
// add方法是继承自父类AbstractQueue的方法,只是调用了offer方法,
// 如果offer方法返回true,也就返回true,如果offer返回false,则抛出一个异常
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}



