- AQS应用之Lock
- ReentrantLock
- AQS具备特性
- 同步等待队列
- 条件等待队列
- AQS源码分析
- 案例一
- 案例二(常用场景)
- AQS队列同步器的架构组织图
- 对于ReentrantLock需要掌握以下几点
- ReentrantLock的创建
- 上锁:lock()
- 非公平
Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。
ReentrantLockReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。
使用ReentrantLock进行同步 ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁 lock.lock() //加锁 lock.unlock() //解锁
ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:
1、FairSync 公平锁的实现
2、NonfairSync 非公平锁的实现
这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。
上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现
- 阻塞等待队列
- 共享/独占
- 公平/非公平
- 可重入
- 允许中断
除了Lock外,Java.util.concurrent当中同步器的实现如Latch,Barrier,BlockingQueue等,都是基于AQS框架实现
- 一般通过定义内部类Sync继承AQS
- 将同步器所有调用都映射到Sync对应的方法
AQS内部维护属性volatile int state (32位),state表示资源的可用状态
state三种访问方式:getState()、setState()、compareAndSetState()
AQS定义两种资源共享方式
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
- Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS定义两种队列,同步等待队列和条件等待队列
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁
public class Juc02_Thread_ReentrantLock {
private static ReentrantLock lock = new ReentrantLock(true);
public static void reentrantLock(){
String threadName = Thread.currentThread().getName();
//默认创建的是独占锁,排它锁;同一时刻读或者写只允许一个线程获取锁
lock.lock();
log.info("Thread:{},第一次加锁",threadName);
lock.lock();
log.info("Thread:{},第二次加锁",threadName);
lock.unlock();
log.info("Thread:{},第一次解锁",threadName);
lock.unlock();
log.info("Thread:{},第二次解锁",threadName);
}
public static void main(String[] args) {
Thread t0 = new Thread(new Runnable() {
@Override
public void run() {
reentrantLock();
}
},"t0");
t0.start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
reentrantLock();
}
},"t1");
t1.start();
}
}
案例二(常用场景)
package com.yg.edu.lock.reentrantlock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class WorkTask {
int taskCount = 0;
//声明此锁
private Lock lock = new ReentrantLock();
public void makeTask(){
lock.lock();
try{
taskCount++;
}finally {
lock.unlock();
}
}
}
package com.yg.edu.lock.reentrantlock;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
WorkTask task = new WorkTask();
for(int i=0;i<20;i++) {
new Thread(() -> {
for(int j=0;j<1000;j++){
task.makeTask();
}
},"thread"+i).start();
}
//等待上面的线程都计算完成后,再用main线程取得最终结果值
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
//等待所有累加线程都结束
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+"t 结果是 "+task.taskCount);
}
}
AQS队列同步器的架构组织图
ReentrantLock–>Lock
NonfairSync/FairSync–>Sync–>AbstractQueuedSynchronizer–>AbstractOwnableSynchronizer
NonfairSync/FairSync–>Sync是ReentrantLock的三个内部类
Node是AbstractQueuedSynchronizer的内部类
注意:上边这四条线,对应关系:“子类”–>“父类”
对于ReentrantLock需要掌握以下几点- ReentrantLock的创建(公平锁/非公平锁)
- 上锁:lock()
- 解锁:unlock()
支持公平锁(先进来的线程先执行)
final ReentrantLock lock = new ReentrantLock();
支持非公平锁(后进来的线程也可能先执行)
final ReentrantLock lock = new ReentrantLock(true)
源码如下
ReentrantLock:
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = (fair)? new FairSync() : new NonfairSync();
}
上述源代码中出现了三个内部类Sync/NonfairSync/FairSync,这里只列出类的定义,至于这三个类中的具体的方法会在后续的第一次引用的时候介绍。
Sync/NonfairSync/FairSync类定义:
static abstract class Sync extends AbstractQueuedSynchronizer final static class NonfairSync extends Sync final static class FairSync extends Sync上锁:lock() 非公平
具体使用方法:
lock.lock();
下面先介绍一下这个总体步骤的简化版,然后会给出详细的源代码,并在源代码的lock()方法部分给出详细版的步骤。
简化版的步骤:(非公平锁的核心)
基于CAS尝试将state(锁数量)从0设置为1
- A、如果设置成功,设置当前线程为独占锁的线程;
- B、如果设置失败,还会再获取一次锁数量,
- B1、如果锁数量为0,再基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;
- B2、如果锁数量不为0或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。
public void lock() {
sync.lock();//调用NonfairSync(非公平锁)或FairSync(公平锁)的lock()方法
}
final void lock() {
//首先基于CAS将state(锁数量)从0设置为1,如果设置成功,设置当前线程为独占锁的线程;-->请求成功-->第一次插队
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//如果设置失败(即当前的锁数量可能已经为1了,即在尝试的过程中,已经被其他线程先一步占有了锁),这个时候当前线程执行acquire(1)方法
acquire(1);
}
public final void acquire(int arg) {
//acquire(1)方法首先调用下边的tryAcquire(1)方法,在该方法中,首先获取锁数量状态,
//如果最后在tryAcquire(1)方法中上述的执行都没成功,即请求没有成功,则返回false,继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果为0(证明该独占锁已被释放,当下没有线程在使用),这个时候我们继续使用CAS将state(锁数量)从0设置为1,如果设置成功,当前线程独占锁;-->请求成功-->第二次插队;当然,如果设置不成功,直接返回false
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果不为0,就去判断当前的线程是不是就是当下独占锁的线程,如果是,就将当前的锁数量状态值+1(这也就是可重入锁的名称的来源)-->请求成功
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
//使用addWaiter(Node.EXCLUSIVE)将当前线程封装进Node节点node,然后将该节点加入等待队列(先快速入队,如果快速入队不成功,其使用正常入队方法无限循环一直到Node节点入队为止)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//快速入队:如果同步等待队列存在尾节点,将使用CAS尝试将尾节点设置为node,并将之前的尾节点插入到node之前
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
//正常入队:如果同步等待队列不存在尾节点或者上述CAS尝试不成功的话,就执行正常入队(该方法是一个无限循环的过程,即直到入队为止)-->第一次阻塞
for (;;) {
Node t = tail;
//如果尾节点为空(初始化同步等待队列),创建一个dummy节点,并将该节点通过CAS尝试设置到头节点上去,设置成功的话,将尾节点也指向该dummy节点(即头节点和尾节点都指向该dummy节点)
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//如果尾节点不为空,执行与快速入队相同的逻辑,即使用CAS尝试将尾节点设置为node,并将之前的尾节点插入到node之前,最后,如果顺利入队的话,就返回入队的节点node,如果不顺利的话,无限循环去执行2.2)下边的流程,直到入队为止
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//node节点入队之后,就去执行acquireQueued(final Node node, int arg)(这又是一个无限循环的过程,这里需要注意的是,无限循环等于阻塞,多个线程可以同时无限循环--每个线程都可以执行自己的循环,这样才能使在后边排队的节点不断前进)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取node的前驱节点p,如果p是头节点,就继续使用tryAcquire(1)方法去尝试请求成功,-->第三次插队(当然,这次插队不一定不会使其获得执行权,请看下边一条),
final Node p = node.predecessor();
//如果第一次请求就成功,不用中断自己的线程,如果是之后的循环中将线程挂起之后又请求成功了,使用selfInterrupt()中断自己(注意p==head&&tryAcquire(1)成功是唯一跳出循环的方法,在这之前会一直阻塞在这里,直到其他线程在执行的过程中,不断的将p的前边的节点减少,直到p成为了head且node请求成功了--即node被唤醒了,才退出循环)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果p不是头节点,或者tryAcquire(1)请求不成功,就去执行shouldParkAfterFailedAcquire(Node pred, Node node)来检测当前节点是不是可以安全的被挂起,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果node的前驱节点pred的等待状态是SIGNAL(即可以唤醒下一个节点的线程),则node节点的线程可以安全挂起)
if (ws == Node.SIGNAL)
return true;
//如果node的前驱节点pred的等待状态是CANCELLED,则pred的线程被取消了,我们会将pred之前的连续几个被取消的前驱节点从队列中剔除,返回false(即不能挂起)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果node的前驱节点pred的等待状态是除了上述两种的其他状态,则使用CAS尝试将前驱节点的等待状态设为SIGNAL,并返回false(因为CAS可能会失败,这里不管失败与否,都返回false,下一次执行该方法的之后,pred的等待状态就是SIGNAL了)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//如果可以安全挂起,就执行parkAndCheckInterrupt()挂起当前线程
//最后,直到该节点的前驱节点p之前的所有节点都执行完毕为止,我们的p成为了头节点,并且tryAcquire(1)请求成功,跳出循环,去执行。(在p变为头节点之前的整个过程中,我们发现这个过程是不会被中断的)
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
//如果产生了异常,我们就会执行cancelAcquire(Node node)取消node的获取锁的意图。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}



