版本
JDK8(JDK1.8)
ReentrantLock类源码重点
1.ReentrantLock是一种可重入互斥锁,可重入的实现思想是维护一个计数器,每次抢到锁计数器加1,释放锁计数器减1,直到计数器为0,才会真正释放锁
2.ReentrantLock实现了公平可重入锁和非公平可重入锁两个类,使用时可以在ReentrantLock(boolean fair) 传入true或false,则实例化FairSync类或NonfairSync类
3.ReentrantLock实现了Lock接口,其方法自然具有Lock定义的语义,lock主要定义有:
- 阻塞锁 lock()
- 获取非阻塞锁(即没获取到锁,线程也不阻塞等待) tryLock()
- 获取可被中断的锁 lockInterruptibly()
- 获取可超时的锁 tryLock(long time, TimeUnit unit)
- 释放锁 unlock()
Lock 源码可以看我这篇文章 Lock
4.ReentrantLock的内部类 Sync、FairSync、NonfairSync,都是虚拟类AQS
(AbstractQueueSynchronizer)的子类,而AQS又是AbstractOwnableSynchronizer的子类,
AbstractOwnableSynchronizer主要存储持有当前锁的线程,以及一些方法,可以看我这篇文章 AbstractOwnableSynchronizer,
5.AQS是重点,它是一个虚拟类,使用模板方法设计模式,定义了操作队列的各种方法,在其内部有两个队列等待队列和条件队列,AQS是基于CLH的改版,即CLH是一直在自旋(循环)判断等待队列中自己的前一个结点是否持有锁或等待,如果是则继续自旋,否则则获取锁,而AQS是自旋中阻塞,即如果前一个结点持有锁或等待时,自己阻塞,直到前一个结点释放锁时,会自动唤醒后一个结点
6.ReentrantLock类定义的部分方法(不包括其内部类)
| 方法名 | 作用 |
|---|---|
| void lock() | 获取锁,没有获取则阻塞,持有锁线程获取则计数器加1(可重入) |
| void lockInterruptibly() | 获取锁,没有获取则阻塞,持有锁线程获取则计数器加1(可重入) |
| boolean tryLock() | 尝试获取锁,成功返回true,失败立即返回false,持有锁线程获取则计数器加1(可重入) |
| boolean tryLock(long timeout, TimeUnit unit) | 尝试获取锁,成功返回true,超时返回false,持有锁线程获取则计数器加1(可重入) |
| void unlock() | 计数器减1,减到0才释放锁 |
剩下的就是一些监测方法和创建条件队列newCondition()
7.部分方法详细实现
其他方法大致类似,可以看源码慢慢研究
lock()
ReentrantLock类
public void lock() {
// 底层调用tryAcquire
// tryAcquire是AbstractQueueSynchronzier指定由子类实现的
// 在本类中有两类tryAcquire实现,一个是公平的,一个是非公平的
// 如果获取锁没有成功 且往等待队列添加一个节点
// 且使用acquireQueued(final Node node, int arg)方法 不断自旋
// 判断是否排队排到队头了,排到了再次调用tryAcquire尝试获得锁
// 没有获得锁,则设置节点状态waitStatus为SINGEL并进入阻塞状态
sync.acquire(1);
}
AbstractQueueSynchronizer类
public final void acquire(int arg) {
// 此tryAcquire(arg)调用的是ReentrantLock重写的方法,
// 且公平锁有一个版本,非公平锁有一个版本
// 如果获取锁没有成功 且
if (!tryAcquire(arg) &&
// 自旋判断排队是否到队头,到了尝试获取锁,没有则设置前一个结点状态为SIGNEL
// SIGNEL表示前一个结点释放锁后会自动唤醒后一个阻塞结点
// 设置成功则当前线程进行阻塞
// addWaiter(Node.EXCLUSIVE), 就是新建个等待队列结点并添加到队尾的操作
// 每个结点中都有Thread类型成员变量,用来表示处于排队的线程
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果没有获得锁,又收到了中断标志则进行中断
// 下面这行是满足上面的if的两个条件触发的
// 进行自我中断
selfInterrupt();
}
AbstractQueueSynchronizer类
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
// 自旋操作,是CLH改进,CLH不停自旋浪费CPU,
// 而这个自旋一次会阻塞,等待队列中前一个结点的唤醒
for (;;) {
// 返回node的上一个节点
// 循环后如果node上一个节点改变,则会抛出异常
final Node p = node.predecessor();
//当排队到前一个结点是头结点才调用tryAcquire重新尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获得锁成功
// 设置该节点为头节点
// 内部实现
// head = node;
// node.thread = null;
// node.prev = null;
setHead(node);
// 之前的头节点让GC回收
p.next = null; // help GC
return interrupted; // 返回中断标志,默认为false即不中断
}
// 没有获取到锁
// 则尝试设为p的状态为SIGNAL,如果p为要释放节点,则
// 在队列先前找一个状态为SIGNAL的结点c和node结点相互连接
// 即 node.prev = c
// c.next = node
if (shouldParkAfterFailedAcquire(p, node))
// 位操作 | 相对于 逻辑||
// 方便的阻塞方法,然后检查是否中断。
// 内部实现
// LockSupport.park(this); 阻塞当前线程
// return Thread.interrupted(); 判断是否有中断标志
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 取消获得
// 从队列中移除该节点
cancelAcquire(node);
// 判断是否要添加中断标记
if (interrupted)
selfInterrupt();
throw t;
}
}
ReentrantLock类
公平锁实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //state 就是计数器
// 为0说明此时没有线程持有该锁
if (c == 0) {
// 重点 公平锁和非公平锁的区别,会不会判断等待队列中有无排队的线程
// 如果前面没有排队的线程 且
if (!hasQueuedPredecessors() &&
// 设置状态成功 CAS,抢锁的操作
compareAndSetState(0, acquires)) {
// 则设置当前线程为独占锁线程
// 获取锁后的操作就是调用 AbstractOwnableSynchronizer类的方法
// 将其中的独占锁线程设为自生
setExclusiveOwnerThread(current);
return true;
}
}
// 不为零说明当前有线程持有锁
// 如果当前线程为持有锁线程,则计数器加一
else if (current == getExclusiveOwnerThread()) {
// 则可重入次数增加
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 可重入次数设置到state中
setState(nextc);
return true;
}
// 如果有线程持有锁,又不是当前线程,则返回false
return false;
}
8.为什么等待队列有些方法总是要从尾处开始进行
关键就在于AQS中定义的往等待队列中插入的结点的enq()方法
private Node enq(Node node) {
for (;;) {
// 获取尾节点
Node oldTail = tail;
// 如果尾节点不为空
if (oldTail != null) {
// 重点在这里 新结点总是先指向原先的尾结点 即node.prev = oldTail
// 再把自己设为尾节点,然后旧的尾结点才会指向新的尾节点 即 oldTail.next = node
// 所以如果从头开始找,可能会漏掉新的尾节点,因为此时旧的尾节点还未指向新的尾节点
// 因为AQS是双向链表有prev也有next
// 将node节点的prev设置为尾节点
node.setPrevRelaxed(oldTail);
// 若此时尾节点还是oldTail,则将尾节点设为node
if (compareAndSetTail(oldTail, node)) {
// 如果上面操作成功则将以前的尾节点的后一个节点设为新节点
// 否则自旋直至设置node为尾节点成功
oldTail.next = node;
return oldTail;
}
}
// 如果尾节点为空
else {
// 将头和尾设置为同一个新创建的节点
initializeSyncQueue();
}
}
}
ReentrantLock类源码
package java.util.concurrent.locks;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import jdk.internal.vm.annotation.ReservedStackAccess;
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 非公平锁,不判断队列头部有无线程排队,
// 直接调用CAS尝试设置状态,成功则抢占锁成功,并设置独占线程为当前线程
if (compareAndSetState(0, acquires)) {
// 设置独占锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程是独占锁的线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果当前线程没有获得锁,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//可重入锁,如果没有到退出的时候则只是设置c,表示可重入次数减少
//如果当可重入次数为0,则直接释放锁
if (c == 0) {
free = true;
// 设置当前拥有独占访问权限的线程
// null表示没有线程拥有访问权限
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 返回当前线程是否独占锁
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
// 虽然我们通常必须在所有者之前读取状态,但我们不需要这样做来检查当前线程是否是所有者
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 在AQS AbstractQueueSynchronizer中实现 ConditionObject类
// 条件等待队列
final ConditionObject newCondition() {
// 创建了一条条件队列
return new ConditionObject();
}
// Methods relayed from outer class 从外部类中继的方法
final Thread getOwner() {
//getState() 返回同步状态的当前值
// state 为 0 没有线程持有锁 所以返回当前持有锁的为空
// 不为0,则表示可重入次数,且有线程持有锁
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 返回当前线程持有该锁的次数
final int getHoldCount() {
// 如果当前线程持有该锁,则返回state表示持有该锁的次数
// 如果该线程没有持有该锁,则返回0
// 该锁就是一个对象,ReentrantLock的实例
return isHeldExclusively() ? getState() : 0;
}
// 通过查看state表示某线程持有该锁的次数
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state 重置为解锁状态
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 为0说明此时没有线程持有该锁
if (c == 0) {
// 如果前面没有排队的线程 且
if (!hasQueuedPredecessors() &&
// 设置状态成功
compareAndSetState(0, acquires)) {
// 则设置当前线程为独占锁线程
setExclusiveOwnerThread(current);
return true;
}
}
// 不为零说明当前有线程持有锁
// 如果当前线程为独占线程
else if (current == getExclusiveOwnerThread()) {
// 则可重入次数增加
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 可重入次数设置到state中
setState(nextc);
return true;
}
// 如果有线程持有锁,又不是当前线程,则返回false
return false;
}
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
// 底层调用tryAcquire
// tryAcquire是AbstractQueueSynchronzier指定由子类实现的
// 在本类中有两类tryAcquire实现,一个是公平的,一个是非公平的
// 如果获取锁没有成功 且往等待队列添加一个节点
// 且使用acquireQueued(final Node node, int arg)方法 不断自旋
// 判断是否排队排到队头了,排到了再次调用tryAcquire尝试获得锁
// 没有获得锁,则设置节点状态waitStatus为SINGEL并进入阻塞状态
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
// 由于不需要排队,只是尝试插队获取下锁
// 故不需要使用AQS AbstractQueueSynchronizer中的等待队列操作
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.tonanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
// 返回AQS内部实现的ConditionObject
return sync.newCondition();
}
public int getHoldCount() {
return sync.getHoldCount();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public boolean isLocked() {
return sync.isLocked();
}
public final boolean isFair() {
// 如果公平性设为true,则创建的AQS子类是FairSync
// 否则创建的AQS子类是 NonfairSync
return sync instanceof FairSync;
}
protected Thread getOwner() {
// 先判断getState()是否为0,为0返回null,
// 否则调用 getExclusiveOwnerThread() 获取独占线程
// getExclusiveOwnerThread()是AbstractOwnableSynchronizer类定义的方法
return sync.getOwner();
}
public final boolean hasQueuedThreads() {
// 从等待队列尾部开始往队头找,直到找到waitStatus <= 0的节点,则返回true,
// 找不到返回false
return sync.hasQueuedThreads();
}
public final boolean hasQueuedThread(Thread thread) {
// 从等待队列尾部开始往队头找,直到找到一个节点中的thread等于传入的thread,则返回true,
return sync.isQueued(thread);
}
public final int getQueueLength() {
// 从等待队列尾部开始往队头找,初始计数器为0,遇到一个节点线程不为空,
// 则计数器加一,找到头部返回计数器
return sync.getQueueLength();
}
protected Collection getQueuedThreads() {
// 从等待队列尾部开始往队头找,初始化集合,遇到一个节点线程不为空,
// 则将该线程添加进集合
return sync.getQueuedThreads();
}
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
// 如果condition 是AbstractQueuedSynchronizer.ConditionObject类的实例
// 或其子类实例才返回true
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
// 判断线程是否持有锁,不持有抛出异常
// 从条件队列头部开始找,直到找到一个节点 waitStatus == Node.ConDITION 返回true
// 否者返回false
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
// 判断线程是否持有锁,不持有抛出异常
// 初始化计数器为0.从条件队列头部开始找,
// 找到一个节点满足 waitStatus == Node.ConDITION 则计数器加1
// 循环到尾部后,返回计数器
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
protected Collection getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
// 判断线程是否持有锁,不持有抛出异常
// 初始化集合.从条件队列头部开始找,
// 找到一个节点满足 waitStatus == Node.ConDITION 则将该节点中的线程添加进集合
// 循环到尾部后,返回集合
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public String toString() {
// 先调用getState() 返回同步状态的当前值 state
// 若state 为 0 没有线程持有锁 所以返回当前持有锁的为null
// 若state不为0,则返回当前持有锁的线程
Thread o = sync.getOwner();
// super.toString()是父类Object的toString()方法
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
}



