栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java锁-AQS

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java锁-AQS

java锁-AQS

详细讨论java中AQS底层加锁原理和应用。


文章目录
  • java锁-AQS
  • 前言
  • 1. 原理
    • 1.1 核心变量
    • 1.2 核心方法
    • 1.3 加锁、释放锁流程
      • 1.3.1 独占锁加锁流程
      • 1.3.2 独占锁释放流程
      • 1.3.3 共享锁加锁流程
      • 1.3.4 共享锁释放流程
  • 2. 源码分析
    • 2.1 独占锁加锁源码分析
    • 2.2 独占锁释放源码分析
    • 2.3 共享锁加锁源码分析
    • 2.4 共享锁释放源码分析
  • 总结


前言

java中AQS是java.util.concurrent.locks.AbstractQueuedSynchronizer抽象类的缩写,大部分的锁都是基于这个类来实现的,如:ReentrantLock,ReentrantReadWriteLock和CountDownLatch等。这是一个基于队列实现的同步器,AbstractQueuedSynchronizer内部维护一张以双向链表的队列和一个全局持有锁状态state变量。本文详细讨论AQS的加锁和释放锁原理。

1. 原理

1.1 核心变量

内部维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

AQS定义两种资源类型:Exclusive(独占,同一时刻只有一个线程能执行,如ReentrantLock)和Share(共享,同一时刻能有多个共享线程同时执行,如Semaphore/CountDownLatch)。

1.2 核心方法

不同锁的实现,同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等)由AQS实现。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively()
    该线程是否正在独占资源。只有用到condition才需要去实现它。

  • tryAcquire(int)
    独占方式。尝试获取资源,成功则返回true,失败则返回false。

  • tryRelease(int)
    独占方式。尝试释放资源,成功则返回true,失败则返回false。

  • tryAcquireShared(int)
    共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

  • tryReleaseShared(int)
    共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    上面5个方法都需要具体的实现类去实现获取锁和释放锁的逻辑,阻塞和唤醒由AQS来处理,这也是证明了它为什么是其他锁实现的基础类,想知道java中锁的实现原理,则理解AQS原理就 非常重要,非常重要,非常重要。

1.3 加锁、释放锁流程

这里详细说明AbstractQueuedSynchronizer加锁和释放锁的实现逻辑。

1.3.1 独占锁加锁流程

1.3.2 独占锁释放流程

1.3.3 共享锁加锁流程

1.3.4 共享锁释放流程


看完这些流程图是不是有点头晕,别急,下面我们跟着java8中AbstractQueuedSynchronizer源码来分析,在代码中是如何实现。

2. 源码分析

这一节我们跟着源码来分析,博主使用java8中AbstractQueuedSynchronizer进行源码分析,不同版本源码可能存在一点差异。

2.1 独占锁加锁源码分析

独占锁的加锁入口方法是**acquire(1)**源码如下:

   
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
            selfInterrupt();
    }
  • 步骤1:执行tryAcquire(arg)方法尝试获取锁,返回true说明获取锁成功直接结束,否则执行步骤2
  • 步骤2:执行addWaiter(Node.EXCLUSIVE)方法,创新的独占模式等待节点,加入到队列尾部,返回这个新节点node(新尾节点)。
  • 步骤3:执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法重新尝试获取锁,获取失败将当前线程挂起,返回当前线程的中断状态,返回ture时执行步骤4
  • 步骤4:执行selfInterrupt()方法将当前线程中断。

addWaiter(Node.EXCLUSIVE) 源码如下:

  
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 申明一个变量pred 将队列的尾节点赋值给它(老尾节点)
        Node pred = tail;
        // 如果尾节点不为null,说明队列已被初始化
        if (pred != null) {
            // 将当前节点的前一个节点指针指向队列尾结点
            node.prev = pred;
            // 采用CAS方式将node节点修改为尾节点(新尾节点)
            if (compareAndSetTail(pred, node)) {
                // 修改成功后将老尾节点的下一个节点指针指向新尾节点
                pred.next = node;
                // 返回新尾节点
                return node;
            }
        }
         // 如果尾节点为null,说明队列未被初始化
        enq(node);
        // 返回新尾节点
        return node;
    }

    
    private Node enq(final Node node) {
        // 采用自旋方式操作
        for (;;) {
            Node t = tail;
            // 如果尾节点为null,说明未初始化,必须先初始化
            if (t == null) { 
                 // 将头节点通过CAS操作赋值初始值 new Node()
                if (compareAndSetHead(new Node()))
                    // 将头节点赋值给尾节点
                    tail = head;
            } else {
               // 申明一个变量pred 将队列的尾节点赋值给它(老尾节点)
                node.prev = t;
                 // 采用CAS方式将node节点修改为尾节点(新尾节点)
                if (compareAndSetTail(t, node)) {
                    // 修改成功后将老尾节点的下一个节点指针指向新尾节点
                    t.next = node;
                     // 返回新尾节点
                    return t;
                }
            }
        }
    }

acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法源码如下:

    
    final boolean acquireQueued(final Node node, int arg) {
        // 定义一个变量记录操作 failed == true 操作失败,  failed == false 操作成功  
        boolean failed = true;
        try {
         // 定义一个变量记录当前线程的中断状态 
            boolean interrupted = false;
            // 自旋方式操作
            for (;;) {
                // 获取上一个线程
                final Node p = node.predecessor();
                   
                if (p == head && tryAcquire(arg)) {
                    // 获取锁成功将当前节点设置为头节点
                    setHead(node);
                    // 将原来头节点的下一个节点指针设置为null,使该对象变成垃圾对象,有利于垃圾回收
                    p.next = null; 
                    // 将操作记录设置为成功
                    failed = false;
                    // 返回当前线程的中断状态
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 检查或修改前一个节点的waitStatus状态
                    parkAndCheckInterrupt())// 挂起线程
                    // 将线程中断状态设置为true
                    interrupted = true;
            }
        } finally {
            // 操作出现异常
            if (failed)
                // 如果当前节点是唤醒节点则唤醒下一个节点,否则删除
                cancelAcquire(node);
        }
    }


    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
         // 获取前一个节点的waitStatus值
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            
            return true;
            // ws > 0 表示上一个节点的线程已被取消
        if (ws > 0) {
            
            do {
                // 寻找前一个节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 将当前节点插入到未被取消的节点后面
            pred.next = node;
        } else {
            
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 表示当前线程还不能挂起
        return false;
    }


    
    private final boolean parkAndCheckInterrupt() {
        // 挂起线程 底层是使用Unsafe工具类实现,Unsafe直接操作虚拟机底层的方法,不了解请自行百度
        LockSupport.park(this);
        // 返回当前线程中断状态
        return Thread.interrupted();
    }


   
    private void cancelAcquire(Node node) {
        // 节点为null 直接返回
        if (node == null)
            return;
        // 设置节点线程为null
        node.thread = null;

        // 过滤掉所有已取消的前置节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
         // 找到未被取消的前节点
        node.prev = pred = pred.prev;

        // 记录前节点的下一个节点的指针
        Node predNext = pred.next;

        // 将当前的节点状态标记为取消
        node.waitStatus = Node.CANCELLED;

        // 当前节点是尾节点直接移除
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    // 前节点是中间节点则修改下一个节点的指针(相当于删除中间被取消的节点)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 当前节点为头节点直接唤醒下个节点
                unparkSuccessor(node);
            }
            // 释放引用,有利于GC回收
            node.next = node; // help GC
        }
    }

到这里独占锁的加锁方式就分析完成了,根据分析源码我们发现:

1、当只有一个线程竞争资源时会立刻得到锁
2、当第二个线程参与竞争资源时先立刻尝试获取锁,获取失败后又自旋两次获取锁,两次获取失败后才会挂起线程。
3、当第三个线程参与竞争资源时立刻尝试获取锁,获取失败后直接挂起线程
4、知道synchronized底层逻辑的不难发现,它们的加锁原理几乎一致

2.2 独占锁释放源码分析

独占锁释放的入口方法是release(int arg) 源码如下:

    public final boolean release(int arg) {
       // 尝试释放锁(由具体实现类完成)
        if (tryRelease(arg)) {
            Node h = head;
            // 如果存在等待节点,则唤醒后续节点
            if (h != null && h.waitStatus != 0)
                 // 唤醒下一个等待的节点
                unparkSuccessor(h);
                // 释放成功
            return true;
        }
        // 释放失败
        return false;
    }

  • 步骤1:执行tryRelease(arg)方法尝试释放锁,释放逻辑由实现类完成,返回true说明释放锁成功执行步骤2,否则释放失败。
  • 步骤2:执unparkSuccessor(h)方法,唤醒下一个等待的节点。

unparkSuccessor(h) 源码分析如下:

   
   private void unparkSuccessor(Node node) {
       
       // 获取当前节点的等待状态
       int ws = node.waitStatus;
       if (ws < 0)
           // waitStatus 值大于0设置为0
           compareAndSetWaitStatus(node, ws, 0);

       
       Node s = node.next;
       // 如果下一个节点存在且已经取消
       if (s == null || s.waitStatus > 0) {
           s = null;
           for (Node t = tail; t != null && t != node; t = t.prev)
               if (t.waitStatus <= 0)
                  // 用递归方式向后找到未被取消的节点
                   s = t;
       }
       if (s != null)
          // 如果等待唤醒的节点存在,则唤醒(使用Unsafe来唤醒,如不知道Unsafe什么请自行百度)
           LockSupport.unpark(s.thread);
   }

到这里释放独占锁的代码分析就完成了,其实很简单,就两步骤:

1、释放锁(将state设置为0,设置独占线程为null)
2、唤醒下一个等待线程

2.3 共享锁加锁源码分析

AQS中共享锁加锁的入口方法为acquireShared(int arg) 下面源码分析

    
    public final void acquireShared(int arg) {
          // 尝试获取共享锁
        if (tryAcquireShared(arg) < 0)
            // 获取失败处理逻辑
            doAcquireShared(arg);
    }
  • 步骤1:执行tryAcquireShared(arg)方法尝试获取锁,获取逻辑由实现类完成,返回值负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。当返回负数时执行步骤2
  • 步骤2:执doAcquireShared(arg)方法,尝试获取锁和等待操作

