栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

抽象队列同步器AQS应用Lock详解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

抽象队列同步器AQS应用Lock详解

抽象队列同步器AQS应用Lock详解
  • AQS应用之Lock
    • ReentrantLock
    • AQS具备特性
    • 同步等待队列
    • 条件等待队列
    • AQS源码分析
      • 案例一
      • 案例二(常用场景)
      • AQS队列同步器的架构组织图
        • 对于ReentrantLock需要掌握以下几点
      • ReentrantLock的创建
      • 上锁:lock()
        • 非公平

AQS应用之Lock

Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。

ReentrantLock

ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。

使用ReentrantLock进行同步
ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁
lock.lock() //加锁
lock.unlock() //解锁

ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:
1、FairSync 公平锁的实现
2、NonfairSync 非公平锁的实现
这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。
上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现

AQS具备特性
  • 阻塞等待队列
  • 共享/独占
  • 公平/非公平
  • 可重入
  • 允许中断

除了Lock外,Java.util.concurrent当中同步器的实现如Latch,Barrier,BlockingQueue等,都是基于AQS框架实现

  • 一般通过定义内部类Sync继承AQS
  • 将同步器所有调用都映射到Sync对应的方法

AQS内部维护属性volatile int state (32位),state表示资源的可用状态
state三种访问方式:getState()、setState()、compareAndSetState()

AQS定义两种资源共享方式

  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

AQS定义两种队列,同步等待队列和条件等待队列

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false
同步等待队列

AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

条件等待队列

Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁

AQS源码分析 案例一
public class Juc02_Thread_ReentrantLock {

    private static ReentrantLock lock = new ReentrantLock(true);

    public static void reentrantLock(){
        String threadName = Thread.currentThread().getName();
        //默认创建的是独占锁,排它锁;同一时刻读或者写只允许一个线程获取锁
        lock.lock();
        log.info("Thread:{},第一次加锁",threadName);
            lock.lock();
            log.info("Thread:{},第二次加锁",threadName);
            lock.unlock();
            log.info("Thread:{},第一次解锁",threadName);
        lock.unlock();
        log.info("Thread:{},第二次解锁",threadName);
    }

    public static void main(String[] args) {
        Thread t0 = new Thread(new Runnable() {
            @Override
            public void run() {
                reentrantLock();
            }
        },"t0");

        t0.start();

        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                reentrantLock();
            }
        },"t1");

        t1.start();
    }

}

案例二(常用场景)
package com.yg.edu.lock.reentrantlock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class WorkTask {

    int taskCount = 0;
    //声明此锁
    private Lock lock = new ReentrantLock();

    public void makeTask(){
        lock.lock();
        try{
            taskCount++;
        }finally {
            lock.unlock();
        }
    }
}

package com.yg.edu.lock.reentrantlock;

import java.util.concurrent.TimeUnit;


public class Main {
    public static void main(String[] args) {
        WorkTask task = new WorkTask();
        for(int i=0;i<20;i++) {
            new Thread(() -> {
                for(int j=0;j<1000;j++){
                    task.makeTask();
                }
            },"thread"+i).start();
        }

        //等待上面的线程都计算完成后,再用main线程取得最终结果值
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //等待所有累加线程都结束
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+"t 结果是 "+task.taskCount);
    }
}

AQS队列同步器的架构组织图

ReentrantLock–>Lock

NonfairSync/FairSync–>Sync–>AbstractQueuedSynchronizer–>AbstractOwnableSynchronizer

NonfairSync/FairSync–>Sync是ReentrantLock的三个内部类

Node是AbstractQueuedSynchronizer的内部类

注意:上边这四条线,对应关系:“子类”–>“父类”

对于ReentrantLock需要掌握以下几点
  • ReentrantLock的创建(公平锁/非公平锁)
  • 上锁:lock()
  • 解锁:unlock()
ReentrantLock的创建

支持公平锁(先进来的线程先执行)

final ReentrantLock lock = new ReentrantLock();

支持非公平锁(后进来的线程也可能先执行)

final ReentrantLock lock = new ReentrantLock(true)

源码如下
ReentrantLock:

private final Sync sync;

 
 public ReentrantLock() {
     sync = new NonfairSync();
 }

 
 public ReentrantLock(boolean fair) {
     sync = (fair)? new FairSync() : new NonfairSync();
 }

上述源代码中出现了三个内部类Sync/NonfairSync/FairSync,这里只列出类的定义,至于这三个类中的具体的方法会在后续的第一次引用的时候介绍。
Sync/NonfairSync/FairSync类定义:

 static abstract class Sync extends AbstractQueuedSynchronizer

 
 final static class NonfairSync extends Sync

 
 final static class FairSync extends Sync
上锁:lock() 非公平

具体使用方法:

lock.lock();

下面先介绍一下这个总体步骤的简化版,然后会给出详细的源代码,并在源代码的lock()方法部分给出详细版的步骤。
简化版的步骤:(非公平锁的核心)
基于CAS尝试将state(锁数量)从0设置为1

  • A、如果设置成功,设置当前线程为独占锁的线程;
  • B、如果设置失败,还会再获取一次锁数量,
  • B1、如果锁数量为0,再基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;
  • B2、如果锁数量不为0或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。
