前言常用的多线程同步机制了解什么是Dueue一.JDK提供的并发容器二.特点三.继承关系四.主要属性五.不变性和可变性六.CLD中对Node的定义
1.节点分类
1.1.live node1.2.first node & last node1.3.active node1.4.self-node1.5.head/tail节点 2.节点删除步骤 七.双向链表节点-Node八.构造方法九.入队
1.Dueue入队方法
队首入队底层核心方法-void linkFirst(E e)队尾入队底层核心方法- void linkLast(E e)
队首入队-boolean offerFirst(E e)队尾入队-boolean offerLast(E e)队首入队-void addFirst(E e)队尾入队-void addLast(E e) 2.Queue入队方法
队尾入队-boolean add(E e)队尾入队-void put(E e)队尾入队-boolean offer(E e)队尾入队-void push(E e) 十.出队
1.Dueue出队方法
出队核心方法-void unlink(Node x)队首出队底层核心方法-unlinkFirst(Node first, Node next)队尾出队底层核心方法- unlinkLast(Node last, Node prev)
队首出队-E pollFirst()队尾出队-E pollLast()队首出队(抛异常)-E removeFirst()队尾出队(抛异常)--E removeLast()不移除元素队首出队-E peekFirst()不移除元素队尾出队-E peekLast()不移除元素队首出队(抛异常)-E getFirst()不移除元素队尾出队(抛异常)-E getLast()从队首向后移除指定元素-boolean removeFirstOccurrence(Object o)从队尾向前移除指定元素-boolean removeLastOccurrence(Object o) 2.Queue出队方法
队首出队-E poll()获取队首元素-E peek()移除队首元素- E remove()移除队尾元素- E pop()获取队首元素- E element() 十一.重要内部方法
获取前驱节点-Node pred(Node p)获取后继节点-Node succ(Node p)获取first节点-Node first()获取last节点-Node last()更新队头-void updateHead()更新队尾-void updateTail()void skipDeletedPredecessors(Node x)void skipDeletedSuccessors(Node x)Node prevTerminator() 和 Node nextTerminator()初始化队头/队尾-void initHeadTail(Node h, Node t) 十二.迭代器十三.总结
前言并发编程最佳学习路线
【Java多线程】高并发修炼基础之高并发必须了解的概念
【Java多线程】了解线程的锁池和等待池概念
【Java多线程】了解Java锁机制
【Java多线程】线程通信
【Java基础】多线程从入门到掌握-第十五节.使用Concurrent集合
【Java多线程】JUC之线程池(一)与线程池的初识第四节.线程池的工作队列
常用的多线程同步机制
volatile 变量:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性保证,不提供原子性。CAS 原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性和原子化更新保证。互斥锁:重量级多线程同步机制,可能会引起上下文切换和线程调度, 它同时提供内存可见性和原子性。 了解什么是Dueue
【Java基础】Queue基础
ConcurrentlinkedDeque: 基于双向链表实现的无界非阻塞线程安全队列,可以从队列的头和尾同时操作(插入/删除),底层基于自旋+CAS的方式实现,适合“多生产,多消费”的场景。
双端队列与普通队列的入队区别是:双端队列中元素可以在“队首、队尾进行入队、出队”
“队首入队”底层调用了linkFirst()方法,而“队尾入队”是调用了linkLast()方法“队首出队”底层调用了unlinkFirst()方法,而“队尾出队”是调用了unlinkLast()方法 三.继承关系
public class ConcurrentlinkedDeque四.主要属性extends AbstractCollection implements Deque , java.io.Serializable {
//队头,不一定first node
private transient volatile Node head;
//队尾,不一定是last node
private transient volatile Node tail;
// 终止节点,在gc-unlinking阶段将无用节点链接到这两个节点上,自行处理减少内内存滞留风险
private static final Node
需要特别注意的是ConcurrentlinkedDeque包含2个特殊字段:PREV_TERMINATOR、NEXT_TERMINATOR。 这2个字段在节点出队时使用,
PREV_TERMINATOR:prev的终止节点,next指向自身,即PREV_TERMINATOR.next = PREV_TERMINATOR。在 first节点出队后,会把first.next指向自身(first.next=first),然后把prev设为PREV_TERMINATOR。NEXT_TERMINATOR:next的终止节点,prev指向自身,即NEXT_TERMINATOR.pre = NEXT_TERMINATOR。在last节点出队后,会把last.prev指向自身(last.prev=last),然后把next设为NEXT_TERMINATOR。 五.不变性和可变性
head/tail 的不变性:
第一个节点总是能以O(1)的时间复杂度从 head 通过 prev 链接到达;最后一个节点总是能以O(1)的时间复杂度从 tail 通过 next 链接到达;所有live节点(item不为null的节点),都能从第一个节点通过调用 succ()方法遍历可达;所有live节点(item不为null的节点),都能从最后一个节点通过调用 pred()方法遍历可达;head/tail 不能为 nullhead 节点的 next 域不能指向到自身;head/tail 不会是GC-unlinked节点(但它可能是unlink节点)。
head/tail的可变性:
head/tail 节点的 item 域可能为 null,也可能不为 null;head/tail 节点 可能 从first/last/tail/head节点访问时不可达;tail 节点的 next 域可以指向自身。 六.CLD中对Node的定义 1.节点分类 1.1.live node
item!=null的节点被称为有效节点、存活(live node)。当节点的 item 被 CAS 改为 null,逻辑上来讲这个节点已经从链表中移除;一个新的元素通过 CAS 链接到一个包含空 prev 或空 next的 first 或 last 节点,这个元素的节点在这时是有效节点。
1.2.first node & last node任何时刻,队列中只有一个首节点(first node),且它的prev指针一定为null,用于终止任何从 live 节点开始的 prev 引用链;只有一个尾节点(last node),且它的next指针一定为null,用于终止任何从 live 节点开始的 next引用链;
first/last node的item 可能为 null(不一定是live node)。它们之间可以通过prev和next链相互到达。。当一个新node被链接在first node的prev上,或者last node的next上时,它就成功入队了。head或tail并不一定指向first node或last node。但通过head肯定能找到first node,tail同理。 1.3.active node
指处于队列上的节点,也被称为活动节点(active node),它们之间通过prev和next链能相互到达,当然active node的item可以是null。所以active node包括live node。
如果p节点为活动节点,则:p.item != null || (p.prev == null && p.next != p) || (p.next == null && p.prev != p) 1.4.self-node
self-node:自链接节点,prev 或 next 指向自身的节点,自链接节点用在解除链接操作中,并且它们都不是active node。
1.5.head/tail节点head/tail 也可能不是 first/last 节点。从head 节点通过prev 引用必定能找到first 节点,从tail 节点通过next 引用必定能找到last 节点。允许 head 和 tail 引用已删除的节点,这些节点没有链接,因此可能无法从 live 节点访问到。
2.节点删除步骤删除节点有3个步骤
逻辑删除(logical deletion):将node的item置为null,它在逻辑上已经被认为是删除了的。未链接( unlinking):。使得从active node出发不能到达这些逻辑删除节点(反之,从逻辑删除节点出发还是可以到达active node的),这样,GC最终会发现这些逻辑删除节点。gc未链接( gc-unlinking):使得从逻辑删除节点也不能到达active node了,这样,会使得GC更快发现它们。当然,第三步只是一种优化,即使没有第三步也可以。这只是为了保持GC健壮性。
在这一步中,会使得变成self-node(即指针指向自己),另一个指针指向end终止符(end指first端或last端)。head和tail可能被unlinking,但不可以被gc-unlinking。 七.双向链表节点-Node
Node实现与ConcurrentlinkedQueue不同之处也就在于多了prev属性用于指向前驱节点
//双向链表节点内部类
static final class Node {
//前驱节点:上一个节点
volatile Node prev;
//当前节点值
volatile E item;
//后继节点:下一个节点
volatile Node next;
//默认构造函数
Node() { }
//带参数构造函数
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
//CAS更新当前节点item属性
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
//CAS更新当前节点next属性
boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
//CAS更新当前节点orev属性
boolean casPrev(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);
}
//更新当前节点next属性(保证有序性,同时通过volatile保证可见性)
void lazySetNext(Node val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//更新当前节点prev属性(保证有序性,同时通过volatile保证可见性)
void lazySetPrev(Node val) {
UNSAFE.putOrderedObject(this, prevOffset, val);
}
//-------------省略UNSAFE相关获取属性偏移量代码----------------------
}
八.构造方法
//初始化,默认head和tail指向item为null的哨兵节点
public ConcurrentlinkedDeque() {
head = tail = new Node(null);
}
//根据已有集合,构造队列
public ConcurrentlinkedDeque(Collection extends E> c) {
Node h = null, t = null;//用于标记队尾、队头元素
//遍历元素
for (E e : c) {
//元素非空校验
checkNotNull(e);
//构建入队节点
Node newNode = new Node(e);
//队列为空,t队尾、h队头指向第一个节点
if (h == null)
h = t = newNode;
//队列不为空,将入队节点链接到队尾节点的next上,队尾节点链接到入队节点的prev上
else {
t.lazySetNext(newNode);
newNode.lazySetPrev(t);
t = newNode;//标记newNode暂时队尾节点
}
}
//更新head/tail 为t、h
initHeadTail(h, t);
}
重点看下空构造器,,其head和tail指针并非指向null,而是指向一个item值为null的Node节点——哨兵节点initHeadTail 九.入队 1.Dueue入队方法
队首入队底层核心方法-void linkFirst(E e)将节点e链接到队首, 并更新head为e
一般认为队首在左边,队尾在右边。所以往prev链方向前进称为左移,往next链方向前进称为右移。
private void linkFirst(E e) {
//非空校验
checkNotNull(e);
//构建入队节点
final Node newNode = new Node(e);
//【1】循环跳转,goto语法, 用于在"continue restartFromHead" 后进行第二个for循环中的初始化
restartFromHead:
for (;;)
//【2】初始化p=h=head、q=null, 从 head节点开始向前循环找到 first 节点(p.prev==null&&p.next!=p)
for (Node h = head, p = h, q;;) {
//【3】head前驱的前驱不为空 并且 head前驱的前驱的前驱不为空,说明head前面有2个节点
if ((q = p.prev) != null &&
(q = (p = q).prev) != null)
//判断p的更新动作是前移(prev),还是变成新head
//如果head被修改,返回head重新查找,否则向前移动一个节点
p = (h != (h = head)) ? h : q;
//【4】头节点的后继节点指向自身(自链接节点),说明p已经被删除了,继续外循环,重新读取head
else if (p.next == p) // PREV_TERMINATOR
continue restartFromHead;
//【5】p是 first node(p.prev为null)
else {
//【6】通过lazySetNext设置新节点的 next 节点为 first node
newNode.lazySetNext(p);
//【7】 CAS 修改 first 的 prev 为新节点,成功说明p已经成功从first端入队
if (p.casPrev(null, newNode)) {
//【8】更新成功后会判断 first 节点是否已经跳了两个节点,只有在跳了两个节点才会 CAS更新head
//这是为了节省 CAS 指令执行开销
if (p != h)
casHead(h, newNode);// p和h的距离至少为1,现在p左边还加了个新节点,所以head需要更新了(允许失败)
return;
}
}
//【9】 执行到此处说明CAS操作失败,有其它线程也在队首插入元素,继续循环直到成功
}
}
if ((q = p.prev) != null && (q = (p = q).prev) != null)分支的理解很关键,让我们来拆分一下逻辑:
| 条件1 | 附带效果 | 条件2 | 附带效果 | 结果 |
|---|---|---|---|---|
| p.prev≠null | 获得了p的前驱q | q.prev≠null | q和p整体左移(prev) | 左移后(prev),p离first node距离至少为1,进入分支,分支内逻辑为再次移动p |
| p.prev≠null | 获得了p的前驱q | q.prev=null | q和p整体左移(q为null) | p刚移动到first node了,不进入分支 |
| p.prev =null | 获得了p的前驱q(q为null) | 短路不执行 | - | p就是first node时,不进入分支 |
很巧妙的是,只要p到达了first node,就不会进入这个分支了。
p = (h != (h = head)) ? h : q;的逻辑就是,如果检测到head相比之前取的局部变量h发生了变化,p就赋值为新head。否则,p就是左移一下(prev)即可。
if (p != h)说明head距离新入队的节点的距离至少为2,所以此时需要更新head。这说明每隔一次入队动作,head或tail才会更新一次,这个和ConcurrentlinkedQueue一样(这一句就不画图表现了)。
为了便于理解,我们以示例来看:假设有2个线程ThreadA 和ThreadB同时进行入队操作。
①ThreadA先单独入队一个元素9,此时,ThreadA会执行分支【4】:
②ThreadA入队一个元素2,同时ThreadB入队一个元素10,此时,依然执行【4】分支,我们假设ThreadA操作成功,ThreadB操作失败:
ThreadA的CAS操作成功后,会进入以下判断:
上述判断的作用就是重置head头指针,可以看到,ConcurrentlinkedDeque其实是以每次跳2个节点的方式移动指针,这主要考虑到并发环境以这种hop跳的方式可以提升效率。
注意,此时ThreadB的p.casPrev(null, newNode)操作失败了,所以会进入下一次自旋,在下一次自旋中继续进入【4】。如果ThreadA的casHead操作没有完成,ThreadB就进入了下一次自旋,则会进入【2】,重置指针p指向队首。最终队列结构如下:
在队尾插入元素和队首类似,不再赘述,读者可以自己阅读源码。
队尾入队底层核心方法- void linkLast(E e)将节点e链接到队尾, 并更新tail为e
一般认为队首在左边,队尾在右边。所以往prev链方向前进称为左移,往next链方向前进称为右移。
private void linkLast(E e) {
//非空校验
checkNotNull(e);
//构建入队节点
final Node newNode = new Node(e);
//【1】循环跳转,goto语法, 用于在"continue restartFromHead" 后进行第二个for循环中的初始化
restartFromTail:
for (;;)
//【2】初始化p=t=tail、q=null, 从 tail节点开始向后循环找到 last节点(p.next==null&&p.next!=p)
for (Node t = tail, p = t, q;;) {
//【3】tail的后继不为空 并且 tail后继的后继的后继不为空,说明tail后面有2个节点
if ((q = p.next) != null &&
(q = (p = q).next) != null)
//判断p的更新动作是后移(next),还是变成新head
//如果tail被修改,返回tail重新查找,否则向后移动一个节点
p = (t != (t = tail)) ? t : q;
//【4】尾节点的前驱节点指向自身(自链接节点),说明p已经被删除了,继续外循环,重新读取tail
else if (p.prev == p) // NEXT_TERMINATOR
continue restartFromTail;
//【5】p是 last node(p.next为null)
else {
//【6】通过lazySetPrev设置新节点的 prev节点为 last node
newNode.lazySetPrev(p);
//【7】 CAS 修改 last的 next 为新节点,成功说明p已经成功从last端入队
if (p.casNext(null, newNode)) {
//【8】更新成功后会判断 last 节点是否已经跳了两个节点,只有在跳了两个节点才会 CAS更新tail
//这是为了节省 CAS 指令执行开销
if (p != t) // 跳两个节点时才修改tail
casTail(t, newNode);// p和t的距离至少为1,现在p右边还加了个新节点,所以tail需要更新了(允许失败)
return;
}
//【9】 执行到此处说明CAS操作失败,有其它线程也在队尾插入元素,继续循环直到成功
}
}
}
队首入队-boolean offerFirst(E e)
底层调用linkFirst(), 向队首插入元素,无论结果如何都返回true
public boolean offerFirst(E e) {
linkFirst(e);
return true;
}
队尾入队-boolean offerLast(E e)
底层调用linkLast(), 向队尾插入元素,无论结果如何都返回true
public boolean offerLast(E e) {
linkLast(e);
return true;
}
队首入队-void addFirst(E e)
底层调用linkFirst(), 向队首插入元素
public void addFirst(E e) {
linkFirst(e);
}
队尾入队-void addLast(E e)
底层调用linkLast(e), 向队尾插入元素
public void addLast(E e) {
linkLast(e);
}
2.Queue入队方法
队尾入队-boolean add(E e)
底层调用offerLast(), 向队尾插入元素,如果插入成功返回true,反之false
public boolean add(E e) {
return offerLast(e);
}
队尾入队-void put(E e)
底层调用offerLast(), 向队尾插入元素,如果插入成功返回true,反之false
public boolean offer(E e) {
return offerLast(e);
}
队尾入队-boolean offer(E e)
底层调用offerLast(e), 向队尾插入元素,如果插入成功返回true,反之false
public boolean offer(E e) {
return offerLast(e);
}
队尾入队-void push(E e)
底层调用addFirst(e), 向队尾插入元素
public void push(E e) {
addFirst(e);
}
十.出队
1.Dueue出队方法
出队核心方法-void unlink(Node x)
unlink方法断开节点的链接:
该方法通过判断入参Node的prev、next来决定是队首出队、队尾出队、还是队中出队
void unlink(Node x) {
//出队节点的前驱节点
final Node prev = x.prev;
//出队节点的后继节点
final Node next = x.next;
//【1】出队节点的前驱节点为空,说明是第一个节点(first节点),调用unlinkFirst(x, next)队首出队,
if (prev == null) {
unlinkFirst(x, next);
//【2】出队节点的后继节点为空,说明是最后一个节点(first节点),调用 unlinkLast(x, prev)队尾出队
} else if (next == null) {
unlinkLast(x, prev);
//【3】不是第一个节点 和 最后一个节点,说明是在中间出队,执行unlink本身的通用节点逻辑。
} else {
Node activePred, activeSucc;//用于标记当前节点的前驱有效节点、后继有效节点
boolean isFirst, isLast;//用于标记当前节点是首节点、还是尾节点
int hops = 1; // 记录逻辑删除节点数
//【4】从被删除节点prev节点开始往前找到第一个有效前驱节点
for (Node p = prev; ; ++hops) {
//【5】当前节点p为有效节点
if (p.item != null) {
activePred = p;//记录有效前驱节点
isFirst = false;//标记为非头部
break;//结束循环
}
//【5】当前节点不是有效节点,获取当前节点的前驱节点
Node q = p.prev;
//【6】 已经到了头部了
if (q == null) {
//当前节点为自链接节点,后继节点指向自身,直接退出
if (p.next == p) return;
//当前节点为非自链接节点,//记录有效前驱节点
activePred = p;
isFirst = true;//标记为头部
break;//结束循环
}
//【7】当前节点为自链接节点,前驱节点指向自身,直接退出
else if (p == q) return;
//【8】更新循环指针,继续循环向前查找
else p = q;
}
//【9】从被删除节点next开始往后找到第一个有效后继节点
for (Node p = next; ; ++hops) {
//【10】当前节点p为有效节点
if (p.item != null) {
activeSucc = p;;//记录有效后继节点
isLast = false;//标记为非尾部
break;//结束循环
}
//【11】当前节点不是有效节点,获取当前节点的前驱节点
Node q = p.next;
//【12】 已经到了尾部了
if (q == null) {
//当前节点为自链接节点,前驱节点指向自身,直接退出
if (p.prev == p) return;
activeSucc = p;//记录有效后继节点
isLast = true;//标记为尾部
break;//结束循环
}
//【13】当前节点为自链接节点,后继节点指向自身,直接退出
else if (p == q) return;
//【14】更新循环指针,继续循环向后查找
else p = q;
}
//【14】无节点跳跃并且操作节点有first或last节点时不更新链表
//达到逻辑删除节点阈值或者是内部删除节点则需要进行额外处理unlink/gc-unlink
if (hops < HOPS && (isFirst | isLast)) return;
//【15】 移除有效前驱和后继节点之间的有效节点,包括x,使得前驱和后继互连
skipDeletedSuccessors(activePred);
skipDeletedPredecessors(activeSucc);
//【16】 有效前驱和后继是队头或队尾,尝试gc-unlink
if ((isFirst | isLast) &&
//前驱和后继的状态,确保没有被改变
(activePred.next == activeSucc) &&
(activeSucc.prev == activePred) &&
(isFirst ? activePred.prev == null : activePred.item != null) &&
(isLast ? activeSucc.next == null : activeSucc.item != null)) {
// 更新head和tail 确保x不可达
updateHead();
updateTail();
// 最后更新x,使得从x到活动节点不可达
x.lazySetPrev(isFirst ? prevTerminator() : x); // 前驱终结节点
x.lazySetNext(isLast ? nextTerminator() : x); // 后继终结节点
}
}
}
队首出队底层核心方法-unlinkFirst(Node first, Node next)unlinkFirst详见unlinkLast详见updateHead详见updateTail详见skipDeletedSuccessors详见skipDeletedPredecessors详见
断开队首出队节点与其后继节点的链接
一般认为队首在左边,队尾在右边。所以往prev链方向前进称为左移,往next链方向前进称为右移。
// 从first开始往后(队尾)找到第一个有效节点,直到找到或者到达队列的最后一个节点为止,并把first的直接后继指向该有效节点:
// 1) 如果first的后继本身就是有效节点,不做任何处理
// 2) 否则往后依次找到第一个有效节点,并把first的后继指向该有效节点
private void unlinkFirst(Node first, Node next) {
//【1】从next节点开始向后(右移)寻找有效节点,o:已删除节点(item为null)
for (Node o = null, p = next, q;;) {//初始化o=null,p=next,q=null
// 【2】p是有效节点 || p是最后一个节点last node
if (p.item != null || (q = p.next) == null) {
//【3】如果第一次循环就执行到此则不需要进行操作直接返回,p本来就是first的后继
// p的前驱不能指向自己,first的后继更新成p
if (o != null && p.prev != p && first.casNext(next, p)) {
//【4】将找到的后继节点的前驱也指向节点p,即完成它们的互连,这一步就是所谓的unlinking,使队列的活动节点无法访问被删除的节点;
skipDeletedPredecessors(p);
// 【5】确保first还是第一个节点,没有被其他线程改变状态,并且它的后继节点p是直接相链接的(指向first),这种关系没有被破坏才进行gc-unlink操作
if (first.prev == null &&
(p.next == null || p.item != null) &&
p.prev == first) {
//【6】实现GC-unlinking了,通过updateHead、updateTail使被删除的节点无法从head/tail可达,最后让被删除节点后继自链接,前驱指向前向终结节点。
//更新head节点,确保已删除节点o从head不可达(unlinking)
updateHead();
//更新tail节点,确保已删除节点o从tail不可达(unlinking)
updateTail();
//使unlinking节点next指向自身
o.lazySetNext(o);
//设置删除节点的prev为PREV_TERMINATOR
o.lazySetPrev(prevTerminator());
}
}
return;
}
//【7】当前节点为自链接节点,说明已被移除,退出方法
else if (p == q) return; // p非活动节点同时p的后继q指向自身则直接返回
//【8】 p非活动节点,p还有后继节点,更新循环指针,继续循环向后(右移)查找
else {
o = p;//注意这里o才被赋值
p = q;
}
}
}
解除已删除节点的链接,并链接 first 节点到下一个 active 节点(注意,在执行完此方法之后 first 节点是没有改变的)
skipDeletedPredecessors详见updateHead详见updateTail详见prevTerminator详见
队尾出队底层核心方法- unlinkLast(Node last, Node prev)断开队尾出队节点与其前驱节点的链接
一般认为队首在左边,队尾在右边。所以往prev链方向前进称为左移,往next链方向前进称为右移。
// 从last开始往后(队首)找到第一个有效节点,直到找到或者到达队列的first node为止,并把last的直接前驱指向该有效节点:
// 1) 如果last的前驱本身就是有效节点,不做任何处理
// 2) 否则往钱依次找到第一个有效节点,并把last的前驱指向该有效节点
private void unlinkLast(Node last, Node prev) {
//【1】从prev节点开始向前(左移)寻找有效节点,o:已删除节点(item为null)
for (Node o = null, p = prev, q;;) {//初始化o=null,p=prev,q=null
//【2】 p是有效节点 || p是第一个节点first node
if (p.item != null || (q = p.prev) == null) {
//【3】如果第一次循环就执行到此则不需要进行操作直接返回,p本来就是last的前驱
// p的后继不能指向自己,last的前驱更新成p
if (o != null && p.next != p && last.casPrev(prev, p)) {
//【4】将找到的前驱节点的后继也指向节点p,即完成它们的互连,这一步就是所谓的unlinking,使队列的活动节点无法访问被删除的节点;
skipDeletedSuccessors(p);
// 【5】确保last还是最后一个节点,没有被其他线程改变状态,并且它的前驱节点p是直接相链接的(指向last),这种关系没有被破坏才进行gc-unlink操作
if (last.next == null &&
(p.prev == null || p.item != null) &&
p.next == last) {
//【6】实现GC-unlinking了,通过updateHead、updateTail使被删除的节点无法从head/tail可达,最后让被删除节点后继自链接,前驱指向前向终结节点。
//更新head节点,确保已删除节点o从head不可达(unlinking)
updateHead();
//更新tail节点,确保已删除节点o从tail不可达(unlinking)
updateTail();
//使unlinking节点prev指向自身
o.lazySetPrev(o);
//设置删除节点的next为NEXT_TERMINATOR
o.lazySetNext(nextTerminator());
}
}
return;
}
//【8】当前节点为自链接节点,说明已被移除,退出方法
else if (p == q) // p非活动节点同时p的前驱q指向自身则直接返回
return;
//【8】 p非活动节点,p还有前驱节点,更新循环指针,继续循环向前(左移)查找
else {
o = p;//注意这里o才被赋值
p = q;
}
}
}
解除已删除节点的链接,并链接 last节点到下一个 active 节点(注意,在执行完此方法之后 last节点是没有改变的)
skipDeletedPredecessors详见updateHead详见updateTail详见prevTerminator详见 队首出队-E pollFirst()
从队首获取并移除元素,如果队列为空返回null,,否则返回队首元素值
public E pollFirst() {
// first() 找到第一个一个节点, succ() 返回后继节点
for (Node p = first(); p != null; p = succ(p)) {
// 节点数据
E item = p.item;
//【1】如果当前节点为有效节点(item!=null),CAS将当前节点item置空(第一步-逻辑删除)
//(注意并不是first节点,因为first节点的item可以为null)
if (item != null && p.casItem(item, null)) {
//断开当前节点链接(第二步-unlinking)
unlink(p);
//返回节点数据
return item;
}
}
return null;
}
源码已经分析完毕,我们以pollFirst出队操作为例进行一个总结说明:
通过first()获取到队列头部的第一个节点(first node)如果为活动节点(first.item==null),则CAS 修改节点的 item 为 null 即执行 logical deletion(逻辑删除继续执行unlinking阶段继续执行gc-unlinking阶段
在unlinking阶段根据节点位置进行不同情况的处理:
如果出队的节点是队列的第一个节点p,则执行unlinkFirst,其过程如下:
找到p之后的第一个有效节点,直到最后一个节点为止,p的后继节点指向这个找到的节点skipDeletedPredecessors完成unlinking阶段,使队列的活动节点无法访问被删除的节点进行gc-unlinking阶段,通过updateHead、updateTail使被删除的节点无法从head/tail可达,最后让被删除节点后继指向自己,前驱指向 终结节点
如果出队的节点是队列的最后一个节点p,则执行unlinkLast,其过程与第1种情况类似,只是方向不同
如果出队的节点是队列的中间位置,则执行unlink中的一个分支代码:
先找到删除节点x的有效前驱和有效后继,统计中间已经处于逻辑删除的节点个数如果统计个数已经超过阈值个数或者是内部节点删除,有效前驱和后继互连,即活动节点不能访问逻辑删除节点了(unlinking阶段)有效前驱和后继是队头或队尾,尝试进行gc-unlink,通过updateHead、updateTail使被删除的节点无法从head/tail可达,最后让被删除节点指向自己或者执行终结节点
队尾出队-E pollLast()first()详见succ()详见unlink()详见
从队尾获取并移除元素,如果队列为空返回null,,否则返回队尾元素值
public E pollLast() {
//从后往前遍历-> last() 找到最后一个节点, pred() 返回前驱节点
for (Node p = last(); p != null; p = pred(p)) {
// 节点数据
E item = p.item;
//【1】如果当前节点为有效节点(item!=null),CAS将当前节点item置空(第一步-逻辑删除)
//(注意并不是last节点,因为last节点的item可以为null)
if (item != null && p.casItem(item, null)) {
//断开当前节点链接
unlink(p);
//返回节点数据
return item;
}
}
return null;
}
主要逻辑
- 首先通过last()方法找到 last节点,last 节点必须为active 节点 (p.next==null&&p.prev!=p)如果last.item==null(这里是允许的,具体见上面我们对 first/last 节点的介绍),则继续调用pred()方法寻找前驱节点。CAS 修改节点的 item 为 null(即 “逻辑删除-logical deletion”),然后调用unlink(p)方法断开解除节点链接,最后返回 item。
队首出队(抛异常)-E removeFirst()last()详见pred()详见unlink()详见
底层调用pollFirst(),队列不为空, 获取并移除队首元素,如果队列为空,抛出NoSuchElementException异常
public E removeFirst() {
return screenNullResult(pollFirst());
}
队尾出队(抛异常)–E removeLast()
底层调用pollLast(),队列不为空, 获取并移除队尾元素,如果队列为空,抛出NoSuchElementException异常
public E removeLast() {
return screenNullResult(pollLast());
}
不移除元素队首出队-E peekFirst()
不移除元素队首出队,队列为空返回null,否则返回元素值
代码和pollFirst()几乎一模一样,只是除去了删除的3个步骤,所以简单了很多。
public E peekFirst() {
//【1】从前往后遍历-> first() 找到第一个节点, succ() 返回后继节点
for (Node p = first(); p != null; p = succ(p)) {
E item = p.item;
//【2】如果当前节点为有效节点(item!=null),CAS将当前节点item置空(第一步-逻辑删除)
if (item != null)return item;
}
return null;
}
不移除元素队尾出队-E peekLast()
不移除元素队尾出队,队列为空返回null,否则返回元素值
public E peekLast() {
//【1】从后往前遍历-> last() 找到最后一个节点, pred() 返回前驱节点
for (Node p = last(); p != null; p = pred(p)) {
E item = p.item;
//【2】如果当前节点为有效节点(item!=null),CAS将当前节点item置空(第一步-逻辑删除)
if (item != null)return item;
}
return null;
}
不移除元素队首出队(抛异常)-E getFirst()
底层调用peekFirst() ,不移除元素队首出队,如果队列为空,则抛出NoSuchElementException异常,否则返回元素值
public E getFirst() {
return screenNullResult(peekFirst());
}
private E screenNullResult(E v) {
if (v == null)
throw new NoSuchElementException();
return v;
}
不移除元素队尾出队(抛异常)-E getLast()
底层调用peekLast() ,不移除元素队尾出队,如果队列为空,则抛出NoSuchElementException异常,否则返回元素值
public E getLast() {
return screenNullResult(peekLast());
}
从队首向后移除指定元素-boolean removeFirstOccurrence(Object o)
从队首至队尾遍历(从前往后),移除通过equals判断相等的第一个元素,并返回true
removeFirstOccurrence()的实现跟pollFirst()差不多,只是更新item为null的前提条件不同。
public boolean removeFirstOccurrence(Object o) {
checkNotNull(o);
//【1】从前往后遍历-> first() 找到第一个节点, succ() 返回后继节点
for (Node p = first(); p != null; p = succ(p)) {
E item = p.item;
//【2】如果当前节点为有效节点(item!=null)且equals判断相等,且CAS 将当前节点item置空成功(第一步-逻辑删除)
if (item != null && o.equals(item) && p.casItem(item, null)) {
unlink(p);//执行删除节点的第二步-unlinking
return true;
}
}
return false;
}
从队尾向前移除指定元素-boolean removeLastOccurrence(Object o)
从队尾至队首遍历(从后往前),移除通过equals判断相等的第一个元素,并返回true
public boolean removeLastOccurrence(Object o) {
checkNotNull(o);
//【1】从后往前遍历-> last() 找到最后一个节点, pred() 返回前驱节点
for (Node p = last(); p != null; p = pred(p)) {
E item = p.item;
//【2】如果当前节点为有效节点(item!=null)且equals判断相等,且CAS 将当前节点item置空成功(第一步-逻辑删除)
if (item != null && o.equals(item) && p.casItem(item, null)) {
unlink(p);//执行删除节点的第二步-unlinking
return true;
}
}
return false;
}
2.Queue出队方法
队首出队-E poll()
底层调用pollFirst(), 从队首获取并移除元素,如果队列为空返回null,,否则返回队尾元素值
public E poll() {
return pollFirst();
}
获取队首元素-E peek()
底层调用peekFirst(), 从队首获取元素(不移除),如果队列为空返回null,,否则返回队尾元素值
public E peek() {
return peekFirst();
}
移除队首元素- E remove()
底层调用removeFirst(),队列不为空, 获取并移除队首元素,如果队列为空,抛出NoSuchElementException异常
public E remove() {
return removeFirst();
}
移除队尾元素- E pop()
底层调用removeFirst(),队列不为空, 获取并移除队首元素,如果队列为空,抛出NoSuchElementException异常
public E pop(){
return removeFirst();
}
获取队首元素- E element()
底层调用getFirst(),获取first node
public E element() {
return getFirst();
}
十一.重要内部方法
获取前驱节点-Node pred(Node p)
pred方法用于寻找当前节点的前驱节点(如果前驱指向自身,则返回最后一个节点):
final Node获取后继节点-Node succ(Node p)pred(Node p) { //【1】获取前驱节点 Node q = p.prev; //【2】当前节点已被移除(自链接),则获取重新获取last node,否则返回q return (p == q) ? last() : q; }
pred方法用于寻找当前节点的后继节点(如果后继指向自身,则返回第一个节点):
final Node succ(Node p) {
//【1】获取后继节点
Node q = p.next;
//【2】当前节点已被移除(自链接),则获取重新获取firstnode,否则返回q
return (p == q) ? first() : q;
}
获取first节点-Node first()
first()方法用于寻找first节点,即满足p.prev== null && p.next!= p的节点:
// 返回首节点
Node first() {
restartFromHead:
for (;;)
// 从head开始往前(左移)找,初始化h=p=head,q=null
for (Node h = head, p = h, q;;) {
//【1】p的前驱和前驱的前驱都非空, 表示p节点之前有2个以上的活动节点
if ((q = p.prev) != null &&
(q = (p = q).prev) != null)
// 如果head已经被更新则将p指向新的head,未更新则直接将p指向q
p = (h != (h = head)) ? h : q;
//【2】p的前驱为空或者前驱的前驱为空
// p == h 表明p的前驱为空(第一个条件里判断),p就是第一个节点
// p == h 不满足则p的前驱非空,前驱的前驱为空,则p的前驱为第一个节点,此时尝试更新head并返回第一个节点
else if (p == h
// 找到的节点不是head节点,CAS修改head
|| casHead(h, p))
return p;
//【3】第二个条件中尝试更新head失败,则说明其他线程更新了head,重新初始h、p、q开始循环处理
else continue restartFromHead;
}
}
获取last节点-Node last()
last方法用于寻找last节点,即满足p.next == null && p.prev != p的节点:
Node更新队头-void updateHead()last() { restartFromTail: for (;;) // 从tail开始往后(右移)找,初始化t=p=tail,q=null for (Node t = tail, p = t, q;;) { //【1】p的后继和后继的后继都非空,表示p节点后面有2个以上的活动节点 if ((q = p.next) != null && (q = (p = q).next) != null) // 如果tail已经被更新则将p指向新的tail,未更新则直接将p指向q p = (t != (t = tail)) ? t : q; //【2】p的后继为空或者后继的后继为空 // p == h 表明p的后继为空(第一个条件里判断),p就是最后一个节点 // p == h 不满足则p的后继非空,后继的后继为空,则p的后继为最后一个节点,此时尝试更新tail并返回最后一个节点 else if (p == t // 找到的节点不是tail节点,CAS修改tail || casTail(t, p)) return p; //【3】第二个条件中尝试更新tail失败,则说明其他线程更新了tail,重新初始t、p、q开始循环处理 else continue restartFromTail; } }
更新head节点,确保在调用此方法之前unlinked的任何节点在该方法返回之后都不能从head访问,updateTail更新tail节点,同updateHead,基本操作一致,只是方向不同而已
private final void updateHead() {
Node h, p, q;
restartFromHead:
//【1】head指向非活动节点 并且 head非第一个节点
while ((h = head).item == null && (p = h.prev) != null) {
for (;;) {
//【2】head的前驱和前驱的前驱都非空,表示head节点前面有2个以上的活动节点
if ((q = p.prev) == null ||
(q = (p = q).prev) == null) {
// 将head更新指向为第一个节点(first node)
if (casHead(h, p))
return;
// 未成功更新说明head已经被其他线程更新了,重新获取head循环判断
else
continue restartFromHead;
}
//【3】h前有超过2个的节点,表明当前h指向的节点已经与第一个节点距离超过2,同时h已经不指向head了,重新循环
else if (h != head) continue restartFromHead;
//【4】 h前有超过2个的节点,同时h还指向head,则更新p为q再次判断,相当于p向前(左移)跳了1或2个节点位置
else p = q;
}
}
}
更新队尾-void updateTail()
private final void updateTail() {
Node t, p, q;
restartFromTail:
//【1】tail指向非活动节点 并且 tail非第一个节点
while ((t = tail).item == null && (p = t.next) != null) {
for (;;) {
//【2】tail的后继和后继的后继都非空,表示tail节点后面有2个以上的活动节点
if ((q = p.next) == null ||
(q = (p = q).next) == null) {
// 将head更新指向为最后一个节点(lastnode)
if (casTail(t, p))
return;
// 未成功更新说明tail已经被其他线程更新了,重新获取tail循环判断
else
continue restartFromTail;
}
//【3】t后有超过2个的节点,表明当前t指向的节点已经与最后一个节点距离超过2,同时t已经不指向tail了,重新循环
else if (t != tail)
continue restartFromTail;
//【4】 t后有超过2个的节点,同时t还指向tail,则更新p为q再次判断,相当于p向后(右移)跳了1或2个节点位置
else
p = q;
}
}
}
void skipDeletedPredecessors(Node x)
这里有个java语法需要注意:continue lable和break lable的作用,可下列参考代码理解:
@Test
public void testLoop() {
System.out.println("continue lable start ");
aaa:
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
System.out.println("continue=>"+j);
if (j == 1) {
continue aaa;
}
}
}
System.out.println("continue lable end ");
System.out.println("break lable start ");
bbb:
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
System.out.println("break=>"+j);
if (j == 1) {
break bbb;
}
}
}
System.out.println("break lable end ");
}
执行结果
将刚刚找到的后继节点的前驱指向节点p,即完成它们的互联,这一步就是所谓的unlinking,使队列的活动节点无法访问被删除的节点。skipDeletedSuccessors代码逻辑同skipDeletedPredecessors,可参考理解
private void skipDeletedPredecessors(Nodevoid skipDeletedSuccessors(Node x)x) { whileActive: //【1】 do { //获取x的前驱节点 Node prev = x.prev; //初始p为x.prev Node p = prev; findActive: //【2】 for (;;) { //【3】p的item非空,说明p为活动节点,退出循环进行关联更新操作 if (p.item != null) break findActive; //【4】 p的item为空,再继续向前查找其前驱 Node q = p.prev; //【5】p的前驱节点为空 // 若p节点处于gc-unlinking状态,即通过p已经无法到达其他活动节点,则需重头开始继续循环判断 // 上面条件不满足,则表示p节点处于unlinking状态,还可以到达其他活动节点,可以继续被使用 // 表明找到了有效节点,退出循环 if (q == null) { if (p.next == p) continue whileActive; break findActive; } //【6】p的前驱节点非空 且 p.prev == p // 相等则表示p节点处于gc-unlinking状态,即通过p已经无法到达其他有效节点 // 无法再向前遍历,只能重头开始循环判断 else if (p == q) continue whileActive; //【7】 到此表示p的item为空,p的前驱非空且不处于gc-unlinking状态,循环向前(左移)继继续判断前驱节点 else p = q; } //【7】CAS更新x的前驱为活动节点, 更新成功返回否则继续循环判断更新 if (prev == p || x.casPrev(prev, p)) return; } while (x.item != null || x.next == null);//节点为活动节点 或者为last node }
实现将刚刚找到的前驱节点的后继指向节点p,即完成它们的互联,这一步就是所谓的unlinking,使队列的活动节点无法访问被删除的节点。
private void skipDeletedSuccessors(NodeNode prevTerminator() 和 Node nextTerminator()x) { whileActive: do { //获取x的后继节点 Node next = x.next; //初始p为x.next Node p = next; findActive: for (;;) { //【3】p的item非空,说明p为活动节点,退出循环进行关联更新操作 if (p.item != null) break findActive; //【4】 p的item为空,再继续向后查找其后继 Node q = p.next; //【5】p的前驱节点为空 if (q == null) { // 若p节点处于gc-unlinking状态,即通过p已经无法到达其他活动节点,则需重头开始继续循环判断 if (p.prev == p) continue whileActive; // 上面条件不满足,则表示p节点处于unlinking状态,还可以到达其他活动节点,可以继续被使用 // 表明找到了有效节点,退出循环 break findActive; } //【6】p的前驱节点非空 且 p.next== p // 相等则表示p节点处于gc-unlinking状态,即通过p已经无法到达其他有效节点 // 无法再向后遍历,只能重头开始循环判断 else if (p == q) continue whileActive; //【7】 到此表示p的item为空,p的后继非空且不处于gc-unlinking状态,循环向后(右移)继续判断前驱节点 else p = q; } //【7】CAS更新x的后继为活动节点, 更新成功返回否则继续循环判断更新 if (next == p || x.casNext(next, p)) return; } while (x.item != null || x.prev == null);//节点为活动节点 或者为first node }
用于队首出队时改变prev指针、队尾出队时改变next指针
//队首出队,会设置出队节点的next指向自身,prev指向PREV_TERMINATOR
Node prevTerminator() {
return (Node) PREV_TERMINATOR;
}
//队尾出队,会设置出队节点的prev指向自身,next指向NEXT_TERMINATOR
Node nextTerminator() {
return (Node) NEXT_TERMINATOR;
}
初始化队头/队尾-void initHeadTail(Node h, Node t)
在通过有参构造ConcurrentlinkedDeque(Collection extends E> c)实例时用到
private void initHeadTail(Node十二.迭代器h, Node t) { //队列为空,有2种情况 if (h == t) { //h、t都为空,说明队列未初始化,将t、g指向一个item为空的哨兵节点 if (h == null) h = t = new Node (null); //队列初始化了 else { // Avoid edge case of a single Node with non-null item. //创建一个哨兵节点 Node newNode = new Node (null); //链接哨兵节点newNode到尾节点t的next上 t.lazySetNext(newNode); //链接尾节点t到哨兵节点newNode的prev上 newNode.lazySetPrev(t); //标记newNode为尾节点 t = newNode; } } //将头节点head指向h、尾节点tail指向t head = h; tail = t; }
ConcurrentlinkedDeque的迭代器是弱一致性的,这在并发容器中是比较普遍的现象,主要是指在A线程在遍历队列节点,而B线程尝试对某个队列节点进行修改的话不会抛出ConcurrentModificationException,这会导致在遍历某个尚未被修改的节点时,在next()返回时可以看到该节点的修改,但在遍历后再对该节点修改时就看不到这种变化。
//升序迭代器
public Iterator iterator() {
return new Itr();
}
//倒序迭代器
public Iterator descendingIterator() {
return new DescendingItr();
}
ConcurrentlinkedDeque由于其双向链表的实现,迭代器可分为升序迭代器(Itr)和倒序迭代器(DescendingItr),通过AbstractItr封装公共操作方法,Itr和DescendingItr分别实现对应不同的方法,一个从头节点开始向后进行遍历,一个从尾节点向后进行遍历,这部分和之前讲解过的linkedBlockingDeque是类似的
private class Itr extends AbstractItr {
Node startNode() { return first(); }
Node nextNode(Node p) { return succ(p); }
}
private class DescendingItr extends AbstractItr {
Node startNode() { return last(); }
Node nextNode(Node p) { return pred(p); }
}
//-------ConcurrentlinkedDeque方法--------
private abstract class AbstractItr implements Iterator {
//下一个返回的节点。即使节点放到nextNode后马上被删除,也会返回这个节点
private Node nextNode;
//nextNode的item域
private E nextItem;
//上一个返回过的节点,用来支持删除操作
private Node lastRet;
//获得first或last
abstract Node startNode();
//获得first或last
abstract Node nextNode(Node p);
AbstractItr() {
advance();
}
private void advance() {
//nextNode的item即将返回,所以把nextNode放到lastRet
lastRet = nextNode;
Node p = (nextNode == null) ? startNode() : nextNode(nextNode);
for (;; p = nextNode(p)) {
//前进方向已经到头了
if (p == null) {
nextNode = null;
nextItem = null;
//这个迭代器之后不能再前进了,已经废了
break;
}
E item = p.item;
if (item != null) {//找到了下一个,先存起来
nextNode = p;
nextItem = item;
break;
}
}
}
public boolean hasNext() {
return nextItem != null;
}
public E next() {
E item = nextItem;
if (item == null) throw new NoSuchElementException();
advance();
return item;
}
public void remove() {
Node l = lastRet;
if (l == null) throw new IllegalStateException();
l.item = null;
unlink(l);
lastRet = null;
}
}
十三.总结
ConcurrentlinkedDeque使用了自旋+CAS的非阻塞算法来保证线程并发访问时的数据一致性。由于队列本身是一种双链表结构,所以虽然算法看起来很简单,但其实需要考虑各种并发的情况,实现复杂度较高,并且ConcurrentlinkedDeque不具备实时的数据一致性,实际运用中,如果需要一种线程安全的栈结构,可以使用ConcurrentlinkedDeque。



