栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

JDK8 AbstractQueuedSynchronizer(AQS) 源码 之 独占模式与共享模式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JDK8 AbstractQueuedSynchronizer(AQS) 源码 之 独占模式与共享模式

目录
  • 一、前言
  • 二、带着问题看源码
    • 2.1 源码里独占模式和共享模式字段对应的值是什么,这样设计带来的影响是什么
    • 2.2 Node.waitStatus在AQS中有哪些状态,代表意义,设置或变更时机是什么
    • 2.3 acquire/acquireInterruptibly/tryAcquireNanos三者的区别是什么
  • 三、public方法
    • 3.1 独占模式
      • 3.1.1 简略流程图
        • 3.1.1.1 acquire方法
        • 3.1.1.2 acquireInterruptibly方法
        • 3.1.1.3 tryAcquireNanos方法
        • 3.1.1.4 release方法
      • 3.1.2 方法列表
    • 3.2 共享模式
      • 3.2.1 简略流程图
        • 3.2.1.1 acquireShared
        • 3.2.1.2 releaseShared
      • 3.2.2 共享模式用到,独占模式没用到的方法列表
  • 四、独占模式源码分析
    • 4.1 acquire 可以被外部类调用的public方法
    • 4.2 tryAcquire 需要被子类重写(这里以ReentrantLock为例)
    • 4.3 addWaiter 将当前线程包装成节点并入队
    • 4.4 enq 初始化头结点并将当前节点入队
    • 4.5 acquireQueued 线程在当前方法循环阻塞、唤醒、尝试获取资源、失败阻塞,成功退出的过程
    • 4.6 setHead 设置头结点,头结点占有线程
    • 4.7 shouldParkAfterFailedAcquire 判断获取资源失败后是否应该获取资源。主要工作:修改前驱节点的状态为SIGNAL,清除队列中被取消的节点(waitStatus为CANCELLED)
    • 4.8 parkAndCheckInterrupt 阻塞当前线程,当其唤醒后,会返回当前线程是否被中断,并清除中断标记位
    • 4.9 cancelAcquire 当前节点设为CANCELLED,并移除同步等待队列
    • 4.10 selfInterrupt 当前线程设置中断标记
    • 4.11 unparkSuccessor 唤醒后继者
    • 4.12 tryAcquireNanos 尝试指定时间内能否获取锁。可以被外部类调用的public方法
    • 4.13 doAcquireNanos 获取锁,等待指定时间,超时被唤醒再次尝试获取资源失败后返回false
    • 4.1.4 acquireInterruptibly 可中断地获取资源。可以被外部类调用的public方法
    • 4.15 doAcquireInterruptibly 可中断地获取资源。
    • 4.16 release 释放资源/释放锁。可以被外部类调用的public方法
    • 4.17 tryRelease 需要被子类重写。尝试释放资源(这里以ReentrantLock为例)
  • 五、共享模式源码分析

一、前言

  AQS逻辑分为三部分:独占模式(锁标识为EXCLUSIVE)、共享模式(锁标识为SHARED)、条件队列(也就是内部类ConditionObject里的逻辑)。

  其中,条件队列依附于独占模式(只有独占模式情况下才会用到ConditionObject)。本文主要讲独占模式及共享模式下涉及到的源码。

  本文适合在理解Node内部类、CAS后阅读

二、带着问题看源码 2.1 源码里独占模式和共享模式字段对应的值是什么,这样设计带来的影响是什么 2.2 Node.waitStatus在AQS中有哪些状态,代表意义,设置或变更时机是什么 2.3 acquire/acquireInterruptibly/tryAcquireNanos三者的区别是什么 三、public方法 3.1 独占模式

独占模式有四个公共方法:
获取资源:acquire,acquireInterruptibly,tryAcquireNanos
释放资源:release

3.1.1 简略流程图 3.1.1.1 acquire方法

3.1.1.2 acquireInterruptibly方法

3.1.1.3 tryAcquireNanos方法

3.1.1.4 release方法

3.1.2 方法列表
  • acquire
  • tryAcquire
  • addWaiter
  • enq
  • acquireQueued
  • setHead
  • shouldParkAfterFailedAcquire
  • parkAndCheckInterrupt
  • cancelAcquire
  • unparkSuccessor
  • LockSupport.unpark
  • tryAcquireNanos
  • doAcquireNanos
  • acquireInterruptibly
  • doAcquireInterruptibly
  • release
  • tryRelease
3.2 共享模式

共享模式同样有四个公共方法:
获取资源:acquireShared,acquireSharedInterruptibly,tryAcquireSharedNanos
释放资源:releaseShared

