栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

JUC之AQS源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JUC之AQS源码解析

JUC之AQS源码解析 一、AQS概述

AQS是AbstractQueuedSynchronizer,就是抽象队列同步器,JDK的原话是Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. 翻译过来就是实现了基于FIFO队列的阻塞同步器。阻塞的对象就是线程。

AQS的用处是当我们使用线程在对资源的抢夺时,不用关心线程如果抢不到资源后如何处理,以及后续资源空闲时再抢资源时如何处理,这些AQS都已经实现,我们只要关心如何抢夺资源就行,举个例子,就跟去医院看病一样,你只用关心跟医生描述病情即可,其他的排队挂号,等待叫号都是由医院负责,AQS就是这个医院。

AQS分为两种资源抢夺模式,一种是独占模式,即一个线程抢到,其他线程只能等待占用线程释放资源后再继续抢夺。另外一种是共享模式,即可以多个线程抢到,不过一般会设置资源使用上限,只允许n个线程使用资源,其他线程则进行等待。

二、AQS的结构

AbstractQueuedSynchronizer的结构

属性属性说明
volatile head 指向等待队列的头节点,volatile修饰保证可见性
volatile tail指向等待队列的尾节点,volatile修饰保证可见性
volatile state同步状态,即线程抢到资源的标记值,volatile修饰保证可见性
headOffsethead属性在class文件中的偏移量
tailOffsettail属性在class文件中的偏移量
waitStatusOffsetwaitStatus在Node class文件中的偏移量
nextOffsetnext属性在Node class文件中的偏移量
方法方法说明
tryAcquire独占模式,获取资源,可以失败,无失败后处理
tryRelease独占模式,释放资源,可以失败,无失败后处理
tryAcquireShared共享模式,获取资源,可以失败,无失败后处理
tryReleaseShared共享模式,释放资源,可以失败,无失败后处理
isHeldExclusively独占模式下,判断占用线程是否为当前调用方法线程
acquire独占模式,获取资源以及失败的处理
release独占模式,释放资源以及失败的处理
acquireShared共享模式,获取资源以及失败的处理
releaseShared共享模式,获取资源以及失败的处理

AbstractQueuedSynchronizer有两个内部类

  • Node         Node节点用于存储线程的信息,一般使用Node,基本底层使用的是链表结构
属性属性说明
prev指向链表中的前一个Node节点
next

指向链表中的后一个Node节点

thread

节点持有的线程对象

waitStatus节点的等待状态
nextWaiter下一个等待节点
SHARED共享模式的标记
EXCLUSIVE独占模式的标记
方法方法说明
predecessor获取前置节点
isShared是否是共享模式
  • ConditionObject
属性属性说明
firstWaiter指向条件队列队首
lastWaiter指向条件队列队尾
方法方法说明
await进入等待
signal唤醒

源码分析:

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //添加到条件队列,并且清理取消节点
            Node node = addConditionWaiter();
            //释放资源,await会释放资源
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //node不在等待队列中时
            while (!isOnSyncQueue(node)) {
                //阻塞
                LockSupport.park(this);
                //在唤醒前中断返回throw_ie,唤醒后中断返回reinterrupt,没中断返回0
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;//中断退出循环
            }
            //排队获取资源发生中断并且之前没有出现唤醒前打断
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // 如果node的下一节点不为空
                unlinkCancelledWaiters();//清理取消节点
            if (interruptMode != 0)//发生过中断 ,报告处理异常
                reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
            //获取条件队列的尾结点
            Node t = lastWaiter;
            //t不是null并且不是取消节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();//进行取消节点清理
                t = lastWaiter;//t重新指向尾结点
            }
            //创建节点node,节点状态为condition
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)//如果尾结点为空代表条件队列为空
                firstWaiter = node;
            else//否则尾结点的下一节点指向node
                t.nextWaiter = node;
            lastWaiter = node;//尾节点指针指向node
            return node;
}

private void unlinkCancelledWaiters() {
            //获取条件队列的首节点
            Node t = firstWaiter;
            //trail代表t的上一节点
            Node trail = null;
            while (t != null) {
                //next指向t的下一节点
                Node next = t.nextWaiter;
                //如果t是取消节点
                if (t.waitStatus != Node.CONDITION) {
                    //t的下一节点指向null
                    t.nextWaiter = null;
                    if (trail == null)//上一节点为空代表为首节点
                        firstWaiter = next;//将首节点重新指向t的下一节点
                    else//否则t的上一节点与t的下一节点直接相连
                        trail.nextWaiter = next;
                    if (next == null)//代表t为尾结点
                        lastWaiter = trail;//将尾结点指针重新指向t的上一节点
                }
                else//trail指向t,下次循环中将变为t的上一节点
                    trail = t;
                t = next;//t指向t的下一节点,即向后遍历
            }
}

