栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java并发编程之Condition接口与ConditionObject等待队列源码详解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java并发编程之Condition接口与ConditionObject等待队列源码详解

Condition接口 Condition接口能做什么

Condition接口提供了类似java.lang.Object所提供的监视器方法,配合Lock接口可以实现等待/通知模式。首先援引一张《Java并发编程的艺术》之中的表格来展示二者的特点与异同。

对比项Object监视器方法Condition
前置条件获取对象的锁调用Lock.lock()获取锁
调用Lock.newCondition()获取Condition对象
调用方式直接调用
如:object.wait()
直接调用
如:condition.await()
等待队列个数一个多个
当前线程释放锁进入等待状态支持支持
当前线程释放锁并进入等待状态
且在等待状态中不响应中断
不支持支持
当前线程释放锁并进入超时等待状态支持支持
当前线程释放锁并进入等待状态到将来的某个时间不支持支持
唤醒等待队列中的一个线程支持支持
唤醒等待队列中的全部线程支持支持
为什么需要Condition

在前文的表格中其实已经可以窥见这个问题的端倪,除了Condition所支持的等待/通知模式增加了更多等待状态的支持,最关键的在于等待队列的个数,即Object的一组监视器方法(搭配内置锁)只能支持一个等待队列,每个Condition对象都包含一个等待队列,可以使用多个Conditon(搭配Lock及同步器AQS)以实现多等待队列(每个队列对应不同等待条件)。

设想这样一种情况,若干线程在等待队列上可以有多个唤醒条件,假设出现事件A和事件B时分别唤醒唤醒某组线程,在这种情况下,如果使用内置锁搭配Object的监视器方法,显然不能满足需求,因为notify和notifyAll都不能唤醒指定线程,此时便需要使用显示锁Lock搭配Condition以实现多等待队列满足需求。

Condition接口定义
public interface Condition {
	
	void await() throws InterruptedException;
	
	void awaitUninterruptibly();
	
	long awaitNanos(long nanosTimeout) throws InterruptedException;
	
	boolean await(long time, TimeUnit unit) throws InterruptedException;
	
	boolean awaitUntil(Date deadline) throws InterruptedException;
	
	void signal();
	
	void signalAll();
}
ConditionObject等待队列实现/设计思路

ConditionObject是Condition的具体实现类,由ConditionObject实现了等待队列,等待队列是一个FIFO队列,等待队列中的每个Node节点都保存线程及相关信息,采取链式存储。
等待队列结构示意图如下:

同步队列与等待队列

当有线程尝试获取资源时,线程会被封装在Node节点中并加入同步队列,同步队列的首个节点是成功获取资源的节点,其余节点均进入阻塞状态,等待尝试获取资源。同步队列中阻塞的线程都是要等待尝试获取资源的。

当同步队列中的线程调用了ConditionObject提供的等待方法后,线程会释放当前资源,并将封装了当前线程的节点加入等待队列。在等待队列中的线程均为阻塞状态且不会尝试获取资源,等待其他线程通知后重新加入同步队列尝试获取资源。

某同步器内同步队列与等待队列结构示意图如下:

等待

调用ConditionObject提供的await()方法,同步队列的首节点会首先释放资源,然后唤醒同步队列中的后继节点,随后将当前线程加入到等待队列中并阻塞,此过程在下文源码分析中会详细介绍。
此过程示意图如下:

通知

调用ConditionObject提供的signal()方法,会唤醒在等待队列中等待时间最长的节点,即首节点,并将对应线程重新添加到同步队列中,并尝试获取资源(如果获取资源仍会被阻塞)。
此过程示意图如下:

ConditionObject源码解析

ConditionObject是同步器AbstractQueuedSynchronizer(以下简称AQS)的内部类,同步器AQS是用于实现阻塞锁和同步组件的基础框架,AQS详解见我之前的文章Java并发编程之队列同步器AQS源码详解,接下来一起通过ConditionObject以及AQS内相关实现方法源码分析等待队列是如何实现的。

注1:为了更好的理解Condition,建议先读AQS相关的文章,因为AQS中维护的同步队列和Condition所实现的等待队列都是基于Node节点实现的,先前文章已经分析过,本篇不再赘述,且二者在实际应用中搭配使用,Condition等待队列的分析也离不开AQS的同步队列。

注2:本文陈列并分析的不是全部源码,部分无关部分省略。

注3:由Condition实现的队列可称作条件队列,本文后文都将其称为等待队列,由AQS实现维护的称作同步队列。

Node节点

首先看一下Node节点的定义,源码如下:

	static final class Node {
		
		 static final Node SHARED = new Node();
		
	    static final Node EXCLUSIVE = null;
		
	    static final int CANCELLED =  1;
		
	    static final int SIGNAL    = -1;
		
	    static final int ConDITION = -2;
		
	    static final int PROPAGATE = -3;
		
	    volatile int waitStatus;
		
	    volatile Node prev;
		
	    volatile Node next;
		
	    volatile Thread thread;
		
	    Node nextWaiter;
	
	    final boolean isShared() {
	        return nextWaiter == SHARED;
	    }
		
	    final Node predecessor() throws NullPointerException {
	        Node p = prev;
	        if (p == null)
	            throw new NullPointerException();
	        else
	            return p;
	    }
	
	    Node() {
	   	}
	
	    Node(Thread thread, Node mode) {
	        this.nextWaiter = mode;
	        this.thread = thread;
	    }
	
	    Node(Thread thread, int waitStatus) {
	        this.waitStatus = waitStatus;
	        this.thread = thread;
	    }
	}
