Condition接口提供了类似java.lang.Object所提供的监视器方法,配合Lock接口可以实现等待/通知模式。首先援引一张《Java并发编程的艺术》之中的表格来展示二者的特点与异同。
| 对比项 | Object监视器方法 | Condition |
|---|---|---|
| 前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁 调用Lock.newCondition()获取Condition对象 |
| 调用方式 | 直接调用 如:object.wait() | 直接调用 如:condition.await() |
| 等待队列个数 | 一个 | 多个 |
| 当前线程释放锁进入等待状态 | 支持 | 支持 |
| 当前线程释放锁并进入等待状态 且在等待状态中不响应中断 | 不支持 | 支持 |
| 当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
| 当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
| 唤醒等待队列中的一个线程 | 支持 | 支持 |
| 唤醒等待队列中的全部线程 | 支持 | 支持 |
在前文的表格中其实已经可以窥见这个问题的端倪,除了Condition所支持的等待/通知模式增加了更多等待状态的支持,最关键的在于等待队列的个数,即Object的一组监视器方法(搭配内置锁)只能支持一个等待队列,每个Condition对象都包含一个等待队列,可以使用多个Conditon(搭配Lock及同步器AQS)以实现多等待队列(每个队列对应不同等待条件)。
设想这样一种情况,若干线程在等待队列上可以有多个唤醒条件,假设出现事件A和事件B时分别唤醒唤醒某组线程,在这种情况下,如果使用内置锁搭配Object的监视器方法,显然不能满足需求,因为notify和notifyAll都不能唤醒指定线程,此时便需要使用显示锁Lock搭配Condition以实现多等待队列满足需求。
Condition接口定义public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
ConditionObject等待队列实现/设计思路
ConditionObject是Condition的具体实现类,由ConditionObject实现了等待队列,等待队列是一个FIFO队列,等待队列中的每个Node节点都保存线程及相关信息,采取链式存储。
等待队列结构示意图如下:
当有线程尝试获取资源时,线程会被封装在Node节点中并加入同步队列,同步队列的首个节点是成功获取资源的节点,其余节点均进入阻塞状态,等待尝试获取资源。同步队列中阻塞的线程都是要等待尝试获取资源的。
当同步队列中的线程调用了ConditionObject提供的等待方法后,线程会释放当前资源,并将封装了当前线程的节点加入等待队列。在等待队列中的线程均为阻塞状态且不会尝试获取资源,等待其他线程通知后重新加入同步队列尝试获取资源。
某同步器内同步队列与等待队列结构示意图如下:
调用ConditionObject提供的await()方法,同步队列的首节点会首先释放资源,然后唤醒同步队列中的后继节点,随后将当前线程加入到等待队列中并阻塞,此过程在下文源码分析中会详细介绍。
此过程示意图如下:
调用ConditionObject提供的signal()方法,会唤醒在等待队列中等待时间最长的节点,即首节点,并将对应线程重新添加到同步队列中,并尝试获取资源(如果获取资源仍会被阻塞)。
此过程示意图如下:
ConditionObject是同步器AbstractQueuedSynchronizer(以下简称AQS)的内部类,同步器AQS是用于实现阻塞锁和同步组件的基础框架,AQS详解见我之前的文章Java并发编程之队列同步器AQS源码详解,接下来一起通过ConditionObject以及AQS内相关实现方法源码分析等待队列是如何实现的。
注1:为了更好的理解Condition,建议先读AQS相关的文章,因为AQS中维护的同步队列和Condition所实现的等待队列都是基于Node节点实现的,先前文章已经分析过,本篇不再赘述,且二者在实际应用中搭配使用,Condition等待队列的分析也离不开AQS的同步队列。
注2:本文陈列并分析的不是全部源码,部分无关部分省略。
注3:由Condition实现的队列可称作条件队列,本文后文都将其称为等待队列,由AQS实现维护的称作同步队列。
Node节点首先看一下Node节点的定义,源码如下:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int ConDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
ConditionObject成员变量
随后看一下ConditionObject的成员变量
private transient Node firstWaiter;
private transient Node lastWaiter;
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
await()等待流程
await()是最基础的Condition等待队列阻塞方法,具体代码分析见注释,其中LockSupport.park()方法的作用是阻塞当前线程,更详细的可以参考我之前的Java并发编程之LockSupport源码详解,await()源码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 根据interruptMode值 来判断如何处理中断请求
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
添加新节点
addConditionWaiter()方法的作用是添加一个新的封装了当前线程的节点到等待队列,并非直接把同步队列中的节点添加到等待队列,源码如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
清除等待状态为CANCELLED的节点
unlinkCancelledWaiters()用于清除等待队列中所有等待状态为CANCELLED的节点,等待状态为CANCELLED表示当前节点封装的线程放弃等待,源码如下:
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
调用线程释放资源
fullyRelease(Node)方法的作用是释放资源并返回最后的资源状态,release()方法在AQS文中解析过,本文不再陈列,源码如下:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
检查节点是否在同步队列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.ConDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
findNodeFromTail(Node)方法从同步队列的队尾向队首遍历寻找Node节点,并返回相应结果
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
中断请求处理
根据interruptMode判断如何处理中断请求
- 当interruptMode为1时,进入selfInterrupt()方法重新执行一次中断
- 当interruptMode为0时,什么也不做
- 当interruptMode为-1时,立刻相应中断抛出异常
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
其他版本的await
awaitUninterruptibly()
在整个await过程中不响应中断请求,但如果在执行过程中收到中断请求,会用一个标志位记录,要退出await方法时调用selfInterrupt()重新进行一次中断。
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
awaitNanos
时限等待awaitNanos(long)、awaitUntil(Date)、await(long, TimeUnit)三个方法实现类似,此处以awaitNanos(long)为例简要分析
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
signal()通知流程
signal()会唤醒在等待队列中等待时间最长的节点(首节点),源码如下
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signalAll()的流程与signal()十分相似,不再展开讨论
总结 显示锁与内置锁等待/通知机制异同- Lock()显示锁提供的等待/通知机制,要比内置锁更加灵活,提供了awaitUninterruptibly()不响应中断的等待等监视器不能提供的方法。
- Object提供的等待/通知机制只能提供一个等待队列,而基于AQS和Condition的实现可以提供多条等待队列,能满足更多场景需求。
ConditionObject实现的await()方法首先将线程封装在Node节点里,然后加入等待队列we尾,同时从AQS同步队列中的移除相应节点。之后作为while循环条件调用isonSyncQueue(Node),检查节点是否在同步队列中,如果节点不在同步队列中则会调用park()方法阻塞线程,直到被signal()中的unpark()唤醒。被唤醒后节点会被转移回同步队列,后续交由同步队列管理,线程被唤醒后会尝试获取资源,清除等待队列中等待状态为CANCELLED的节点,最后根据不同情况分别处理中断请求。
以上便是本篇文章的全部内容
作者才疏学浅,如文中出现纰漏,还望指正



