阻塞队列介绍
Queue接口 是阻塞队列的规范 所有阻塞队列都会实现它
public interface Queueextends Collection { //添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常 boolean add(E e); //添加一个元素,添加成功返回true, 如果队列满了,返回false boolean offer(E e); //返回并删除队首元素,队列为空则抛出异常 E remove(); //返回并删除队首元素,队列为空则返回null E poll(); //返回队首元素,但不移除,队列为空则抛出异常 E element(); //获取队首元素,但不移除,队列为空则返回null E peek();
BlockingQueue接口
BlockingQueue 继承了 Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。两个附加操作:
支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法take: 队列空时,获取元素的线程会等待队列变为非空
入队:
(1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不阻塞)
(2)offer(E e, long timeout, TimeUnit unit):可以设置阻塞时间,如果队列已满,则进行阻塞。超过阻塞时间,则返回false
(3)put(E e):队列没满的时候是正常的插入,如果队列已满,则阻塞,直至队列空出位置
出队:
(1)poll():如果有数据,出队,如果没有数据,返回null (不阻塞)
(2)poll(long timeout, TimeUnit unit):可以设置阻塞时间,如果没有数据,则阻塞,超过阻塞时间,则返回null
(3)take():队列里有数据会正常取出数据并删除;但是如果队列里无数据,则阻塞,直到队列里有数据
BlockingQueue常用方法示例
当队列满了无法添加元素,或者是队列空了无法移除元素时:
抛出异常:add、remove、element
返回结果但不抛出异常:offer、poll、peek
阻塞:put、take
public class BlockingQueueTest {
public static void main(String[] args) {
addTest();
}
private static void addTest() {
BlockingQueue blockingQueue = new ArrayBlockingQueue(2);
System.out.println(blockingQueue.add(1));
System.out.println(blockingQueue.add(2));
System.out.println(blockingQueue.add(3));
}
private static void removeTest() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);
blockingQueue.add(1);
blockingQueue.add(2);
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
}
private static void elementTest() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);
blockingQueue.element();
}
private static void offerTest(){
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(2));
System.out.println(blockingQueue.offer(3));
}
private static void pollTest() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
blockingQueue.offer(1);
blockingQueue.offer(2);
blockingQueue.offer(3);
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
private static void peekTest() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(2);
System.out.println(blockingQueue.peek());
}
private static void putTest(){
BlockingQueue blockingQueue = new ArrayBlockingQueue(2);
try {
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void takeTest(){
BlockingQueue blockingQueue = new ArrayBlockingQueue(2);
try {
blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ArrayBlockingQueue
ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。
ArrayBlockingQueue使用
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put(“1”); //向队列中添加元素
ArrayBlockingQueue的原理
数据结构
利用了Lock锁的Condition通知机制进行阻塞控制。
核心:一把锁,两个条件
//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者
private final Condition notEmpty;
//生产者
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
...
lock = new ReentrantLock(fair); //公平,非公平
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
入队put方法
public void put(E e) throws InterruptedException {
//检查是否为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁,如果线程中断抛出异常
lock.lockInterruptibly();
try {
//阻塞队列已满,则将生产者挂起,等待消费者唤醒
//设计注意点: 用while不用if是为了防止虚假唤醒
while (count == items.length)
notFull.await(); //队列满了,使用notFull等待(生产者阻塞)
// 入队
enqueue(e);
} finally {
lock.unlock(); // 唤醒消费者线程
}
}
private void enqueue(E x) {
final Object[] items = this.items;
//入队 使用的putIndex
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0; //设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部
count++;
//notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了
notEmpty.signal();
}
出队take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加锁,如果线程中断抛出异常
lock.lockInterruptibly();
try {
//如果队列为空,则消费者挂起
while (count == 0)
notEmpty.await();
//出队
return dequeue();
} finally {
lock.unlock();// 唤醒生产者线程
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //取出takeIndex位置的元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0; //设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部
count--;
if (itrs != null)
itrs.elementDequeued();
//notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位
notFull.signal();
return x;
}
它的设计精髓 环形数组很好的提升了效率
linkedBlockingQueue
linkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 linkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
linkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。linkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说linkedBlockingQueue是读写分离的,读写操作可以并行执行。
linkedBlockingQueue使用
指定队列的大小创建有界队列
BlockingQueueboundedQueue = new linkedBlockingQueue<>(100);
无界队列
BlockingQueueunboundedQueue = new linkedBlockingQueue<>();
linkedBlockingQueue的原理
数据结构
// 容量,指定容量就是有界队列
private final int capacity; // 元素数量 private final AtomicInteger count = new AtomicInteger(); // 链表头 本身是不存储任何元素的,初始化时item指向null transient Nodehead; // 链表尾 private transient Node last; // take锁 锁分离,提高效率 private final ReentrantLock takeLock = new ReentrantLock(); // notEmpty条件 // 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒 private final Condition notEmpty = takeLock.newCondition(); // put锁 private final ReentrantLock putLock = new ReentrantLock(); // notFull条件 // 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒 private final Condition notFull = putLock.newCondition(); //典型的单链表结构 static class Node { E item; //存储元素 Node next; //后继节点 单链表结构 Node(E x) { item = x; } } 构造器 public linkedBlockingQueue() { // 如果没传容量,就使用最大int值初始化其容量 this(Integer.MAX_VALUE); } public linkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 初始化head和last指针为空值节点 last = head = new Node (null); } 入队put方法 public void put(E e) throws InterruptedException { // 不允许null元素 if (e == null) throw new NullPointerException(); int c = -1; // 新建一个节点 Node node = new Node (e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 使用put锁加锁 putLock.lockInterruptibly(); try { // 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程) while (count.get() == capacity) { notFull.await(); } // 队列不满,就入队 enqueue(node); c = count.getAndIncrement();// 队列长度加1,返回原值 // 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队) // 这里为啥要唤醒一下呢? // 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); // 真正唤醒生产者线程 } // 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程 if (c == 0) signalNotEmpty(); } private void enqueue(Node node) { // 直接加到last后面,last指向入队元素 last = last.next = node; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock();// 加take锁 try { notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程 } finally { takeLock.unlock(); // 真正唤醒消费者线程 } 出队take方法 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 使用takeLock加锁 takeLock.lockInterruptibly(); try { // 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞) while (count.get() == 0) { notEmpty.await(); } // 否则,出队 x = dequeue(); c = count.getAndDecrement();//长度-1,返回原值 if (c > 1)// 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理 notEmpty.signal(); } finally { takeLock.unlock(); // 真正唤醒消费者线程 } // 为什么队列是满的才唤醒阻塞在notFull上的线程呢? // 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程, // 这也是锁分离带来的代价 // 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程 if (c == capacity) signalNotFull(); return x; } private E dequeue() { // head节点本身是不存储任何元素的 // 这里把head删除,并把head下一个节点作为新的值 // 并把其值置空,返回原来的值 Node h = head; Node first = h.next; h.next = h; // 方便GC head = first; E x = first.item; first.item = null; return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程 } finally { putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程 } }