final int fullyRelease(Node node) {
        //假设失败
        boolean failed = true;
        try {
            //获取同步状态
            int savedState = getState();
            //释放资源
            if (release(savedState)) {
                failed = false;//释放成功
                return savedState;//放回同步状态
            } else {//释放失败抛出异常
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)//失败后将node变为取消节点
                node.waitStatus = Node.CANCELLED;
        }
}

private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE) //唤醒前中断,抛出异常
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT) //唤醒后中断,自我中断
                selfInterrupt();
}

需要注意几点:

  1. 在向条件队列中添加节点时,如果尾结点不是取消节点的话,会进行一次取消节点清理
  2. 条件队列是单链表结构,虽然使用Node节点类型,但是只是用nextWaiter节点作为链接点
  3. 在await中被唤醒后,还是要去竞争资源,也就是可能进入等待队列继续阻塞
 public final void signal() {
            //非独占的抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //获取条件队列的首节点进行唤醒
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
}

private void doSignal(Node first) {
            do {
                //首节点指向首节点的下一节点,并且为空代表队列只有首节点一个节点
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                //将旧首节点的下一节点指向null 
                first.nextWaiter = null;
                //这里其实就是首节点移除条件队列中了
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
        //将node置为0中间状态
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //将条件队列的节点放入等待队列
        Node p = enq(node);
        //获取节点p的等待状态,这里p是尾结点的前一节点,也就是node的上一节点
        int ws = p.waitStatus;
        //等待状态为取消状态或者将p设置为signal失败进行唤醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
}

需要注意几点:

  1. 独占模式的判断,因为这里唤醒后的入队调用的是独占模式的方法,所以要做这个判断
  2. 在没有进入到等待队列中,transferForSignal里的修改node的ws为0判断是因为node节点可能被取消掉,变成取消状态,那就没有唤醒的意义了,所以继续循环就行了
  3. 在transferForSignal里的唤醒前判断,ws>0代表可能node的前节点已取消,这时候唤醒node并不会违反fifo的原则,因为acquireQueued中还会进行阻塞等待,如果没有将p的状态更新成signal代表node的前一个节点已经唤醒获取到资源,所以可以唤醒node节点,很大机会获得资源。
  4. signalAll是全部唤醒,没有signal唤醒的条件限制,从firstWaiter往后遍历唤醒就行了。
三、AQS模式分析 3.1 AQS独占模式获取资源

acquire方法

public final void acquire(int arg) {
        //先尝试获取锁,不成功的会进入等待队列进行阻塞
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //自我打断的意思我的理解是,当线程阻塞过程中被打断过,我们会自我打断。
            //由于线程执行到这里已经从阻塞中出来了,所以在下次阻塞会自我打断一次
            //也是一种提升线程利用率的方式,避免线程一直阻塞,由于还在循环内,相当于进行了一            
            //次自旋
            selfInterrupt();
}

private Node addWaiter(Node mode) {
        //创建等待线程节点,节点类型为独占类型
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //pred指向尾节点
        Node pred = tail;
        //当尾结点不为空时,将新节点加入到队列的尾部
        if (pred != null) {
            //将新创建的等待节点node的前节点设置成当前队列的尾节点
            node.prev = pred;
            //同时将队列的尾节点指向新创建的等待节点node
            if (compareAndSetTail(pred, node)) {
                //将原来尾结点的下一节点设置成新创建的等待节点node
                pred.next = node;
                //返回新创建的等待节点node
                return node;
            }
        }
        //由于上面compareAndSetTail方法并不能保证尾节点的置换,从而使node进入等待队列,
        //所以使用enq(node)方法保证node进行等待队列
        enq(node);
        //返回新创建的等待节点node
        return node;
}

private Node enq(final Node node) {
        //无限循环,保证node一定进入等待队列
        for (;;) {
            //获取当前队列的尾结点,由于新的节点未进入队列,所以这里的尾结点还是旧的尾结点
            Node t = tail;
            //当尾结点为空,代表等待队列是空的
            if (t == null) { // Must initialize
                //将一个空节点(哨兵节点)放入到队列中,由于没有节点,将head指向哨兵节点
                if (compareAndSetHead(new Node()))
                    //同时tail也指向哨兵节点
                    tail = head;
            } else {//当尾结点不为空,代表等待队列中有其他等待节点
                //新的等待节点node的前节点设置为当前等待队列的尾节点
                node.prev = t;
                //将tail指向新的等待节点node
                if (compareAndSetTail(t, node)) {
                    t.next = node;//将等待队列的旧尾节点的next设置成新的node节点
                    return t;//返回旧的尾结点
                }
            }
        }
}

final boolean acquireQueued(final Node node, int arg) {
        //假定获取资源失败
        boolean failed = true;
        try {
            //假定打断状态为false
            boolean interrupted = false;
            for (;;) {
                //获取node的前节点,由于哨兵节点的存在,p不会为null
                final Node p = node.predecessor();
                //如果p等于头结点(也就是哨兵节点),代表你是首个等待节点,尝试获取一次资源
                if (p == head && tryAcquire(arg)) {
                    //获取资源成功后,将头结点设置为新的等待节点,并且置为哨兵节点
                    setHead(node);
                    //将哨兵节点的下一节点设置为null,方便gc掉哨兵节点
                    p.next = null; // help GC
                    //获取资源成功,将失败标志置为false
                    failed = false;
                    //代表线程未打断进入等待状态
                    return interrupted;
                }
                //获取资源失败,会阻塞进入等待状态
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //代表线程打断进入过阻塞状态
                    interrupted = true;
            }
        } finally {
            //获取资源失败
            if (failed)
                cancelAcquire(node);
        }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取前一节点的等待状态。
        int ws = pred.waitStatus;
        //当前一节点表示唤醒状态时,代表前一节节点不是死等,node可以park阻塞等待了
        if (ws == Node.SIGNAL)
            //返回true表示可以进行阻塞等待了
            return true;
        //当前一节点的等待状态大于0,表示该节点已取消
        if (ws > 0) {
            //不断向队列前面寻找不是取消状态的节点
            do {
                //将新等待节点node的前置节点设置为前置节点的前置节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //当找到后,将该节点从队列中去掉,将正常节点的下一节点设置为新的等待节点node
            pred.next = node;
        } else {
            //将前置节点的状态设置为唤醒状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        //返回false,表示节点还不能进行阻塞等待状态
        return false;
}

private final boolean parkAndCheckInterrupt() {
        //阻塞线程,这里就阻塞了不会往下进行,等待唤醒
        LockSupport.park(this);
        //阻塞唤醒后返回线程的打断状态并且重置打断状态
        return Thread.interrupted();
}

private void cancelAcquire(Node node) {
        //如果node为空,不需要进行取消操作
        if (node == null)
            return;
        //将node的thread置为空
        node.thread = null;
        //获取node的前一节点
        Node pred = node.prev;
        //如果前置节点是取消状态,继续往前找
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        //获取前一节点的下一节点
        Node predNext = pred.next;
        //将node节点的状态置为取消
        node.waitStatus = Node.CANCELLED;
        //如果node为尾结点,就将他的前置节点设置为尾节点
        if (node == tail && compareAndSetTail(node, pred)) {
            //同时将尾结点的后置节点置为null
            compareAndSetNext(pred, predNext, null);
        } else {//node不为尾结点
            int ws;
            //node前置节点不是头结点并且前置节点不是取消状态以及哨兵节点
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                //获取node的后置节点
                Node next = node.next;
                //如果后置节点不为null并且等待状态不为取消状态
                if (next != null && next.waitStatus <= 0)
                    //将node从队列中取出,将node的前置节点与后置节点相连
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
            //node的后置节点置为它本身,方便gc回收
            node.next = node; // help GC
        }
}

private void unparkSuccessor(Node node) {
        //获取node的等待状态
        int ws = node.waitStatus;
        //当ws不是取消状态时
        if (ws < 0)
            //将等待状态置为0
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;//获取node的后置节点
        if (s == null || s.waitStatus > 0) {//后置节点为空或者等待状态为取消时
            s = null;//将s置为空
            //从尾部开始寻找异常后置节点后的首个正常状态节点将s指向该节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //当找到正常节点后
        if (s != null)
            //唤醒他
            LockSupport.unpark(s.thread);
}

static void selfInterrupt() {
        Thread.currentThread().interrupt();
}

需要注意的几点:

  1. selfInterrupt方法是为了提升线程利用率,尽可能使线程使用效益最大化。
  2. cancelAcquire方法的执行条件,当获取资源失败,也就是出现异常时,才会调用该方法。
  3. 当我们取消节点是,会唤醒另外一个节点,正常是后置节点,但当后置节点也为取消节点时,会从队列尾向前扫描,找到这个后置节点后的首个正常节点,唤醒该正常节点。
  4. 节点阻塞的前提是需要他的前置节点是signal状态,否则不能阻塞等待。
  5. 取消节点的清除,是在其后置节点进行入队阻塞时触发。
  6. setHead方法会将改变head指针的同时,会将指向的节点的thread,prev置为null,变为哨兵节点。
  7. 哨兵节点的作用,主要是为了队列为空时的特殊处理,有了哨兵节点,队列永远不为空,就不用了考虑特殊处理了。

acquireInterruptibly方法

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        //判断是否中断过,中断过就抛出中断异常,并重置中断状态
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            //没有获取资源成功,进入下面的方法
            doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //阻塞唤醒后会抛出中断异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

需要注意的一点:

  1. 与acquireQueued方法不同的地方在于阻塞唤醒后的处理方式不一样,acquireInterruptibly方法会直接抛出中断异常,并且退出资源竞争,将当前节点取消掉。

3.2 AQS独占模式释放资源
public final boolean release(int arg) {
        //独占模式下释放资源不会有争抢的问题
        if (tryRelease(arg)) {
            //获取当前头结点,也就是持有资源的节点
            Node h = head;
            //h不是null并且不是中间状态节点
            if (h != null && h.waitStatus != 0)
                //唤醒队列的中的一个等待节点
                unparkSuccessor(h);
            return true;//代表成功释放资源
        }
        return false;//代表释放资源失败
}

需要注意的几点:

  1. 独占模式下release不会出现争抢,所以代码相对简单
  2. 唤醒原则是首个等待节点,即哨兵节点后的第一个节点,但是当该节点为取消状态时,会从队尾向前扫描找到这个节点后的首个正常节点后唤醒。与获取资源时取消获取操作唤醒线程是一样的。
  3. h.waitStatus不等于0的限定是要求队列中必须有等待节点,才能进行唤醒,由于哨兵节点的存在,每次在队列加入等待节点阻塞时,前一个节点的状态会变成-1,所以哨兵节点在后置节点存在的情况下,是不等于0,也就是告诉我们等待队列有除哨兵节点外的等待节点需要唤醒。
3.3 AQS共享模式获取资源

acquireShared方法

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //获取共享资源,返回为所剩资源标志,大于等于0代表还有共享资源可用
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//设置头结点,并且置为哨兵节点
        //查看下方表格解析
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;//获取node的后置节点
            if (s == null || s.isShared())//如果后置节点为空,或者后者节点也为共享模式
                doReleaseShared();//唤醒另一个节点使用资源
        }
}

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //当队列中不止你一个节点时
            if (h != null && h != tail) {
                //获取h的等待状态
                int ws = h.waitStatus;
                //如果等待状态为signal,可以唤醒后继节点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                   //当h的等待状态从唤醒修改成0后,唤醒后继节点                                    
                   unparkSuccessor(h);
                }
                
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //头结点没有发生改变退出
            if (h == head)                   // loop if head changed
                break;
        }
}

