栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

JDK源码——AbstractQueuedSynchronizer类

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JDK源码——AbstractQueuedSynchronizer类

摘要

在Lock 接口,它在 JDK 中的实现类主要是 ReentrantLock (可译为“重入锁”)。ReentrantLock 的实现主要依赖于其内部的一个嵌套类 Sync,而 Sync 又继承自 AbstractQueuedSynchronizer (简称 AQS)。而且,不仅 ReentrantLock,其他一些并发工具类如 CountdownLatch、CyclicBarrier 等,其实现也都是基于 AQS 类。AQS 可以理解为并发包中许多类实现的基石。因此,在分析并发包中常用类的实现原理前,有必要先理解一下 AQS,之后再分析的时候就会简单不少。

AQS 内部有一个核心变量 state;此外,以 Node 类为节点维护了两种队列:主队列(main queue)和条件队列(condition queue),简单起见,分别可以将二者理解为双链表和单链表。

AQS 就像是提供了一套基础设施的设备,其它常用类如 ReentrantLock、CountdownLatch 等的内部嵌套类 Sync,都是在 AQS 提供的基础设施之上制定了自己的“游戏规则”,进而生产出了不同的产品。而它们的游戏规则都是围绕 state 变量和这两种队列进行操作的。

可重入锁(递归锁)

是指的是在同一个线程外层方法获取锁的时候,在进入线程内的方法会自动获取锁(前提是同一个对象),不会因为之前已经获取过还没有释放而阻塞。java中的ReentrantLock和Synchrogazed都是可重入锁,可重入锁的一个优点是可以一定程度的避免死锁。

package com.example.jdk;

import sun.misc.Lock;

public class JDK_Test {

    static Object objectt1=new Object();
    public static void m1(){
        new Thread(()->{
            synchronized (objectt1){
                System.out.println(Thread.currentThread().getName()+"t"+"----------外层调用");
                synchronized (objectt1){
                    System.out.println(Thread.currentThread().getName()+"t"+"----------中层调用");
                    synchronized (objectt1){
                        System.out.println(Thread.currentThread().getName()+"t"+"----------内层调用");
                    }
                }
            }
        },"t1").start();
    }

    public static void main(String[] args) {
        v1();
    }

    public static synchronized void v1(){
        System.out.println("=======外层锁");
        v2();
    }
    public static synchronized void v2(){
        System.out.println("=======中层锁");
        v3();
    }
    public static synchronized void v3(){
        System.out.println("=======内层锁");
    }
}


实现结果
t1	----------外层调用
t1	----------中层调用
t1	----------内层调用

为什么是两个呢?:主要是为了防止异常的情况下,需要彻底释放锁资源。

AQS 类
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {}

可以看到它是一个抽象类,不能直接被实例化。它的父类 AbstractOwnableSynchronizer 的主要代码如下:

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    
    private transient Thread exclusiveOwnerThread;

    // 其他代码
}

其内部主要维护了一个变量 exclusiveOwnerThread,作用是标记独占模式下的 Owner 线程,后面涉及到的时候再进行分析。

AQS成员变量

其中,head 和 tail 为主队列的头尾节点,state 为 AQS 维护的核心变量,ReentrantLock 等类中的 Sync 类实现,都是通过操作 state 来实现各自功能的。

// 主队列头节点
private transient volatile Node head;

// 主队列尾结点
private transient volatile Node tail;

// 状态,AQS 维护的一个核心变量
private volatile int state;
嵌套类

AQS 内部有两个嵌套类,分别为 Node 和 ConditionObject。

Node类
static final class Node {
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;

    // waitStatus的几种状态
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int ConDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;

