栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java ReentrantLock源码总结 ReentrantLock源码注释翻译和解析中英文对照版 AQS虚拟类的实现

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java ReentrantLock源码总结 ReentrantLock源码注释翻译和解析中英文对照版 AQS虚拟类的实现

版本
JDK8(JDK1.8)

ReentrantLock类源码重点
1.ReentrantLock是一种可重入互斥锁,可重入的实现思想是维护一个计数器,每次抢到锁计数器加1,释放锁计数器减1,直到计数器为0,才会真正释放锁

2.ReentrantLock实现了公平可重入锁和非公平可重入锁两个类,使用时可以在ReentrantLock(boolean fair) 传入true或false,则实例化FairSync类或NonfairSync类

3.ReentrantLock实现了Lock接口,其方法自然具有Lock定义的语义,lock主要定义有:

  • 阻塞锁 lock()
  • 获取非阻塞锁(即没获取到锁,线程也不阻塞等待) tryLock()
  • 获取可被中断的锁 lockInterruptibly()
  • 获取可超时的锁 tryLock(long time, TimeUnit unit)
  • 释放锁 unlock()

Lock 源码可以看我这篇文章 Lock

4.ReentrantLock的内部类 Sync、FairSync、NonfairSync,都是虚拟类AQS
(AbstractQueueSynchronizer)的子类,而AQS又是AbstractOwnableSynchronizer的子类,
AbstractOwnableSynchronizer主要存储持有当前锁的线程,以及一些方法,可以看我这篇文章 AbstractOwnableSynchronizer,

5.AQS是重点,它是一个虚拟类,使用模板方法设计模式,定义了操作队列的各种方法,在其内部有两个队列等待队列和条件队列,AQS是基于CLH的改版,即CLH是一直在自旋(循环)判断等待队列中自己的前一个结点是否持有锁或等待,如果是则继续自旋,否则则获取锁,而AQS是自旋中阻塞,即如果前一个结点持有锁或等待时,自己阻塞,直到前一个结点释放锁时,会自动唤醒后一个结点

6.ReentrantLock类定义的部分方法(不包括其内部类)

方法名作用
void lock()获取锁,没有获取则阻塞,持有锁线程获取则计数器加1(可重入)
void lockInterruptibly()获取锁,没有获取则阻塞,持有锁线程获取则计数器加1(可重入)
boolean tryLock()尝试获取锁,成功返回true,失败立即返回false,持有锁线程获取则计数器加1(可重入)
boolean tryLock(long timeout, TimeUnit unit)尝试获取锁,成功返回true,超时返回false,持有锁线程获取则计数器加1(可重入)
void unlock()计数器减1,减到0才释放锁

剩下的就是一些监测方法和创建条件队列newCondition()

7.部分方法详细实现
其他方法大致类似,可以看源码慢慢研究
lock()

ReentrantLock类
public void lock() {
        // 底层调用tryAcquire
        // tryAcquire是AbstractQueueSynchronzier指定由子类实现的
        // 在本类中有两类tryAcquire实现,一个是公平的,一个是非公平的
        // 如果获取锁没有成功 且往等待队列添加一个节点
        // 且使用acquireQueued(final Node node, int arg)方法 不断自旋
        // 判断是否排队排到队头了,排到了再次调用tryAcquire尝试获得锁
        // 没有获得锁,则设置节点状态waitStatus为SINGEL并进入阻塞状态
        sync.acquire(1);
   }
   
