栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

AQS、ReentrantLock实现原理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

AQS、ReentrantLock实现原理

大纲:

什么是AQS:

        AQS( AbstractQueuedSynchronizer )是一个用来构建多线程访问共享资源的同步器抽象框架,只需要继承 AQS 就可以很方便的实现我们自定义的多线程同步器、锁,是典型的模板设计模式,父类定义好骨架和部分内部操作细节,具体规则由子类去实现。

       是整个JUC包的基石,ReentrantLock、ReadWriteLock、CountDowndLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor等都是在AQS的基础上实现的。

 AQS的原理概述:

        如果被请求的共享资源未被占用,将当前请求资源的线程设置为独占线程,并将共享资源设置为锁定状态,AQS 使用一个 Volatile 修饰的 int 类型的成员变量 State 来表示同步状态,修改同步状态成功即为获得锁,修改 State 值时通过 CAS 机制来保证修改的原子性, 如果共享资源被占用,需要一定的阻塞等待唤醒机制来保证锁的分配,AQS 中会将竞争共享资源失败的线程添加到一个FIFO(First input first)队列(变体的CLH队列)中。

AQS队列模型:

                CLH队列:Craig、Landin and Hagersten 队列,是以人名命名的,是 单向链表实现的队列。申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现 前驱节点释放了锁就结束自旋

1. 是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。
2. CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。
3. AQS就是基于CLH队列,AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),来实现锁的分配。

         AQS中使用了CLH队列,但是实现方式略有变动,Head 指向节点为已获得锁的节点,是一个虚拟节点,节点本身不持有具体线程;  获取不到同步状态,会将节点进行自旋获取锁,自旋一定次数失败后会将线程阻塞,相对于 CLH 队列性能较好

AQS同步器整体结构:

AQS代码,head, tail记录了队列的头尾节点(Node中记录了prev和next,中间节点可通过头尾节点获取到):

public abstract class AbstractQueuedSynchronizer 
  extends AbstractOwnableSynchronizer implements java.io.Serializable {
  	// CLH 变体队列头、尾节点,双向链表,记录头尾指针就维护了一个队列
    private transient volatile Node head;
  	private transient volatile Node tail;
  	// AQS 同步状态
   	private volatile int state;
  	// CAS 方式更新 state
  	protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

}

Node节点代码, 记录了当前节点的状态waitStatus,当前节点的前后节(双向链表结构),当前节点的线程对象:

static final class Node {
    static final Node SHARED = new Node();//标识等待节点处于共享模式
    static final Node EXCLUSIVE = null;//标识等待节点处于独占模式
 
    static final int CANCELLED =  1;//由于超时或中断,节点已被取消,不再执行了
    static final int SIGNAL    = -1;//表示下一个节点是通过park堵塞的,需要通过unpark唤醒,也就是说这个节点有义务去唤醒它后继节点
    static final int CONDITION = -2;//表示线程在等待条件变量(先获取锁,加入到条件等待队列,然后释放锁,等待条件变量满足条件;只有重新获取锁之后才能返回)
    static final int PROPAGATE = -3;//表示后续结点会传播唤醒的操作,共享模式下起作用
 
    //等待状态:对于condition节点,初始化为CONDITION;其它情况,默认为0,通过CAS操作原子更新
    volatile int waitStatus;
    //前节点
    volatile Node prev;
    //后节点
    volatile Node next;
    //线程对象
    volatile Thread thread;
    //对于Condtion表示下一个等待条件变量的节点;其它情况下用于区分共享模式和独占模式;
    Node nextWaiter;
}

AOS:

        AQS继承自AOS,用于记录独占锁(互斥锁)获取锁的线程。

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    // 独占线程(不参与序列化)
    private transient Thread exclusiveOwnerThread;
    // 设置当前独占的线程
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    // 返回当前独占的线程
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
ReentrantLock实现: 类结构图:

(1)lock.lock() 方法后,第一步试图获取锁,主要为tryAcquire()方法

(2)获取锁成功,则执行业务代码,如果失败,则需要初始化队列,如果队列已初始化则加入队列尾部进入阻塞 执行addWaiter(),enq(),acquireQueued方法。

1. 获取锁 tryAcquire():

        非公平锁在执行 nofaireTryAcquire之前会先cas修改state尝试获取锁一次,在获取失败后执行nofaireTryAcquire,判断 state ==0 之后,非公平锁会直接cas修改state尝试获取锁,公平锁则需要先执行hasQueuedPredecessors(),如果当前线程前面有一个排队的线程则返回true,如果当前线程位于队列的头部或队列为空则返回false,  返回false 才会去cas修改state获取锁。

        公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU 唤醒阻塞线程的开销比非公平锁大。

        非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU 不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。

 

 2. 创建当前线程node 并加入等待队列

         第一步尝试获得锁失败,则接下来会将线程组装成为 Node 进行入队流程,Node 是 AQS 中最基本的数据结构,也是 CLH 队列中的节点,Node 有分为SHARED(共享)、EXCLUSIVE(独占) 两种模式。

