Condition 是个接口,依赖于 Lock 接口的实现,基本的方法就是 await() 和 signal() 方法,是在 java 1.5 中才出现的,用于替代 Object 的 wait()、notify() 实现线程间的协作,相比使用 Object 的 wait()、notify(),使用 Condition 的 await()、signal() 这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用 Conditon中的await()对应Object的wait();
Condition 接口共包含以下这些方法:
| 方法 | 功能 |
|---|---|
| void await() throws InterruptedException | 使当前线程等待,直到它收到信号或被中断。 |
| void awaitUninterruptibly() | 导致当前线程等待,直到它收到信号,不响应中断。 |
| long awaitNanos(long nanosTimeout) throws InterruptedException | 使当前线程等待,直到它被发出信号或被中断,或者指定的等待时间过去。 |
| boolean await(long time, TimeUnit unit) throws InterruptedException | 使当前线程等待,直到它被发出信号或被中断,或者指定的等待时间过去。 |
| boolean awaitUntil(Date deadline) throws InterruptedException | 导致当前线程等待,直到它被发出信号或被中断,或者指定的截止日期过去。 |
| void signal() | 唤醒一个等待线程。 |
| void signalAll() | 唤醒所有等待的线程。 |
newCondition 执行链路为 newCondition -> sync.newCondition() -> new ConditionObject()。
创建的是 ConditionObject 类的对象,ConditionObject 是 ReentrantLock 的内部类。
二、await 等待实现 2.1 条件等待流程使当前线程等待,直到它收到信号或被中断。
先创建一个条件等待节点,然后在 fullyRelease 方法中释放当前线程获取锁的状态,并存储锁的状态码。存储状态码完成,通过 fullyRelease 判断当前 node 是否在同步队列中,如果仅在等待队列中则阻塞线程。signal 方法的调用或者中断将唤醒线程,线程唤醒后进入同步队列从新排队等待获取锁。
boolean await(long time, TimeUnit unit) 方法将等待指定时间,程序执行流程和 四、awaitNanos 定时等待实现 相同,不多加赘述。
2.2 await线程等待public final void await() throws InterruptedException {
// 当前线程为中断状态
if (Thread.interrupted())
throw new InterruptedException();
// 添加一个条件等待节点
Node node = addConditionWaiter();
// 释放锁,返回获得节点状态
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果等待节点不在同步队列中,阻塞线程(等待队列进行唤醒)
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 检查中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 进行获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清理队列中取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 根据中断状态判断判断是否抛出异常或者中断
reportInterruptAfterWait(interruptMode);
}
2.3 addConditionWaiter 等待队列添加节点
为当前线程创建 CONDITION 状态的节点,并将节点插入队列。
private Node addConditionWaiter() {
// 等待队列的末尾节点
Node t = lastWaiter;
// 如果节点已被取消,则清除节点
if (t != null && t.waitStatus != Node.CONDITION) {
// 整理队列,清除已被取消的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 节点插入队列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
2.4 unlinkCancelledWaiters 清除已取消节点
更新等待队列,从条件队列中取消链接已取消的等待节点。 仅在持有锁时调用。
private void unlinkCancelledWaiters() {
// 取得首个节点
Node t = firstWaiter;
// 缓存上个节点
Node trail = null;
while (t != null) {
// 下一个等待节点
Node next = t.nextWaiter;
// 当前节点已被取消
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 未缓存上个节点,表示当前节点为首节点
if (trail == null)
// 下一个节点做为首个节点
firstWaiter = next;
else
// 跳过当前节点
trail.nextWaiter = next;
// 如果没有下一个节点,表示被取消的节点是last节点
if (next == null)
lastWaiter = trail;
}
else
// 节点未被取消,更新上个节点的引用
trail = t;
t = next;
}
}
2.5 fullyRelease 释放锁
使用当前状态值调用 release; 返回保存状态。 取消节点并在失败时抛出异常。
final long fullyRelease(Node node) {
boolean failed = true;
try {
// 获取锁状态码
long savedState = getState();
// 释放锁,直接释放所有重入
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 释放锁失败
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 未释放锁,取消当前节点状态
node.waitStatus = Node.CANCELLED;
}
}
2.6 checkInterruptWhileWaiting 检查中断
如果是 signal 方法执行前中断则为 THROW_IE 否则为 REINTERRUP。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
// 如果是signal前中断则为 THROW_IE 否则为 REINTERRUPT
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
2.7 transferAfterCancelledWait 检查中断
transferAfterCancelledWait 是实际检查中断的方法。中断可以分为两种,如果修改 waitStatus 值成功,表示 signal 还未被执行,如果修改失败,表示 signal 已经执行。但是有可能 signal 中刚修改了 waitStatus ,还未将节点加入同步队列,线程就失去了CPU,所以需要进行判断,如果还未加入队列,执行 Thread.yield() 暂时让出CPU。
final boolean transferAfterCancelledWait(Node node) {
// 值修改成功,表示还未执行 signal
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 加入同步队列
enq(node);
return true;
}
// 判断是否在同步队列中
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
三、awaitUninterruptibly 等待实现
导致当前线程等待,直到它收到信号,不响应中断。
整体执行流程和 await 相同,但是 awaitUninterruptibly 不需要包含检查中断和将节点加入同步队列的逻辑,因为当前方法不响应中断,仅收到 signal 信号才会停止等待,signal 中会将节点加入到同步队列。
public final void awaitUninterruptibly() {
// 添加一个条件等待节点
Node node = addConditionWaiter();
// 释放锁,返回获得节点状态
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 保存中断状态,不响应中断
if (Thread.interrupted())
interrupted = true;
}
// 等待获取锁
if (acquireQueued(node, savedState) || interrupted)
// 中断当前线程
selfInterrupt();
}
四、awaitNanos 定时等待实现
使当前线程等待,直到它被发出信号或被中断,或者指定的等待时间过去。
此方法的实现和 boolean awaitUntil(Date deadline) 、boolean await(long time, TimeUnit unit) 相同。
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
// 当前线程为中断状态
if (Thread.interrupted())
throw new InterruptedException();
// 添加一个条件等待节点
Node node = addConditionWaiter();
// 释放锁,返回获得节点状态
int savedState = fullyRelease(node);
// 取得等待到时的时间
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 如果等待节点不在同步队列中,阻塞线程(等待队列进行唤醒)
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
// 定时到,将节点加入同步队列
transferAfterCancelledWait(node);
break;
}
// 阻塞指定时间
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 检查中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 更新等待时间
nanosTimeout = deadline - System.nanoTime();
}
// 进行获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
// 清理队列中取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
// 根据中断状态判断判断是否抛出异常或者中断
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
五、signal 信号实现
5.1 信号实现流程
signal 用于给 await 中阻塞的一个线程一个取消等待的信号,此时 doSignal 方法将节点从等待队列剔除,transferForSignal 方法中将节点加入同步队列。接收到信号的线程并不能立即获取到锁,还需要等待占有锁的线程释放锁。
5.2 signal 取消等待public final void signal() {
// 当前线程不是持有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 取消第一个节点的线程等待
doSignal(first);
}
5.3 doSignal 取消等待
从等待队列中删除当前节点的引用。如果当前节点已被取消等待,则对当前节点唤醒失败,继续唤醒下一个等待节点。
private void doSignal(Node first) {
do {
// 当前节点已经是最后一个节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 转换节点到同步队列,节点已被取消时返回false
(first = firstWaiter) != null);
}
5.4 transferForSignal 转换节点
修改当前节点的状态,将当前节点加入同步队列。并修改前面一个节点的状态,等待排队唤醒。
final boolean transferForSignal(Node node) {
// 如果不能更改waitStatus,则该节点已被取消
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 节点加入同步队列,并获得节点前面的一个节点
Node p = enq(node);
int ws = p.waitStatus;
//前面一个节点是被取消的等待队列节点或者修改状态为 SIGNAL 失败
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒线程
LockSupport.unpark(node.thread);
return true;
}
六、signalAll 信号实现
signalAll 通过 doSignalAll 方法进行批量唤醒,传入首个节点。
private void doSignalAll(Node first) {
// 引用设置为空
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
// 清除引用
first.nextWaiter = null;
// 转换节点到同步队列,节点已被取消时返回false
transferForSignal(first);
first = next;
} while (first != null);
}