共享模式下的四个公共方法基本逻辑与独占模式相同,最主要的不同点在于获取到资源时的处理不同setHeadAndPropagate

3.2.1 简略流程图 3.2.1.1 acquireShared

3.2.1.2 releaseShared

3.2.2 共享模式用到,独占模式没用到的方法列表
  • acquireShared
  • tryAcquireShared
  • doAcquireShared
  • setHeadAndPropagate
  • doReleaseShared
  • doAcquireSharedInterruptibly
  • doAcquireSharedNanos
四、独占模式源码分析 4.1 acquire 可以被外部类调用的public方法
	
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
4.2 tryAcquire 需要被子类重写(这里以ReentrantLock为例)

用于被子类重写,判断当前能否直接获取到锁。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

以ReentrantLock重写的非公平方法为例。先获取当前占用资源数,判断如果是0,或者当前线程已经获取过了,则获取成功,否则获取锁失败,返回false。

   protected final boolean tryAcquire(int acquires) {
       return nonfairTryAcquire(acquires);
   }
   
   final boolean nonfairTryAcquire(int acquires) {
       final Thread current = Thread.currentThread();
       int c = getState();
       if (c == 0) {
           if (compareAndSetState(0, acquires)) {
               setExclusiveOwnerThread(current);
               return true;
           }
       }
       else if (current == getExclusiveOwnerThread()) {
           int nextc = c + acquires;
           if (nextc < 0) // overflow
               throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
       }
       return false;
   }
4.3 addWaiter 将当前线程包装成节点并入队
	
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 如果队列中尾指针不为空,尝试将当前节点放到尾节点上,并且cas尝试将尾指针指向当前节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 不断尝试入队
        enq(node);
        return node;
    }

可以看到,Node构造函数传了两个参数:当前线程和mode模式。
mode模式有两种:

  • 共享锁标识 static final Node SHARED = new Node();
  • 独占锁标识 static final Node EXCLUSIVE = null;

nextWaiter被赋值为mode,说明如果使用了等待队列,那么当前模式一定是独占模式的(独占模式下nextWaiter才可以被赋值,共享模式不可以被赋值,赋值后无法判断是否为共享模式)

 		
        Node(Thread thread, Node mode) {     
            this.nextWaiter = mode;
            this.thread = thread;
        }
4.4 enq 初始化头结点并将当前节点入队
 
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 如果尾节点为null,说明队列为空,需要新建头结点,然后下次循环将当前节点加进去
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 有头结点则cas设置尾节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
4.5 acquireQueued 线程在当前方法循环阻塞、唤醒、尝试获取资源、失败阻塞,成功退出的过程
	
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取前序节点,如果前序节点是头,并且当前线程能够获取到资源,则将当前节点变为头,并且使之前的头脱离队列
                final Node p = node.predecessor();
                // 如果p是头结点(说明node是队列中除了头的唯一节点)并且尝试获取资源成功
                if (p == head && tryAcquire(arg)) {
                    // 设置当前节点为头结点,并且将原来的头移除队列
                    setHead(node);
                    // 原来头的后继指向null
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果前序节点不是头,或者获取资源失败,
                // 正常来讲,shouldParkAfterFailedAcquire这里node为队尾,p为node的前驱节点。
                // 如果p的waitStatus为0,设为SIGNAL,返回false进入下轮循环。
                // 如果为SIGNAL返回true进入parkAndCheckInterrupt
                if (shouldParkAfterFailedAcquire(p, node) &&
                        // 在这里阻塞,被唤醒后返回当前线程中断状态并清除中断标记。如果被中断过返回true,进入if方法内部
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // try抛异常则取消获取资源
            if (failed)
                cancelAcquire(node);
        }
    }
4.6 setHead 设置头结点,头结点占有线程
 
    private void setHead(Node node) {
        head = node;
        // 头结点不占有线程
        node.thread = null;
        node.prev = null;
    }
4.7 shouldParkAfterFailedAcquire 判断获取资源失败后是否应该获取资源。主要工作:修改前驱节点的状态为SIGNAL,清除队列中被取消的节点(waitStatus为CANCELLED)
 
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 如果前序节点的waitStatus状态为SIGNAL,说明当前节点等待被唤醒,可以park阻塞
        if (ws == Node.SIGNAL)
            return true;
        // 如果是CANCELLED状态,从前序节点往前找,将所有CANCELLED状态的节点移除队列
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 如果是0、-3、-2,比较并设置前驱节点的状态为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 不进行park阻塞操作
        return false;
    }
4.8 parkAndCheckInterrupt 阻塞当前线程,当其唤醒后,会返回当前线程是否被中断,并清除中断标记位

