栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java并发编程之队列同步器AQS源码详解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java并发编程之队列同步器AQS源码详解

什么是AQS 简介

队列同步器AbstractQueuedSynchronizer简称AQS,是用于实现阻塞锁和同步组件的基础框架,JUC中的同步工具类闭锁CountDownLatch、信号量Semaphore以及重入锁ReentrantLock和读写锁ReentrantReadWriteLock都是基于AQS实现(同步工具类栅栏CyclicBarrier虽与AQS没有直接关系,但其内含重入锁,与AQS脱不了干系)。

AQS的作用

AQS内部包含了一个volatile整型变量state和一个FIFO同步队列

  • state顾名思义用于表示状态,而状态具体是什么,取决于实现类的需求,例如在信号量Semaphore的实现中,state表示许可(permit),同时最多可以有state个许可可以分发给线程用于获取资源。
  • 同步队列用于管理同步状态(线程获取释放资源后的状态,非state变量),同步队列可以用来保存获取失败的线程引用等信息,同他提供了线程排队、同步队列中线程的阻塞与唤醒等操作。

锁和同步组件的实现难免会需要管理状态变量和各个尝试获取、释放资源的线程,AQS将这部操作抽象了出来,并提供了相关的处理方法。这样做简化了锁的实现细节,使得基于AQS实现的锁与同步组件,无需关心状态、线程管理等“底层操作”,只需要关心锁的具体实现即可。

AQS的实现思路

AQS的主要功能是状态信息state变量的管理以及使用同步队列管理线程同步状态

首先是用于表示状态的变量state,用途不再赘述,该变量被volatile修饰,保证了变量的可见性,同时提供了三个能够保证对状态的读写操作是安全的方法,供子类实现使用,他们分别是:

  • getState()
  • setState(int)
  • compareAndSetState(int,int)

同步队列是一个FIFO(先入先出)的双向队列,头节点是成功获取资源的节点,采用链式存储。AQS包含一个静态内部类Node,Node代表一个节点,用于保存获取同步状态失败的线程引用、等待状态和前驱后继节点,具体会在后文源码部分解析,同步队列的基本组成单位就是Node。同步队列基本结构示意图如下:

不同的锁和同步工具获取资源的逻辑不同,他们需要自己实现获取资源和释放资源的具体逻辑,此外AQS还提供了独占式和共享式两种资源获取方式,AQS已经封装好了部分获取资源的方法,比如acquire()方法用来独占式获取资源、acquireShared()方法用来共享式获取资源,此外AQS提供了一些方法供子类重写来实现其具体逻辑:

  • tryAcquire(int):独占式尝试获取资源,成功返回true,失败返回false。
  • tryRelease(int):独占式尝试释放资源,成功返回true,失败返回false。
  • tryAcquireShared(int):共享式尝试获取资源,返回剩余可用资源数,返回值负数时表示获取失败,返回值正数时表示获取成功。
  • tryReleaseShared(int):共享式尝试释放资源,成功返回true,失败返回false。
AQS源码解析

Condition相关内容不在本文讨论

同步队列头尾节点引用及state
	//head永远指向同步队列头节点
	private transient volatile Node head;
	//head永远指向同步队列尾节点
    private transient volatile Node tail;
	//资源状态变量
    private volatile int state;

对于以上三个变量,AQS都提供了CAS操作用于保证对变量写操作的原子性,具体方法如下:

	protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

	private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
静态内部类Node
	static final class Node {
		
		 static final Node SHARED = new Node();
		
	    static final Node EXCLUSIVE = null;
		
	    static final int CANCELLED =  1;
		
	    static final int SIGNAL    = -1;
		
	    static final int ConDITION = -2;
		
	    static final int PROPAGATE = -3;
		
	    volatile int waitStatus;
		
	    volatile Node prev;
		
	    volatile Node next;
		
	    volatile Thread thread;
		
	    Node nextWaiter;
	
	    final boolean isShared() {
	        return nextWaiter == SHARED;
	    }
		
	    final Node predecessor() throws NullPointerException {
	        Node p = prev;
	        if (p == null)
	            throw new NullPointerException();
	        else
	            return p;
	    }
	
	    Node() {
	   	}
	
	    Node(Thread thread, Node mode) {
	        this.nextWaiter = mode;
	        this.thread = thread;
	    }
	
	    Node(Thread thread, int waitStatus) {
	        this.waitStatus = waitStatus;
	        this.thread = thread;
	    }
	}
