栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

AQS源码分析

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

AQS源码分析

目录
  • 同步队列
    • 加锁
      • 自定义同步器
      • 线程加入AQS队列#addwaiter
      • 线程自旋#acquireQueued
    • 释放锁
  • 条件队列
    • 条件等待
    • 条件唤醒


同步队列

双向链表

加锁 自定义同步器

自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

独占锁#accquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上
        selfInterrupt();
}

共享锁#acquireShared

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}
线程加入AQS队列#addwaiter

自旋加入AQS队列。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
    	//这儿可能会出现两个节点的prev都指向tail,compareAndSetTail有一个会失败,失败后执行enq会重新赋值
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //队列为空或者加入队尾竞争失败
    enq(node);
    return node;
}

private Node enq(final Node node) {
   for (;;) {
       Node t = tail;
       if (t == null) { // Must initialize
           if (compareAndSetHead(new Node()))
               tail = head;
       } else {
           node.prev = t;
           if (compareAndSetTail(t, node)) {
               t.next = node;
               return t;
           }
       }
   }
}
线程自旋#acquireQueued

加入AQS队列后,线程开始自旋。自旋过程中,第2个节点是有机会直接获取锁的,第3个及以后的节点或第2个节点获取锁失败,会判断前驱节点的状态(SIGNAL就挂起,CANCELLED往前移,0就设置前驱节点为SIGNAL)来决定当前线程是否需要挂起。

final boolean acquireQueued(final Node node, int arg) {
   //标记是否成功拿到锁
   boolean failed = true;
   try {
   	   //标记等待过程中是否被中断过
       boolean interrupted = false;
       //自旋
       for (;;) {
           final Node p = node.predecessor();
           if (p == head && tryAcquire(arg)) {
           	   //这里不需要再进行CAS操作了,获取锁成功的线程作为新的head节点
               setHead(node);
               p.next = null; // help GC
               failed = false;
               //假中断,在这个过程中即使线程被中断了,忽略
               return interrupted;
           }
           
           //第2个节点获取锁失败或第3个及以后的节点,则根据前驱节点的waitStatus决定是否挂起当前线程
           if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())
               interrupted = true;
       }
   } finally {
       if (failed)
           cancelAcquire(node);
   }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    //新加入AQS队列的节点的waitStatus=0
    //如果是SIGNAL状态,意味着当前线程需要被unpark唤醒
    if (ws == Node.SIGNAL)
        
        return true;
    //如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”的节点,并设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操作实际是把队列中CANCELLED的节点剔除掉。    
    if (ws > 0) {
        
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //如果前继节点为“0”或者“共享锁”状态,设置前继节点为SIGNAL状态
        
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt(){
	LockSupport.park(this);
	return Thread.interrupted();
}
释放锁

尝试将state减1,如果为0说明可以被其它线程竞争了,唤醒下一个节点,下一个节点醒来后就会回到acquireQueued逻辑。

public final boolean release(int arg) {
   if (tryRelease(arg)) {
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);
       return true;
   }
   return false;
}

protected final boolean tryRelease(int releases) {
   int c = getState() - releases;
   if (Thread.currentThread() != getExclusiveOwnerThread())
       throw new IllegalMonitorStateException();
   boolean free = false;
   if (c == 0) {
       free = true;
       setExclusiveOwnerThread(null);
   }
   setState(c);
   return free;
}

private void unparkSuccessor(Node node) {
   
   int ws = node.waitStatus;
   if (ws < 0)
       compareAndSetWaitStatus(node, ws, 0);

   
   Node s = node.next;
   if (s == null || s.waitStatus > 0) {
       s = null;
       for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0)
               s = t;
   }
   if (s != null)
       LockSupport.unpark(s.thread);
}
条件队列

单向链表,每个condition都维护1个条件队列。

条件等待

调用condition.await时,会先往条件队列里添加Node,然后释放锁,就相当于把同步队列的head移到了条件队列。

条件唤醒

唤醒会从条件队列中移动到同步队列,等待获取锁,获取锁成功后才真正唤醒。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1026116.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号