栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

CAS共享锁机制分析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

CAS共享锁机制分析

CAS共享锁机制分析

什么是共享锁?AQS原理概述共享锁逻辑分析(Semaphore角度分析AQS共享逻辑)

共享锁获取共享锁释放共用方法

什么是共享锁?

能被多个线程同时获得的锁,为共享锁。也称之为读锁。与互斥锁(写锁)互斥。

AQS原理概述

其核心是一个volatile关键字修饰的int类型的state变量,以及一个由双向指针组成的链表队列。队列再初始化的时候,会CAS生成一个head空节点,后继被阻塞的节点会添加到这个空节点的后面,并相互建立pre和next指针。

而state变量在不同的锁中,使用方式也有锁不同,比如

ReentrantLock 重入锁中,state变量用于记录锁的重入次数,即state>0表示有锁,state=0表示无锁。ReentrantReadWriteLock读写锁,它把int类型的state看作是一个32位的位图,高16位和低16位分别代表了读锁和写锁,高16位表示所有线程读锁的总次数。低16位表示写锁的重入次数。CountDownLatch门闩,state用于记录初始门闩数量,await方法用于把线程加入队列,countDown方法用于减少门闩,当门闩等于0的时候,释放所有阻塞在队列中的线程。Semaphore用来记录信号数量,当state=0时,即新进入的线程会被阻塞到队列。

AQS再代码中使用了模版方法模式,AQS只负责把线程封装成node节点加入列表进行阻塞以及唤醒的工作,至于ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore这些子类如何实现的具体上锁逻辑,AQS并不关心,它只是一个抽象类,把一些公用逻辑进行提炼。

共享锁逻辑分析(Semaphore角度分析AQS共享逻辑) 共享锁获取
	
	//AQS添加锁入口
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            // 小于0表示获取共享锁失败
            
            //获取共享锁失败,会再次尝试获取锁,如果失败添加队列并阻塞线程
            doAcquireShared(arg);
    }
	
    private void doAcquireShared(int arg) {
        // 封装当前线程以及类型
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点是head节点(获取了锁的节点),此时head节点可能已经完成了锁释放,抢锁
                if (p == head) {
                    // 尝试获取锁
                    int r = tryAcquireShared(arg);
                    // 如果获取锁成功,更新head节点,尝试唤醒队列中的后继节点(因为共享锁是可以多线程同时获取,参考:读写锁)
                    if (r >= 0) {
                        // 将当前获取锁的节点更新头部,然后唤醒后继节点。
                        // 四个线程:A B(有锁) {(head初始化的空节点) -> C -> D }
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //确保pre节点能够通知当前节点,并阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
            	//如果中途发生异常,则取消线程
                cancelAcquire(node);
        }
    }

检查并矫正当前节点的前驱节点,确保当前节点可被通知:

	//确保pre节点能够通知当前节点
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            
            return true;
        if (ws > 0) { // 只有取消状态是大于1(但是肯定不是head节点,head节点是已经获得锁的)
            
            do {
                // 拆分代码,即:如果前驱节点是取消状态,则找前驱节点的前驱节点,一直往前找,直到找到活着的节点,
                // 然后相互建立pre以及next指针
                // Node predprev = pred.prev;
                // pred = predprev;
                // node.prev = pred;
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        
        return false;
    }

唤醒阻塞线程

    
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // this 指明当前线程是阻塞在哪个对象上,后去方便使用jstack命令排查问题
        // 判断是否是中断的方式来唤醒线程的
        // 唤醒线程的两种方式 1.unpark 2. interrupt
        return Thread.interrupted();
    }

更新头节点,并唤醒后继线程:

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 更新头节点
        setHead(node);

        if (propagate > 0 ||
                h == null || // 假设C线程在获取锁失败,准备进入队列前尝试获取锁的时候A线程正好释放了锁。head节点还没有初始化
                h.waitStatus < 0 || // 头节点SIGNAL/PROPAGATE状态,表示活跃 可通知 或 传播
                (h = head) == null ||
                h.waitStatus < 0) {
            Node s = node.next;
            // 队列中下一个节点为 读锁节点
            if (s == null || s.isShared())
            	//此方法见目录 加锁、解锁的共用方法。同时解释node节点中PROPAGATE状态的含义
                doReleaseShared();
        }
    }

此方法在获取共享锁逻辑中被调用,共享锁是可以被多个线程同时持有的。

共享锁释放
    public final boolean releaseShared(int arg) {
        // 模版方法,子类实现释放锁逻辑,如果成功,唤醒后继节点
        if (tryReleaseShared(arg)) {
        	//此方法见目录 加锁、解锁的共用方法。同时解释node节点中PROPAGATE状态的含义
            doReleaseShared();
            return true;
        }
        return false;
    }
共用方法
    
    private void doReleaseShared() {

        
        for (;;) {

            Node h = head;
            // h != tail 表明后续还有节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 当前节点状态为活跃节点
                if (ws == Node.SIGNAL) {
					//CAS修改头节点状态为0,成功则唤醒线程,失败则跳过当前循环
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                        continue; // loop to recheck cases
                    }
                    //唤醒线程
                    unparkSuccessor(h);
                } else if (ws == 0 &&		//ws=0 表示有线程已经唤醒了后继节点,当前线程会跳过唤醒再次循环走到这里
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                    // 如果head没有改变,但是CAS失败,则跳过此次循环,从新刷新head节点
                    continue;
                }
            }
            // 从始至终,head节点未发生改变 ,表明head没有被更新
            if (h == head) // loop if head changed
                break;
        }
    }

以上就是我对AQS共享机制的分析,如果有理解错误的地方,欢迎大家留言指正,一起学习共同进步

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781653.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号