获取资源

本部分主要聚焦独占式获取资源方法acquire()方法以及共享式获取资源方法acquireShared()的执行流程,同时会简要说明acquireInterruptibly()方法与tryAcquireNanos()方法的执行流程(因为实现大同小异)。

独占式与共享式获取资源方式对比
  • 独占式:当资源被独占式获取时,其余尝试获取资源的线程均会被阻塞。
  • 共享式:当资源被共享式获取时,独占式获取会被阻塞,如果有剩余资源,则允许其他共享式同时获取资源。

示意图如下:

独占式获取资源 acquire(int)
	public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire方法是独占式获取资源的顶层方法,该方法会尝试获取资源,如果成功获取则返回,获取失败则进入同步队列,直到成功获取为止,该方法执行流程如下

  1. tryAcquire()方法,该方法在AQS中只会抛出一个UnsupportedOperationException,故需要子类具体实现,在该方法尝试获取资源成功时返回true(自然if条件也就不成立,acquire方法也会返回),获取失败是返回false,继续执行下面的方法。
  2. addWaiter()方法,将当前线程(创建一个新的Node实例)插入同步队列尾,并标记为独占模式。
  3. acquireQueued()方法,自旋的获取资源,如果成功拿到资源则返回false,如果在等待过程中收到了中断通知,则返回true,执行下一个方法。
  4. selfInterrupt()方法,线程等待过程中收到中断通知不会立刻响应,而是延迟到成功获取资源后使用该方法再次中断。

接下来来关注上述出现的方法和其执行流程(tryAcquire方法具体实现取决于子类,此处不表)

addWaiter(Node)

addWaiter(Node):将当前线程插入同步队列尾,并将封装了当前线程的Node节点返回

	
	private Node addWaiter(Node mode) {
		
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        
        if (pred != null) {
            node.prev = pred;
             
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        
        enq(node);
        return node;
    }
enq(Node)

enq(Node):自旋的将节点插入同步队列队尾,成功时返回

	private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
acquireQueued(Node,int)

acquireQueued(Node,int):自旋的获取资源

	final boolean acquireQueued(final Node node, int arg) {
		
        boolean failed = true;
        try {
        	
            boolean interrupted = false;
            for (;;) {
            	
                final Node p = node.predecessor();
                
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    
                    interrupted = true;
            }
        } finally {
        	
            if (failed)
                cancelAcquire(node);
        }
    }

当成功获取资源时,设置首节点操作过程示意图如下:

当try块发生异常时,会进入cancelAcquire()方法,用于取消当前线程获取资源的操作

cancelAcquire(Node)

cancelAcquire():通俗点说,cancelAcquire()的核心功能就是要使node节点从同步队列出队,node节点所处位置可能有三种情况

  1. node节点是尾节点:则将pred(node节点前最近一个等待状态非CANCELLEDd的节点)节点设为tail尾节点并将pred的next设为null。
  2. pred是头节点:则将node节点后第一个等待状态非CANCELLED的节点唤醒。
  3. 非以上两种情况:将pred的等待状态设为SIGNAL,使pred的next指向node的next节点。
	private void cancelAcquire(Node node) {
        if (node == null)
            return;
		
        node.thread = null;

        
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;
		
        node.waitStatus = Node.CANCELLED;

        
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
            	
                unparkSuccessor(node);
            }

            node.next = node;
        }
    }
unparkSuccessor(Node)

unparkSuccessor():找到并唤醒节点node后可以获取资源的节点

	private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        
        if (s != null)
            LockSupport.unpark(s.thread);
    }
shouldParkAfterFailedAcquire(Node,Node) && parkAndCheckInterrupt()

shouldParkAfterFailedAcquire():主要用于判断判断当前节点是否需要被挂起

本方法做的事情通俗点讲就是,当前线程无法获取资源,应该暂时被挂起,那么当前线程在被挂起之前,必须要找到一个线程在他可以获取资源时将其唤醒,这个线程就是他等待状态为SIGNAL的前驱节点,什么时候找到了这个节点,什么时候当前线程就可用安全的被挂起。

	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        
        if (ws == Node.SIGNAL)
            return true;
        
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        	
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

挂起当前线程

	private final boolean parkAndCheckInterrupt() {
        LockSupport不在本文介绍
        LockSupport.park(this);
        return Thread.interrupted();
    }
selfInterrupt()

将当前线程的中断标志位设为true

	static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
