栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Java数据结构及算法实战】系列012:Java队列06——数组实现的优先级阻塞队列PriorityBlockingQueue

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Java数据结构及算法实战】系列012:Java队列06——数组实现的优先级阻塞队列PriorityBlockingQueue

PriorityBlockingQueue是基于数组实现的无界优先级阻塞队列。PriorityBlockingQueue与PriorityQueue类似,其中的元素按其自然顺序排序,或由队列构造时提供的比较器根据所使用的构造函数排序。优先级队列不允许空元素,依赖自然顺序的优先级队列也不允许插入不可比较的对象。相比于PriorityQueue而言,PriorityBlockingQueue一个最大的优势是线程安全的。

PriorityBlockingQueue是Java Collections Framework的一个成员。

1.   PriorityBlockingQueue的声明

PriorityBlockingQueue的接口和继承关系如下

public class PriorityBlockingQueue extends AbstractQueue

    implements BlockingQueue, java.io.Serializable {   …

}

完整的接口继承关系如下图所示。

从上述代码可以看出,PriorityBlockingQueue既实现了BlockingQueue和java.io.Serializable接口,又继承了java.util.AbstractQueue。其中,AbstractQueue是Queue接口的抽象类,核心代码如下。

2.   PriorityBlockingQueue的成员变量和构造函数

以下是PriorityBlockingQueue的构造函数和成员变量。

// 默认数组容量

private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 最大数组容量

    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 元素数组

    private transient Object[] queue;

// 队列中的元素个数

    private transient int size;

    // 比较器

    private transient Comparator comparator;

// 操作数组确保原子性的锁

    private final ReentrantLock lock = new ReentrantLock();

// 数组非空的条件判断

    private final Condition notEmpty = lock.newCondition();

// 分配用Spinlock,通过CAS获取

    private transient volatile int allocationSpinLock;

    public PriorityBlockingQueue() {

        this(DEFAULT_INITIAL_CAPACITY, null);

    }

    public PriorityBlockingQueue(int initialCapacity) {

        this(initialCapacity, null);

    }

    public PriorityBlockingQueue(int initialCapacity,

                                 Comparator comparator) {

        if (initialCapacity < 1)

            throw new IllegalArgumentException();

        this.comparator = comparator;

        this.queue = new Object[Math.max(1, initialCapacity)];

    }

    public PriorityBlockingQueue(Collection c) {

        boolean heapify = true; // true if not known to be in heap order

        boolean screen = true;  // true if must screen for nulls

        if (c instanceof SortedSet) {

            SortedSet ss = (SortedSet) c;

            this.comparator = (Comparator) ss.comparator();

            heapify = false;

        }

        else if (c instanceof PriorityBlockingQueue) {

            PriorityBlockingQueue pq =

                (PriorityBlockingQueue) c;

            this.comparator = (Comparator) pq.comparator();

            screen = false;

            if (pq.getClass() == PriorityBlockingQueue.class) // exact match

                heapify = false;

        }

        Object[] es = c.toArray();

        int n = es.length;

        // If c.toArray incorrectly doesn't return Object[], copy it.

        if (es.getClass() != Object[].class)

            es = Arrays.copyOf(es, n, Object[].class);

        if (screen && (n == 1 || this.comparator != null)) {

            for (Object e : es)

                if (e == null)

                    throw new NullPointerException();

        }

        this.queue = ensureNonEmpty(es);

        this.size = n;

        if (heapify)

            heapify();

    }

从上述代码可以看出,构造函数有4种。构造函数中的参数含义如下

l  initialCapacity用于设置队列中内部数组的容量。如果没有指定,则会使用默认数组容量DEFAULT_INITIAL_CAPACITY的值。

l  comparator为比较器

l  c用于设置最初包含给定集合的元素,按集合迭代器的遍历顺序添加

类成员queue是一个数组,用于存储队列中的元素。size用于记录队列中的元素个数。

通过ReentrantLock和加锁条件notEmpty来实现并发控制。

3.   PriorityBlockingQueue的核心方法

以下对PriorityBlockingQueue常用核心方法的实现原理进行解释。

3.1.     offer(e)

执行offer(e)方法后有两种结果

l  队列未达到容量时,返回 true

l  队列达到容量时,先扩容,再返回 true

PriorityBlockingQueue的offer (e)方法源码如下:

public boolean offer(E e) {

        if (e == null)

            throw new NullPointerException();

        final ReentrantLock lock = this.lock;

        lock.lock();  // 加锁

        int n, cap;

        Object[] es;

        while ((n = size) >= (cap = (es = queue).length))

            tryGrow(es, cap);  // 扩容

        try {

            final Comparator cmp;

            if ((cmp = comparator) == null)

                siftUpComparable(n, e, es);

            else

                siftUpUsingComparator(n, e, es, cmp);

            size = n + 1;

            notEmpty.signal();  // 唤醒等待中的线程

        } finally {

            lock.unlock();  // 解锁

        }

        return true;

    }

从上面代码可以看出,执行offer(e)方法时,分为以下几个步骤:

l  为了确保并发操作的安全先做了加锁处理。

l  判断待入队的元素e是否为null。为null则抛出NullPointerException异常。

l  判断当前队列中的元素是否已经大于等于队列的容量,如果是则证明队列已经满了,需要先通过tryGrow()方法扩容。

l  通过siftUpComparable ()或者siftUpUsingComparator()方法插入数据元素。

l  通过执行notEmpty.signal()方法来唤醒等待中的线程。

l  最后解锁。

tryGrow()方法源码如下:

private void tryGrow(Object[] array, int oldCap) {

        lock.unlock(); // 必须释放并重新获取锁

        Object[] newArray = null;

        if (allocationSpinLock == 0 &&

            ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {

            try {

                int newCap = oldCap + ((oldCap < 64) ?

                                       (oldCap + 2) :

                                       (oldCap >> 1));

                if (newCap - MAX_ARRAY_SIZE > 0) {

                    int minCap = oldCap + 1;

                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

                        throw new OutOfMemoryError();

                    newCap = MAX_ARRAY_SIZE;

                }

                if (newCap > oldCap && queue == array)

                    newArray = new Object[newCap];

            } finally {

                allocationSpinLock = 0;

            }

        }

        if (newArray == null)

            Thread.yield();

        lock.lock();

        if (newArray != null && queue == array) {

            queue = newArray;

            System.arraycopy(array, 0, newArray, 0, oldCap);

        }

}

siftUpComparable()方法和siftUpUsingComparator()方法源码如下:

private static  void siftUpComparable(int k, T x, Object[] es) {

        Comparable key = (Comparable) x;

        while (k > 0) {

            int parent = (k - 1) >>> 1;

            Object e = es[parent];

            if (key.compareTo((T) e) >= 0)

                break;

            es[k] = e;

            k = parent;

        }

        es[k] = key;

    }

    private static  void siftUpUsingComparator(

        int k, T x, Object[] es, Comparator cmp) {

        while (k > 0) {

            int parent = (k - 1) >>> 1;

            Object e = es[parent];

            if (cmp.compare(x, (T) e) >= 0)

                break;

            es[k] = e;

            k = parent;

        }

        es[k] = x;

    }

在上述代码中,在位置k处插入项x,通过向上提升x到树形结构中来维护堆的不变性,直到x大于或等于它的父节点或根节点。

3.2.     put(e)

执行put(e)方法后有两种结果:

•      

l  队列未满时,直接插入没有返回值

l  队列满时,会扩容后再插入

PriorityBlockingQueue的put (e)方法源码如下:

public void put(E e) {

        offer(e); // 不会阻塞

    }

从上面代码可以看出,put(e)方法的实现等同于offer(e),因此队列满时会自动扩容,再插入元素,不会阻塞队列。

3.3.     offer(e,time,unit)

offer(e,time,unit)方法与offer(e)方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false。执行offer(e,time,unit)方法有两种结果:

•      

l  队列未满时,返回 true

l  队列满时,先扩容,再返回 true

PriorityBlockingQueue的put (e)方法源码如下:

public boolean offer(E e, long timeout, TimeUnit unit) {

        return offer(e); // 不会阻塞

}

从上面代码可以看出,offer(e,time,unit)方法的实现等同于offer(e),因此队列满时会自动扩容,再插入元素,不会阻塞队列。

3.4.     add(e)

执行add(e)方法后有有两种结果

l  队列未达到容量时,返回 true

l  队列达到容量时,先扩容,再返回 true

PriorityBlockingQueue的add(e)方法源码如下:

    public boolean add(E e) {

        return offer(e);

}

从上面代码可以看出,add(e)方法等同于offer(e)方法的实现。

3.5.     poll ()

执行poll()方法后有两种结果:

l  队列不为空时,返回队首值并移除

l  队列为空时,返回 null

PriorityBlockingQueue的poll()方法源码如下:

public E poll() {

        final ReentrantLock lock = this.lock;

        lock.lock();  // 加锁

        try {

            return dequeue(); // 出队

        } finally {

            lock.unlock();  // 解锁

        }

}

从上面代码可以看出,执行poll()方法时,分为以下几个步骤:

l  为了确保并发操作的安全先做了加锁处理。

l  执行dequeue()方法做元素的出队。

l  最后解锁。

dequeue()方法源码如下:

 

private E dequeue() {

        final Object[] es;

        final E result;

        if ((result = (E) ((es = queue)[0])) != null) {

            final int n;

            final E x = (E) es[(n = --size)];

            es[n] = null;

            if (n > 0) {

                final Comparator cmp;

                if ((cmp = comparator) == null)

                    siftDownComparable(0, x, es, n);

                else

                    siftDownUsingComparator(0, x, es, n, cmp);

            }

        }

        return result;

    }

private static  void siftDownComparable(int k, T x, Object[] es, int n) {

        Comparable key = (Comparable)x;

        int half = n >>> 1;

        while (k < half) {

            int child = (k << 1) + 1;

            Object c = es[child];

            int right = child + 1;

            if (right < n &&

                ((Comparable) c).compareTo((T) es[right]) > 0)

                c = es[child = right];

            if (key.compareTo((T) c) <= 0)

                break;

            es[k] = c;

            k = child;

        }

        es[k] = key;

    }

    private static  void siftDownUsingComparator(

        int k, T x, Object[] es, int n, Comparator cmp) {

        int half = n >>> 1;

        while (k < half) {

            int child = (k << 1) + 1;

            Object c = es[child];

            int right = child + 1;

            if (right < n && cmp.compare((T) c, (T) es[right]) > 0)

                c = es[child = right];

            if (cmp.compare(x, (T) c) <= 0)

                break;

            es[k] = c;

            k = child;

        }

        es[k] = x;

}

出队的原理是是这样的,在位置k处插入项x,通过反复将x降级到树中来维护堆的不变性,直到它小于或等于其子项或是一个叶子。

3.6.     take()

执行take()方法后有两种结果:

l  队列不为空时,返回队首值并移除

l  队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值

PriorityBlockingQueue的take ()方法源码如下:

public E take() throws InterruptedException {

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();  // 获取锁

        E result;

        try {

            while ( (result = dequeue()) == null)  // 出队

                notEmpty.await();  // 使线程等待

        } finally {

            lock.unlock();  // 解锁

        }

        return result;

    }

从上面代码可以看出,执行take()方法时,分为以下几个步骤:

l  先是要获取锁。

l  执行dequeue()方法做元素的出队。如果出队元素是null,则线程等待。

l  最后解锁。

dequeue()方法此处不再赘述。

3.7.     poll(time,unit)

poll(time,unit)方法与poll()方法不同之处在于,前者加入了等待机制。设定等待的时间,如果在指定时间内队列还为空,则返回null。执行poll(time,unit)方法后有两种结果:

l  队列不为空时,返回队首值并移除

l  队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null

PriorityBlockingQueue的poll(time,unit)方法源码如下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {

        long nanos = unit.toNanos(timeout);

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();  // 获取锁

        E result;

        try {

            while ( (result = dequeue()) == null && nanos > 0) // 出队

                nanos = notEmpty.awaitNanos(nanos);  // 使线程等待指定的时间

        } finally {

            lock.unlock();  // 解锁

        }

        return result;

}

从上面代码可以看出,执行poll(time,unit)方法时,分为以下几个步骤:

l  先是要获取锁。

l  执行dequeue()方法做元素的出队。如果出队元素是null,则线程等待。

l  最后解锁。

dequeue()方法此处不再赘述。

3.8.     remove()

执行remove()方法后有两种结果:

l  队列不为空时,返回队首值并移除

l  队列为空时,抛出异常

PriorityBlockingQueue的remove()方法其实是调用了父类AbstractQueue的remove ()方法,源码如下:

public E remove() {

        E x = poll();

        if (x != null)

            return x;

        else

            throw new NoSuchElementException();

}

从上面代码可以看出,remove()直接调用了poll()方法。如果poll()方法返回结果为null,则抛出NoSuchElementException异常。

poll()方法此处不再赘述。

3.9.     peek()

执行peek()方法后有两种结果:

l  队列不为空时,返回队首值但不移除

l  队列为空时,返回null

peek()方法源码如下:

public E peek() {

        final ReentrantLock lock = this.lock;

        lock.lock();  // 加锁

        try {

            return (E) queue[0];

        } finally {

            lock.unlock();  // 解锁

        }

}

从上面代码可以看出,peek()方法比较简单,直接就是获取了数组里面的索引为0的元素。

3.10.            element()

执行element()方法后有两种结果:

l  队列不为空时,返回队首值但不移除

l  队列为空时,抛出异常

element()方法其实是调用了父类AbstractQueue的element()方法,源码如下:

public E element() {

        E x = peek();

        if (x != null)

            return x;

        else

            throw new NoSuchElementException();

}

从上面代码可以看出,执行element()方法时,先是获取peek()方法的结果,如果结果是null,则抛出NoSuchElementException异常。

4.   PriorityBlockingQueue的单元测试

PriorityBlockingQueue的单元测试如下:

package com.waylau.java.demo.datastructure;

import static org.junit.jupiter.api.Assertions.assertEquals;

import static org.junit.jupiter.api.Assertions.assertNotNull;

import static org.junit.jupiter.api.Assertions.assertNull;

import static org.junit.jupiter.api.Assertions.assertThrows;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.NoSuchElementException;

import java.util.Queue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.PriorityBlockingQueue;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

class PriorityBlockingQueueTests {

    @Test

    void testOffer() {

        // 初始化队列

        Queue queue = new PriorityBlockingQueue(3);

        // 测试队列未满时,返回 true

        boolean resultNotFull = queue.offer("Java");

        assertTrue(resultNotFull);

        // 测试队列达到容量时,会自动扩容

        queue.offer("C");

        queue.offer("Python");

        boolean resultFull = queue.offer("C++"); // 扩容

        assertTrue(resultFull);

    }

    @Test

    void testPut() throws InterruptedException {

        // 初始化队列

        BlockingQueue queue = new PriorityBlockingQueue(3);

        // 测试队列未满时,直接插入没有返回值;

        queue.put("Java");

        // 测试队列满则扩容。

        queue.put("C");

        queue.put("Python");

        queue.put("C++");

    }

    @Test

    void testOfferTime() throws InterruptedException {

        // 初始化队列

        BlockingQueue queue = new PriorityBlockingQueue(3);

        // 测试队列未满时,返回 true

        boolean resultNotFull = queue.offer("Java", 5, TimeUnit.SECONDS);

        assertTrue(resultNotFull);

        // 测试队列满则扩容,返回true

        queue.offer("C");

        queue.offer("Python");

        boolean resultFull = queue.offer("C++", 5, TimeUnit.SECONDS); // 不会阻塞

        assertTrue(resultFull);

    }

   

    @Test

    void testAdd() {

        // 初始化队列

        Queue queue = new PriorityBlockingQueue(3);

        // 测试队列未满时,返回 true

        boolean resultNotFull = queue.add("Java");

        assertTrue(resultNotFull);

        // 测试队列满则扩容,返回 true

        queue.add("C");

        queue.add("Python");

        boolean resultFull = queue.add("C++"); // 扩容

        assertTrue(resultFull);

    }

    @Test

    void testPoll() throws InterruptedException {

        // 初始化队列

        Queue queue = new PriorityBlockingQueue(3);

        // 测试队列为空时,返回 null

        String resultEmpty = queue.poll();

        assertNull(resultEmpty);

        // 测试队列不为空时,返回队首值并移除

        queue.add("Java");

        queue.add("C");

        queue.add("Python");

        String resultNotEmpty = queue.poll();

        assertEquals("C", resultNotEmpty);

    }

    @Test

    void testTake() throws InterruptedException {

        // 初始化队列

        BlockingQueue queue = new PriorityBlockingQueue(3);

        // 测试队列不为空时,返回队首值并移除

        queue.put("Java");

        queue.put("C");

        queue.put("Python");

        String resultNotEmpty = queue.take();

        assertEquals("C", resultNotEmpty);

        // 测试队列为空时,会阻塞等待,一直等到队列不为空时再返回队首值

        queue.clear();

        String resultEmpty = queue.take(); // 阻塞等待

        assertNotNull(resultEmpty);

    }

    @Test

    void testPollTime() throws InterruptedException {

        // 初始化队列

        BlockingQueue queue = new PriorityBlockingQueue(3);

        // 测试队列不为空时,返回队首值并移除

        queue.put("Java");

        queue.put("C");

        queue.put("Python");

        String resultNotEmpty = queue.poll(5, TimeUnit.SECONDS);

        assertEquals("C", resultNotEmpty);

        // 测试队列为空时,会阻塞等待,如果在指定时间内队列还为空则返回 null

        queue.clear();

        String resultEmpty = queue.poll(5, TimeUnit.SECONDS); // 等待5秒

        assertNull(resultEmpty);

    }

   

    @Test

    void testRemove() throws InterruptedException {

        // 初始化队列

        Queue queue = new PriorityBlockingQueue(3);

        // 测试队列为空时,抛出异常

        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {

            queue.remove();// 抛异常

        });

        assertEquals(null, excpetion.getMessage());

        // 测试队列不为空时,返回队首值并移除

        queue.add("Java");

        queue.add("C");

        queue.add("Python");

        String resultNotEmpty = queue.remove();

        assertEquals("C", resultNotEmpty);

    }

    @Test

    void testPeek() throws InterruptedException {

        // 初始化队列

        Queue queue = new PriorityBlockingQueue(3);

        // 测试队列不为空时,返回队首值并但不移除

        queue.add("Java");

        queue.add("C");

        queue.add("Python");

        String resultNotEmpty = queue.peek();

        assertEquals("C", resultNotEmpty);

        resultNotEmpty = queue.peek();

        assertEquals("C", resultNotEmpty);

        resultNotEmpty = queue.peek();

        assertEquals("C", resultNotEmpty);

        // 测试队列为空时,返回null

        queue.clear();

        String resultEmpty = queue.peek();

        assertNull(resultEmpty);

    }

    @Test

    void testElement() throws InterruptedException {

        // 初始化队列

        Queue queue = new PriorityBlockingQueue(3);

        // 测试队列不为空时,返回队首值并但不移除

        queue.add("Java");

        queue.add("C");

        queue.add("Python");

        String resultNotEmpty = queue.element();

        assertEquals("C", resultNotEmpty);

        resultNotEmpty = queue.element();

        assertEquals("C", resultNotEmpty);

        resultNotEmpty = queue.element();

        assertEquals("C", resultNotEmpty);

        // 测试队列为空时,抛出异常

        queue.clear();

        Throwable excpetion = assertThrows(NoSuchElementException.class, () -> {

            queue.element();// 抛异常

        });

        assertEquals(null, excpetion.getMessage());

    }

}