需要注意的几点:

  1. setHeadAndPropagate方法,当获取资源r=0,代表的只有当前节点可以获取资源,不能进行传播获取,当获取资源r>0,代表除了当前节点,还可传播后继共享节点进行获取。
  2. setHeadAndPropagate的传播条件
条件条件说明
propagate>0说明共享资源还有剩余可以进行传播
h==null||h.waitStatus<0这两个条件可以看成一个条件,因为哨兵节点的存在,h是不可能为空的,所以当h.waitStatus<0时,也就是signal和propagate这两个状态,前者是因为有新的线程阻塞导致,后者是由于有共享锁被释放了导致,但是这两个动作顺序是不确定的,所以导致这个判断会可能无效唤醒线程。
(h=head)==null||h.waitStatus<0同理,上面的判断是在setHead方法之前就有线程释放共享锁的操作,这个是setHead方法之后的线程释放共享锁的操作,当然也会导致无效唤醒。

acquireSharedInterruptibly方法

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

与acquireInterruptibly同理,如果发生中断抛出中断异常,停止竞争资源,取消当前节点

3.4 AQS共享模式释放资源
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
}

释放没有太多代码,doReleaseShared()与获取中的方法一样,所以就不加以赘述了。

3.5 AQS的其他方法
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}

protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
}

protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
}

上面的五种方法是需要在子类中进行实现的,所以在AQS代码中只是抛出异常而已。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //获取资源的截止时间
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                //由于入队会花费时间,所以这里要减去入队的时间
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                //spinForTimeoutThreshold是1000纳秒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
}

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //获取截止时间
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                //剔除排队时间
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

tryAcquireNanos与tryAcquireSharedNanos其实是设置了阻塞的最长时间限制,但是这个时间会剔除节点入队时间,因为入队操作是cas操作,是有可能浪费大量时间的。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/831304.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号