AbstractQueueSynchronizer类
public final void acquire(int arg) {
		// 此tryAcquire(arg)调用的是ReentrantLock重写的方法,
		// 且公平锁有一个版本,非公平锁有一个版本
        // 如果获取锁没有成功 且
        if (!tryAcquire(arg) &&
            // 自旋判断排队是否到队头,到了尝试获取锁,没有则设置前一个结点状态为SIGNEL
            // SIGNEL表示前一个结点释放锁后会自动唤醒后一个阻塞结点
            // 设置成功则当前线程进行阻塞
            // addWaiter(Node.EXCLUSIVE), 就是新建个等待队列结点并添加到队尾的操作
            // 每个结点中都有Thread类型成员变量,用来表示处于排队的线程
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果没有获得锁,又收到了中断标志则进行中断
            // 下面这行是满足上面的if的两个条件触发的
            // 进行自我中断
            selfInterrupt();
  }
  
AbstractQueueSynchronizer类
final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
        	// 自旋操作,是CLH改进,CLH不停自旋浪费CPU,
        	// 而这个自旋一次会阻塞,等待队列中前一个结点的唤醒
            for (;;) {
                // 返回node的上一个节点
                // 循环后如果node上一个节点改变,则会抛出异常
                final Node p = node.predecessor();
                
                //当排队到前一个结点是头结点才调用tryAcquire重新尝试获取锁
                if (p == head && tryAcquire(arg)) {
                	// 获得锁成功
                    // 设置该节点为头节点
                    // 内部实现
                    // head = node;
        			// node.thread = null;
        			// node.prev = null;
                    setHead(node);
                    // 之前的头节点让GC回收
                    p.next = null; // help GC
                    return interrupted; // 返回中断标志,默认为false即不中断
                }
                //  没有获取到锁
                // 则尝试设为p的状态为SIGNAL,如果p为要释放节点,则
                // 在队列先前找一个状态为SIGNAL的结点c和node结点相互连接
                // 即 node.prev = c
                // c.next = node
                if (shouldParkAfterFailedAcquire(p, node))
                    // 位操作 | 相对于 逻辑||
                    // 方便的阻塞方法,然后检查是否中断。
                    // 内部实现
                    // LockSupport.park(this); 阻塞当前线程
                    // return Thread.interrupted(); 判断是否有中断标志
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            // 取消获得
            // 从队列中移除该节点
            cancelAcquire(node);
            // 判断是否要添加中断标记
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }
    
    
ReentrantLock类
公平锁实现
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState(); //state 就是计数器
            // 为0说明此时没有线程持有该锁
            if (c == 0) {
                  // 重点 公平锁和非公平锁的区别,会不会判断等待队列中有无排队的线程
                 // 如果前面没有排队的线程 且
                if (!hasQueuedPredecessors() &&
                     // 设置状态成功 CAS,抢锁的操作
                    compareAndSetState(0, acquires)) {
                    // 则设置当前线程为独占锁线程
                    // 获取锁后的操作就是调用 AbstractOwnableSynchronizer类的方法
                    // 将其中的独占锁线程设为自生
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 不为零说明当前有线程持有锁
            // 如果当前线程为持有锁线程,则计数器加一
            else if (current == getExclusiveOwnerThread()) {
                // 则可重入次数增加
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 可重入次数设置到state中
                setState(nextc);
                return true;
            }
            // 如果有线程持有锁,又不是当前线程,则返回false
            return false;
        }

8.为什么等待队列有些方法总是要从尾处开始进行
关键就在于AQS中定义的往等待队列中插入的结点的enq()方法

 private Node enq(Node node) {
        for (;;) {
            // 获取尾节点
            Node oldTail = tail;
            // 如果尾节点不为空
            if (oldTail != null) {
            	// 重点在这里 新结点总是先指向原先的尾结点 即node.prev = oldTail
            	// 再把自己设为尾节点,然后旧的尾结点才会指向新的尾节点 即 oldTail.next = node
            	// 所以如果从头开始找,可能会漏掉新的尾节点,因为此时旧的尾节点还未指向新的尾节点
            	// 因为AQS是双向链表有prev也有next
                // 将node节点的prev设置为尾节点 
                node.setPrevRelaxed(oldTail);
                // 若此时尾节点还是oldTail,则将尾节点设为node
                if (compareAndSetTail(oldTail, node)) {
                    // 如果上面操作成功则将以前的尾节点的后一个节点设为新节点
                    // 否则自旋直至设置node为尾节点成功
                    oldTail.next = node;
                    return oldTail;
                }
            } 
            // 如果尾节点为空
            else {
                // 将头和尾设置为同一个新创建的节点
                initializeSyncQueue();
            }
        }
    }

ReentrantLock类源码

package java.util.concurrent.locks;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import jdk.internal.vm.annotation.ReservedStackAccess;


public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    
    private final Sync sync;

    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        
        @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 非公平锁,不判断队列头部有无线程排队,
                // 直接调用CAS尝试设置状态,成功则抢占锁成功,并设置独占线程为当前线程
                if (compareAndSetState(0, acquires)) {
                    // 设置独占锁的线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果当前线程是独占锁的线程
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            // 如果当前线程没有获得锁,抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //可重入锁,如果没有到退出的时候则只是设置c,表示可重入次数减少
            //如果当可重入次数为0,则直接释放锁
            if (c == 0) {
                
                free = true;
                // 设置当前拥有独占访问权限的线程
                // null表示没有线程拥有访问权限
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        // 返回当前线程是否独占锁
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            // 虽然我们通常必须在所有者之前读取状态,但我们不需要这样做来检查当前线程是否是所有者
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // 在AQS AbstractQueueSynchronizer中实现 ConditionObject类
        // 条件等待队列
        final ConditionObject newCondition() {
            // 创建了一条条件队列
            return new ConditionObject();
        }

        // Methods relayed from outer class 从外部类中继的方法

        final Thread getOwner() {
            //getState() 返回同步状态的当前值
            // state 为 0 没有线程持有锁 所以返回当前持有锁的为空
            // 不为0,则表示可重入次数,且有线程持有锁
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        // 返回当前线程持有该锁的次数
        final int getHoldCount() {
            // 如果当前线程持有该锁,则返回state表示持有该锁的次数
            // 如果该线程没有持有该锁,则返回0
            // 该锁就是一个对象,ReentrantLock的实例
            return isHeldExclusively() ? getState() : 0;
        }

        // 通过查看state表示某线程持有该锁的次数
        final boolean isLocked() {
            return getState() != 0;
        }

        
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state 重置为解锁状态
        }
    }

    
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 为0说明此时没有线程持有该锁
            if (c == 0) {
                
                    // 如果前面没有排队的线程 且
                if (!hasQueuedPredecessors() &&
                     // 设置状态成功
                    compareAndSetState(0, acquires)) {
                    // 则设置当前线程为独占锁线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 不为零说明当前有线程持有锁
            // 如果当前线程为独占线程
            else if (current == getExclusiveOwnerThread()) {
                // 则可重入次数增加
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 可重入次数设置到state中
                setState(nextc);
                return true;
            }
            // 如果有线程持有锁,又不是当前线程,则返回false
            return false;
        }
    }

    
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    
    public void lock() {
        // 底层调用tryAcquire
        // tryAcquire是AbstractQueueSynchronzier指定由子类实现的
        // 在本类中有两类tryAcquire实现,一个是公平的,一个是非公平的
        // 如果获取锁没有成功 且往等待队列添加一个节点
        // 且使用acquireQueued(final Node node, int arg)方法 不断自旋
        // 判断是否排队排到队头了,排到了再次调用tryAcquire尝试获得锁
        // 没有获得锁,则设置节点状态waitStatus为SINGEL并进入阻塞状态
        sync.acquire(1);
    }

    
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    
    public boolean tryLock() {
        // 由于不需要排队,只是尝试插队获取下锁
        // 故不需要使用AQS AbstractQueueSynchronizer中的等待队列操作
        return sync.nonfairTryAcquire(1);
    }

    
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.tonanos(timeout));
    }

    
    public void unlock() {
        sync.release(1);
    }

    
    public Condition newCondition() {
        // 返回AQS内部实现的ConditionObject
        return sync.newCondition();
    }

    
    public int getHoldCount() {
        return sync.getHoldCount();
    }

    
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    
    public boolean isLocked() {
        return sync.isLocked();
    }

    
    public final boolean isFair() {
        // 如果公平性设为true,则创建的AQS子类是FairSync
        // 否则创建的AQS子类是 NonfairSync
        return sync instanceof FairSync;
    }

    
    protected Thread getOwner() {
        // 先判断getState()是否为0,为0返回null,
        // 否则调用 getExclusiveOwnerThread() 获取独占线程
        // getExclusiveOwnerThread()是AbstractOwnableSynchronizer类定义的方法
        return sync.getOwner();
    }

    
    public final boolean hasQueuedThreads() {
        // 从等待队列尾部开始往队头找,直到找到waitStatus <= 0的节点,则返回true,
        // 找不到返回false
        return sync.hasQueuedThreads();
    }

    
    public final boolean hasQueuedThread(Thread thread) {
        // 从等待队列尾部开始往队头找,直到找到一个节点中的thread等于传入的thread,则返回true,
        return sync.isQueued(thread);
    }

    
    public final int getQueueLength() {
        // 从等待队列尾部开始往队头找,初始计数器为0,遇到一个节点线程不为空,
        // 则计数器加一,找到头部返回计数器
        return sync.getQueueLength();
    }

    
    protected Collection getQueuedThreads() {
         // 从等待队列尾部开始往队头找,初始化集合,遇到一个节点线程不为空,
        // 则将该线程添加进集合
        return sync.getQueuedThreads();
    }

    
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        // 如果condition 是AbstractQueuedSynchronizer.ConditionObject类的实例
        // 或其子类实例才返回true
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        // 判断线程是否持有锁,不持有抛出异常
        // 从条件队列头部开始找,直到找到一个节点 waitStatus == Node.ConDITION 返回true
        // 否者返回false
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        // 判断线程是否持有锁,不持有抛出异常
        // 初始化计数器为0.从条件队列头部开始找,
        // 找到一个节点满足 waitStatus == Node.ConDITION 则计数器加1
        // 循环到尾部后,返回计数器
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    
    protected Collection getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        // 判断线程是否持有锁,不持有抛出异常
        // 初始化集合.从条件队列头部开始找,
        // 找到一个节点满足 waitStatus == Node.ConDITION 则将该节点中的线程添加进集合
        // 循环到尾部后,返回集合
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    
    public String toString() {
        // 先调用getState() 返回同步状态的当前值 state
        // 若state 为 0 没有线程持有锁 所以返回当前持有锁的为null
        // 若state不为0,则返回当前持有锁的线程
        Thread o = sync.getOwner();
        // super.toString()是父类Object的toString()方法
        return super.toString() + ((o == null) ?
                                   "[Unlocked]" :
                                   "[Locked by thread " + o.getName() + "]");
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/274533.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号