前言数据结构:二叉堆
1.什么是二叉堆2.堆的基本操作(上浮和下沉)
2.1.上浮(siftup)2.2.下沉(siftdown) 3.完整实现
3.1.大顶堆3.2.小顶堆 一.JDK提供的并发容器二.特点三.继承关系四.主要属性五.排序规则六.构造方法
源码建堆/堆化-heapify() 七.入队
非阻塞入队-boolean offer(E e)扩容-void tryGrow(Object[] array, int oldCap)非阻塞式入队-boolean offer(E,long,TimeUnit)非阻塞式入队-void put(E e) 八.出队
出队核心方法-E dequeue()
阻塞式出队-E take()非阻塞式出队-E poll()阻塞式超时出队-E poll(timeout, unit)阻塞式出队-E peek()移除元素-boolean remove(Object o)
removeAtremoveEQ 九.源码中堆上浮和下沉调整实现
入队排序
堆的上浮-siftUpComparable堆的上浮图解堆的上浮- siftUpUsingComparator 出队排序
堆的下沉-siftDownComparable堆的下沉图解堆的下沉-siftDownUsingComparator 十.迭代器十一.总结
前言并发编程最佳学习路线
【Java多线程】高并发修炼基础之高并发必须了解的概念
【Java多线程】了解线程的锁池和等待池概念
【Java多线程】了解Java锁机制
【Java多线程】线程通信
【Java基础】多线程从入门到掌握-第十五节.使用Concurrent集合
【Java多线程】JUC之线程池(一)与线程池的初识第四节.线程池的工作队列
这边安利一个数据结构的可视化网站:数据结构可视化网站
堆排序
二叉堆是完全二叉树,除了最后一层,其他节点都是满的,且最后一层节点从左到右排列,如下:
二叉堆分为大根堆(大顶堆)和小根堆(小顶堆),一般来说都是小根堆,任意一个节点都小于它的左右子节点的值,根节点就是堆中的最小的值。
二叉堆分类
大顶堆:父节点值总是>=任何一个子节点值小顶堆:父节点值总是<=任何一个子节点值
具体表现为:添加操作则是不断“上浮”,而删除操作则是不断“下沉”
堆可以使用数组存储,数组的下标可以从0开始,也可以从1开始,各有好处,当然JDK中堆的实现是从0开始的哦。
如果从索引为1的位置开始存储元素,第k个节点的左右子节点的下标:(2k, 2k + 1),父节点的坐标可以很容易求:floor(k / 2),floor表示向下取整。如果从索引为0的位置开始存储元素,第k个节点的左右子节点的下标:(2k + 1, 2k + 2),父节点的坐标也可以很容易求:floor((k - 1) / 2),floor表示向下取整。
之后我们会详细分析JDK中的实现,也就是第二种。PriorityBlockingQueue会依据元素的比较方式选择构建大顶堆或小顶堆。
如:如果元素是Integer这种引用类型,那么默认就是“小顶堆”,也就是每次出队都会是当前队列最小的元素。 2.堆的基本操作(上浮和下沉)
堆中最重要核心的两个操作便是如何将元素向上调整 or 向下调整。
堆的操作中,最重要的就是堆元素的上浮和下沉操作: 2.1.上浮(siftup)
以插入操作为例,二话不说,直接在数组末尾插上元素,接着再一一向上层比较,比较的原则的就是:我们只需要比较当前这个数是不是比它的父节点小,如果比它小,就进行交换,否则则停止交换。
思路非常简单,你可以思考一下其合理性:我们想,如果我们每次插入数据的时候,都做一次向上调整的操作,我们一定能够保证,每次都是在一个符合条件的二叉堆上插入数,对吧。那这样的话,本身就满足任何一个父节点必定比其子节点小的条件,如果待调整节点更小,那他必然也小于另一个子节点,由于我们一直迭代做,最后一定会满足条件。
插入元素: 新增元素添加到完全二叉树的底层最右侧(最右下方),需要进行上浮操作,重新使得堆有序。
// 向上调整 u 是当前的索引
private void siftUp(int k) {
// 如果发现当前的节点比父节点小
while (k/2 > 0 && a[k/ 2] > a[k]) {// while (k > 1 && (a[k]>a[k/2]))
// 就和父节点交换一下
heap_swap(k/2, k);
k /= 2;//k = k/2;
}
}
这边也给出插入一个元素x的伪代码:
void insert(int x){
size ++; // 最后一个元素指针
heap[size] = x; // 赋值
siftUp(size); // 向上调整
}
2.2.下沉(siftdown)
为什么需要向下调整呢,以删除操作为例,我们知道,要在数组头部删除一个元素且保证后面元素的顺序是比较麻烦的,我们通常在遇到删除堆顶的时候,直接将数组的最后一个元素heap[size–]将heap[0]覆盖,接着执行down(0),自上而下地执行调整操作。
调整的规则也比较简单,其实就是判断当前元素和左右子节点的大小关系,和最小的那个交换,递归地去调整,直到无法交换为止。
当删除一个堆元素(堆顶) 时,首先将堆顶(第一个元素)元素与最右下方元素(最后一个元素)交换,然后删除最右下方元素(最后一个元素)。此时堆顶元素需要进行下沉操作,重新使得堆有序。
// 向下调整
private void siftDown(int k) {
int t = k;
if (k * 2 <= size && a[k * 2] < a[t]) t = k * 2; // 判断左子节点是否存在, 且如果左左子节点比它小,就更新坐标
if (k * 2 + 1 <= size && a[k * 2 + 1] < a[t]) t = k * 2 + 1; // 同理
if (k != t) { // 如果需要交换
heap_swap(k, t);// 交换一下
siftDown(t); // 继续做这个操作
}
}
这边给出删除小根堆中的最小值的伪代码:
int poll(){
int res = heap[1]; // 堆顶是最小值
heap[1] = heap[size--]; // 直接将最后一个元素覆盖堆顶,并size-1
siftDown(1); // 执行向下调整
return res;
}
我们希望删除第k个元素或者更新第k个元素都是比较简便的:
// 删除位置为k的元素
void removeAt(int k){
heap[k] = heap[size --];
// 分别做一次向下操作和向上操作,其中一个判断必定只会执行一次
down(k);
up(k);
}
// 更新位置为k的元素为x
void updateAt(int k, int x){
heap[k] = x;
down(k);
up(k);
}
3.完整实现
3.1.大顶堆
public class MaxPQimplements Iterable { private E[] pq; // 存储索引从1到n的元素 private int n; // 优先级队列上的元素个数 private Comparator comparator; // 比较器 public MaxPQ(int initCapacity) { pq = (E[]) new Object[initCapacity + 1]; n = 0; } public MaxPQ() { this(1); } public MaxPQ(int initCapacity, Comparator comparator) { this.comparator = comparator; pq = (E[]) new Object[initCapacity + 1]; n = 0; } public MaxPQ(Comparator comparator) { this(1, comparator); } public MaxPQ(E[] keys) { n = keys.length; pq = (E[]) new Object[keys.length + 1]; for (int i = 0; i < n; i++) { pq[i + 1] = keys[i]; } for (int k = n / 2; k >= 1; k--) { sink(k); } assert isMaxHeap(); } public boolean isEmpty() { return n == 0; } public int size() { return n; } public E max() { if (isEmpty()) { throw new NoSuchElementException("Priority queue underflow"); } return pq[1]; } private void resize(int capacity) { assert capacity > n; E[] temp = (E[]) new Object[capacity]; for (int i = 1; i <= n; i++) { temp[i] = pq[i]; } pq = temp; } public void insert(E x) { // 如果需要,将数组大小增加一倍 if (n == pq.length - 1) { resize(2 * pq.length); } // 添加x,并过滤它以保持堆不变 pq[++n] = x; swim(n); assert isMaxHeap(); } public E delMax() { if (isEmpty()) { throw new NoSuchElementException("Priority queue underflow"); } E max = pq[1]; exch(1, n--); sink(1); pq[n + 1] = null; // to avoid loiterig and help with garbage collection if ((n > 0) && (n == (pq.length - 1) / 4)) { resize(pq.length / 2); } assert isMaxHeap(); return max; } private void swim(int k) { while (k > 1 && less(k / 2, k)) { exch(k, k / 2); k = k / 2; } } private void sink(int k) { while (2 * k <= n) { //j指向k的较小的子节点 int j = 2 * k; if (j < n && less(j, j + 1)) { j++; } if (!less(k, j)) { break; } exch(k, j); k = j; } } private boolean less(int i, int j) { if (comparator == null) { return ((Comparable ) pq[i]).compareTo(pq[j]) < 0; } else { return comparator.compare(pq[i], pq[j]) < 0; } } private void exch(int i, int j) { E swap = pq[i]; pq[i] = pq[j]; pq[j] = swap; } private boolean isMaxHeap() { return isMaxHeap(1); } private boolean isMaxHeap(int k) { if (k > n) { return true; } int left = 2 * k; int right = 2 * k + 1; if (left <= n && less(k, left)) { return false; } if (right <= n && less(k, right)) { return false; } return isMaxHeap(left) && isMaxHeap(right); } @Override public Iterator iterator() { return new HeapIterator(); } private class HeapIterator implements Iterator { // 创建一个新的优先级队列 private MaxPQ copy; //将所有元素添加到堆的副本中 //需要线性时间,因为已经在堆顺序,所以没有元素移动 public HeapIterator() { if (comparator == null) { copy = new MaxPQ (size()); } else { copy = new MaxPQ (size(), comparator); } for (int i = 1; i <= n; i++) { copy.insert(pq[i]); } } @Override public boolean hasNext() { return !copy.isEmpty(); } @Override public void remove() { throw new UnsupportedOperationException(); } @Override public E next() { if (!hasNext()) { throw new NoSuchElementException(); } return copy.delMax(); } } }
测试结果
3.2.小顶堆public class MinPQimplements Iterable { private E[] pq; // 存储索引从1到n的元素 private int n; // 优先级队列上的元素个数 private Comparator comparator; // 比较器 public MinPQ(int initCapacity) { pq = (E[]) new Object[initCapacity + 1]; n = 0; } public MinPQ() { this(1); } public MinPQ(int initCapacity, Comparator comparator) { this.comparator = comparator; pq = (E[]) new Object[initCapacity + 1]; n = 0; } public MinPQ(Comparator comparator) { this(1, comparator); } public MinPQ(E[] keys) { n = keys.length; pq = (E[]) new Object[keys.length + 1]; for (int i = 0; i < n; i++) { pq[i + 1] = keys[i]; } for (int k = n / 2; k >= 1; k--) { sink(k); } assert isMinHeap(); } public boolean isEmpty() { return n == 0; } public int size() { return n; } public E min() { if (isEmpty()) { throw new NoSuchElementException("Priority queue underflow"); } return pq[1]; } private void resize(int capacity) { assert capacity > n; E[] temp = (E[]) new Object[capacity]; for (int i = 1; i <= n; i++) { temp[i] = pq[i]; } pq = temp; } public void insert(E x) { // double size of array if necessary if (n == pq.length - 1) { resize(2 * pq.length); } // add x, and percolate it up to maintain heap invariant pq[++n] = x; swim(n); assert isMinHeap(); } public E delMin() { if (isEmpty()) throw new NoSuchElementException("Priority queue underflow"); E min = pq[1]; exch(1, n--); sink(1); pq[n + 1] = null; // to avoid loiterig and help with garbage collection if ((n > 0) && (n == (pq.length - 1) / 4)) { resize(pq.length / 2); } assert isMinHeap(); return min; } private void swim(int k) { while (k > 1 && greater(k / 2, k)) { exch(k, k / 2); k = k / 2; } } private void sink(int k) { while (2 * k <= n) { //j指向k的较小的子节点 int j = 2 * k; if (j < n && greater(j, j + 1)) { j++; } if (!greater(k, j)) { break; } exch(k, j); k = j; } } private boolean greater(int i, int j) { if (comparator == null) { return ((Comparable ) pq[i]).compareTo(pq[j]) > 0; } else { return comparator.compare(pq[i], pq[j]) > 0; } } private void exch(int i, int j) { E swap = pq[i]; pq[i] = pq[j]; pq[j] = swap; } private boolean isMinHeap() { return isMinHeap(1); } private boolean isMinHeap(int k) { if (k > n) { return true; } int left = 2 * k; int right = 2 * k + 1; if (left <= n && greater(k, left)) { return false; } if (right <= n && greater(k, right)) { return false; } return isMinHeap(left) && isMinHeap(right); } @Override public Iterator iterator() { return new HeapIterator(); } private class HeapIterator implements Iterator { // 创建一个新的优先级队列 private MinPQ copy; //将所有元素添加到堆的副本中 //需要线性时间,因为已经在堆顺序,所以没有元素移动 public HeapIterator() { if (comparator == null) { copy = new MinPQ (size()); } else { copy = new MinPQ (size(), comparator); } for (int i = 1; i <= n; i++) { copy.insert(pq[i]); } } @Override public boolean hasNext() { return !copy.isEmpty(); } @Override public void remove() { throw new UnsupportedOperationException(); } @Override public E next() { if (!hasNext()) { throw new NoSuchElementException(); } return copy.delMin(); } } }
测试方法
@Test
public void minTest() {
MinPQ minPQ = new MinPQ<>();
minPQ.insert(10);
minPQ.insert(50);
minPQ.insert(1);
minPQ.insert(80);
minPQ.insert(0);
minPQ.insert(6);
minPQ.forEach(System.out::println);
}
测试结果
一.JDK提供的并发容器 二.特点
PriorityBlockingQueue: 使用基于数组的二叉堆实现的,支持按优先级排序的无界阻塞队列(其实就是线程安全的PriorityQueue),写入的对象必须实现Comparable接口,或在队列构造方法传入比较器Comparator。默认按照自然顺序升序排序,缺点是不能保证同优先级元素的顺序。内部使用ReentrantLock和Condition实现
底层是一种基于数组实现的堆结构,是真正的无界队列(仅受内存大小限制)与其他阻塞队列不同之处在于:它是一种优先级队列,按照优先级权重大小的顺序出队;但不能保证两个优先级相同元素的顺序。使用CAS自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行(插入元素永远不会阻塞线程)
允许写入null元素 三.继承关系
public class PriorityBlockingQueue四.主要属性extends AbstractQueue implements BlockingQueue , java.io.Serializable {
//默认的数组容量; 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//数组最大容量, Integer.MAX_VALUE - 8,减8是因为有的虚拟机实现的数组的前8个字节用来存储别的东西
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private transient Object[] queue;
//元素个数,小于等于queue.length
private transient int size;
//比较器, 如果为null, 表示以元素自身的自然顺序进行比较(元素必须实现Comparable接口).
private transient Comparator super E> comparator;
//全局独占锁,用来保证并发安全和可见性
private final ReentrantLock lock;
//队列空时,用于阻塞出队线程, 没有notFull,因为这是一个无界队列
private final Condition notEmpty;
//用于分配的自旋锁,通过CAS获取,相当于AQS的state,持有这个state才可以准备新数组以扩容
//allocationSpinLock=1表示正在扩容
private transient volatile int allocationSpinLock;
private PriorityQueue q;
内部会构造为一颗平衡的二叉小顶堆,根据构造方法中传入的Comparator进行排序或者没有传的情况下使用自然排序,数组的第一个元素为最小的元素。 五.排序规则
指定排序规则有2种方式:
- 传入PriorityBlockingQueue中的元素实现Comparable接口,重写compareTo方法。初始化PriorityBlockingQueue时,指定构造参数Comparator,重写compare方法。
Comparable和Comparator的返回值的含义:
Comparable:a.compareTo(b) < 0, 表示a < b。由于PriorityBlockingQueue是最小堆,所以a会放到数组的前面去。
如果为-2表示,a在b前两位,得到2说明a在n后两位 Comparator: cmp.compare(a, b) < 0,表示a < b。由于PriorityBlockingQueue是最小堆,所以a会放到数组的前面去。
六.构造方法 源码不了解 Comparator 和 Comparable 可以看这篇
【Java基础】Comparable和Comparator两种比【Java源码】理解Java中Comparator接口中的返回值 1,-1,0
//默认构造器,使用 DEFAULT_INITIAL_CAPACITY 作为容量
// 默认初始容量11, 以元素自然顺序比较(元素必须实现Comparable接口)
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
//可指定容量的构造器
//以元素自然顺序比较(元素必须实现Comparable接口)
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator super E> comparator) {
//初始容量检查
if (initialCapacity < 1)
throw new IllegalArgumentException();
//主要属性的初始化
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();//没有notFull,因为这是一个无界队列
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
//将指定集合添加到队列中的构造器
public PriorityBlockingQueue(Collection extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // 为true代表数组需要重新进行堆排序
boolean screen = true; // 为true代表需要扫描一遍数组,看里面有没有null元素(此类不支持null元素)
//如果传入的是有序集合SortedSet
if (c instanceof SortedSet>) {
SortedSet extends E> ss = (SortedSet extends E>) c;
this.comparator = (Comparator super E>) ss.comparator();
heapify = false;//SortedSet的内部数据已经按照升序排序好了,自然也是堆结构的
}
//如果传入的是优先级阻塞队列PriorityBlockingQueue
else if (c instanceof PriorityBlockingQueue>) {
PriorityBlockingQueue extends E> pq =(PriorityBlockingQueue extends E>) c;
this.comparator = (Comparator super E>) pq.comparator();
screen = false;//PriorityBlockingQueue不可能包含null元素
//如果传参的集合类型不是PriorityBlockingQueue,表示不需要重新建堆
if (pq.getClass() == PriorityBlockingQueue.class)
heapify = false;
}
//集合转数组
Object[] a = c.toArray();
//数组大小
int n = a.length;
// a数组类型不是Object[].class就复制一个新数组出来
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
//需要检查数组是否有null元素标记为true && (数组不为空 || 比较器不为空),遍历数组
// 如果有null元素,直接抛出NullPointerException异常
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
//更新容器为传入集合
this.queue = a;
//更新元素个数
this.size = n;
if (heapify)
heapify();//堆排序
}
最终调用的都是同一个构造器,按照initialCapacity构建数组,因为它是int型的,所以最大是Integer.MAX_VALUE。
注意,initialCapacity并没有和MAX_ARRAY_SIZE进行比较,所以会可能构建超出出Integer.MAX_VALUE大小的数组。 建堆/堆化-heapify()
heapify()可以使节点任意放置的二叉树,在O(N)的时间复杂度内转变为二叉堆,具体做法是,从最后一层非叶子节点自底向上执行down操作。
private void heapify() {
//获取当前容器
Object[] array = queue;
//获取当前元素个数
int n = size;
//最后一层非叶子层:无符号右移1位:half= (n/2)-1 = (元素个数/2)-1
int half = (n >>> 1) - 1;
// 两种排序规则下, 自底向上 地执行 siftdown操作
Comparator super E> cmp = comparator;
//比较器为Comparable
if (cmp == null) {
//从最后一个非叶子节点,反向层次遍历,以遍历节点为root的子树将被构建成最小堆
for (int i = half; i >= 0; i--)
//使用比较器筛选:Comparable冒泡下移
siftDownComparable(i, (E) array[i], array, n);
}
//比较器为Comparator,逻辑同上
else {
for (int i = half; i >= 0; i--)
//使用比较器筛选:Comparator冒泡下移
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
上面2个方法实际就是最小堆构建过程中的冒泡下移操作。
private static void siftDownComparable(int k, T x, Object[] array,int n) {
if (n > 0) {
Comparable super T> key = (Comparable super T>)x;
// 这是第一个叶子节点的索引,也就是说当k到达一个叶子节点时,它就不能再下移了
//无符号右移1位:half= n/2 = 元素个数 / 2 (取整)
while (k < half) {
//child = (k * 2) + 1
int child = (k << 1) + 1; // 左子节点的索引
Object c = array[child]; // 获得左子节点
int right = child + 1;
if (right < n &&
((Comparable super T>) c).compareTo((T) array[right]) > 0)
//如果右子节点存在,而右子节点更小的话
c = array[child = right];//更新child和c
//如果key小于等于两个子节点的较小值,说明key停留在k索引处,刚好构成了最小堆
if (key.compareTo((T) c) <= 0)
break;
//如果key大于两个子节点的较小值
array[k] = c;//子节点较小值上移
k = child;//遍历索引k下移
}
//退出循环说明key的停留位置已经确定,就是现在的k的值
array[k] = key;
}
}
七.入队
由于PriorityBlockingQueue是无界的,所以入队是不可能因为队列满而阻塞的,但有可能因为内存耗尽而抛出OutOfMemoryError。
无界队列,一定可以添加成功,无需阻塞。容量不够则扩容,put完会重新构建小顶堆。
跟前面的各种阻塞队列实现思路基本一致,这里比较有意思的是数组的扩容和往小顶堆插入元素的处理逻辑 非阻塞入队-boolean offer(E e)
如果队列未满,则插入成功并返回true, 如果队列已满则返回false。
public boolean offer(E e) {
//不允许null元素
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();//加锁
//n=元素个数计数、cap=当前数组大小
int n, cap;
//当前数组
Object[] array;
// 当前队列中的元素个数 >= 数组的容量,说明队列已满,调用tryGrow扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);// 动态扩容
//元素个数计数小于当前数组大小时
try {
Comparator super E> cmp = comparator;
//如果是Comparable,就调用siftUpComparable进行冒泡上移排序
//比较器为空, 则按照元素的自然顺序进行堆
if (cmp == null)
siftUpComparable(n, e, array);//n=元素个数、e=入队元素、array=当前数组
//如果是Comparator,就调用siftUpUsingComparator进行冒泡上移排序
// 比较器非空, 则按照比较器Comparator进行堆排序
else
siftUpUsingComparator(n, e, array, cmp);//n=元素个数、e=入队元素、array=当前数组、cmp比较器
//元素计数+1
size = n + 1;
//唤醒一个出队线程(只有出队线程可能阻塞在AQS条件队列里)
notEmpty.signal();
} finally {
lock.unlock();//解锁
}
return true;//入队成功
}
几种入队方法最终都是调用了offer()方法,在添加元素时 当数组中元素大于等于容量时,调用 tryGrow() 扩容
上面最关键的是siftUpComparable和siftUpUsingComparator方法,这2个方法内部几乎一样,只不过前者是一个根据元素的自然顺序比较,后者则根据外部比较器Comparator比较,我们重点看下siftUpComparable方法:跳转
扩容-void tryGrow(Object[] array, int oldCap)在入队过程中,如果队列内部的queue数组已经满了,就需要进行扩容:
如果当前队列元素个数小于 64 个,数组容量就变成旧容量 乘 2 加 2否则变成原来的1.5 倍(原来容量越大,扩容成本越高,所以容量设置的小一点)。
从offer()的代码来看,完全有可能多个线程同时进入tryGrow()尝试扩容。
private void tryGrow(Object[] array, int oldCap) {
//入队线程在准备新数组期间,释放锁以允许出队等操作并行(在方法外层的已经获取到锁了)
lock.unlock();
//1.如果线程获取到同步状态,计算新容量,基于新容量创建新容器
//准备的新数组
Object[] newArray = null;
//如果同步状态为0,并且CAS更新allocationSpinLock为1成功,说明持有独占锁成功,准备新数组以扩容
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {//相当于AQS的独占锁,同时只能有一个线程持有这个“锁”
try {
//新容量的计算公式:
//1. 如果oldCap < 64, 即旧容量+(旧容量+2),new=old+(old + 2)
//2. 如果oldCap >=64, 即旧容量+(旧容量/2),new=old+(old / 2),等于1.5oldCap
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // 希望节点数较小的时候,增长快一点
(oldCap >> 1));
//扩容之后溢出判断:如果新容量超过了MAX_ARRAY_SIZE,即 Integer.MAX_VALUE - 8
if (newCap - MAX_ARRAY_SIZE > 0) {
//最小容量=旧容量在扩容一个
int minCap = oldCap + 1;
//旧容量在扩容一个,超出最大容量,抛出OOM异常
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
//否则使用最大容量为新容量
newCap = MAX_ARRAY_SIZE;
}
//如果新容量 > 旧容量 并且 ,queue 和 array内存地址一致,说明在此之前没有其他线程进行过扩容,使用新容量,创建一个新数组
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];//新数组,此时还没有赋值
} finally {
allocationSpinLock = 0;//释放这个“锁”
}
}
//2.如果当前线程没有获取到同步状态,新容器为空,说明可能有其它线程正在扩容,导致allocationSpinLock竞争失败
if (newArray == null)
Thread.yield(); //尽量让出cpu,好让成功的线程先来执行下面的流程
//3.因为要修改内部数组queue的指针,获取全局锁,会在方法外层的finally中unlock()
lock.lock();
//如果当前线程获取到同步状态,新容器创建成功,queue 和 array内存地址一致,说明在此之前没有其他线程进行过扩容
if (newArray != null && queue == array) {
queue = newArray;/// 指向新的内部数组
System.arraycopy(array, 0, newArray, 0, oldCap);//拷贝旧数组元素到新数组中
}
}
调用tryGrow()的前提一定会先获取全局锁,所以先释放锁。因为可能有线程正在出队,扩容/出队是可以并发执行的(扩容的前半部分只是新建一个内部数组,不会对出队产生影响)。扩容后的内部数组大小一般为原来的2倍。
需要注意的是allocationSpinLock,该字段通过CAS操作,为1表示有线程正在进行扩容。
为啥在扩容之前先释放锁,并使用CAS控制只有一个线程可以扩容成功呢?
扩容是需要时间的,如果在整个扩容期间一直持有锁的话,其他线程在这时是不能进行出队和入队操作的,这大大降低了并发性能。
spinlock锁使用CAS控制只有一个线程可以进行扩容,失败的线程执行Thread.yield()让出CPU,目的是让扩容的线程优先调用lock.lock()优先获取锁,但是这得不到保证,因此需要后面的判断。另外自旋锁变量allocationSpinLock在扩容结束后重置为0,并没有使用UNSAFE方法的CAS进行设置是因为:
同时只可能有一个线程获取到该锁。allocationSpinLock是volatile修饰。
上图展示了扩容线程和出队线程并行执行的过程。
对其他的入队线程来说,如果Thread.yield()没有让出CPU的话,那么其他入队线程就只有自旋了(while ((n = size) >= (cap = (array = queue).length)))。
而最后的lock.lock()也是有必要的:
先锁住了,防止别的线程搞破坏。这个目的和正常出队入队加锁的目的一样。可见性。加锁强制让所有线程看到扩容后新数组成员。
&& queue == array判断也是很有必要的
因为两个实参一样的调用tryGrow的线程,线程1创建新数组1后更新allocationSpinLock = 0,线程2才去CAS修改allocationSpinLock,然后线程2自己又会另外创建一个新数组2。当线程1将容器queue指向新数组1后,线程2的新数组2自然不应该将queue指向新数组2
所以通过&& queue == array判断并发扩容时queue 和 array内存地址一致,说明在此之前没有进行过扩容
总之,只能有一个线程更新queue的指针。 非阻塞式入队-boolean offer(E,long,TimeUnit)
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); //底层调用offer(),不需要阻塞
}
非阻塞式入队-void put(E e)
向队列中插入指定元素,由于队列是无界的,所以不会阻塞线程.
public void put(E e) {
offer(e); //无界队列,插入操作不需要阻塞
}
八.出队
因为队列可能没有元素,所以出队线程是可能阻塞在AQS条件队列里的。 出队核心方法-E dequeue()
出队过程:
暂存堆顶元素(最小值),方法结束前返回它。把堆的最后一个节点放到堆顶,然后下移它,再次使得整个堆变成最小堆。
private E dequeue() {
//n元素个数-1,表示出队后的剩余元素个数
int n = size - 1;
// 队列为空, 则返回null
if (n < 0)
return null;
//队列不为空,
else {
//获取当前容器
Object[] array = queue;
// 保存队头的值,也就是返回这个值
// array[0]是堆顶节点, 每次出队都删除堆顶节点(即优先级最小的元素)
E result = (E) array[0];
// 准备将队尾的值 覆盖第一个
// array[n]是堆的最后一个节点, 也就是二叉树的最右下节点(即优先级最大的元素)
E x = (E) array[n];
//GC堆的最后一个节点
array[n] = null;
//最后一个元素将放到堆顶再下移
Comparator super E> cmp = comparator;
//------因为放到堆顶,所以从0索引开始冒泡下移--------
//Comparable冒泡下移
if (cmp == null)
//0=索引、x=出队元素、array=当前数组
siftDownComparable(0, x, array, n);
//Comparator冒泡下移
else
//0=索引、x=出队元素、array=当前数组、cmp比较器
siftDownUsingComparator(0, x, array, n, cmp);
//更新元素个数
size = n;
return result;//返回堆顶节点
}
}
可以看出,每次出队的元素都是“堆顶节点”,对于“小顶堆”就是队列中的最小值,对于“大顶堆”就是队列中的最大值。
我们看下siftDownComparable方法如何实现堆顶点的删除。跳转 阻塞式出队-E take()
队列为空的时候进入条件等待,take完元素之后,立刻重新构建小顶堆。
删除元素(出队)的整个过程比较简单,也是先获取全局锁,然后判断队列状态,如果是空,则阻塞线程,否则调用dequeue()方法出队:
//出队一个元素,如果队列为空, 则阻塞线程.
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//可中断加锁
E result;
try {
// 尝试获取最小元素,即小顶堆第一个元素,然后重新排序,如果不存在表示队列为空,进行阻塞等待。
while ( (result = dequeue()) == null)
notEmpty.await();//出队线程阻塞
} finally {
lock.unlock();//解锁
}
return result;
}
非阻塞式出队-E poll()
只是一次尝试,完全有可能poll返回null(队列为空)。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
return dequeue();//返回元素或者返回null
} finally {
lock.unlock();//解锁
}
}
阻塞式超时出队-E poll(timeout, unit)
在take()阻塞式获取方法的基础上额外增加超时功能,传入一个timeout,获取元素超时会立即返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//阻塞等待时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//可中断加锁
E result;
try {
//如果队列为空,并且 还在超时获取时间内,阻塞出队线程,时间到直接返回null
//,否则获取最小元素,即小顶堆第一个元素,然后重新排序
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);//获取阻塞剩余时间
} finally {
lock.unlock();//解锁
}
return result;
}
阻塞式出队-E peek()
直接获取队首元素,只获取不出队。可能返回为null(队列为空)
获取堆顶方法
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//队列为空空,返回null,否则获取堆顶,即优先级最小的元素
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();//解锁
}
}
该方法必须加锁,不然遇到出队过程中的中间过程。比如下图的第1步。当然,第1步肯定是正确的新堆顶。所以,本方法重点还是在于可见性,不然你可能看不到下图的第1步的结果。
移除元素-boolean remove(Object o)内部删除
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//找到队列第一个相等的元素的索引,否则返回-1
int i = indexOf(o);
//如果队列中没有这个相等元素,返回false
if (i == -1)
return false;
//移除队列中该索引的元素,返回true
removeAt(i);
return true;
} finally {
lock.unlock();
}
}
//遍历底层数组, 找到匹配元素的下标
private int indexOf(Object o) {
//元素不为空
if (o != null) {
//获取当前队列
Object[] array = queue;
//获取元素个数
int n = size;
//从前往后找到equals判断相等的元素的索引
for (int i = 0; i < n; i++)
if (o.equals(array[i]))
return i;
}
//元素为空,返回-1
return -1;
}
removeAt
删除队列指定索引处的元素
// 移除下标为i的元素
private void removeAt(int i) {
//获取当前队列
Object[] array = queue;
//获取当前队列最后一个元素的索引
int n = size - 1;
// 如果删除的是最后一个元素,那么不会影响最小堆的结构,直接删除即可
if (n == i)
array[i] = null;
//如果删除的不是最后一个元素
else {
// 老套路了,让队尾的元素覆盖这里
//把元素暂存起来,然后删除该元素
E moved = (E) array[n];
array[n] = null;
Comparator super E> cmp = comparator;
//比较器为Comparable
if (cmp == null)
//将其移动到i处,可能需要下移( //老套路了,让队尾的元素覆盖这里)
siftDownComparable(i, moved, array, n);
//比较器为Comparator,同上
else
siftDownUsingComparator(i, moved, array, n, cmp);
//向下调整没成功,向上调整
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);//上移
else
siftUpUsingComparator(i, moved, array, cmp);//同上
}
// 这也是惯用做法,上下分别做一次调整
}
//更新元素个数
size = n;
}
前半段代码和删除顶点的逻辑一样,只不过删除顶点时被删除节点的索引为0,removeAt(int i)删除的是i索引节点。总之,就是 把最后一个节点放到删除处i,然后再使用下移方法。但 调用下移方法后,节点不一定会下移。
如上图,值为8的节点移动到 i处后,会下移,但不必上移。
如果if (array[i] == moved)不成立,说明节点下移了。不用管被删除节点的值是什么(所以图中是问号),现在被删除节点有父节点parent和左右子节点left、right,在删除前parent <= left && parent <= right是肯定成立的,现在moved下移了说明moved肯定大于min(left, right),那么parent < moved肯定成立。
因为moved肯定大于min(left, right),所以需要执行下移方法(siftDownXXX),执行完毕后,以i索引为root的子树则已经是最小堆了。因为parent < moved成立,所以moved的加入对i索引往上的层次没有影响。
如上图,值为6的节点移动到i处后,不需要下移,但需要上移。
如果if (array[i] == moved)成立,说明节点没有下移,节点移动过去,以索引i为root的子树马上就成为了一个最小堆了。但移动过去的节点跟上层节点的关系还没确定,因为没有下移,则parent < moved就不能说一定成立了,比如上图这种情况。
如果不是上图这种情况,上移方法(siftUpXXX)也不会把节点上移(此时,节点根本不需要就形成了最小堆)。只有执行了上移方法,整个堆的最小堆性质才得以保证。 removeEQ
void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//当前队列
Object[] array = queue;
//从前往后遍历队列
for (int i = 0, n = size; i < n; i++) {
//如果PriorityBlockingQueue内部数组中确实还有该元素
if (o == array[i]) {
removeAt(i);//根据索引删除它
break;
}
}
} finally {
lock.unlock();//解锁
}
}
九.源码中堆上浮和下沉调整实现
准确地说,源码中应该是调整 + 插入,不断调整,找到插入的位置,给该位置赋值。但,如果你理解了前面的调整思想,相信你会很快理解源码中的实现。 入队排序
堆的上浮-siftUpComparablesiftUpComparable90的作用其实就是堆元素的上浮调整,将元素x插入到堆中,注意这里是不断和父节点比较,最终找到插入位置。
可以把堆可以想象成一棵完全二叉树,每次插入元素都链接到二叉树的最右下方,然后将插入的元素与其父节点比较,如果父节点大,则交换元素,直到没有父节点比插入的节点大为止。这样就保证了堆顶(二叉树的根节点)一定是最小的元素。(注:以上仅针对“小顶堆”)
每从队尾添加一个元素都会从下往上挨个比较自己和“父节点”的大小,如果小就交换,否则就停止。
private static void siftUpComparable(int k, T x, Object[] array) {
// 如果不传入Comparable的实现,这里会强转失败,抛出异常
Comparable super T> key = (Comparable super T>) x;
while (k > 0) {
//求k节点的父节点索引parent:无符号右移1位:相当于(k-1)除2,int parent = (k - 1)/2(取整)
int parent = (k - 1) >>> 1;
//获取父父节点
Object e = array[parent];
// 如果插入的节点值key大于等于他的父节点,就不用交换了 ,说明现在已经是最小堆了,则退出
if (key.compareTo((T) e) >= 0)
break;
//将父元素移下来:如果插入的节点值key大于等于他的父节点,则交换父节点和当前节点的值()
array[k] = e;
//k向上移(没有实际把key上移,只是把k索引上移)
k = parent;
}
// 退出循环后,k的位置就是待插入的位置
array[k] = key;
}
堆的上浮图解
我们通过示例来理解下入队的整个过程:假设初始构造的队列大小为6,依次插入9、2、93、10、25、90。
①初始队列情况
②插入元素9(索引0处)
将上述数组想象成一棵完全二叉树,其实就是下面的结构:
③插入元素2(索引1处)
对应的二叉树:
由于节点2的父节点为9,所以要进行“上浮调整”,最终队列结构如下:
对应的二叉树如下:
④插入元素93(索引2处)
⑤插入元素10(索引3处)
⑥插入元素25(索引4处)
⑦插入元素90(索引5处)
此时,堆不满足有序条件,因为“90”的父节点“93”大于它,所以需要“上浮调整”:
最终,堆的结构如上,可以看到,经过调整后,堆顶元素一定是最小的。
堆的上浮- siftUpUsingComparator
private static void siftUpUsingComparator(int k, T x, Object[] array, Comparator super T> cmp) {
//如果到达root,自然也不用上移了
while (k > 0) {
//无符号右移1位:(k - 1)/2
int parent = (k - 1) >>> 1;
//获取父节点
Object e = array[parent];
//如果key已经大于等于它的父节点,说明现在已经是最小堆了
if (cmp.compare(x, (T) e) >= 0)
break;
//如果key已经小于它的父节点,说明key需要冒泡上移
array[k] = e;//把父节点的值弄下来
k = parent;//没有实际把key上移,只是把k索引上移
}
array[k] = x;
}
出队排序
堆的下沉-siftDownComparable
移除k位置的元素,并调整二叉堆,具体思想就是,一般通过向下调整找到覆盖位置,用x覆盖即可,x一般可以从队尾获取。
这里的k就是当前空缺的位置,x就是覆盖元素,比如我们之前说的队尾元素
在移除第一个元素后,会用堆排序将当前堆再排一次序。
private static void siftDownComparable(int k, T x, Object[] array,int n) {
if (n > 0) {
Comparable super T> key = (Comparable super T>)x;
// 二叉堆有一个性质,最后一层叶子最多 占 1 / 2
//找到索引n对应节点的父节点:无符号右移1位:相当于n除2, 即half= n/2 = 元素个数 / 2 (取整)
int half = n >>> 1;
// 循环非叶子节点
while (k < half) {
//k的左子节点:child = (k * 2) + 1
int child = (k << 1) + 1;
Object c = array[child];
// k的右子节点:right =k的左子节点索引+1
int right = child + 1;//右子节点的位置
// 始终用左子节点c表示最小的数
if (right < n &&
((Comparable super T>) c).compareTo((T) array[right]) > 0) //左和右比较,如果左比右大
// 这里如果右子节点小,更新c = child = right
c = array[child = right];
// 如果当前的k比左子节点还要小,那就不必交换了,待在那正好!
//如果key小于等于两个子节点的较小值,说明key停留在k索引处,刚好构成了最小堆(//如果当前节点比最后一个节点大,就停止)
if (key.compareTo((T) c) <= 0)
break;
//如果key大于两个子节点的较小值
array[k] = c;//小的值向上移
k = child;//k向下更新
}
// 退出循环时,一定找到了x覆盖的位置,覆盖即可
array[k] = key;
}
}
堆的下沉图解上述代码其实是经典的堆“下沉”操作,对堆中某个顶点下沉,步骤如下:
找到该顶点的左右子节点中较小的那个;与当前节点交换;重复前2步直到当前节点没有左右子节点或比左右子节点都小。
来看个示例,假设堆的初始结构如下,现在出队一个元素(索引0位置的元素2)。
①初始状态
对应二叉树结构:
②将顶点与最后一个节点调换
即将顶点“2”与最后一个节点“93” 交换,然后将索引5为止置null。
注意:为了提升效率(比如siftDownComparable的源码所示)并不一定要真正交换,可以用一个变量保存索引5处的节点值,在整个下沉操作完成后再替换。但是为了理解这一过程,示例图中全是以交换进行的。
③下沉索引0处节点
比较元素“93”和 左右子节点中的最小者,发现 “93”大于“9” ,违反了“小顶堆”的规则,所以交换“93”和“9”,这一过程称为 siftdown(下沉)
④继续下沉索引1处节点
比较元素“93” 和 左右子节点中的最小者,发现 “93”大于“10”,违反了“小顶堆”的规则,所以交换“93”和“10”:
⑤比较结束
由于 “93”已经没有左右子节点了,所以下沉结束,可以看到,此时堆恢复了有序状态,最终队列结构如下:
堆的下沉-siftDownUsingComparatorsiftDownUsingComparator分析完全类似,只是比较时使用的是Comparator而已。
private static void siftDownUsingComparator(int k, T x, Object[] array,int n, Comparator super T> cmp) {
if (n > 0) {
//无符号右移1位:int half= n/2(取整)= 元素个数/2
int half = n >>> 1;
while (k < half) {
// 子节点的索引:child = (k * 2) + 1
int child = (k << 1) + 1;
// 获得子节点
Object c = array[child];
//子节点右移一步
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
//如果右子节点存在,而右子节点更小的话
c = array[child = right];//更新child和c
//如果key小于等于两个子节点的较小值,说明key停留在k索引处,刚好构成了最小堆
if (cmp.compare(x, (T) c) <= 0)
break;
//如果key大于两个子节点的较小值
array[k] = c;//子节点较小值上移
k = child;//遍历索引k下移
}
//退出循环说明key的停留位置已经确定,就是现在的k的值
array[k] = x;
}
}
十.迭代器
PriorityBlockingQueue的迭代器也是弱一致性的,而且它弱得都有点离谱,因为在迭代器对象初始化的时候,就复制了一个新数组出来。也就是说,初始化之后,即使元素从PriorityBlockingQueue中删除 或者 新元素加入了PriorityBlockingQueue迭代器也不管。
public Iteratoriterator() { return new Itr(toArray());//拷贝容器副本,与原容器无关 } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { //拷贝queue数组的前面[0,size)部分元素,因为后面的元素都是null //新数组大小为size,元素都是浅拷贝 return Arrays.copyOf(queue, size); } finally { //解锁 lock.unlock(); } }
创建迭代器时,浅拷贝一个新数组给迭代器,颇有点“CopyOnWrite”的意思。
final class Itr implements Iterator{ final Object[] array; // 创建迭代器时,队列内部数组的副本 int cursor; // 下一次next()即将返回的索引 int lastRet; // 上一次next()返回的索引,初始时或lastRet已经被删除时,它为-1 Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } public E next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E)array[cursor++]; } public void remove() {//用lastRet来支持remove方法 if (lastRet < 0) throw new IllegalStateException(); removeEQ(array[lastRet]);//注意传入的是数组元素 lastRet = -1; } }
在迭代器的remove()方法中,不可以直接调用removeAt,而是要先去检查该元素是否还在PriorityBlockingQueue的内部数组中,如果还在,才能去删除它。毕竟迭代器的内部数组只是一个副本。 十一.总结
经过源码分析我们了解了 PriorityBlockingQueue 为什么是无界、有优先级的队列了。因为它可以自动扩容,在出队、入队后都会进行排序。
PriorityBlockingQueue使用二叉堆的最小堆来实现优先级出队,但内部成员是数组,只是逻辑是个堆。
对于该队列来说,只需要时刻知道队列哪个元素最小,其他元素的顺序并不重要。所以使用堆这种数据结构再合适不过。
优先级排序规则有2种:Comparable和Comparator。如果比较a、 b两个元素,如果a的优先级更高,那么肯定a.compareTo(b) < 0 或者 a.compare(b) < 0 。
二叉堆的内部操作只有两种:堆的上浮 和 堆的下沉。



