栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ReentrantLock

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ReentrantLock

特点
  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁 (先到先得)
  • 支持多个条件变量( 具有多个 WaitSet)
  • 支持可重入
// 获取ReentrantLock对象
private ReentrantLock lock = new ReentrantLock();
// 加锁
lock.lock();
try {
	// 需要执行的代码
}finally {
	// 释放锁
	lock.unlock();
}
 

synchronized 和 ReentrantLock区别
  1. Synchronized 内置java关键字,lock 是java一个类
  2. Sychronized 无法判断获取锁的状态,lock可以判断是否获取到了锁
  3. Sychronized 会自动释放锁,lock需要手动释放锁,否则会死锁
  4. Sychronized 线程1(获得锁,阻塞) 线程2 (等待) Lock锁不一定会等待下去lock.tryLock()
  5. Sychronized 可重入锁,不可以中断的,非公平锁;lock可重入的,可以判断锁,可以设置公平锁
  6. Sychronized适合锁少量的代码同步问题,lock适合锁大量的同步代码
  7. Synchronized 只能在单个waitSet里等待,随机唤醒等待的线程,lock 可以搭配条件变量在多个waitset等待,并实现精准的唤醒
可重入
  • 可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
  • 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
public class LockAgain {

    public static void main(String[] args) {
        method1();
    }
    static ReentrantLock lock = new ReentrantLock();
    
    public static void method1() {
        lock.lock();
        try {
            System.out.println("execute method1");
            method2();
        } finally {
            lock.unlock();
        }
    }

    public static void method2() {
        lock.lock();
        try {
            System.out.println("execute method2");
            method3();
        } finally {
            lock.unlock();
        }
    }

    public static void method3() {
        lock.lock();
        try {
            System.out.println("execute method3");
        } finally {
            lock.unlock();
        }
    }
}
可中断

如果某个线程处于等待获取锁的状态,可以调用其 interrupt 方法让其停止阻塞,避免长时间等待下去,这也是可以避免死锁的一种方式,但是是被动的等待其他线程调用打断方法

处于阻塞的线程可以停止阻塞,让其继续获得争抢锁的权利

public class CanInterrupted {
    private static final ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {

            try {
                //可打断锁
                //如果没有竞争会获得lock对象锁,如果有竞争进入等待队列,但可以被其他线程使用interrupt打断
                System.out.println("尝试获得锁!");
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
                return;//被打断,返回,不再向下执行
            }
            System.out.println(" 获得了锁!");
            lock.unlock();

        }, "t1");
        lock.lock();//在主线程里获得了锁
        try {
            t1.start();
            Thread.sleep(1000);
            // 打断
            t1.interrupt();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

可超时

使用 lock.tryLock 方法会返回获取锁是否成功。如果成功则返回 true ,反之则返回 false 。
并且 tryLock 方法可以指定等待时间,参数为:tryLock(long timeout, TimeUnit unit), 其中 timeout 为最长等待时间,TimeUnit 为时间单位

主动防止无限制等待,避免死锁

public class CanTimeout {

    private  static  final ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {

       Thread t1 =  new Thread(()->{
             System.out.println("尝试获得锁,获取到锁返回true,否则false");
             if (!lock.tryLock()){
                 System.out.println("获取不到锁");
                 return;
             }
             System.out.println("获取到了锁");
             lock.unlock();

         }) ;

        lock.lock();//主线程拥有了锁
        try{
            t1.start();
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }
}

公平锁

公平锁: 非常公平, 不能够插队,必须先来后到!解决线程饥饿但会降低并发度

非公平锁:非常不公平,可以插队 (默认都是非公平)

条件变量

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入waitSet 等待。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

synchronized 是那些不满足条件的线程都在一间休息室等消息
而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
await 前需要获得锁

1 await 执行后,会释放锁,进入 conditionObject 等待
2 await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
3 竞争 lock 锁成功后,从 await 后继续执行

public class ConditionTest {

    public static void main(String[] args) {

        Data data = new Data();

        new  Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        },"A").start();


        new  Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        },"B").start();
        new  Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.increment();
            }
        },"C").start();
        new  Thread(() -> {
            for (int i = 0; i < 10; i++) {
                data.decrement();
            }
        },"D").start();
    }
}

