栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

AQS之ReentrantLock源码解析

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

AQS之ReentrantLock源码解析

前言

 AQS指的是AbstractQueuedSynchronizer抽象类,我们常用的java并发包中的类都是基于这个类实现的,AQS面向的是实现方,就是说,当我们要实现一个锁的时候就可以使用AQS这个抽象类

AQS使用模板方法来设计使得他的子类来重写他的方法,在父类中回调,主要有以下几种方法

  1. tryAcquire
  2. tryRelease
  3. tryAcquireShared
  4. tryReleaseShared
  5. isHeldExclusively

前两种是在独占锁时使用,后两种是在共享锁时使用,最后一个是判断获取锁的线程是否是当前线程

AQS中还有一些重要的属性

  • state     // 表示锁的状态,为0的时候表示没有加锁,大于零表示已经有线程加锁
  • head     //AQS内部会维护一个双向链表,head指向头节点
  • tail        //指向尾接到
  • exclusiveOwnerThread   //当前持有锁的线程
ReentrantLock

ReentrantLock是一个可重入锁,下面我们通过RenntrantLock的加锁和解锁来探究一下

假设有三个线程来访问共享资源

import java.util.concurrent.locks.ReentrantLock;

public class LockTest {
    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                reentrantLock.lock();
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    reentrantLock.unlock();
                }
            },"t"+i).start();
            
        }
     
    }
}

我们知道三个线程中将会有一个线程获得锁,其他两个线程将会阻塞,直到持有锁的线程释放锁,为了方便理解,我们假设t0线程首先获得锁。

Lock流程

t0.lock的执行流程是怎么样的呢?

 因为刚开始锁并没有被获取,所以t0会把state设置为1,并且让获得锁的线程指向自己,

我们通过源码来查看一下

当我们调用Lock.lock方法的时候,会执行ReentrantLock的内部类的sync的lock

public void lock() {
        sync.lock();
}

sync是什么,我们上面说了,当我们想使用AQS实现锁的时候就可以继承AQS类,来重写模板方法,ReentrantLock也是如此,它有一个内部类是sync继承了AQS类,但它仍然是一个抽象类。

它有两个实现类  NonfairSync 和 FairSync 即非公平锁和公平锁,当我们创建时默认是非公平锁

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {
                //代码省略         
        }

        protected final boolean tryRelease(int releases) {
          //代码省略
        }

        protected final boolean isHeldExclusively() {
           //代码省略
        }

        final ConditionObject newCondition() {
            //代码省略
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            //代码省略
        }

        final int getHoldCount() {
            //代码省略
        }

        final boolean isLocked() {
            //代码省略
        }

        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
           //代码省略
        }
    }
