栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ArrayBlockingQueue

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ArrayBlockingQueue

ArrayBlockingQueue是阻塞队列中的一个有界队列,当队列满时,再有数据入队列,将会抛出异常。本篇主要介绍该类的相关方法。
该阻塞队列使用可重入锁ReentrantLock实现同步。

一、相关方法

ArrayBlockingQueue(int capacity);
ArrayBlockingQueue(int capacity, boolean fair)
ArrayBlockingQueue(int capacity, boolean fair, Collection c)

这三个方法都是阻塞队列的初始化方法,
capacity:阻塞队列的初始容量
fair:该队列是否是公平队列,默认为非公平,吞吐量高
c:可以将已经存在的列表初始化到阻塞队列中。

public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection c) {
        //this(capacity, fair);
        //下面这段代码是把this(capacity, fair);方法中的代码拷贝过来了
        if (capacity <= 0)
            throw new IllegalArgumentException();
        //实际保存数据的也是使用数组保存
        this.items = new Object[capacity];
        //初始化可重入锁
        lock = new ReentrantLock(fair);
        //绑定两个condition
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
		
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                	//对当前入列的元素判空处理
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

final int dec(int i)

final E itemAt(int i)

该方法直接获取数组中指定下标的元素,不保证同步

private static void checkNotNull(Object v)

对元素判空,为空抛空指针异常,说明阻塞队列不允许null入列

private void enqueue(E x)

添加元素,并且通过notEmpty唤醒所有等待的线程,相当于唤醒消费者

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        //实际存储元素自增
        count++;
        //唤醒等待的线程
        notEmpty.signal();
    }

private E dequeue()

删除元素,从数组位置的第一个位置先删除,并且通过notFull唤醒生产者线程将数据入队列

private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

void removeAt(final int removeIndex)

移除指定下标的元素,这里要注意,如果removeIndex和队列本身维护的takeIndex一样,那么可以直接执行移除即可,否则会直接将下一个元素覆盖当前元素,一直当removeIndex到了阻塞队列本身维护的putIndex位置

void removeAt(final int removeIndex) {
        final Object[] items = this.items;
        //如果要移除的下标正好等于takeIndex
        if (removeIndex == takeIndex) {
            //直接移除,并将takeIndex置为+1
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                
                if (next != putIndex) {
                	//这里做覆盖式的操作
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        //唤醒操作
        notFull.signal();
    }

public boolean add(E e)

可通过实例调用的添加元素方法,直接调用父类实现的方法,但是本质上还是调用自己实现的offer(E e)方法

public boolean offer(E e)

真实的添加队列的实现,ArrayBlockingQueue维护了一个putIndex指针,该指针直接设置了下一个元素需要插入的位置,节省了遍历查找的时间。该方法也是比较简单,当前插入的线程拿到锁之后就执行插入操作即可,如果队列已满,就会抛出异常

public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

public void put(E e) throws InterruptedException

put(E e)方法也是向阻塞队列中添加数据,但是与add方法不同的是,add方法发现队列满了之后会立即返回,但是put方法会阻塞,直接被唤醒继续执行,调用put方法的线程相当于一个生产者,这是需要区分的一个地方。并且在获取锁的过程中,响应中断优先于获取锁。

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
            	//等待被唤醒
                notFull.await();
            //执行添加元素方法
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

设置等待时间的往队列中添加元素的方法。该方法重点分析是怎么实现等待时间控制的。
通过Condition.awaitNanos(int nanos)来实现。

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

借助Condition维护的等待队列,在调用该方法时,会将当前线程加入到该Condition的等待队列中,并且释放出自己获取的锁(能调用Condition.await方法的线程一定是同步队列的首结点),并且唤醒同步队列中的下一个节点。之后循环的判断是否被唤醒并且重新进入到同步队列(在这中途,会响应中断并抛异常),并且在同步队列中直到获取到同步状态,返回剩余时间。

public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //为当前线程新创建一个Node节点
            Node node = addConditionWaiter();
            //当前线程释放同步状态,就是释放锁
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            //循环判断是否重新回到了同步队列上,即是否被唤醒
            while (!isOnSyncQueue(node)) {
            	//如果超时了,取消等待状态,退出循环
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                //
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            //重新等待在同步队列中获取同步状态
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }

总结该方法就是,当队列满时,会有一个等待时间,借助AQS的Condition实现,将当前线程加入到等待队列中并且循环等待被消费者线程唤醒(期间会记录时间,超时会取消等待状态并且返回小于0的时间),阻塞队列中的返回也会返回。

public E poll()

对外提供的获取数据,从第一个数据开始获取(不阻塞,立即返回)
实际调用的是dequeue方法,该方法只是做一个加解锁的操作

public E take() throws InterruptedException

与put()方法相反,put方法是阻塞式的添加数据,take方法则是等待式的获取数据,通过notEmpty(Condition类型)等待队列,实际获取数据也是调用dequque方法。

public E poll(long timeout, TimeUnit unit) throws InterruptedException

设置等待超时时间的获取数据的方法,具体超时监听看上面的offer方法。

public E peek()

获取当前队列的队头元素,直接通过维护的takeIndex获取

**public int size() **

获取当前队列的元素个数,也是同步方法,需要加解锁操作

public int remainingCapacity()

返回队列剩余的空间,加解锁,初始化的长度-count

**public boolean remove(Object o) **

移除元素,对于移除我们要注意的点是:阻塞队列自己维护了当前的插入下标和移除下标,移除元素有可能会造成维护的混乱。让我们来看一下ArrayBlockingQueue是怎么处理的?

public boolean remove(Object o) {
        if (o == null) return false;
        //老规矩,加锁操作
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                	//从takeIndex位置开始比较,是否是需要移除的元素
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    //为什么需要重新回到0?因为有可能不是从0位置开始插入的
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

总结:移除一个指定的元素,首先需要找到当前元素的下标。通过takeIndex循环遍历队列,直到找到元素或者与putIndex重叠还没有找到(说明不存在于队列),找到之后调用removeAt(i)去移除具体的元素

public boolean contains(Object o)

查询队列是否包含某个元素,除开remove(Object o)需要移除元素之外,其他操作一致。

二、相关参数

final Object[] items; 实际保存数据的数组

int takeIndex; 指向入队列最早的一个元素的下标

int putIndex; 下一次添加元素时的下标

int count; 队列中的元素个数

final ReentrantLock lock; 队列实现同步机制的同步锁

private final Condition notEmpty; 队列中唤醒等待获取数据的消费者线程的同步条件

private final Condition notFull; 队列中唤醒等会添加数据的生产者线程的同步条件

三、总结

ArrayBlockingQueue使用ReentrantLock以及与之相对应的Condition实现队列的同步以及超时机制,并且通过维护takeIndex和putIndex指针,可以保证数据的顺序存储以及获取,并且大大的提高了性能。

有了AQS的相关知识,理解阻塞队列的实现是非常简单的

ps:ArrayBlockingQueue还应该看的是怎么实现迭代器的,主要通过Itr和Itrs两个内部类来实现,将会在下一篇接着讲解(不定时)



路漫漫其修远兮,本菜鸟还将上下而求索!!!
同学们,冲冲冲

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/666006.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号