class  Data{

    private  int number = 0;

    ReentrantLock lock = new ReentrantLock ();
    Condition condition = lock.newCondition();

    public  void increment()  {

        try {
            lock.lock();
            while (number !=0){

                condition.await();
            }

            number++;
            System.out.println(Thread.currentThread().getName()+"-->"+number);

            condition.signalAll();//唤醒condition里waitSet 里所有的线程,重新竞争锁
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    public  void decrement()  {

        try {
            lock.lock();
            while (number ==0){

                condition.await();
            }

            number--;
            System.out.println(Thread.currentThread().getName()+"-->"+number);

            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

}

特点:Condition 精准的通知和唤醒线程

生产者和消费者

JUC版的生产者与消费者问题

public class ProducerAndCustomerByCondition {

    public static void main(String[] args) {
        MessageBlockingQueue queue = new MessageBlockingQueue(2);
        for (int i = 0; i < 3; i++) {
            int finalI = i;
            new  Thread(()->{
                queue.put(new Messages(finalI,"produce"+finalI));
            },"producer"+finalI).start();
        }

        new Thread(() -> {
            while(true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    Messages message = queue.take();
                    System.out.println(message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }, "消费者").start();
    }


}
class MessageBlockingQueue{
    
    private final linkedList linkedList = new linkedList<>();

    private final int capacity;

    ReentrantLock lock = new ReentrantLock ();
    Condition conditionPro = lock.newCondition();
    Condition conditionCus = lock.newCondition();

    public MessageBlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public  Messages  take()   {

        lock.lock();
        try {
             while ( linkedList.size() == 0) {
                 System.out.println("队列为空,消费者等待");
                 conditionCus.await();
             }

            Messages messages = linkedList.removeFirst();
            System.out.println("获取消息"+messages);

            conditionPro.signalAll();
            return messages;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }

         return null;
    }

    public void  put(Messages messages)   {

        Objects.requireNonNull(messages);
        try {
            lock.lock();
            while (linkedList.size() == capacity){
                System.out.println("队列已满,生产者等待");
                conditionPro.await();
            }
            linkedList.addLast(messages);
            System.out.println("生产消息"+messages);

            conditionCus.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
record Messages(int id, Object value) {

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", value=" + value +
                '}';
    }
}
线程同步控制 固定线程的执行顺序

wait/notify

public class AlternateByWait {

    static   boolean isRunning = false;
    static  final  Object lock = new Object();
    public static void main(String[] args) {

        Thread t1 = new Thread(() -> {

            synchronized (lock){
                while (!isRunning) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t1");

            }
        }, "t1");
        Thread t2 = new Thread(()->{
            synchronized (lock){
                isRunning = true;

                System.out.println("t2");
                lock.notifyAll();
            }
        },"t2");

        t1.start();
        t2.start();

    }

await()/signal()

static   boolean isRunning = false;

    static final  ReentrantLock  lock = new ReentrantLock();
    static final Condition condition = lock.newCondition();

    public static void main(String[] args) {

        Thread t1 = new Thread(() -> {
            lock.lock();
            try {
                while (!isRunning) {
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t1");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

                lock.unlock();
            }

        }, "t1");
        Thread t2 = new Thread(()->{
            lock.lock();
      
            try {
                isRunning = true;
                System.out.println("t2");
                condition.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            
        },"t2");

        t1.start();
        t2.start();

    }

park/unpark

public static void main(String[] args) {

        Thread t1 = new Thread(() -> {
            LockSupport.park();
            System.out.println("t1");

        }, "t1");

        Thread t2 = new Thread(() -> {
            System.out.println("t2");
            LockSupport.unpark(t1);
        }, "t2");

        t1.start();
        t2.start();
    }
线程交替输出

线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。 输出 abcabcabcabcabc
wait/notify()

public class PrintByWait {

    public static void main(String[] args) {

        WaitFlag waitFlag = new WaitFlag(1, 5);

        new Thread(()->{
            waitFlag.print("a", 1, 2);
        }).start();
        new Thread(()->{
            waitFlag.print("b", 2, 3);
        }).start();
        new Thread(()->{
            waitFlag.print("c", 3, 1);
        }).start();

    }
}



class  WaitFlag{

    //等待标记
    private  int flag;

    private final int loopName;//循环次数

    public WaitFlag(int flag, int loopName) {
        this.flag = flag;
        this.loopName = loopName;
    }

    //当前操作线程和等待标记进行比较,一致继续运行,否则等待,并唤醒其他线程
    public    void  print(String content, int flag, int nextFlag){

        for (int i = 0; i < loopName; i++) {
             synchronized(this){
                while (this.flag != flag){

                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName()+content);
                this.flag = nextFlag;
                this.notifyAll();
             }
        }
    }
}

await/signal

public class PrintByAwait {

    public static void main(String[] args) {

        AwaitSignal signal = new AwaitSignal(5);

        ReentrantLock lock = new ReentrantLock();

        Condition condition1 = lock.newCondition();
        Condition condition2 = lock.newCondition();
        Condition condition3 = lock.newCondition();
        new Thread(()-> signal.print(lock, condition1, condition2,"a")).start();
        new Thread(()-> signal.print(lock, condition2, condition3,"b")).start();
        new Thread(()-> signal.print(lock, condition3, condition1,"c")).start();

        try {
            TimeUnit.SECONDS.sleep(1);
            lock.lock();
            condition1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }

    }

}

record AwaitSignal(int loopNum) {

    public void print(Lock lock, Condition cur, Condition next, String content) {

        for (int i = 0; i < loopNum; i++) {

            lock.lock();

            try {

                cur.await();
                System.out.println(content);
                next.signal();

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

        }
    }

}
JDK11实现原理

加锁解锁流程
public ReentrantLock() {
        sync = new NonfairSync();
    }

没有竞争时

final void lock() {
		 	// 没有竞争时, 直接加锁
            if (compareAndSetState(0, 1))
            	// 设置持有锁的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
            	// 有竞争, 会调用这个方法
                acquire(1);
  }


出现竞争时

Thread-1 执行了

  1. lock方法中CAS 尝试将 state 由 0 改为 1,结果失败
  2. lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败
  3. 接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列
    1. 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
    2. Node 的创建是懒惰的
    3. 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

      当前线程进入 acquireQueued 逻辑
  4. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
  5. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
  6. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
  7. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时
    state 仍为 1,失败
  8. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回
    true
  9. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

    再次有多个线程经历上述过程竞争失败,变成这个样子

    Thread-0 释放锁,进入 tryRelease 流程,如果成功
  • 设置 exclusiveOwnerThread 为 null

  • state = 0


当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程

如果加锁成功(没有竞争),会设置

  • exclusiveOwnerThread 为 Thread-1,state = 1

  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread

  • 原本的 head 因为从链表断开,而可被垃圾回收

    如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

    如果不巧又被 Thread-4 占了先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1

  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

加锁

// Sync 继承自 AQS
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

     // 加锁实现
    final void lock() {
        // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 如果尝试失败,进入 ㈠
            acquire(1);
    }

    // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
    public final void acquire(int arg) {
        // ㈡ tryAcquire
        if (
                !tryAcquire(arg) &&
            	// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            selfInterrupt();
        }
    }

    // ㈡ 进入 ㈢
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

    // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 如果还没有获得锁
        if (c == 0) {
            // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
        else if (current == getExclusiveOwnerThread()) {
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 获取失败, 回到调用处
        return false;
    }

    // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
    private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式,新建的Node的waitstatus默认为0,因为waitstatus是成员变量,默认被初始化为0
        Node node = new Node(Thread.currentThread(), mode);
        // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                // 双向链表
                pred.next = node;
                return node;
            }
        }
        //如果tail为null,尝试将 Node 加入 AQS, 进入 ㈥
        enq(node);
        return node;
    }

    // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
                // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
                if (compareAndSetHead(new Node())) {
                    tail = head;
                }
            } else {
                // cas 尝试将 Node 对象加入 AQS 队列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
                if (p == head && tryAcquire(arg)) {
                    // 获取成功, 设置自己(当前线程对应的 node)为 head
                    setHead(node);
                    // 上一个节点 help GC
                    p.next = null;
                    failed = false;
                    // 返回中断标记 false
                    return interrupted;
                }
                if (
                    // 判断是否应当 park, 进入 ㈦
                    shouldParkAfterFailedAcquire(p, node) &&
                    // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
                    parkAndCheckInterrupt()
                ) {
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取上一个节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) {
            // 上一个节点都在阻塞, 那么自己也阻塞好了
            return true;
        }
        // > 0 表示取消状态
        if (ws > 0) {
           //前继节点是CANCELLED ,则需要充同步队列中删除,并检测新接上的前继节点的状态,若还是为CANCELLED ,还需要重复上述步骤

            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 这次还没有阻塞
            // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    // ㈧ 阻塞当前线程
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
}

解锁

// Sync 继承自 AQS
static final class NonfairSync extends Sync {
    // 解锁实现
    public void unlock() {
        sync.release(1);
    }

    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final boolean release(int arg) {
        // 尝试释放锁, 进入 ㈠
        if (tryRelease(arg)) {
            // 队列头节点 unpark
            Node h = head;
            if (
                // 队列不为 null
                h != null &&
                // waitStatus == Node.SIGNAL 才需要 unpark
                h.waitStatus != 0
            ) {
                // unpark AQS 中等待的线程, 进入 ㈡
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }

    // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
    protected final boolean tryRelease(int releases) {
        // state--
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 支持锁重入, 只有 state 减为 0, 才释放成功
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
    private void unparkSuccessor(Node node) {
        // 如果状态为 Node.SIGNAL 尝试重置状态为 0, 如果线程获取到了锁那么后来头结点会被抛弃掉
        // 不成功也可以
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }
        // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
        Node s = node.next;
        // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}
可重入
static final class NonfairSync extends Sync {
    // ...

    // Sync 继承过来的方法, 方便阅读, 放在此处
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
        else if (current == getExclusiveOwnerThread()) {
            // state++
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // Sync 继承过来的方法, 方便阅读, 放在此处
    protected final boolean tryRelease(int releases) {
        // state--
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 支持锁重入, 只有 state 减为 0, 才释放成功
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}
可打断

不可打断模式:
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了。

// Sync 继承自 AQS
static final class NonfairSync extends Sync {
    // ...

    private final boolean parkAndCheckInterrupt() {
        // 如果打断标记已经是 true, 则 park 会失效
        LockSupport.park(this);
        // interrupted 会清除打断标记
        return Thread.interrupted();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    // 还是需要获得锁后, 才能返回打断状态
                    return interrupted;
                }
                if (
                        shouldParkAfterFailedAcquire(p, node) &&
                                parkAndCheckInterrupt()
                ) {
                    // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
                    interrupted = true;
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    public final void acquire(int arg) {
        if (
                !tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            // 如果打断状态为 true
            selfInterrupt();
        }
    }

    static void selfInterrupt() {
        // 重新产生一次中断,这时候线程是如果正常运行的状态,那么不是出于sleep等状态,interrupt方法就不会报错
        Thread.currentThread().interrupt();
    }
}
 

可打断模式:

在 park 过程中如果被 interrupt 会抛出异常

static final class NonfairSync extends Sync {
    public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果没有获得到锁, 进入 ㈠
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    // ㈠ 可打断的获取锁流程
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt()) {
                    // 在 park 过程中如果被 interrupt 会进入此
                    // 这时候抛出异常, 而不会再次进入 for (;;)
                    throw new InterruptedException();
                }
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}
公平锁
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {
        acquire(1);
    }

    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final void acquire(int arg) {
        if (
                !tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        ) {
            selfInterrupt();
        }
    }
    // 与非公平锁主要区别在于 tryAcquire 方法的实现
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
    public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        // h != t 时表示队列中有 Node
        return h != t &&
                (
                        // (s = h.next) == null 表示队列中还有没有老二
                        (s = h.next) == null || // 或者队列中老二线程不是此线程
                                s.thread != Thread.currentThread()
                );
    }
}
 
条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

await 流程

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

park 阻塞 Thread-0

signal 流程

假设 Thread-1 要来唤醒 Thread-0

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的
waitStatus 改为 -1

Thread-1 释放锁,进入 unlock 流程

ublic class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;

    // 第一个等待节点
    private transient Node firstWaiter;

    // 最后一个等待节点
    private transient Node lastWaiter;
    public ConditionObject() { }
    // ㈠ 添加一个 Node 至等待队列
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 所有已取消的 Node 从队列链表删除, 见 ㈡
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // 创建一个关联当前线程的新 Node, 添加至队列尾部
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    // 唤醒 - 将没取消的第一个节点转移至 AQS 队列
    private void doSignal(Node first) {
        do {
            // 已经是尾节点了
            if ( (firstWaiter = first.nextWaiter) == null) {
                lastWaiter = null;
            }
            first.nextWaiter = null;
        } while (
            // 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环 ㈢
                !transferForSignal(first) &&
                        // 队列还有节点
                        (first = firstWaiter) != null
        );
    }

    // 外部类方法, 方便阅读, 放在此处
    // ㈢ 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
    final boolean transferForSignal(Node node) {
        // 设置当前node状态为0(因为处在队列末尾),如果状态已经不是 Node.CONDITION, 说明被取消了
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 加入 AQS 队列尾部
        Node p = enq(node);
        int ws = p.waitStatus;
        if (
            // 插入节点的上一个节点被取消
                ws > 0 ||
                        // 插入节点的上一个节点不能设置状态为 Node.SIGNAL
                        !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
        ) {
            // unpark 取消阻塞, 让线程重新同步状态
            LockSupport.unpark(node.thread);
        }
        return true;
    }
// 全部唤醒 - 等待队列的所有节点转移至 AQS 队列
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

    // ㈡
    private void unlinkCancelledWaiters() {
        // ...
    }
    // 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
    public final void signal() {
        // 如果没有持有锁,会抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    // 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    // 不可打断等待 - 直到被唤醒
    public final void awaitUninterruptibly() {
        // 添加一个 Node 至等待队列, 见 ㈠
        Node node = addConditionWaiter();
        // 释放节点持有的锁, 见 ㈣
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        // 如果该节点还没有转移至 AQS 队列, 阻塞
        while (!isOnSyncQueue(node)) {
            // park 阻塞
            LockSupport.park(this);
            // 如果被打断, 仅设置打断状态
            if (Thread.interrupted())
                interrupted = true;
        }
        // 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
        if (acquireQueued(node, savedState) || interrupted)
            selfInterrupt();
    }
    // 外部类方法, 方便阅读, 放在此处
    // ㈣ 因为某线程可能重入,需要将 state 全部释放,获取state,然后把它全部减掉,以全部释放
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 唤醒等待队列队列中的下一个节点
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    // 打断模式 - 在退出等待时重新设置打断状态
    private static final int REINTERRUPT = 1;
    // 打断模式 - 在退出等待时抛出异常
    private static final int THROW_IE = -1;
    // 判断打断模式
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
    }
    // ㈤ 应用打断模式
    private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
    // 等待 - 直到被唤醒或打断
    public final void await() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        // 添加一个 Node 至等待队列, 见 ㈠
        Node node = addConditionWaiter();
        // 释放节点持有的锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 如果该节点还没有转移至 AQS 队列, 阻塞
        while (!isOnSyncQueue(node)) {
            // park 阻塞              
            LockSupport.park(this);
            // 如果被打断, 退出等待队列
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 退出等待队列后, 还需要获得 AQS 队列的锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 所有已取消的 Node 从队列链表删除, 见 ㈡
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        // 应用打断模式, 见 ㈤
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    // 等待 - 直到被唤醒或打断或超时
    public final long awaitNanos(long nanosTimeout) throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        // 添加一个 Node 至等待队列, 见 ㈠
        Node node = addConditionWaiter();
        // 释放节点持有的锁
        int savedState = fullyRelease(node);
        // 获得最后期限
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        // 如果该节点还没有转移至 AQS 队列, 阻塞
        while (!isOnSyncQueue(node)) {
            // 已超时, 退出等待队列
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            // park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 如果被打断, 退出等待队列
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        // 退出等待队列后, 还需要获得 AQS 队列的锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 所有已取消的 Node 从队列链表删除, 见 ㈡
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        // 应用打断模式, 见 ㈤
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }
    // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
    public final boolean awaitUntil(Date deadline) throws InterruptedException {
        // ...
    }
    // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
    public final boolean await(long time, TimeUnit unit) throws InterruptedException {
        // ...
    }
    // 工具方法 省略 ...
}
JDK14 实现原理

jdk14 以后对ReentrantLock 进行了修改,主要是AQS上的区别

内部结构

同样继承于AQS

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        
        @ReservedStackAccess
        final boolean tryLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, 1)) {//cas线程锁状态为1则表示成功
                    setExclusiveOwnerThread(current);//设置当前线程为owner线程
                    return true;
                }
            } else if (getExclusiveOwnerThread() == current) {//持有锁的线程==当前线程
                if (++c < 0) // 每次重入计数+1  溢出一般不会发生
                    throw new Error("Maximum lock count exceeded");
                setState(c);//设置锁的状态为c
                return true;
            }
            return false;
        }

        
        abstract boolean initialTryLock();

        @ReservedStackAccess
        final void lock() {//上锁
            if (!initialTryLock())
                acquire(1);//进入aqs内部方法acquire()在此之前会先tryAcquire  尝试获取一次,失败在进入重载的acquire方法自旋,每次自旋会尝试获取锁直到被park阻塞
        }

        //可中断的锁
        final void lockInterruptibly() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!initialTryLock())
                acquireInterruptibly(1);//调用AQS的中断获取锁的方法
        }

       //超时尝试获取锁
        final boolean tryLockNanos(long nanos) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return initialTryLock() || tryAcquireNanos(1, nanos);
        }

        //释放锁
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (getExclusiveOwnerThread() != Thread.currentThread())
                throw new IllegalMonitorStateException();
            boolean free = (c == 0);
            if (free)
                setExclusiveOwnerThread(null);
            setState(c);
            return free;
        }

      //是否为当前持有锁的线程 
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }
非公平锁
//默认为非公平锁
public ReentrantLock() {
        sync = new NonfairSync();
    }
//true则为公平锁
public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
       //初始化尝试获取锁,若获取到则不会调用AQS方法
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            if (compareAndSetState(0, 1)) { //第一次尝试CAS获取锁
                setExclusiveOwnerThread(current);
                return true;
                //如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入--可重入原理
            } else if (getExclusiveOwnerThread() == current) {
                int c = getState() + 1;//每次重入状态计数+1。释放也需要释放相应的次数
                if (c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            } else
                return false;
        }

        
        protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    }
公平锁
static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //只有同步队列里没有获取锁或者阻塞的线程才会尝试CAS设置锁的状态为1,和非公平锁的唯一区别
                if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(current);//将当前线程设置为独占意思就是加锁
                    return true;
                }
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
            return false;
        }

        
        protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && !hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    }
   public final boolean hasQueuedThreads() {
        for (Node p = tail, h = head; p != h && p != null; p = p.prev)
            if (p.status >= 0)//说明队列里有正在获取锁的线程或者wait的线程
                return true;
        return false;
    }
加锁

不管公平与非公平都是调用父类的Sync的lock方法,区别在于initialTryLock() 由Sync子类实现公平与非公平逻辑

JDK14的实现比jdk11更要通俗易懂

public void lock() {
        sync.lock();
    }

 final void lock() {
            if (!initialTryLock())
                acquire(1);//调取AQS的acquire方法,解析见AQS
  }
释放锁
public void unlock() {
        sync.release(1);//调用AQS的释放锁的release方法。重入几次则需要释放相应次数。否则其他线程则抢不到锁一直阻塞。解析见AQS
    }
可中断
public void lockInterruptibly() throws InterruptedException {
        sync.lockInterruptibly();
    }

 final void lockInterruptibly() throws InterruptedException {
            if (Thread.interrupted())//线程已经中断抛出中断异常
                throw new InterruptedException();
            if (!initialTryLock())
                acquireInterruptibly(1);//调用AQS的可中断获取锁的方法
   }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/288489.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号