栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

阻塞队列之 LinkedBlockingQueue

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

阻塞队列之 LinkedBlockingQueue

阻塞队列之LinkedBlockingQueue 阻塞队列的概念

关于队列的概念,我们不在赘述,可以参考:队列以及队列的应用

阻塞队列就是在队列的基础上对队列的读写操作加了阻塞的功能,并且提供了线程安全的机制

  1. 阻塞读,当从阻塞队列中获取元素的时候,如果队列不为空,正常获取,如果队列为空,线程会阻塞,等到队列中有元素在尝试获取
  2. 阻塞写,当队列不满时,可以正常插入元素,但如果队列满,并不会放弃,而是会阻塞,直到队列中的元素被消费导致队列中有空位置,在尝试插入元素
阻塞队列的核心API

BlockingQueue是阻塞队列的顶级接口,他规定了阻塞队列必须要实现的方法,下面我们列举出关于读写的几个方法:

方法功能
boolean add(E e)插入元素,成功返回true,失败会抛出一个异常IllegalStateException
boolean offer(E e)插入元素,成功返回true,失败返回false
void put(E e)插入元素,如果队列已经满了,会阻塞等待
boolean offer(E e, long timeout, TimeUnit unit)插入元素,如果队列已经满了,会阻塞等到有空间插入,但是不会一直等下去,timeout和unit决定了等待多久
boolean remove(Object o)
public E poll()
E take()检索并将队头元素出队,如果队列为空,会阻塞等待队列有元素
E poll(long timeout, TimeUnit unit)检索并将队头元素出队,如果队列为空,会阻塞等待队列有元素,但是不会一直等下去,timeout和unit决定了等待多久
常用的阻塞队列

BlockingQueue有很多实现类,提供了各种功能强大的阻塞队列为我们所用,下面我们通过他的类图看一下具体提供了那些实现类。

  • DelayedWorkQueue:优先阻塞队列,通过二叉堆保证优先级
  • LinkedBlockingQueue:通过链表实现的阻塞队列
  • ArrayBlockingQueue:通过数组实现的阻塞队列
  • LinkedTransferQueue:当消费者来取元素的时候,如果没有元素,会创建一个空的节点,并阻塞在这里,下次生产者如果添加元素,会直接将元素添加到这个节点,唤醒生产者去消费。
  • DelayQueue:支持延时获取元素的阻塞队列
  • SynchronousQueue:同步队列,不存储元素,每一个put操作都会等待一个take操作完成
  • PriorityBlockingQueue:优先阻塞队列
  • LinkedBlockingDeque:通过双向链表实现的双向阻塞队列
LinkedBlockingQueue源码探究 基本数据结构
static class Node {
    E item;

    
    Node next;

    Node(E x) { item = x; }
}


transient Node head;


private transient Node last;


private final ReentrantLock takeLock = new ReentrantLock();


private final Condition notEmpty = takeLock.newCondition();


private final ReentrantLock putLock = new ReentrantLock();


private final Condition notFull = putLock.newCondition();

以上是组成LinkedBlockingQueue的基本数据结构,通过代码可以得到以下几点关键信息:

  1. 首先他是通过一个单向链表维护的队列,并且提供了两个成员变量head和last,分别指向队头和队尾,队头节点不存储元素,只作为一个虚拟的头结点,方便链表的操作。
  2. 提供了两把由ReentrantLock实现的可重入锁takeLock和putLock,用来实现从队列中取元素和写元素时的线程阻塞和线程安全的功能。我们知道ReentrantLock是基于AQS实现的,所以自然会有用于排队的CLH队列。用来存储没有拿到锁的线程
  3. 两个条件队列notEmpty和notFull。这又是干什么的呢,我们知道阻塞队列取元素,如果队列为空,是需要去阻塞,那直接阻塞到CLH队列可以吗?当然不行,我们需要将他们先暂存到条件队列,等生产者给阻塞队列里面放入了元素,才可以将这个条件队列唤醒,让他们转移到CLH队列,在去竞争takeLock这把锁,否则不管有没有元素,都放到CLH队列去竞争锁,那显然是不合理的。

画一个简单的图来表示呢就是这个样子

构造方法
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}


public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node(null);
}


public LinkedBlockingQueue(Collection c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}
入队相关API

add

// add方法是继承自父类AbstractQueue的方法,只是调用了offer方法,
// 如果offer方法返回true,也就返回true,如果offer返回false,则抛出一个异常
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

offer

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/830913.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号