- 可中断
- 可以设置超时时间
- 可以设置为公平锁 (先到先得)
- 支持多个条件变量( 具有多个 WaitSet)
- 支持可重入
// 获取ReentrantLock对象
private ReentrantLock lock = new ReentrantLock();
// 加锁
lock.lock();
try {
// 需要执行的代码
}finally {
// 释放锁
lock.unlock();
}
synchronized 和 ReentrantLock区别
- Synchronized 内置java关键字,lock 是java一个类
- Sychronized 无法判断获取锁的状态,lock可以判断是否获取到了锁
- Sychronized 会自动释放锁,lock需要手动释放锁,否则会死锁
- Sychronized 线程1(获得锁,阻塞) 线程2 (等待) Lock锁不一定会等待下去lock.tryLock()
- Sychronized 可重入锁,不可以中断的,非公平锁;lock可重入的,可以判断锁,可以设置公平锁
- Sychronized适合锁少量的代码同步问题,lock适合锁大量的同步代码
- Synchronized 只能在单个waitSet里等待,随机唤醒等待的线程,lock 可以搭配条件变量在多个waitset等待,并实现精准的唤醒
- 可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁
- 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
public class LockAgain {
public static void main(String[] args) {
method1();
}
static ReentrantLock lock = new ReentrantLock();
public static void method1() {
lock.lock();
try {
System.out.println("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
System.out.println("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
System.out.println("execute method3");
} finally {
lock.unlock();
}
}
}
可中断
如果某个线程处于等待获取锁的状态,可以调用其 interrupt 方法让其停止阻塞,避免长时间等待下去,这也是可以避免死锁的一种方式,但是是被动的等待其他线程调用打断方法
处于阻塞的线程可以停止阻塞,让其继续获得争抢锁的权利
public class CanInterrupted {
private static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
try {
//可打断锁
//如果没有竞争会获得lock对象锁,如果有竞争进入等待队列,但可以被其他线程使用interrupt打断
System.out.println("尝试获得锁!");
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
return;//被打断,返回,不再向下执行
}
System.out.println(" 获得了锁!");
lock.unlock();
}, "t1");
lock.lock();//在主线程里获得了锁
try {
t1.start();
Thread.sleep(1000);
// 打断
t1.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
可超时
使用 lock.tryLock 方法会返回获取锁是否成功。如果成功则返回 true ,反之则返回 false 。
并且 tryLock 方法可以指定等待时间,参数为:tryLock(long timeout, TimeUnit unit), 其中 timeout 为最长等待时间,TimeUnit 为时间单位
主动防止无限制等待,避免死锁
public class CanTimeout {
private static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(()->{
System.out.println("尝试获得锁,获取到锁返回true,否则false");
if (!lock.tryLock()){
System.out.println("获取不到锁");
return;
}
System.out.println("获取到了锁");
lock.unlock();
}) ;
lock.lock();//主线程拥有了锁
try{
t1.start();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
公平锁
公平锁: 非常公平, 不能够插队,必须先来后到!解决线程饥饿但会降低并发度
非公平锁:非常不公平,可以插队 (默认都是非公平)
条件变量synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入waitSet 等待。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比
synchronized 是那些不满足条件的线程都在一间休息室等消息
而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
await 前需要获得锁
1 await 执行后,会释放锁,进入 conditionObject 等待
2 await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
3 竞争 lock 锁成功后,从 await 后继续执行
public class ConditionTest {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
},"C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"D").start();
}
}
class Data{
private int number = 0;
ReentrantLock lock = new ReentrantLock ();
Condition condition = lock.newCondition();
public void increment() {
try {
lock.lock();
while (number !=0){
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName()+"-->"+number);
condition.signalAll();//唤醒condition里waitSet 里所有的线程,重新竞争锁
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() {
try {
lock.lock();
while (number ==0){
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+"-->"+number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
特点:Condition 精准的通知和唤醒线程
JUC版的生产者与消费者问题
public class ProducerAndCustomerByCondition {
public static void main(String[] args) {
MessageBlockingQueue queue = new MessageBlockingQueue(2);
for (int i = 0; i < 3; i++) {
int finalI = i;
new Thread(()->{
queue.put(new Messages(finalI,"produce"+finalI));
},"producer"+finalI).start();
}
new Thread(() -> {
while(true) {
try {
TimeUnit.SECONDS.sleep(1);
Messages message = queue.take();
System.out.println(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者").start();
}
}
class MessageBlockingQueue{
private final linkedList linkedList = new linkedList<>();
private final int capacity;
ReentrantLock lock = new ReentrantLock ();
Condition conditionPro = lock.newCondition();
Condition conditionCus = lock.newCondition();
public MessageBlockingQueue(int capacity) {
this.capacity = capacity;
}
public Messages take() {
lock.lock();
try {
while ( linkedList.size() == 0) {
System.out.println("队列为空,消费者等待");
conditionCus.await();
}
Messages messages = linkedList.removeFirst();
System.out.println("获取消息"+messages);
conditionPro.signalAll();
return messages;
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
return null;
}
public void put(Messages messages) {
Objects.requireNonNull(messages);
try {
lock.lock();
while (linkedList.size() == capacity){
System.out.println("队列已满,生产者等待");
conditionPro.await();
}
linkedList.addLast(messages);
System.out.println("生产消息"+messages);
conditionCus.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
record Messages(int id, Object value) {
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
线程同步控制
固定线程的执行顺序
wait/notify
public class AlternateByWait {
static boolean isRunning = false;
static final Object lock = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock){
while (!isRunning) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t1");
}
}, "t1");
Thread t2 = new Thread(()->{
synchronized (lock){
isRunning = true;
System.out.println("t2");
lock.notifyAll();
}
},"t2");
t1.start();
t2.start();
}
await()/signal()
static boolean isRunning = false;
static final ReentrantLock lock = new ReentrantLock();
static final Condition condition = lock.newCondition();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
lock.lock();
try {
while (!isRunning) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t1");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1");
Thread t2 = new Thread(()->{
lock.lock();
try {
isRunning = true;
System.out.println("t2");
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"t2");
t1.start();
t2.start();
}
park/unpark
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
LockSupport.park();
System.out.println("t1");
}, "t1");
Thread t2 = new Thread(() -> {
System.out.println("t2");
LockSupport.unpark(t1);
}, "t2");
t1.start();
t2.start();
}
线程交替输出
线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。 输出 abcabcabcabcabc
wait/notify()
public class PrintByWait {
public static void main(String[] args) {
WaitFlag waitFlag = new WaitFlag(1, 5);
new Thread(()->{
waitFlag.print("a", 1, 2);
}).start();
new Thread(()->{
waitFlag.print("b", 2, 3);
}).start();
new Thread(()->{
waitFlag.print("c", 3, 1);
}).start();
}
}
class WaitFlag{
//等待标记
private int flag;
private final int loopName;//循环次数
public WaitFlag(int flag, int loopName) {
this.flag = flag;
this.loopName = loopName;
}
//当前操作线程和等待标记进行比较,一致继续运行,否则等待,并唤醒其他线程
public void print(String content, int flag, int nextFlag){
for (int i = 0; i < loopName; i++) {
synchronized(this){
while (this.flag != flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+content);
this.flag = nextFlag;
this.notifyAll();
}
}
}
}
await/signal
public class PrintByAwait {
public static void main(String[] args) {
AwaitSignal signal = new AwaitSignal(5);
ReentrantLock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
new Thread(()-> signal.print(lock, condition1, condition2,"a")).start();
new Thread(()-> signal.print(lock, condition2, condition3,"b")).start();
new Thread(()-> signal.print(lock, condition3, condition1,"c")).start();
try {
TimeUnit.SECONDS.sleep(1);
lock.lock();
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
record AwaitSignal(int loopNum) {
public void print(Lock lock, Condition cur, Condition next, String content) {
for (int i = 0; i < loopNum; i++) {
lock.lock();
try {
cur.await();
System.out.println(content);
next.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
JDK11实现原理
加锁解锁流程
public ReentrantLock() {
sync = new NonfairSync();
}
没有竞争时
final void lock() {
// 没有竞争时, 直接加锁
if (compareAndSetState(0, 1))
// 设置持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 有竞争, 会调用这个方法
acquire(1);
}
出现竞争时
Thread-1 执行了
- lock方法中CAS 尝试将 state 由 0 改为 1,结果失败
- lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败
- 接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列
- 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
- Node 的创建是懒惰的
- 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquireQueued 逻辑
- acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时
state 仍为 1,失败 - 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回
true - 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0 释放锁,进入 tryRelease 流程,如果成功
-
设置 exclusiveOwnerThread 为 null
-
state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置
-
exclusiveOwnerThread 为 Thread-1,state = 1
-
head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
-
原本的 head 因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先 -
Thread-4 被设置为 exclusiveOwnerThread,state = 1
-
Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁实现
final void lock() {
// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果尝试失败,进入 ㈠
acquire(1);
}
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
// ㈡ tryAcquire
if (
!tryAcquire(arg) &&
// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// ㈡ 进入 ㈢
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
// 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败, 回到调用处
return false;
}
// ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式,新建的Node的waitstatus默认为0,因为waitstatus是成员变量,默认被初始化为0
Node node = new Node(Thread.currentThread(), mode);
// 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 双向链表
pred.next = node;
return node;
}
}
//如果tail为null,尝试将 Node 加入 AQS, 进入 ㈥
enq(node);
return node;
}
// ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// cas 尝试将 Node 对象加入 AQS 队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
if (p == head && tryAcquire(arg)) {
// 获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
// 上一个节点 help GC
p.next = null;
failed = false;
// 返回中断标记 false
return interrupted;
}
if (
// 判断是否应当 park, 进入 ㈦
shouldParkAfterFailedAcquire(p, node) &&
// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
// 上一个节点都在阻塞, 那么自己也阻塞好了
return true;
}
// > 0 表示取消状态
if (ws > 0) {
//前继节点是CANCELLED ,则需要充同步队列中删除,并检测新接上的前继节点的状态,若还是为CANCELLED ,还需要重复上述步骤
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这次还没有阻塞
// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// ㈧ 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}
解锁
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// 解锁实现
public void unlock() {
sync.release(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean release(int arg) {
// 尝试释放锁, 进入 ㈠
if (tryRelease(arg)) {
// 队列头节点 unpark
Node h = head;
if (
// 队列不为 null
h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0
) {
// unpark AQS 中等待的线程, 进入 ㈡
unparkSuccessor(h);
}
return true;
}
return false;
}
// ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
private void unparkSuccessor(Node node) {
// 如果状态为 Node.SIGNAL 尝试重置状态为 0, 如果线程获取到了锁那么后来头结点会被抛弃掉
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
可重入
static final class NonfairSync extends Sync {
// ...
// Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
可打断
不可打断模式:
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了。
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// ...
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// interrupted 会清除打断标记
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
// 还是需要获得锁后, 才能返回打断状态
return interrupted;
}
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// 如果是因为 interrupt 被唤醒, 返回打断状态为 true
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// 如果打断状态为 true
selfInterrupt();
}
}
static void selfInterrupt() {
// 重新产生一次中断,这时候线程是如果正常运行的状态,那么不是出于sleep等状态,interrupt方法就不会报错
Thread.currentThread().interrupt();
}
}
可打断模式:
在 park 过程中如果被 interrupt 会抛出异常
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁, 进入 ㈠
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// ㈠ 可打断的获取锁流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入此
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有 Node
return h != t &&
(
// (s = h.next) == null 表示队列中还有没有老二
(s = h.next) == null || // 或者队列中老二线程不是此线程
s.thread != Thread.currentThread()
);
}
}
条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject
await 流程开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
park 阻塞 Thread-0
假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的
waitStatus 改为 -1
Thread-1 释放锁,进入 unlock 流程
ublic class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 第一个等待节点
private transient Node firstWaiter;
// 最后一个等待节点
private transient Node lastWaiter;
public ConditionObject() { }
// ㈠ 添加一个 Node 至等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个关联当前线程的新 Node, 添加至队列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 唤醒 - 将没取消的第一个节点转移至 AQS 队列
private void doSignal(Node first) {
do {
// 已经是尾节点了
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
first.nextWaiter = null;
} while (
// 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环 ㈢
!transferForSignal(first) &&
// 队列还有节点
(first = firstWaiter) != null
);
}
// 外部类方法, 方便阅读, 放在此处
// ㈢ 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
final boolean transferForSignal(Node node) {
// 设置当前node状态为0(因为处在队列末尾),如果状态已经不是 Node.CONDITION, 说明被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入 AQS 队列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (
// 插入节点的上一个节点被取消
ws > 0 ||
// 插入节点的上一个节点不能设置状态为 Node.SIGNAL
!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
) {
// unpark 取消阻塞, 让线程重新同步状态
LockSupport.unpark(node.thread);
}
return true;
}
// 全部唤醒 - 等待队列的所有节点转移至 AQS 队列
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// ㈡
private void unlinkCancelledWaiters() {
// ...
}
// 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
public final void signal() {
// 如果没有持有锁,会抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 不可打断等待 - 直到被唤醒
public final void awaitUninterruptibly() {
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁, 见 ㈣
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 仅设置打断状态
if (Thread.interrupted())
interrupted = true;
}
// 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
// 外部类方法, 方便阅读, 放在此处
// ㈣ 因为某线程可能重入,需要将 state 全部释放,获取state,然后把它全部减掉,以全部释放
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 唤醒等待队列队列中的下一个节点
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 打断模式 - 在退出等待时重新设置打断状态
private static final int REINTERRUPT = 1;
// 打断模式 - 在退出等待时抛出异常
private static final int THROW_IE = -1;
// 判断打断模式
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// ㈤ 应用打断模式
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 等待 - 直到被唤醒或打断
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 应用打断模式, 见 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 等待 - 直到被唤醒或打断或超时
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
// 获得最后期限
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// 已超时, 退出等待队列
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 应用打断模式, 见 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
public final boolean awaitUntil(Date deadline) throws InterruptedException {
// ...
}
// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
// ...
}
// 工具方法 省略 ...
}
JDK14 实现原理
jdk14 以后对ReentrantLock 进行了修改,主要是AQS上的区别
内部结构同样继承于AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
@ReservedStackAccess
final boolean tryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, 1)) {//cas线程锁状态为1则表示成功
setExclusiveOwnerThread(current);//设置当前线程为owner线程
return true;
}
} else if (getExclusiveOwnerThread() == current) {//持有锁的线程==当前线程
if (++c < 0) // 每次重入计数+1 溢出一般不会发生
throw new Error("Maximum lock count exceeded");
setState(c);//设置锁的状态为c
return true;
}
return false;
}
abstract boolean initialTryLock();
@ReservedStackAccess
final void lock() {//上锁
if (!initialTryLock())
acquire(1);//进入aqs内部方法acquire()在此之前会先tryAcquire 尝试获取一次,失败在进入重载的acquire方法自旋,每次自旋会尝试获取锁直到被park阻塞
}
//可中断的锁
final void lockInterruptibly() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!initialTryLock())
acquireInterruptibly(1);//调用AQS的中断获取锁的方法
}
//超时尝试获取锁
final boolean tryLockNanos(long nanos) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return initialTryLock() || tryAcquireNanos(1, nanos);
}
//释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
//是否为当前持有锁的线程
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
非公平锁
//默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//true则为公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//初始化尝试获取锁,若获取到则不会调用AQS方法
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { //第一次尝试CAS获取锁
setExclusiveOwnerThread(current);
return true;
//如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入--可重入原理
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;//每次重入状态计数+1。释放也需要释放相应的次数
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//只有同步队列里没有获取锁或者阻塞的线程才会尝试CAS设置锁的状态为1,和非公平锁的唯一区别
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);//将当前线程设置为独占意思就是加锁
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
if (p.status >= 0)//说明队列里有正在获取锁的线程或者wait的线程
return true;
return false;
}
加锁
不管公平与非公平都是调用父类的Sync的lock方法,区别在于initialTryLock() 由Sync子类实现公平与非公平逻辑
JDK14的实现比jdk11更要通俗易懂
public void lock() {
sync.lock();
}
final void lock() {
if (!initialTryLock())
acquire(1);//调取AQS的acquire方法,解析见AQS
}
释放锁
public void unlock() {
sync.release(1);//调用AQS的释放锁的release方法。重入几次则需要释放相应次数。否则其他线程则抢不到锁一直阻塞。解析见AQS
}
可中断
public void lockInterruptibly() throws InterruptedException {
sync.lockInterruptibly();
}
final void lockInterruptibly() throws InterruptedException {
if (Thread.interrupted())//线程已经中断抛出中断异常
throw new InterruptedException();
if (!initialTryLock())
acquireInterruptibly(1);//调用AQS的可中断获取锁的方法
}