addWaiter() : 创建当前线程的Node,  队列为空则初始化队列,不为空则加入到队列尾部,核心方法 enq(node) 中会循环cas去加入队列尾部.

// 为当前线程和给定模式 并创建排队节点。
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 这里的逻辑和 enq(node)方法是一幕一样的,只是为了在指定条件下快速enq
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
}
    
 // 将节点插入队列,必要时进行初始化
 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acqureQueued: 实现同步的核心方法,是一个自旋的过程, 如果当前Node的prev为头节点,则会cas获取锁,获取成功则将当前Node设置为head, 将原来的head节点设置为null 方便GC回收, 退出自旋,执行业务代码。其他情况如果Node的prev为不为头节点或者为头节点但是获取锁失败,说明仍需排队,会调用shouldParkAfterFailedAcquire() 方法, 设置前驱节点状态为-1,然后调用parkAndCheckInterrupt()阻塞线程直到unlock()的时候被唤醒

shouldParkAfterFailedAcquire(): 该方法的主要目的在于 

        1. 排除被取消的前驱节点

        2. 告诉前驱节点,当前节点被parkAndCheckInterrupt()阻塞

        3. realse()释放锁的时候会判断head.waitStatus != 0,才会去LockSupport.unpark(head.next.thread)。

会有下面几种情况:

        1. 前驱节点waitStatus =0 则将其设置为-1

        2. 前驱节点waitStatus = -1 则返回true,且仅有这种情况才会返回true, 执行后续的parkAndCheckInterrupt()方法阻塞线程, 就是只有某个节点的前驱节点 为 -1,该节点才会通过parkAndCheckInterrupt阻塞, 同样的,一个节点要执行后续的parkAndCheckInterrupt方法,就需要想将其前驱节点设置为-1,然后下次自旋执行shouldParkAfterFailedAcquire的时候才会返回true,这个有点绕,可以多看几遍源码理解下。

        3. 前驱节点 waitStatus > 0 被取消了, 则一直往链表头取,直到找到没有被取消的节点为止,将当前节点的前驱节点修改为取到的节点。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
             // SIGNAL    = -1;//表示下一个节点是通过park堵塞的,需要通过unpark唤醒,也就是说这个节点有义务去唤醒它后继节点
            return true;
        if (ws > 0) {
            // CANCELLED =  1;//由于超时或中断,节点已被取消,不再执行了
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 初始化的状态 0 
            // CONDITION = -2;//表示线程在等待条件变量(先获取锁,加入到条件等待队列,然后释放锁,等待条件变量满足条件;只有重新获取锁之后才能返回)
            // int PROPAGATE = -3;//表示后续结点会传播唤醒的操作,共享模式下起作用
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

执行流程:

 队列变化逻辑图:

API:

lock.lock();

lock.unlock();

公平锁&非公平锁

实现代码流程:

实现自己的自旋锁代码

彻底理解其思想和api的原理

记住所有的细节,记录在博客中,

问题:

enq(final Node node) 总自旋的作用,就是为防止并发的时候,两个线程竞争加入队列尾部,则会有失败的需要自旋去加入队列尾部,所以后面执行acquireQueued()各个想成互不干扰

结合 shouldParkAfterFailedAcquire 理解节点取消操作

shouldParkAfterFailedAcquire 这个方法这个涉及的目的是什么?

cancelAcquire 何时执行

tryAcquire 是 AQS 中抽象模版方法,但是内部会有默认实现,虽然默认的方法内部抛出异常,为什么不直接定义为抽象方法呢?

因为 AQS 不只是对独占锁实现了抽象,同时还包括共享锁;不同锁定义了不同类别的方法,共享锁就不需要 tryAcquire,如果定义为抽象方法,继承 AQS 子类都需要实现该方法

为何tryAcquire() 公平锁的在 FairSync中,nofairTryAcqurie在 Sync中,Sync中级

为何称为自旋锁

修改state为何需要 cas修改

直接继承 AQS就好了,为什么要定义一个内部类来实现AQS

CONDITION 的作用

PROPAGATE 的作用

参考:

聊一聊ReentrantLock和AQS那点事,不刷别后悔! - 知乎

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/880797.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号