    // 前驱节点(主队列)
    volatile Node prev;
    // 后继节点(主队列)
    volatile Node next;
    // 节点的线程
    volatile Thread thread;
    // 后继节点(条件队列)
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

添加到主队列用的是第二个构造器,Node 类可以理解为对线程 Thread 的封装。因此,在主队列中排队的一个个节点可以理解为一个个有模式(mode)、有状态(waitStatus)的线程。

ConditionObject 类
public class ConditionObject implements Condition, java.io.Serializable {
    
    private transient Node firstWaiter;

    
    private transient Node lastWaiter;
    // ...
}

ConditionObject 实现了 Condition 接口,它主要操作的是条件队列,这里只贴了其类签名和头尾节点,后面用到的时候再具体分析。

CAS 操作

AQS 内部通过 Unsafe 类实现了一系列 CAS (Compare And Swap) 操作。AQS 内部的许多操作是通过 CAS 来实现线程安全的。

// 获取 Unsafe 实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state、head、tail 等变量的内存偏移地址
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));
    } catch (Exception ex) { throw new Error(ex); }
}

// 一些 CAS 操作
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}

private static final boolean compareAndSetNext(Node node,
                                               Node expect,
                                               Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

 Node 节点的「独占模式」和「共享模式」,其实 AQS 也主要是围绕对这两种模式的操作进行的。Node 节点是对线程 Thread 类的封装,因此两种模式可以理解如下:

  • 独占模式(exclusive):线程对资源的访问是排他的,即某个时间只能一个线程单独访问资源;
  • 共享模式(shared):与独占模式不同,多个线程可以同时访问资源。
AQS中Node独占模式

独占模式下的操作主要有以下几个方法(可与前面分析的 Lock 接口的方法类比):

  • acquire(int arg):以独占模式获取资源,忽略中断;可以类比 Lock 接口的 lock 方法;
  • acquireInterruptibly(int arg):以独占模式获取资源,响应中断;可以类比 Lock 接口的 lockInterruptibly 方法;
  • tryAcquireNanos(int arg, long nanosTimeout):以独占模式获取资源,响应中断,且有超时等待;可以类比 Lock 接口的 tryLock(long, TimeUnit) 方法;
  • release(int arg):释放资源,可以类比 Lock 接口的 unlock 方法。
独占模式获取资源(忽略中断)
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

该方法看似很短,其实是内部做了封装。这几行代码包含了如下四个操作步骤:

  1. tryAcquire
  2. addWaiter(Node.EXECUSIVE)
  3. acquireQueued(final Node node, arg))
  4. selfInterrupt
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

该方法的作用是尝试以独占模式获取资源,若成功则返回 true。可以看到该方法是一个 protected 方法,而且 AQS 中该方法直接抛出了异常,其实是它把实现委托给了子类。这也是 ReentrantLock、CountdownLatch 等类(严格来说是其内部类 Sync)的实现功能不同的地方,这些类正是通过对该方法的不同实现来制定了自己的“游戏规则”。

若 step 1 中的 tryAcquire 方法返回 true,则表示当前线程获取资源成功,方法直接返回,该线程接下来就可以“为所欲为”了;否则表示获取失败,接下来会依次执行 step 2 和 step 3。