独占式获取资源的流程梳理
  1. 首先调用由子类自定义实现的方法tryAcquire()尝试获取资源,如果成功获取,则返回。
  2. 如果尝试获取失败,说明有其他线程正在持有目标资源,需要将当前线程封装到一个Node实例中并加入同步队列尾,使用addWaiter()和enq()方法完成此步操作。
  3. acquireQueued()方法的中的try块内操作如果出现异常,则调用cancelAcquire()方法进行处理,让当前节点出队并做相关处理。
  4. 当try块内操作未出现异常时,如果封装了当前线程的节点是头节点,则继续使用tryAcquire()尝试获取资源如获取资源成功,则旧头节点出队等待垃圾收集,将当前节点设置为新的头节点后返回。
  5. 如果当前节点不是头节点,则在acquireQueued()方法中会反复调用shouldParkAfterFailedAcquire()方法,此时当前节点会在同步队列中寻找合适的节点(等待状态为SIGNAL的节点)并插入其后,以实现安全的挂起,等待前驱节点释放资源后唤醒。
  6. 如果在上述过程中,线程被中断过,parkAndCheckInterrupt()方法会返回true,并将当前线程中断标志位复位,随后调用selfInterrupt()方法将当前线程中断标志位设为true。

调用流程图如下(图片内容源自《Java并发编程的艺术》):

独占式可中断的获取资源

前文提到的独占式获取资源,节点在等待队列收到中断通知时,不会立即处理,而是延迟到成功获取资源后补上一次中断通知,再根据后续程序进行处理。响应的AQS还提供了可以响应中断的资源获取方式,顶层方法acquireInterruptibly()。许多实现与独占式相同,下文不再过多重复。

acquireInterruptibly(int)

与acquire()方法内容基本相同,只是声明了会抛出异常InterruptedException

	public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
doAcquireInterruptibly(int)

方法中不再像acquireQueued()方法一样设一个interrupted中断标记,而是检查到中断通知后立即抛出异常

	private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
独占超时等待式获取资源

在限定时间内等待并尝试获取锁, 超出限定时间则会取消获取资源

tryAcquireNanos(int,long)

可以看出此种获取方式也是即时响应中断通知的

	public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
doAcquireNanos(int,long)
	private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    
                    LockSupport.parkNanos(this, nanosTimeout);
                
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
共享式获取资源

共享式获取资源的实现与独占式获取资源的实现有许多重复的地方,后文不再赘述,只选取共享式获取资源独特的部分讲解,首先看一下共享式获取资源的顶层方法

acquireShared(int)

与独占式相同,同样先调用由子类自定义实现的tryAcquireShared()方法尝试获取资源,如果获取资源成功将返回一个大于等于0的值,具体意义取决于子类实现,获取失败时进入执行doAcquireShared()方法。

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
doAcquireShared()

本方法与acquireQueued(Node,int)方法实现大体相同

	private void doAcquireShared(int arg) {
		
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                    	
                        setHeadAndPropagate(node, r);
                        p.next = null;
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
        	
            if (failed)
                cancelAcquire(node);
        }
    }
setHeadAndPropagate(Node,int)
	private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
            	
                doReleaseShared();
        }
    }

本方法体现了共享式的传播性,即共享模式同时允许多条线程获取资源,当一个线程获取了共享资源后,如果资源还有剩余,要通知后继共享节点获取资源,目的是使其他线程尽快的获取资源。

共享式获取资源流程简单总结

共享式获取资源与独占式获取资料流程大体相同,可以参考之前的分析,二者最主要的区别体现在唤醒了一个节点后,还要继续检查是否仍有剩余资源,如果仍有剩余资源会继续唤醒后继共享节点

释放资源 独占式释放资源 release()

release方法非常容易理解,首先调用一次tryRelease()方法,如果释放资源成功,则唤醒可以获取资源的后继节点(unparkSuccessor()方法前文已提及,不再此处赘述),并返回true,通知调用者释放资源成功,如果释放资源失败,则返回false

	public final boolean release(int arg) {
		
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
共享式释放资源 releaseShared()

同样容易理解,先调用由子类实现的tryReleaseShared()尝试释放资源,成功时则进入doReleaseShared()唤醒后继节点以获取资源,成功返回true,失败返回false

	public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
doReleaseShared()
	private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                	
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    
                    unparkSuccessor(h);
                }
                
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            
            if (h == head)
                break;
        }
    }

以上便是本篇文章的全部内容
作者才疏学浅,如文中出现纰漏,还望指正

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/445326.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号