public void lock() {
    sync.lock();//调用NonfairSync(非公平锁)或FairSync(公平锁)的lock()方法
}
final void lock() {
	//首先基于CAS将state(锁数量)从0设置为1,如果设置成功,设置当前线程为独占锁的线程;-->请求成功-->第一次插队
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
    //如果设置失败(即当前的锁数量可能已经为1了,即在尝试的过程中,已经被其他线程先一步占有了锁),这个时候当前线程执行acquire(1)方法
        acquire(1);
}
public final void acquire(int arg) {
	//acquire(1)方法首先调用下边的tryAcquire(1)方法,在该方法中,首先获取锁数量状态,
	//如果最后在tryAcquire(1)方法中上述的执行都没成功,即请求没有成功,则返回false,继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果为0(证明该独占锁已被释放,当下没有线程在使用),这个时候我们继续使用CAS将state(锁数量)从0设置为1,如果设置成功,当前线程独占锁;-->请求成功-->第二次插队;当然,如果设置不成功,直接返回false
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果不为0,就去判断当前的线程是不是就是当下独占锁的线程,如果是,就将当前的锁数量状态值+1(这也就是可重入锁的名称的来源)-->请求成功
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
private Node addWaiter(Node mode) {
	//使用addWaiter(Node.EXCLUSIVE)将当前线程封装进Node节点node,然后将该节点加入等待队列(先快速入队,如果快速入队不成功,其使用正常入队方法无限循环一直到Node节点入队为止)
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        //快速入队:如果同步等待队列存在尾节点,将使用CAS尝试将尾节点设置为node,并将之前的尾节点插入到node之前
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
	//正常入队:如果同步等待队列不存在尾节点或者上述CAS尝试不成功的话,就执行正常入队(该方法是一个无限循环的过程,即直到入队为止)-->第一次阻塞
    for (;;) {
        Node t = tail;
        //如果尾节点为空(初始化同步等待队列),创建一个dummy节点,并将该节点通过CAS尝试设置到头节点上去,设置成功的话,将尾节点也指向该dummy节点(即头节点和尾节点都指向该dummy节点)
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
        //如果尾节点不为空,执行与快速入队相同的逻辑,即使用CAS尝试将尾节点设置为node,并将之前的尾节点插入到node之前,最后,如果顺利入队的话,就返回入队的节点node,如果不顺利的话,无限循环去执行2.2)下边的流程,直到入队为止
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
//node节点入队之后,就去执行acquireQueued(final Node node, int arg)(这又是一个无限循环的过程,这里需要注意的是,无限循环等于阻塞,多个线程可以同时无限循环--每个线程都可以执行自己的循环,这样才能使在后边排队的节点不断前进)
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
        	//获取node的前驱节点p,如果p是头节点,就继续使用tryAcquire(1)方法去尝试请求成功,-->第三次插队(当然,这次插队不一定不会使其获得执行权,请看下边一条),
            final Node p = node.predecessor();
            //如果第一次请求就成功,不用中断自己的线程,如果是之后的循环中将线程挂起之后又请求成功了,使用selfInterrupt()中断自己(注意p==head&&tryAcquire(1)成功是唯一跳出循环的方法,在这之前会一直阻塞在这里,直到其他线程在执行的过程中,不断的将p的前边的节点减少,直到p成为了head且node请求成功了--即node被唤醒了,才退出循环)
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果p不是头节点,或者tryAcquire(1)请求不成功,就去执行shouldParkAfterFailedAcquire(Node pred, Node node)来检测当前节点是不是可以安全的被挂起,
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	int ws = pred.waitStatus;
	//如果node的前驱节点pred的等待状态是SIGNAL(即可以唤醒下一个节点的线程),则node节点的线程可以安全挂起)
    if (ws == Node.SIGNAL)
        
        return true;
    //如果node的前驱节点pred的等待状态是CANCELLED,则pred的线程被取消了,我们会将pred之前的连续几个被取消的前驱节点从队列中剔除,返回false(即不能挂起)
    if (ws > 0) {
        
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
    	//如果node的前驱节点pred的等待状态是除了上述两种的其他状态,则使用CAS尝试将前驱节点的等待状态设为SIGNAL,并返回false(因为CAS可能会失败,这里不管失败与否,都返回false,下一次执行该方法的之后,pred的等待状态就是SIGNAL了)
        
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
//如果可以安全挂起,就执行parkAndCheckInterrupt()挂起当前线程
//最后,直到该节点的前驱节点p之前的所有节点都执行完毕为止,我们的p成为了头节点,并且tryAcquire(1)请求成功,跳出循环,去执行。(在p变为头节点之前的整个过程中,我们发现这个过程是不会被中断的)
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
//如果产生了异常,我们就会执行cancelAcquire(Node node)取消node的获取锁的意图。
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/349038.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号