ArrayBlockingQueue是阻塞队列中的一个有界队列,当队列满时,再有数据入队列,将会抛出异常。本篇主要介绍该类的相关方法。
该阻塞队列使用可重入锁ReentrantLock实现同步。
ArrayBlockingQueue(int capacity);
ArrayBlockingQueue(int capacity, boolean fair)
ArrayBlockingQueue(int capacity, boolean fair, Collection extends E> c)
这三个方法都是阻塞队列的初始化方法,
capacity:阻塞队列的初始容量
fair:该队列是否是公平队列,默认为非公平,吞吐量高
c:可以将已经存在的列表初始化到阻塞队列中。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection extends E> c) {
//this(capacity, fair);
//下面这段代码是把this(capacity, fair);方法中的代码拷贝过来了
if (capacity <= 0)
throw new IllegalArgumentException();
//实际保存数据的也是使用数组保存
this.items = new Object[capacity];
//初始化可重入锁
lock = new ReentrantLock(fair);
//绑定两个condition
notEmpty = lock.newCondition();
notFull = lock.newCondition();
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
//对当前入列的元素判空处理
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
final int dec(int i)
final E itemAt(int i)
该方法直接获取数组中指定下标的元素,不保证同步
private static void checkNotNull(Object v)
对元素判空,为空抛空指针异常,说明阻塞队列不允许null入列
private void enqueue(E x)
添加元素,并且通过notEmpty唤醒所有等待的线程,相当于唤醒消费者
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
//实际存储元素自增
count++;
//唤醒等待的线程
notEmpty.signal();
}
private E dequeue()
删除元素,从数组位置的第一个位置先删除,并且通过notFull唤醒生产者线程将数据入队列
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
void removeAt(final int removeIndex)
移除指定下标的元素,这里要注意,如果removeIndex和队列本身维护的takeIndex一样,那么可以直接执行移除即可,否则会直接将下一个元素覆盖当前元素,一直当removeIndex到了阻塞队列本身维护的putIndex位置
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//如果要移除的下标正好等于takeIndex
if (removeIndex == takeIndex) {
//直接移除,并将takeIndex置为+1
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
//这里做覆盖式的操作
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
//唤醒操作
notFull.signal();
}
public boolean add(E e)
可通过实例调用的添加元素方法,直接调用父类实现的方法,但是本质上还是调用自己实现的offer(E e)方法
public boolean offer(E e)
真实的添加队列的实现,ArrayBlockingQueue维护了一个putIndex指针,该指针直接设置了下一个元素需要插入的位置,节省了遍历查找的时间。该方法也是比较简单,当前插入的线程拿到锁之后就执行插入操作即可,如果队列已满,就会抛出异常
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException
put(E e)方法也是向阻塞队列中添加数据,但是与add方法不同的是,add方法发现队列满了之后会立即返回,但是put方法会阻塞,直接被唤醒继续执行,调用put方法的线程相当于一个生产者,这是需要区分的一个地方。并且在获取锁的过程中,响应中断优先于获取锁。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
//等待被唤醒
notFull.await();
//执行添加元素方法
enqueue(e);
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
设置等待时间的往队列中添加元素的方法。该方法重点分析是怎么实现等待时间控制的。
通过Condition.awaitNanos(int nanos)来实现。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
借助Condition维护的等待队列,在调用该方法时,会将当前线程加入到该Condition的等待队列中,并且释放出自己获取的锁(能调用Condition.await方法的线程一定是同步队列的首结点),并且唤醒同步队列中的下一个节点。之后循环的判断是否被唤醒并且重新进入到同步队列(在这中途,会响应中断并抛异常),并且在同步队列中直到获取到同步状态,返回剩余时间。
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//为当前线程新创建一个Node节点
Node node = addConditionWaiter();
//当前线程释放同步状态,就是释放锁
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
//循环判断是否重新回到了同步队列上,即是否被唤醒
while (!isOnSyncQueue(node)) {
//如果超时了,取消等待状态,退出循环
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
//重新等待在同步队列中获取同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
总结该方法就是,当队列满时,会有一个等待时间,借助AQS的Condition实现,将当前线程加入到等待队列中并且循环等待被消费者线程唤醒(期间会记录时间,超时会取消等待状态并且返回小于0的时间),阻塞队列中的返回也会返回。
public E poll()
对外提供的获取数据,从第一个数据开始获取(不阻塞,立即返回)
实际调用的是dequeue方法,该方法只是做一个加解锁的操作
public E take() throws InterruptedException
与put()方法相反,put方法是阻塞式的添加数据,take方法则是等待式的获取数据,通过notEmpty(Condition类型)等待队列,实际获取数据也是调用dequque方法。
public E poll(long timeout, TimeUnit unit) throws InterruptedException
设置等待超时时间的获取数据的方法,具体超时监听看上面的offer方法。
public E peek()
获取当前队列的队头元素,直接通过维护的takeIndex获取
**public int size() **
获取当前队列的元素个数,也是同步方法,需要加解锁操作
public int remainingCapacity()
返回队列剩余的空间,加解锁,初始化的长度-count
**public boolean remove(Object o) **
移除元素,对于移除我们要注意的点是:阻塞队列自己维护了当前的插入下标和移除下标,移除元素有可能会造成维护的混乱。让我们来看一下ArrayBlockingQueue是怎么处理的?
public boolean remove(Object o) {
if (o == null) return false;
//老规矩,加锁操作
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//从takeIndex位置开始比较,是否是需要移除的元素
if (o.equals(items[i])) {
removeAt(i);
return true;
}
//为什么需要重新回到0?因为有可能不是从0位置开始插入的
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
总结:移除一个指定的元素,首先需要找到当前元素的下标。通过takeIndex循环遍历队列,直到找到元素或者与putIndex重叠还没有找到(说明不存在于队列),找到之后调用removeAt(i)去移除具体的元素
public boolean contains(Object o)
查询队列是否包含某个元素,除开remove(Object o)需要移除元素之外,其他操作一致。
二、相关参数final Object[] items; 实际保存数据的数组
int takeIndex; 指向入队列最早的一个元素的下标
int putIndex; 下一次添加元素时的下标
int count; 队列中的元素个数
final ReentrantLock lock; 队列实现同步机制的同步锁
private final Condition notEmpty; 队列中唤醒等待获取数据的消费者线程的同步条件
三、总结private final Condition notFull; 队列中唤醒等会添加数据的生产者线程的同步条件
ArrayBlockingQueue使用ReentrantLock以及与之相对应的Condition实现队列的同步以及超时机制,并且通过维护takeIndex和putIndex指针,可以保证数据的顺序存储以及获取,并且大大的提高了性能。
有了AQS的相关知识,理解阻塞队列的实现是非常简单的
ps:ArrayBlockingQueue还应该看的是怎么实现迭代器的,主要通过Itr和Itrs两个内部类来实现,将会在下一篇接着讲解(不定时)
路漫漫其修远兮,本菜鸟还将上下而求索!!!
同学们,冲冲冲



