AQS指的是AbstractQueuedSynchronizer抽象类,我们常用的java并发包中的类都是基于这个类实现的,AQS面向的是实现方,就是说,当我们要实现一个锁的时候就可以使用AQS这个抽象类
AQS使用模板方法来设计使得他的子类来重写他的方法,在父类中回调,主要有以下几种方法
-
tryAcquire
-
tryRelease
-
tryAcquireShared
-
tryReleaseShared
-
isHeldExclusively
前两种是在独占锁时使用,后两种是在共享锁时使用,最后一个是判断获取锁的线程是否是当前线程
AQS中还有一些重要的属性
- state // 表示锁的状态,为0的时候表示没有加锁,大于零表示已经有线程加锁
- head //AQS内部会维护一个双向链表,head指向头节点
- tail //指向尾接到
- exclusiveOwnerThread //当前持有锁的线程
ReentrantLock是一个可重入锁,下面我们通过RenntrantLock的加锁和解锁来探究一下
假设有三个线程来访问共享资源
import java.util.concurrent.locks.ReentrantLock;
public class LockTest {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
for (int i = 0; i < 3; i++) {
new Thread(() -> {
reentrantLock.lock();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
reentrantLock.unlock();
}
},"t"+i).start();
}
}
}
我们知道三个线程中将会有一个线程获得锁,其他两个线程将会阻塞,直到持有锁的线程释放锁,为了方便理解,我们假设t0线程首先获得锁。
Lock流程t0.lock的执行流程是怎么样的呢?
因为刚开始锁并没有被获取,所以t0会把state设置为1,并且让获得锁的线程指向自己,
我们通过源码来查看一下
当我们调用Lock.lock方法的时候,会执行ReentrantLock的内部类的sync的lock
public void lock() {
sync.lock();
}
sync是什么,我们上面说了,当我们想使用AQS实现锁的时候就可以继承AQS类,来重写模板方法,ReentrantLock也是如此,它有一个内部类是sync继承了AQS类,但它仍然是一个抽象类。
它有两个实现类 NonfairSync 和 FairSync 即非公平锁和公平锁,当我们创建时默认是非公平锁
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
//代码省略
}
protected final boolean tryRelease(int releases) {
//代码省略
}
protected final boolean isHeldExclusively() {
//代码省略
}
final ConditionObject newCondition() {
//代码省略
}
// Methods relayed from outer class
final Thread getOwner() {
//代码省略
}
final int getHoldCount() {
//代码省略
}
final boolean isLocked() {
//代码省略
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
//代码省略
}
}
NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
//通过CAS来尝试设置当前线程持有锁
if (compareAndSetState(0, 1))
//如果设置成功,将当前线程设置为持有锁线程
setExclusiveOwnerThread(Thread.currentThread());
else
//否则调用父类,即AQS的acquire方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
所以当我们调用Lock()方法时,实际上执行的是NonfairSync的Lock()方法
lock方法中会先使用cas尝试设置当前线程为持有锁线程,失败再走父类的acquire方法,在我们的例子中,因为我们假设t0是首先进入的线程,那么在它之前没有任何线程持有这把锁,所以t0可以设置自己为持有锁的线程然后调用结束
在t0持有锁期间 t1来获取锁
t0 一直占有锁所以t1 会被阻塞
t1.lock执行流程
final void lock() {
//使用cas尝试获得锁,但是此时state已经为1被其他线程占用
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//cas失败,调用父类AQS的acquire的方法
acquire(1);
}
AQS的acquire。tryAcquire是子类重写的方法,父类来调用。也是尝试获得锁的方法
public final void acquire(int arg) {
//当tryAcquire返回true时调用结束
if (!tryAcquire(arg) &&
//tryAcquire返回false时继续执行
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 打断自身,给自身设置打断标记,只有acquireQueued返回true执行
selfInterrupt();
}
tryAcquire方法执行流程
//NonfairSync类
protected final boolean tryAcquire(int acquires) {
//调用父类Sync的方法
return nonfairTryAcquire(acquires);
}
//sync类
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取当前state
int c = getState();
//c==0说明当前没有线程持有锁
if (c == 0) {
//使用cas来尝试获取锁,成功返回true失败false
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//c!=0说明当前锁已经被持有了,判断持有锁的线程是否是当前线程,是当前线程就将
//state+1 主要是实现锁的重入性
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//上面没能满足,返回false,尝试获得锁失败
return false;
}
根据上面的逻辑,我们的t1线程来tryAcquire的时候,state!=0,而且只有锁的线程也不是t1线程,那么tryAcquire失败,进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法来执行
首先执行的是addWaiter(Node.EXCLUSIVE)方法,这个方法的作用就是 将当前的线程封装为一个Node来链入AQS
private Node addWaiter(Node mode) {
//创建一个Node将当前线程封装,mode为null当前不用考虑
Node node = new Node(Thread.currentThread(), mode);
//拿到尾指针
Node pred = tail;
//尾指针不为空
if (pred != null) {
//使用cas来将当前节点链入尾部,并返回
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//尾节点为空
enq(node);
return node;
}
我们当前t1进入此方法时,tail为null 将会执行enq(node)方法,enq方法会首先建立一个哨兵节点然后将我们的node添加到哨兵节点后,然后返回。
在第一次循环时,tail为null会创建一个哨兵节点,第二次循环将当前节点设置为尾节点,即 哨兵节点-->当前节点 然后返回当前节点。
private Node enq(final Node node) {
//死循环
for (;;) {
//拿到尾指针
Node t = tail;
if (t == null) {
//尾指针为空建立一个node设置为head和tail
if (compareAndSetHead(new Node()))
tail = head;
} else {
//不为空
node.prev = t;
//将当前节点设置为尾节点然后返回
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
现在为止,我们的addwaiter执行完毕,将当前节点添加到尾节点并且返回,接着将要执行
acquireQueued方法,进入这个方法时还会进行tryAcquire,就是说如果当前节点在链中是第二个,并且在当前线程阻塞之前,锁被释放,那么当前线程将获得锁,而不需要被阻塞,如果锁没有被释放,或者当前节点不是第二个节点就会执行下面的判断逻辑
final boolean acquireQueued(final Node node, int arg) {
//失败标志
boolean failed = true;
try {
//打断标志
boolean interrupted = false;
for (;;) {
//拿到当前节点的前驱节点
final Node p = node.predecessor();
//前驱节点为头节点,tryAcquire获取锁成功
if (p == head && tryAcquire(arg)) {
//当前节点设置为头节点
setHead(node);
//当前节点的前驱节点退出链,等待gc
p.next = null;
failed = false;
//返回中断状态
return interrupted;
}
//当上面的逻辑并没有执行成功返回时,线程将被阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//当try中出现异常,failed没有置为false时
//撤销node,使node退出锁竞争
if (failed)
cancelAcquire(node);
shouldParkAfterFailedAcquire(p, node),方法会将当前节点的前驱接的的waiterState改为-1,
我们在前面的enq中默认创建的node的waitState的值为0,当执行此方法时,会用cas来设置前驱节点waitState为 -1 然后返回false,返回false会导致后面的parkAndCheckInterrupt()方法不会执行,而是再次判断(p == head && tryAcquire(arg))来尝试获得锁,失败后再次执行shouldParkAfterFailedAcquire方法返回true,执行parkAndCheckInterrupt()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//拿到前驱节点的waitState
int ws = pred.waitStatus;
//如果等于 -1 那么当前接的可以阻塞
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//大于0表示撤销状态
//一直向前查找一个不是撤销状态的节点设置为当前节点的前驱节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//使用cas来设置前驱的waitState为 -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt(),至此为止,t1线程已经被阻塞了,
主要做了以下事情,创建哨兵节点,创建当前线程的节点,让哨兵节点的next指向当前节点,然后设置哨兵节点的waiterstate的值为-1 然后当前线程阻塞
private final boolean parkAndCheckInterrupt() {
//阻塞当前线程
LockSupport.park(this);
//唤醒后返回当前线程是否被中断
return Thread.interrupted();
}
当t2线程进来时,t0线程依然持有锁,那么t2线程的执行逻辑与t1线程大致一致,t2线程也会加入到等待队列中,然后将t2线程的前驱节点就是t1线程的waiterState设置为-1,然后进入阻塞
此后如果还有线程来请求获得锁,只要锁没有释放,都会进入阻塞队列,并设置它的前驱节点的waierState值为-1,来标志它有后继节点。
lock的总结:我们一共定义了三个线程,t0获取锁,是设置state为1,然后开始持有锁,假设在t1和t2线程加锁期间t0线程都不释放锁,那么t1的加锁流程就是,首先会建立一个哨兵节点,然后创建当前线程的节点,让哨兵节点的next节点执行当前节点,之后设置前驱节点(此时是哨兵节点)的waiterState值为-1,然后t1线程阻塞。t2的加锁流程是创建一个节点,让t1节点的next指向当前节点,设置t1节点的waiterState的值为-1,然后当前线程阻塞。
了解过加锁流程后,我们再来看一下,当t0解锁时会发生什么
t0.unlock,首先调用父类AQS的release方法,会调用子类重写的tryRekese方法,解锁成功并且等待队列中有节点,就会进入唤醒流程,没有就直接返回
public void unlock() {
//调用父类(AQS)的release方法
sync.release(1);
}
//AQS
public final boolean release(int arg) {
//调用子类重写的tryRelease方法
if (tryRelease(arg)) {
//成功释放锁之后,如果等待队列中有节点
//调用unparkSuccessor()来唤醒
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
接下来我们要调用的是tryRelsese方法,我们t0只lock了一次,所以,会设置持有锁线程为null,设置state为0,返回true,表示锁已经被释放了。
//sync类
protected final boolean tryRelease(int releases) {
//获取state然后减1
int c = getState() - releases;
//如果持有锁的线程不是当前线程,那么就抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果c==0说明锁已经释放,设置持有锁线程为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//如果state不是0说明锁被重入了,那么就让state-1,锁不能释放
setState(c);
return free;
}
释放锁之后,就会执行唤醒流程,因为我们的等待队列中有三个节点(包括哨兵节点),那么就会执行unparkSuccessor(h); 按照我们的流程,将会唤醒t1
private void unparkSuccessor(Node node) {
//传入的是头节点
int ws = node.waitStatus;
//小于0先设置为0,防止再有线程来释放锁,
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//当头节点下一个节点为null时就什么也不做,因为头节点是哨兵节点
//当下一个节点不为null但是waitStatus大于0,(被撤销)就会在尾节点找到一个节点来
//代替它
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
t1被唤醒会干嘛呢?t1如果在阻塞过程中被中断了,interrupted 会被设置为true,然后执行(p == head && tryAcquire(arg))条件,那么当前条件是成立的,tryAcquire也是可以获得锁的,当t1获得锁后,会将t1节点设置为head,将t1节点的前驱节点断开链,等待垃圾回收。
final boolean acquireQueued(final Node node, int arg) {
//失败标志
boolean failed = true;
try {
//打断标志
boolean interrupted = false;
for (;;) {
//拿到当前节点的前驱节点
final Node p = node.predecessor();
//前驱节点为头节点,tryAcquire获取锁成功
if (p == head && tryAcquire(arg)) {
//当前节点设置为头节点
setHead(node);
//当前节点的前驱节点退出链,等待gc
p.next = null;
failed = false;
//返回中断状态
return interrupted;
}
//当上面的逻辑并没有执行成功返回时,线程将被阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//当try中出现异常,failed没有置为false时
//撤销node,使node退出锁竞争
if (failed)
cancelAcquire(node);
继续向前返回,当我们在阻塞中被打断了时,会返回true,然后执行 selfInterrupt方法,这个方法中只有 Thread.currentThread().interrupt();一行代码,是打断当前线程的,为什么要这样呢
在parkAndCheckInterrupt方法中通过Thread.interrupted();来判断线程是否被打断,但是Thread.interrupted();会擦除打断标记,所以要再次打断,然后返回。至此解锁完成。t1解锁流程类似,然后t2获得锁。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果上述条件成立执行下面方法
selfInterrupt();
}