private Node addWaiter(Node mode) {
    // 将当前线程封装为一个 Node 节点,指定 mode
    // PS: 独占模式 Node.EXECUSIVE, 共享模式 Node.SHARED
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 通过 CAS 操作设置主队列的尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 尾节点 tail 为 null,表示主队列未初始化
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 尾节点为空,表明当前队列未初始化
        if (t == null) { // Must initialize
            // 将队列的头尾节点都设置为一个新的节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 将 node 节点插入主队列末尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

可以看到 addWaiter(Node.EXECUSIVE) 方法的作用是:把当前线程封装成一个独占模式的 Node 节点,并插入到主队列末尾(若主队列未初始化,则将其初始化后再插入)。

step 3: acquireQueued(final Node node, arg))

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 中断标志位
        boolean interrupted = false;
        for (;;) {
            // 获取该节点的前驱节点
            final Node p = node.predecessor();
            // 若前驱节点为头节点,则尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 若获取成功,则将该节点设置为头节点并返回
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 若上面条件不满足,即前驱节点不是头节点,或尝试获取失败
            // 判断当前线程是否可以休眠
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

若当前节点的前驱节点为头节点,则会再次尝试获取资源(tryAcuqire),若获取成功,则将当前节点设置为头节点并返回;否则若前驱节点不是头节点,或者获取资源失败,执行如下两个方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前驱节点的等待状态
    int ws = pred.waitStatus;
    // 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠
    if (ws == Node.SIGNAL)
        
        return true;
    // 若前驱节点的状态大于 0,表示前驱节点处于取消(CANCELLED)状态
    // 则将前驱节点跳过(相当于踢出队列)
    if (ws > 0) {
        
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        
         // 此时 waitStatus 只能为 0 或 PROPAGATE 状态,将前驱节点的等着状态设置为 SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

该方法的流程:

  1. 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠(park);
  2. 若前驱节点是取消状态 (ws > 0),则将其清理出队列,以此类推;
  3. 若前驱节点为 0 或 PROPAGATE,则将其设置为 SIGNAL 状态。

正如其名,该方法(shouldParkAfterFailedAcquire)的作用就是判断当前线程在获取资源失败后,是否可以休眠(park)。

private final boolean parkAndCheckInterrupt() {
    // 将当前线程休眠
    LockSupport.park(this);
    return Thread.interrupted();
}

该方法的作用:

  1. 使当前线程休眠(park);
  2. 返回该线程是否被中断(其他线程对其发过中断信号)

上面就是 acquireQueued(final Node node, arg)) 方法的执行过程,为了便于理解,可参考下面的流程图:

若此期间被其他线程中断过,则此时再去执行 selfInterrupt 方法去响应中断请求: 

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

以上就是 acquire 方法执行的整体流程。

以独占模式获取资源(响应中断)

该操作其实与前面的过程类似,因此分析相对简单些,代码如下:

public final void acquireInterruptibly(int arg)
        throws InterruptedException 
    // 若线程被中断过,则抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取资源
    if (!tryAcquire(arg))
        // 尝试获取资源失败
        doAcquireInterruptibly(arg);
}

tryAcquire 与前面的操作一样,若尝试获取资源成功则直接返回;否则,执行doAcquireInterruptibly:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException 
    // 将当前线程封装成 Node 节点插入主队列末尾
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 抛出中断异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

通过与前面的 acquire 方法对比可以发现,二者代码几乎一样,区别在于 acquire 方法检测到中断(parkAndCheckInterrupt)时只是记录了标志位,并未响应;而此处直接抛出了异常。这也是二者仅有的区别。

以独占模式获取资源(响应中断,且有超时)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException 
    // 若被中断,则响应
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 若超时时间小于等于 0,直接获取失败
    if (nanosTimeout <= 0L)
        return false;
    // 计算截止时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            // 已经超时了,获取失败
            if (nanosTimeout <= 0L)
                return false;
            // 若大于自旋时间,则线程休眠;否则自旋
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 若被中断,则响应
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这里有个变量 spinForTimeoutThreshold,表示自旋时间,若大于该值则将线程休眠,否则继续自旋。个人理解这里增加该时间是为了提高效率,即,只有在等待时间较长的时候才让线程休眠。该方法与 acquireInterruptibly 也是类似的,在前者的基础上增加了 timeout。

释放资源

前面分析了三种获取资源的方式,自然也有释放资源。下面分析释放资源的 release 操作:

public final boolean release(int arg) {
    // 尝试释放资源,若成功则返回 true
    if (tryRelease(arg)) {
        Node h = head;
        // 若头节点不为空,且等待状态不为 0(此时为 SIGNAL)
        // 则唤醒其后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

与 tryAcquire 方法类似,tryRelease 方法在 AQS 中也是抛出异常,同样交由子类实现:

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

unparkSuccessor 的主要作用是唤醒 node 的后继节点,代码如下:

private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    
    // 后继节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        // 若后继节点是取消状态,则从尾节点向前遍历,找到 node 节点后面一个未取消状态的节点
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒node节点的后继节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

若 node 节点的后继节点是取消状态(ws > 0),则从主队列中取其后面一个非取消状态的线程唤醒。前面三个获取资源的方法中,finally 代码块中都用到了 cancelAcquire 方法,都是获取失败时的操作,这里也分析一下:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    // 跳过取消状态的前驱节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 前驱节点的后继节点引用
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 将当前节点设置为取消状态
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // 若该节点为尾节点(后面没其他节点了),将 predNext 指向 null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 前驱节点为头节点,表明当前节点为第一个,取消时唤醒它的下一个节点
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

该方法的主要操作:

  1. 将 node 节点设置为取消(CANCELLED)状态;
  2. 找到它在队列中非取消状态的前驱节点 pred:
    1. 若 node 节点是尾节点,则前驱节点的后继设为空,
    2. 若 pred 不是头节点,且状态为 SIGNAL,则后继节点设为 node 的后继节点;
    3. 若 pred 是头节点,则唤醒 node 的后继节点。

该过程可以跟双链表删除一个节点的过程进行对比分析

AQS的Node的共享模式

与独占模式类似,共享模式下也有与之类似的相应操作,分别如下:

  1. acquireShared(int arg): 以共享模式获取资源,忽略中断;
  2. acquireSharedInterruptibly(int arg): 以共享模式获取资源,响应中断;
  3. tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式获取资源,响应中断,且有超时等待;
  4. releaseShared(int arg): 释放资源,唤醒后继节点,并确保传播。
共享模式获取资源(忽略中断)
public final void acquireShared(int arg) {
    // 返回值小于 0,表示获取失败
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// 尝试以共享模式获取资源(返回值为 int 类型)
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

与独占模式的 tryAcquire 方法类似,tryAcquireShared 方法在 AQS 中也抛出异常,由子类实现其逻辑。不同的地方在于,tryAcquire 方法的返回结果是 boolean 类型,表示获取成功与否;而 tryAcquireShared 的返回结果是 int 类型,分别为:

  1. 负数:表示获取失败;
  2. 零:表示获取成功,但后续共享模式的获取会失败;
  3. 正数:表示获取成功,后续共享模式的获取可能会成功(需要进行检测)

若 tryAcquireShared 获取成功,则直接返回;否则执行 doAcquireShared 方法:

private void doAcquireShared(int arg) {
    // 把当前线程封装成共享模式的 Node 节点,插入主队列末尾
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 中断标志位
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 若前驱节点为头节点,则尝试获取资源
            if (p == head) {
                int r = tryAcquireShared(arg);
                // 这里表示当前线程成功获取到了资源
                if (r >= 0) {
                    // 设置头节点,并传播状态(注意这里与独占模式不同)
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 是否应该休眠(与独占模式相同,不再赘述)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 取消操作(与独占模式相同)
            cancelAcquire(node);
    }
}

doAcquireShared 方法会把当前线程封装成一个共享模式(SHARED)的节点,并插入主队列末尾。addWaiter(Node mode) 方法前文已经分析过。该方法与 acquireQueued 方法的区别在于 setHeadAndPropagate 方法,把当前节点设置为头节点之后,还会有传播(propagate)行为。

private void setHeadAndPropagate(Node node, int propagate) {
    // 记录旧的头节点
    Node h = head; // Record old head for check below
    // 将 node 设置为头节点
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 后继节点为空或共享模式唤醒
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
private void doReleaseShared() {
    
    for (;;) {
        // 这里的头节点已经是上面设置后的头节点了
        Node h = head;
        // 由于该方法有两个入口(setHeadAndPropagate 和 releaseShared),需考虑并发控制
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 若头节点不变,则跳出循环;否则继续循环
        if (h == head)                   // loop if head changed
            break;
    }
}

该方法与独占模式下的获取方法 acquire 大体相似,不同在于该方法中,节点获取资源后会传播状态,即,有可能会继续唤醒后继节点。值得注意的是:该方法有两个入口 setHeadAndPropagate 和 releaseShared,可能有多个线程操作,需考虑并发控制。

以共享模式获取资源(响应中断)
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

tryAcquireShared 方法前面已分析,若获取资源失败,会执行 doAcquireSharedInterruptly 方法:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 把当前线程封装成共享模式节点,并插入主队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 与 doAcquireShared 相比,区别在于这里抛出了异常
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

从代码可以看到,acquireSharedInterruptibly 方法与 acquireShared 方法几乎完全一样,不同之处仅在于前者会抛出 InterruptedException 异常响应中断;而后者仅记录标志位,获取结束后才响应。

以共享模式获取资源(响应中断,且有超时)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

doAcquireSharedNanos:该方法可与独占模式下的超时等待方法 tryAcquireNanos(int arg, long nanosTimeout) 进行对比,二者操作基本一致

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
 释放资源,唤醒节点,传播状态
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
AQS中的Node两种模式下的场景分析

场景如下:有 T0~T4 共 5 个线程按先后顺序获取资源,其中 T2 和 T3 为共享模式,其他均为独占模式。就此场景分析:T0 先获取到资源(假设占用时间较长),而后 T1~T4 再获取则失败,会依次进入主队列。此时主队列中各个节点的状态示意图如下:

之后,T0 操作完毕并释放资源,会将 T1 唤醒。T1(独占模式) 会从 acquireQueued(final Node node, int arg) 方法的循环中继续获取资源,这时会获取成功,并将 T1 设置为头节点(T 被移除)。此时主队列节点示意图如下:

此时,T1 获取到资源并进行相关操作。而后,T1 操作完释放资源,并唤醒下一个节点 T2,T2(共享模式) 继续从 doAcquireShared(int) 方法的循环中执行。此时 T2 获取资源成功,将自身设为头节点(T1 被移除),由于后继节点 T3 也是共享模式,因此 T1 会继续唤醒T3;T3 唤醒后的操作与 T2 相同,但后继节点 T4 不是共享模式,因此不再继续唤醒。此时队列节点状态示意图如下:

此时,T2 和 T3 同时获取到资源。之后,当二者都释放资源后会唤醒 T4:

T4 获取资源的与 T1 类似。

AQS源码总结
  • AQS 是一个抽象类,无法直接进行实例化;
  • AQS 内部维护了一个核心变量 state,以及两种队列:主队列(main queue)和条件队列(condition queue);
  • AQS 提供了一套基础设施,ReentrantLock 等类通常用一个内部嵌套类 Sync 继承 AQS,并在 Sync 类中制定自己的“游戏规则”。
  • acquire: 独占模式获取资源,忽略中断;
  • acquireInterruptibly: 独占模式获取资源,响应中断;
  • tryAcquireNanos: 独占模式获取资源,响应中断,有超时;
  • release: 释放资源,唤醒主队列中的下一个线程。
  • 本文分析了以共享模式获取资源的三种方式,以及释放资源的操作。分别为:

  • acquireShared: 共享模式获取资源,忽略中断;
  • acquireSharedInterruptibly: 共享模式获取资源,响应中断;
  • tryAcquireSharedNanos: 共享模式获取资源,响应中断,有超时;
  • releaseShared: 释放资源,唤醒后继节点,并确保传播。
博文参考

JDK源码分析-AbstractQueuedSynchronizer(1) - 知乎

JDK源码分析-AbstractQueuedSynchronizer(2) - 知乎

JDK源码分析-AbstractQueuedSynchronizer(3) - 知乎

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/592427.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号