NonfairSync
 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        
        final void lock() {
            //通过CAS来尝试设置当前线程持有锁
            if (compareAndSetState(0, 1))
                //如果设置成功,将当前线程设置为持有锁线程
                setExclusiveOwnerThread(Thread.currentThread());
            
           else
                //否则调用父类,即AQS的acquire方法
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

所以当我们调用Lock()方法时,实际上执行的是NonfairSync的Lock()方法

lock方法中会先使用cas尝试设置当前线程为持有锁线程,失败再走父类的acquire方法,在我们的例子中,因为我们假设t0是首先进入的线程,那么在它之前没有任何线程持有这把锁,所以t0可以设置自己为持有锁的线程然后调用结束

在t0持有锁期间 t1来获取锁

 t0 一直占有锁所以t1 会被阻塞

t1.lock执行流程

final void lock() {
        //使用cas尝试获得锁,但是此时state已经为1被其他线程占用
       if (compareAndSetState(0, 1))
             setExclusiveOwnerThread(Thread.currentThread());
       else
            //cas失败,调用父类AQS的acquire的方法
             acquire(1);
}

AQS的acquire。tryAcquire是子类重写的方法,父类来调用。也是尝试获得锁的方法

​
 public final void acquire(int arg) {
         //当tryAcquire返回true时调用结束
        if (!tryAcquire(arg) &&
            //tryAcquire返回false时继续执行
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           

            // 打断自身,给自身设置打断标记,只有acquireQueued返回true执行
            selfInterrupt();
}

​

tryAcquire方法执行流程

//NonfairSync类
protected final boolean tryAcquire(int acquires) {
          //调用父类Sync的方法
         return nonfairTryAcquire(acquires);
}
//sync类
final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取当前state
    int c = getState();
    //c==0说明当前没有线程持有锁
    if (c == 0) {
        //使用cas来尝试获取锁,成功返回true失败false
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //c!=0说明当前锁已经被持有了,判断持有锁的线程是否是当前线程,是当前线程就将
    //state+1  主要是实现锁的重入性
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //上面没能满足,返回false,尝试获得锁失败           
    return false;
}

根据上面的逻辑,我们的t1线程来tryAcquire的时候,state!=0,而且只有锁的线程也不是t1线程,那么tryAcquire失败,进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法来执行

首先执行的是addWaiter(Node.EXCLUSIVE)方法,这个方法的作用就是 将当前的线程封装为一个Node来链入AQS

private Node addWaiter(Node mode) {
    //创建一个Node将当前线程封装,mode为null当前不用考虑
    Node node = new Node(Thread.currentThread(), mode);
    //拿到尾指针
    Node pred = tail;
    //尾指针不为空
    if (pred != null) {
        //使用cas来将当前节点链入尾部,并返回
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //尾节点为空
    enq(node);
    return node;
}

我们当前t1进入此方法时,tail为null 将会执行enq(node)方法,enq方法会首先建立一个哨兵节点然后将我们的node添加到哨兵节点后,然后返回。

在第一次循环时,tail为null会创建一个哨兵节点,第二次循环将当前节点设置为尾节点,即 哨兵节点-->当前节点   然后返回当前节点。

private Node enq(final Node node) {
    //死循环
    for (;;) {
        //拿到尾指针
        Node t = tail;
        if (t == null) {
            //尾指针为空建立一个node设置为head和tail
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //不为空
            node.prev = t;
            //将当前节点设置为尾节点然后返回
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

现在为止,我们的addwaiter执行完毕,将当前节点添加到尾节点并且返回,接着将要执行

acquireQueued方法,进入这个方法时还会进行tryAcquire,就是说如果当前节点在链中是第二个,并且在当前线程阻塞之前,锁被释放,那么当前线程将获得锁,而不需要被阻塞,如果锁没有被释放,或者当前节点不是第二个节点就会执行下面的判断逻辑   

final boolean acquireQueued(final Node node, int arg) {
     //失败标志
    boolean failed = true;
    try {
        //打断标志
        boolean interrupted = false;
        for (;;) {
            //拿到当前节点的前驱节点
            final Node p = node.predecessor();
             //前驱节点为头节点,tryAcquire获取锁成功
            if (p == head && tryAcquire(arg)) {
                //当前节点设置为头节点
                setHead(node);
                //当前节点的前驱节点退出链,等待gc
                p.next = null; 
                failed = false;
                //返回中断状态
                return interrupted;
            }
            //当上面的逻辑并没有执行成功返回时,线程将被阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //当try中出现异常,failed没有置为false时
        //撤销node,使node退出锁竞争
        if (failed)
            cancelAcquire(node);

 shouldParkAfterFailedAcquire(p, node),方法会将当前节点的前驱接的的waiterState改为-1,

我们在前面的enq中默认创建的node的waitState的值为0,当执行此方法时,会用cas来设置前驱节点waitState为 -1 然后返回false,返回false会导致后面的parkAndCheckInterrupt()方法不会执行,而是再次判断(p == head && tryAcquire(arg))来尝试获得锁,失败后再次执行shouldParkAfterFailedAcquire方法返回true,执行parkAndCheckInterrupt()方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //拿到前驱节点的waitState
    int ws = pred.waitStatus;
    //如果等于 -1  那么当前接的可以阻塞    
    if (ws == Node.SIGNAL)
      
        return true;
    if (ws > 0) {
         //大于0表示撤销状态
          //一直向前查找一个不是撤销状态的节点设置为当前节点的前驱节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //使用cas来设置前驱的waitState为 -1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt(),至此为止,t1线程已经被阻塞了,

主要做了以下事情,创建哨兵节点,创建当前线程的节点,让哨兵节点的next指向当前节点,然后设置哨兵节点的waiterstate的值为-1 然后当前线程阻塞

private final boolean parkAndCheckInterrupt() {
    //阻塞当前线程
    LockSupport.park(this);
    //唤醒后返回当前线程是否被中断
    return Thread.interrupted();
}

当t2线程进来时,t0线程依然持有锁,那么t2线程的执行逻辑与t1线程大致一致,t2线程也会加入到等待队列中,然后将t2线程的前驱节点就是t1线程的waiterState设置为-1,然后进入阻塞

 此后如果还有线程来请求获得锁,只要锁没有释放,都会进入阻塞队列,并设置它的前驱节点的waierState值为-1,来标志它有后继节点。

lock的总结:我们一共定义了三个线程,t0获取锁,是设置state为1,然后开始持有锁,假设在t1和t2线程加锁期间t0线程都不释放锁,那么t1的加锁流程就是,首先会建立一个哨兵节点,然后创建当前线程的节点,让哨兵节点的next节点执行当前节点,之后设置前驱节点(此时是哨兵节点)的waiterState值为-1,然后t1线程阻塞。t2的加锁流程是创建一个节点,让t1节点的next指向当前节点,设置t1节点的waiterState的值为-1,然后当前线程阻塞。

了解过加锁流程后,我们再来看一下,当t0解锁时会发生什么

t0.unlock,首先调用父类AQS的release方法,会调用子类重写的tryRekese方法,解锁成功并且等待队列中有节点,就会进入唤醒流程,没有就直接返回

public void unlock() {
    //调用父类(AQS)的release方法
    sync.release(1);
}
//AQS
public final boolean release(int arg) {
    //调用子类重写的tryRelease方法
    if (tryRelease(arg)) {
       //成功释放锁之后,如果等待队列中有节点
        //调用unparkSuccessor()来唤醒
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

接下来我们要调用的是tryRelsese方法,我们t0只lock了一次,所以,会设置持有锁线程为null,设置state为0,返回true,表示锁已经被释放了。

//sync类
protected final boolean tryRelease(int releases) {
    //获取state然后减1
    int c = getState() - releases;
    //如果持有锁的线程不是当前线程,那么就抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //如果c==0说明锁已经释放,设置持有锁线程为null
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    //如果state不是0说明锁被重入了,那么就让state-1,锁不能释放
    setState(c);
    return free;
}

释放锁之后,就会执行唤醒流程,因为我们的等待队列中有三个节点(包括哨兵节点),那么就会执行unparkSuccessor(h);  按照我们的流程,将会唤醒t1

private void unparkSuccessor(Node node) {
    //传入的是头节点
    int ws = node.waitStatus;
    //小于0先设置为0,防止再有线程来释放锁,
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    
    Node s = node.next;
    //当头节点下一个节点为null时就什么也不做,因为头节点是哨兵节点
    //当下一个节点不为null但是waitStatus大于0,(被撤销)就会在尾节点找到一个节点来
    //代替它
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //唤醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

t1被唤醒会干嘛呢?t1如果在阻塞过程中被中断了,interrupted 会被设置为true,然后执行(p == head && tryAcquire(arg))条件,那么当前条件是成立的,tryAcquire也是可以获得锁的,当t1获得锁后,会将t1节点设置为head,将t1节点的前驱节点断开链,等待垃圾回收。

final boolean acquireQueued(final Node node, int arg) {
     //失败标志
    boolean failed = true;
    try {
        //打断标志
        boolean interrupted = false;
        for (;;) {
            //拿到当前节点的前驱节点
            final Node p = node.predecessor();
             //前驱节点为头节点,tryAcquire获取锁成功
            if (p == head && tryAcquire(arg)) {
                //当前节点设置为头节点
                setHead(node);
                //当前节点的前驱节点退出链,等待gc
                p.next = null; 
                failed = false;
                //返回中断状态
                return interrupted;
            }
            //当上面的逻辑并没有执行成功返回时,线程将被阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //当try中出现异常,failed没有置为false时
        //撤销node,使node退出锁竞争
        if (failed)
            cancelAcquire(node);

继续向前返回,当我们在阻塞中被打断了时,会返回true,然后执行 selfInterrupt方法,这个方法中只有 Thread.currentThread().interrupt();一行代码,是打断当前线程的,为什么要这样呢

在parkAndCheckInterrupt方法中通过Thread.interrupted();来判断线程是否被打断,但是Thread.interrupted();会擦除打断标记,所以要再次打断,然后返回。至此解锁完成。t1解锁流程类似,然后t2获得锁。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         //如果上述条件成立执行下面方法
        selfInterrupt();
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/862418.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号