目录
一、为什么要再造管程?
二、什么是可重入锁?
三、公平锁与非公平锁
四、用锁的最佳实践原则
五、如何利用两个条件变量快速实现阻塞队列呢?
六、同步和异步
七、异步转同步——dubbo源码分析
在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。
Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
一、为什么要再造管程?
我们前面在介绍死锁问题的时候,提出了一个破坏不可抢占条件方案,但是这个方案 synchronized 没有办法解决。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,也释放不了线程已经占有的资源。但我们希望的是:
对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?有三种方案
- 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
- 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
- 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
这三种方案可以全面弥补 synchronized 的问题,也就是“重复造轮子”的主要原因,体现在 API 上,就是 Lock 接口的三个方法。详情如下:
// 支持中断的API void lockInterruptibly() throws InterruptedException; // 支持超时的API boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 支持非阻塞获取锁的API boolean tryLock();
// 支持中断的API测试程序
public class TestLock {
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
// 线程A
Thread threadA = new Thread(() -> {
System.out.println("ThreadA启动,尝试获取锁...");
lock.lock();// 获取锁
System.out.println("ThreadA获取锁成功...");
try {
Thread.sleep(3000);// 睡眠
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("ThreadA执行完,释放锁...");
}
}, "ThreadA");
// 线程A
Thread threadB = new Thread(() -> {
System.out.println("ThreadB启动,尝试获取锁...");
try {
// 能够响应中断的获取锁
lock.lockInterruptibly();
System.out.println("ThreadB获取锁成功...");
} catch (Exception e) {
lock.unlock(); // 释放锁
System.out.println("ThreadB执行完,释放锁...");
}
System.out.println("ThreadB继续执行...");
}, "ThreadB");
// 执行中断操作的线程
Thread threadC = new Thread(() -> {
// threadB打上中断标记
threadB.interrupt();
}, "ThreadC");
threadA.start();
// 睡眠
Thread.sleep(100);
threadB.start();
Thread.sleep(100);
System.out.println("---准备中断线程---");
threadC.start();
}
}
执行结果
二、什么是可重入锁?
所谓可重入锁,顾名思义,指的是线程可以重复获取同一把锁。例如下面代码中,当线程 T1 执行到 ① 处时,已经获取到了锁 rtl ,当在 ① 处调用 get() 方法时,会在 ② 再次对锁 rtl 执行加锁操作。此时,如果锁 rtl 是可重入的,那么线程 T1 可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程 T1 此时会被阻塞。
public class TestReentryLock {
private final Lock rtl = new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); // ②
try {
return value;
} finally {
rtl.unlock();// 保证锁能释放
}
}
public void addOne() {
rtl.lock();// 获取锁
try {
value = 1 + get(); // ①
System.out.println("执行完后结果:"+value);
} finally {
rtl.unlock();// 保证锁能释放
}
}
public static void main(String[] args) {
TestReentryLock ts = new TestReentryLock();
ts.addOne();
}
}
三、公平锁与非公平锁
在使用 ReentrantLock 的时候,你会发现 ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。
//无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync() : new NonfairSync();
}
如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。
四、用锁的最佳实践原则
并发大师 Doug Lea《Java 并发编程:设计原则与模式》一书中,推荐的三个用锁的最佳实践,它们分别是:
- 永远只在更新对象的成员变量时加锁
- 永远只在访问可变的成员变量时加锁
- 永远不在调用其他对象的方法时加锁
永远不在调用其他对象的方法时加锁,因为调用其他对象的方法,实在是太不安全了,也许“其他”方法里面有线程 sleep() 的调用,也可能会有奇慢无比的 I/O 操作,这些都会严重影响性能。更可怕的是,“其他”类的方法可能也会加锁,然后双重加锁就可能导致死锁。
五、如何利用两个条件变量快速实现阻塞队列呢?
一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队),相关的代码如下:
public class BlockedQueue{ final Lock lock = new ReentrantLock(); // 条件变量:队列不满 final Condition notFull = lock.newCondition(); // 条件变量:队列不空 final Condition notEmpty = lock.newCondition(); // 入队 void enq(T x) throws InterruptedException { lock.lock(); try { while () { // 等待队列不满 notFull.await(); } // 省略入队操作... // 入队后,通知可出队 notEmpty.signal(); } finally { lock.unlock(); } } // 出队 void deq() throws InterruptedException { lock.lock(); try { while () { // 等待队列不空 notEmpty.await(); } // 省略出队操作... // 出队后,通知可入队 notFull.signal(); } finally { lock.unlock(); } } }
Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。
六、同步和异步
同步和异步的区别到底是什么呢?通俗点来讲就是调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。
比如在下面的代码里,有一个计算圆周率小数点后 100 万位的方法pai1M(),这个方法可能需要执行俩礼拜,如果调用pai1M()之后,线程一直等着计算结果,等俩礼拜之后结果返回,就可以执行 printf("hello world")了,这个属于同步;如果调用pai1M()之后,线程不用等待计算结果,立刻就可以执行 printf("hello world"),这个就属于异步。
// 计算圆周率小说点后100万位
String pai1M() {
//省略代码无数
}
pai1M()
printf("hello world")
同步,是 Java 代码默认的处理方式。如果你想让你的程序支持异步,可以通过下面两种方式来实现:
- 调用方创建一个子线程,在子线程中执行方法调用,这种调用我们称为异步调用;
- 方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接 return,这种方法我们一般称为异步方法。
七、异步转同步——dubbo源码分析
在编程领域,异步的场景还是挺多的,比如 TCP 协议本身就是异步的,我们工作中经常用到的 RPC 调用,在 TCP 协议层面,发送完 RPC 请求后,线程是不会等待 RPC 的响应结果的。可能你会觉得奇怪,平时工作中的 RPC 调用大多数都是同步的啊?这是怎么回事呢?
其实很简单,一定是有人帮你做了异步转同步的事情。例如目前知名的 RPC 框架 Dubbo 就给我们做了异步转同步的事情,那它是怎么做的呢?下面我们就来分析一下 Dubbo 的相关源码。
Dubbo 异步转同步的功能是通过 DefaultFuture 这个类实现的,为了理清前后关系,还是有必要分析一下调用 DefaultFuture.get() 之前发生了什么。DubboInvoker 的 108 行调用了 DefaultFuture.get(),我稍微修改了一下列在了下面。这一行先调用了 request(inv, timeout) 方法,这个方法其实就是发送 RPC 请求,之后通过调用 get() 方法等待 RPC 返回结果。
public class DubboInvoker{
Result doInvoke(Invocation inv){
// 下面这行就是源码中108行
// 为了便于展示,做了修改
return currentClient
.request(inv, timeout)
.get();
}
}
DefaultFuture这个类是很关键,把相关的代码精简之后,列到了下面。不过有必要重复一下我们的需求:当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。不知道你有没有似曾相识的感觉,这不就是经典的等待 - 通知机制吗?
// 创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout); // 等待
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal(); // 通知
}
} finally {
lock.unlock();
}
}
调用线程通过调用 get() 方法等待 RPC 返回结果,这个方法里面,你看到的都是熟悉的“面孔”:调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁;获取锁后,通过经典的在循环中调用 await() 方法来实现等待。
当 RPC 结果返回时,会调用 doReceived() 方法,这个方法里面,调用 lock() 获取锁,在 finally 里面调用 unlock() 释放锁,获取锁后通过调用 signal() 来通知调用线程,结果已经返回,不用继续等待了。
Dubbo 的源代码在Github 上
DefaultFuture 的路径是:incubator-dubbo/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java。