ConditionObject成员变量

随后看一下ConditionObject的成员变量

	
    private transient Node firstWaiter;
	
    private transient Node lastWaiter;
    
    private static final int REINTERRUPT =  1;
    
    private static final int THROW_IE    = -1;
    
await()等待流程

await()是最基础的Condition等待队列阻塞方法,具体代码分析见注释,其中LockSupport.park()方法的作用是阻塞当前线程,更详细的可以参考我之前的Java并发编程之LockSupport源码详解,await()源码如下:

public final void await() throws InterruptedException {
	
    if (Thread.interrupted())
        throw new InterruptedException();
    
    Node node = addConditionWaiter();
    
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    
    while (!isOnSyncQueue(node)) {
    	
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    // 根据interruptMode值 来判断如何处理中断请求
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
添加新节点

addConditionWaiter()方法的作用是添加一个新的封装了当前线程的节点到等待队列,并非直接把同步队列中的节点添加到等待队列,源码如下:

	private Node addConditionWaiter() {
        Node t = lastWaiter;
        
        if (t != null && t.waitStatus != Node.CONDITION) {
        	
            unlinkCancelledWaiters();
            
            t = lastWaiter;
        }
        
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
清除等待状态为CANCELLED的节点

unlinkCancelledWaiters()用于清除等待队列中所有等待状态为CANCELLED的节点,等待状态为CANCELLED表示当前节点封装的线程放弃等待,源码如下:

	
	private void unlinkCancelledWaiters() {
       Node t = firstWaiter;
       Node trail = null;
       while (t != null) {
           Node next = t.nextWaiter;
           if (t.waitStatus != Node.CONDITION) {
               t.nextWaiter = null;
               if (trail == null)
                   firstWaiter = next;
               else
                   trail.nextWaiter = next;
               if (next == null)
                   lastWaiter = trail;
           }
           else
               trail = t;
           t = next;
       }
    }
调用线程释放资源

fullyRelease(Node)方法的作用是释放资源并返回最后的资源状态,release()方法在AQS文中解析过,本文不再陈列,源码如下:

	final int fullyRelease(Node node) {
		
	    boolean failed = true;
	    try {
	    	
	        int savedState = getState();
	        
	        if (release(savedState)) {
	            failed = false;
	            return savedState;
	        } else {
	            throw new IllegalMonitorStateException();
	        }
	    } finally {
	    	
	        if (failed)
	            node.waitStatus = Node.CANCELLED;
	    }
	}
检查节点是否在同步队列中
	final boolean isOnSyncQueue(Node node) {
		
        if (node.waitStatus == Node.ConDITION || node.prev == null)
            return false;
        
        if (node.next != null) // If has successor, it must be on queue
            return true;
        
        return findNodeFromTail(node);
    }

findNodeFromTail(Node)方法从同步队列的队尾向队首遍历寻找Node节点,并返回相应结果

	private boolean findNodeFromTail(Node node) {
		
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
中断请求处理

根据interruptMode判断如何处理中断请求

  1. 当interruptMode为1时,进入selfInterrupt()方法重新执行一次中断
  2. 当interruptMode为0时,什么也不做
  3. 当interruptMode为-1时,立刻相应中断抛出异常
	private void reportInterruptAfterWait(int interruptMode)
       throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
其他版本的await awaitUninterruptibly()

在整个await过程中不响应中断请求,但如果在执行过程中收到中断请求,会用一个标志位记录,要退出await方法时调用selfInterrupt()重新进行一次中断。

	public final void awaitUninterruptibly() {
		
	    Node node = addConditionWaiter();
	    int savedState = fullyRelease(node);
	    boolean interrupted = false;
	    while (!isOnSyncQueue(node)) {
	        LockSupport.park(this);
	        
	        if (Thread.interrupted())
	            interrupted = true;
	    }
	    
	    if (acquireQueued(node, savedState) || interrupted)
	        selfInterrupt();
	}
awaitNanos

时限等待awaitNanos(long)、awaitUntil(Date)、await(long, TimeUnit)三个方法实现类似,此处以awaitNanos(long)为例简要分析

	public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
        	
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }
signal()通知流程

signal()会唤醒在等待队列中等待时间最长的节点(首节点),源码如下

	public final void signal() {
		
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
	private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            
            first.nextWaiter = null;
            
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    final boolean transferForSignal(Node node) {
        
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        
        Node p = enq(node);
        int ws = p.waitStatus;
        
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

signalAll()的流程与signal()十分相似,不再展开讨论

总结 显示锁与内置锁等待/通知机制异同
  • Lock()显示锁提供的等待/通知机制,要比内置锁更加灵活,提供了awaitUninterruptibly()不响应中断的等待等监视器不能提供的方法。
  • Object提供的等待/通知机制只能提供一个等待队列,而基于AQS和Condition的实现可以提供多条等待队列,能满足更多场景需求。
等待通知流程

ConditionObject实现的await()方法首先将线程封装在Node节点里,然后加入等待队列we尾,同时从AQS同步队列中的移除相应节点。之后作为while循环条件调用isonSyncQueue(Node),检查节点是否在同步队列中,如果节点不在同步队列中则会调用park()方法阻塞线程,直到被signal()中的unpark()唤醒。被唤醒后节点会被转移回同步队列,后续交由同步队列管理,线程被唤醒后会尝试获取资源,清除等待队列中等待状态为CANCELLED的节点,最后根据不同情况分别处理中断请求。


以上便是本篇文章的全部内容
作者才疏学浅,如文中出现纰漏,还望指正

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/631891.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号