关于LockSupport可以看这篇文章 LockSupport源码理解

	
    private final boolean parkAndCheckInterrupt() {
        // 线程中断或者被unpark都可以被唤醒
        LockSupport.park(this);
        return Thread.interrupted();
    }

Thread.interrupted() 方法会返回中断标记和清除中断标记。true为已中断

4.9 cancelAcquire 当前节点设为CANCELLED,并移除同步等待队列

一个比较令人费解的方法。unparkSuccessor的时机我不太理解。

	
    private void cancelAcquire(Node node) {
        if (node == null)
            return;

        node.thread = null;
        // 取到此节点之前最近的状态小于0的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // 循环执行结束后,node.prev的waitStatus<=0
        // 记录pred的下一个节点,用于cas设置pred的next
        Node predNext = pred.next;
        // 设置当前节点状态为CANCELLED
        node.waitStatus = Node.CANCELLED;
        // 如果这个节点已经是尾节点,就把尾节点设置为pred(非CANCELLED的节点)
        if (node == tail && compareAndSetTail(node, pred)) {
            // cas设置pred的next为null
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            // 如果node已经不是尾节点了
            int ws;
            // 防止pred为头结点,并且pred的等待状态为SIGNAL时才能往下走
            // 如果pred不是头结点 并且 (pred.waitStatus为SIGNAL 或 pred.waitStatus<=0且cas设置为SIGNAL成功) 并且 pred.thread不为null
            if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                // 进到这里说明node后面被加了节点。所以要将pred的后继指向node的后继
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // node != tail && ( pred == head || pred.thread == null || pred.waitStatus!=Node.SIGNAL && (pred.waitStatus>0|| casFailed))
                // pred为头结点,或者 pred被取消了 或者 cas设置pred的状态为SIGNAL失败了?才去唤醒node的后继节点
                unparkSuccessor(node);
            }
            // 为什么这样会help GC呢
            node.next = node; // help GC
        }
    }
4.10 selfInterrupt 当前线程设置中断标记
	
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
4.11 unparkSuccessor 唤醒后继者

我一直理解唤醒的都是头结点的后继者。但是该方法是唤醒了入参的后继,我还没有找到传参为头之外的情况。

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 如果状态小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3,将其设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾结点开始从后往前开始遍历
            for (Node t = tail; t != null && t != node; t = t.prev)
                // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点, 保存结点
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 该结点不为为空,释放许可,在这里阻塞的线程被唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }
4.12 tryAcquireNanos 尝试指定时间内能否获取锁。可以被外部类调用的public方法
 
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 先尝试获取资源是否成功,失败的话尝试指定时间内是否能获取锁成功
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }
4.13 doAcquireNanos 获取锁,等待指定时间,超时被唤醒再次尝试获取资源失败后返回false

和requireQueued的区别:

  • 唤醒方式不同:
    • requireQueued: 被其它线程LockSupport.unpark、被中断才会被唤醒
    • doAcquireNanos: 被其它线程LockSupport.unpark、被中断、超时会被唤醒
  • 退出循环方式不同:
    • requireQueued: 被唤醒后,在循环中通过判断当前节点是否为头的后继节点和是否能获取资源来退出循环
    • doAcquireNanos: 被唤醒后,在循环中判断完是否获取资源后,还会判断是否超时来退出循环
  • 中断处理不同:
    • requireQueued: 不处理中断,只返回中断信息
    • doAcquireNanos: 中断后抛出异常
	
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 绝对结束时间
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                // 如果超时了,直接返回false
                if (nanosTimeout <= 0L)
                    return false;
                // 判断是否需要阻塞,
                // 如果等待时间比自旋时间小,则用for循环等待获取锁。
                // 否则用LockSupport阻塞对应时间,被唤醒后在下一轮循环判断能否获取锁并退出
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 可以响应中断,抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
4.1.4 acquireInterruptibly 可中断地获取资源。可以被外部类调用的public方法
	
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
4.15 doAcquireInterruptibly 可中断地获取资源。

和acquireQueued基本完全相同,只是对中断的处理不同。

    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                // 遇到中断则抛出异常
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
4.16 release 释放资源/释放锁。可以被外部类调用的public方法
    public final boolean release(int arg) {
        // 如果尝试释放资源成功(释放资源,如果全部释放,资源不再被锁住)
        if (tryRelease(arg)) {
            Node h = head;
            // 如果头节点不为空且头的waitStatus不为0,释放后继者
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
4.17 tryRelease 需要被子类重写。尝试释放资源(这里以ReentrantLock为例)
 		
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
五、共享模式源码分析
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/854510.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号