doAcquireShared(arg) 源码分析如下

    private void doAcquireShared(int arg) {
         // 创建和添加一个共享类型的节点(和独占锁一样,只是节点类型为共享模型,这里不再分析)
        final Node node = addWaiter(Node.SHARED);
        // 定义一个变量记录操作 failed == true 操作失败,  failed == false 操作成功  
        boolean failed = true;
        try {
           // 定义一个变量记录当前线程的中断状态 
            boolean interrupted = false;
            // 自旋方式操作
            for (;;) {
                // 获取前一个节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 如果上一个节点上头节点,则尝试获取锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取到锁设置头节点,且唤醒下一个节点(这个需要满足条件,在setHeadAndPropagate方法中详细说明)
                        // 这个方法也是和独占模式处理不同之处,需要特别注意
                        setHeadAndPropagate(node, r);
                        // 释放引用,有利于垃圾回收
                        p.next = null; // help GC
                        if (interrupted)
                             // 如果线程中断过,这设置当前线程中断
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 这里和独占模式一模一样,不再分析
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 操作出现异常
            if (failed)
                // 如果当前节点是唤醒节点则唤醒下一个节点,否则删除 (和独占模式一样,这里不再分析)
                cancelAcquire(node);
        }
    }


    
    private void setHeadAndPropagate(Node node, int propagate) {
        // 记录老头节点
        Node h = head; 
        // 设置当前节点为头节点
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 下一个节点是共享节点或者是未知节点是,尝试唤醒,可能存在不必要的唤醒
            if (s == null || s.isShared())
                 // 释放后续节点,下一个章节详解
                doReleaseShared();
        }
    }

到这里共享锁加锁就讲完了,大部分逻辑和独占锁的加锁逻辑一致,不同点主要体现在setHeadAndPropagate(Node node, int propagate)这个方法,详细请看代码注释。

1、 尝试获取锁,获取不到锁一般是至少有一个独占线程在阻塞,只要加入到阻塞队列说明队列已被创建
2、只要尝试获取到锁,会尝试去释放后续共享节点(递归执行会把后面连续的共享节点全部释放)

2.4 共享锁释放源码分析

共享锁释放方法入口releaseShared(int arg) 源码如下:

    
    public final boolean releaseShared(int arg) {
         // 尝试释放锁,如果释放后允许唤醒后续等待结点返回true,否则返回false。
        if (tryReleaseShared(arg)) {
            // 存在等待节点,则唤醒
            doReleaseShared();
            return true;
        }
        return false;
    }
  • 步骤1:执行tryReleaseShared(arg)方法尝试释放共享锁,释放逻辑由实现类完成,如果释放后允许唤醒后续等待结点返回true,否则返回false。当返回true时执行步骤2
  • 步骤2:执doReleaseShared()方法,唤醒后续节点

doReleaseShared() 源码分析

    
    private void doReleaseShared() {
        
        for (;;) {
            // 获取头节点
            Node h = head;
            // 如果存在等待唤醒的节点
            if (h != null && h != tail) {
                // 获取当前头节点的等待状态
                int ws = h.waitStatus;
                // 第三次尝试获取到锁或被挂起的线程
                //(已执行过shouldParkAfterFailedAcquire(p, node)方法,前节点的waitStatus就必须是 Node.SIGNAL)
                // 用来唤醒下一个被阻塞的节点
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;  // 修改失败则,从新检查修改
                        // 唤醒后续等待的节点
                    unparkSuccessor(h);
                }
                   
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // 修改失败继续自旋修改
            }
            // 在自旋期间,可能有另一个线程执行setHead(node),导致head发生变化,则一直自旋,直到head是同一个对象为止。
            if (h == head)               
                break;
        }
    }

共享锁释放到这就分析完了,和独占锁有点像,但是更复杂一些。

1、尝试释放锁释放成功判断是否有等待的后续节点
2、判断头节点的waitStatus值是否为Node.SIGNAL,如果是,则修改waitStatus=0且唤醒后续节点
3、判断头节点的waitStatus值是否为0,如果是,则修改waitStatus = Node.PROPAGATE

AQS的方法和参数有很多,本文主要分析加锁和释放锁逻辑。如果上文都能看懂我相信AQS的其他源码你也一定能看懂。

总结

AQS是java锁里重要的基础类,全局维护一个state资源,通过一个队列来同步线程的顺序。原子操作部分大量采用CAS方式提高同步性能,也保证了数据的原子性,线程唤醒和挂起使用了Unsafe工具类实现。希望读者读完本文后不仅知道AQS的原理和使用还知道CAS的使用。AQS都能搞定,其他锁你还怕吗?还想了解java其他锁欢迎阅读博主的《java锁系列》


如果对您有帮助,请支持博主点一个鼓励的赞
原创不易,转载请标明来源

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/606561.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号