5.   PriorityBlockingQueue的应用案例:英雄战力排行榜

以下是一个英雄战力排行榜的示例。该示例模拟了6个英雄,可以根据英雄的战力由高至低排序。

以下是Hero类,用来代表英雄:

package com.waylau.java.demo.datastructure;

public class Hero {

    private String name;

   

    private Integer power; // 战力

   

    public Hero(String name, Integer power) {

        this.name = name;

        this.power = power;

    }

   

    public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

    public Integer getPower() {

        return power;

    }

    public void setPower(Integer power) {

        this.power = power;

    }

    @Override

    public String toString() {

        return "Hero [name=" + name + ", power=" + power + "]";

    }

}

以下是应用主程序:

package com.waylau.java.demo.datastructure;

import java.util.Comparator;

import java.util.Queue;

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueDemo {

    public static void main(String[] args) {

        int n = 6;

       

        Queue queue = new PriorityBlockingQueue(n, new Comparator() {

            // 战力由大到小排序

            @Override

            public int compare(Hero hero0, Hero hero1) {

                return hero1.getPower().compareTo(hero0.getPower());

            }

        });

        queue.add(new Hero("Nemesis", 95));

        queue.add(new Hero("Edifice Rex", 88));

        queue.add(new Hero("Marquis of Death", 91));

        queue.add(new Hero("Magneto", 96));

        queue.add(new Hero("Hulk", 85));

        queue.add(new Hero("Doctor Strange", 94));

       

        for (int i = 0; i

            System.out.println(queue.poll());

        }

    }

}

运行上述程序,输出内容如下:

Hero [name=Magneto, power=96]

Hero [name=Nemesis, power=95]

Hero [name=Doctor Strange, power=94]

Hero [name=Marquis of Death, power=91]

Hero [name=Edifice Rex, power=88]

Hero [name=Hulk, power=85]

6.   参考引用

本系列归档至《Java数据结构及算法实战》:https://github.com/waylau/java-data-structures-and-algorithms-in-action
《数据结构和算法基础(Java语言实现)》(柳伟卫著,北京大学出版社出版):https://item.jd.com/13014179.html

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/863056.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号