1、保护性暂停模式
1.1、概述1.2、单任务版GuardedObject1.3、多任务版GuardedObject 2、生产者消费者模式
2.1、概述2.2、代码实现 3、终止模式之两阶段终止4、同步模式之犹豫Balking5、双重检查加锁单例模式6、异步模式之工作线程
6.1、定义6.2、饥饿6.3、创建多少线程池合适
6.3.1、CPU 密集型运算6.3.2、I/O 密集型运算 7、享元模式
7.1、定义7.2、体现4.3、自定义连接池
1、保护性暂停模式 1.1、概述旨在用一个线程等待另一个线程的执行结果,因为要等待另一方的结果,因此归类到同步模式
JDK 中,join 的实现、Future 的实现,采用的就是此模式
1.2、单任务版GuardedObjectclass GuardedObject {
private Object response;
public Object get() {
synchronized (this) {
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public Object get(long timeout) {
synchronized (this) {
long beginTime = System.currentTimeMillis();
long passTime = 0;
while (response == null) {
long waitTime = timeout - passTime;
if (waitTime <= 0) {
break;
}
try {
// 重点,避免虚假唤醒导致睡眠时间延长
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passTime = System.currentTimeMillis() - beginTime;
}
return response;
}
}
public void complete(Object response) {
synchronized (this) {
this.response = response;
}
}
}
为什么要讲这个模式,观察一下 Thread 中的 join 方法,你就明白了
1.3、多任务版GuardedObject是不是很熟悉?
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
@Slf4j
public class Test {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
TimeUnit.SECONDS.sleep(1);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}
@Slf4j
class People extends Thread {
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.info("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.info("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j
class Postman extends Thread {
private final int id;
private final String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.info("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
class Mailboxes {
private static final Map BOXES = new Hashtable<>();
private static final AtomicInteger ID = new AtomicInteger(1);
private static synchronized int generateId() {
return ID.incrementAndGet();
}
public static GuardedObject getGuardedObject(int id) {
return BOXES.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
BOXES.put(go.getId(), go);
return go;
}
public static Set getIds() {
return BOXES.keySet();
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class GuardedObject {
private int id;
public GuardedObject(int id) {
this.id = id;
}
private Object response;
public Object get() {
synchronized (this) {
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public Object get(long timeout) {
synchronized (this) {
long beginTime = System.currentTimeMillis();
long passTime = 0;
while (response == null) {
long waitTime = timeout - passTime;
if (waitTime <= 0) {
break;
}
try {
// 重点,避免虚假唤醒导致睡眠时间延长
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passTime = System.currentTimeMillis() - beginTime;
}
return response;
}
}
public void complete(Object response) {
synchronized (this) {
this.response = response;
}
}
}
2、生产者消费者模式
2.1、概述
与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应,所以生产者消费者模型应该归类到异步模型
JDK 中各种阻塞队列,采用的就是这种模式
2.2、代码实现@Slf4j
public class Test {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id, "值" + id));
}, "生产者" + i).start();
}
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Message message = queue.take();
log.info("消费者拿到消息 {}", message);
}
}, "消费者").start();
}
}
// 消息队列类 , java 线程之间通信
@Slf4j
class MessageQueue {
private final linkedList list = new linkedList<>();
private final int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
public Message take() {
// 检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
log.info("队列为空, 消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列头部获取消息并返回
Message message = list.removeFirst();
log.info("已消费消息 {}", message);
list.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (list) {
// 检查对象是否已满
while (list.size() == capcity) {
try {
log.info("队列已满, 生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(message);
log.info("已生产消息 {}", message);
list.notifyAll();
}
}
}
final class Message {
private final int id;
private final Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
3、终止模式之两阶段终止
在一个线程 T1 中如何优雅的终止线程 T2 ?也就是说,T1 终止 T2 后,还需要留给 T2 处理后续事情的机会。
我们不能直接使用 T1.stop() 方法来直接停止线程,因为这个方法会立刻杀死线程,如果此时 T1 正好锁住了一个共享资源,那么它死后锁将永远得不到释放,使得其他线程永远无法获得锁
@Slf4j
public class TwoPhaseTermination {
private Thread monitorThread;
public void start() {
monitorThread = new Thread(() -> {
Thread curThread = Thread.currentThread();
while (true) {
if (!curThread.isInterrupted()) {
log.info("正常运行。。");
} else {
log.info("料理后事。。");
break;
}
log.info("执行监控。。");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
//在监控线程sleep的时候打断,会抛出InterruptedException异常,然后打断标记清除,所以需要重新设置好打断标记
curThread.interrupt();
}
}
}, "monitor");
monitorThread.start();
}
public void stop() {
monitorThread.interrupt();
}
}
//测试
TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
twoPhaseTermination.start();
log.info("开始执行..");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
twoPhaseTermination.stop();
log.info("准备结束..");
如上方法确实能够正确运行,但是实现起来多少有点繁琐,还需要考虑线程的打断标记,正常情况下,打断后,标记就会为 true,如果是 sleep 的时候被打断,因为会重置打断标记,使我们不得不重新设置打断标记,对于程序维护来说比较麻烦
@Slf4j
public class TwoPhaseTermination {
private Thread monitorThread;
private volatile boolean stopTag;
public void start() {
stopTag = false;
monitorThread = new Thread(() -> {
while (true) {
if (!stopTag) {
log.info("正常运行。。");
} else {
log.info("料理后事。。");
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行监控。。");
}
}, "monitor");
monitorThread.start();
}
public void stop() {
stopTag = true;
//如果正好在睡觉直接打断
monitorThread.interrupt();
}
}
4、同步模式之犹豫Balking
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回,就拿两阶段终止模式的代码举例,如果我们现在监控了某一个系统资源的回收,只需要使用一个线程就可以了,那如果多个线程调用多次 start 方法,那么每次都会创建一个新的线程去监控,这样是不合理的,所以就可以使用一个开始标记来判断是否需要执行 start 方法
但是我们还能使用 volatile 关键字去做吗?答案是否定的,因为这里可能涉及到多个线程对开始标记进行操作,还是不能保证原子性,所以我们只好使用 synchronized 关键字来实现了
@Slf4j
public class TwoPhaseTermination {
private Thread monitorThread;
private volatile boolean startTag = false;
private boolean stopTag;
public void start() {
synchronized (TwoPhaseTermination.class) {
if (startTag) {
log.info("启动过了么 {} ", startTag);
return;
}
startTag = true;
}
startTag = true;
stopTag = false;
monitorThread = new Thread(() -> {
while (true) {
if (!stopTag) {
log.info("正常运行。。");
} else {
log.info("料理后事。。");
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行监控。。");
}
}, "monitor");
monitorThread.start();
log.info("正在启动:");
}
public void stop() {
stopTag = true;
//如果正好在睡觉直接打断
monitorThread.interrupt();
}
}
其实懒加载的单例模式,就是这个原理
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
5、双重检查加锁单例模式
首先看一下懒汉式的单例模式
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
加载静态方法上的 synchronized 实际上是给类 class 加的锁
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
synchronized{
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
}
上述实现有一个问题:对于第一次进入该方法,加锁后实例化,没有问题,但是后续已经创建完成了,每次 getInstance 都需要加锁,性能太低了,所以我们便有了如下改进
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static Singleton getInstance() {
if(INSTANCE == null) {
synchronized{
if (INSTANCE == null) { // t1
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}
但是上述代码,还是存在一定的问题,我们一起来观察它对应的字节码
0: getstatic #2 // Field INSTANCE:Lcom/phz/test/Singleton; 3: ifnonnull 37 6: ldc #3 // 获得类对象 class com/phz/test/Singleton,用于加锁 8: dup // 复制引用地址 9: astore_0 // 存储一份,用于解锁 10: monitorenter // 创建 Monitor 开始进入同步代码块 11: getstatic #2 // Field INSTANCE:Lcom/phz/test/Singleton; 14: ifnonnull 27 17: new #3 // class com/phz/test/Singleton 20: dup 21: invokespecial #4 // Method "":()V 24: putstatic #2 // Field INSTANCE:Lcom/phz/test/Singleton; 27: aload_0 // 把锁对象拿出来,用于解锁 28: monitorexit 29: goto 37 32: astore_1 33: aload_0 34: monitorexit 35: aload_1 36: athrow 37: getstatic #2 // Field INSTANCE:Lcom/phz/test/Singleton; 40: areturn
其中
17 表示创建对象,将对象引用入栈20 表示复制一份对象引用,因为调用构造方法还会消耗一份21 表示利用一个对象引用,调用构造方法,消耗一份24 表示利用一个对象引用,赋值给 static INSTANCE,将最后一份引用消耗
但是 JIT 可能会优化为:先执行 24,再执行 21。如果两个线程 t1,t2 按如下时间序列执行:
按照代码来看就是,第一个线程进入,由于 INSTANCE 还为空,所以获取到了锁,代码执行到了 INSTANCE = new Singleton(); 此时先将引用传递给了 INSTANCE,但是还没有执行 new 操作(或者操作太多,时间有点长),此时第二个线程也来了,此时判断 INSTANCE 已经不为空了,就直接 return,然后去使用这个空对象,最后第一个线程才执行 new 操作,然后释放锁
6、异步模式之工作线程 6.1、定义这个问题出现的原因,就是因为指令的重排序,所以我们为了保证程序的正确性,需要禁止指令重排序,我们可以给 INSTANCE 变量加上 volatile 关键字
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
6.2、饥饿固定大小线程池会有饥饿现象
两个工人是同一个线程池中的两个线程他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待(也就是说)后厨做菜:没啥说的,做就是了 比如工人 A 处理了点餐任务,接下来它要等着工人 B 把菜做好,然后上菜,他俩也配合的蛮好但现在同时来了两个客人,这个时候工人 A 和工人 B 都去处理点餐了,这时没人做饭了,饥饿
static final ListMENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅"); static Random RANDOM = new Random(); static String cooking() { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.execute(() -> { log.info("处理点餐..."); Future f = executorService.submit(() -> { log.info("做菜"); return cooking(); }); try { log.info("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); executorService.execute(() -> { log.info("处理点餐..."); Future f = executorService.submit(() -> { log.info("做菜"); return cooking(); }); try { log.info("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); }
使用 jconsole 工具检测一下
可以发现,这并不是死锁,只是由于线程数不足,导致的饥饿
如何解决呢?简单想,那就是增加几个线程(工人),但是这不能解决根本上的问题,不同任务类型应该使用不同的线程池,只有这样才能避免饥饿,并且还能够提升效率
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.info("处理点餐...");
Future f = cookPool.submit(() -> {
log.info("做菜");
return cooking();
});
try {
log.info("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.info("处理点餐...");
Future f = cookPool.submit(() -> {
log.info("做菜");
return cooking();
});
try {
log.info("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
6.3、创建多少线程池合适
过小会导致程序不能充分地利用系统资源、容易导致饥饿过大会导致更多的线程上下文切换,占用更多内存 6.3.1、CPU 密集型运算
6.3.2、I/O 密集型运算通常采用 CPU 核数 + 1 能够实现最优的 CPU 利用率,+ 1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU 计算时间 + 等待时间) / CPU 计算时间
4 * 100% * 100% / 50% = 8
4 * 100% * 100% / 10% = 40
7、享元模式7.1、定义对于像 String 这样的不可变类,如果我们有修改需求的时候,本身并不是修改其内容,而是创建了新的对象,这样虽然能够避免共享带来的线程安全问题,但是也带来一个严重的问题,那就是对象创建的太频繁,对象创建的太多,为了解决这个矛盾,便引入这个享元模式
英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时,通过与其他类似对象共享尽可能多的数据来最小化内存使用的对象,属于 GOF 23种设计模式中的一种结构性设计模式
wikipedia: A flyweight is an object that minimizes memory usage by sharing as much data as possible with other similar objects
出自 “Gang of Four” design patterns
归类 Structual patterns
7.2、体现在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法
例如 Long 的 valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
private static class LongCache {
private LongCache(){}
static final Long cache[] = new Long[-(-128) + 127 + 1];
static {
for(int i = 0; i < cache.length; i++)
cache[i] = new Long(i - 128);
}
}
注意以下几点:
String 也使用了享元模式,具体体现可以参考其串池的原理,不展开讲了,然后还有 BigInteger,BigDecimal 等,因为都是不可变类,所以都用了享元模式
4.3、自定义连接池
@Slf4j
public class Test {
@SneakyThrows
public static void main(String[] args) {
MockPool mockPool = new MockPool(5);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
MockConnection connection = mockPool.getConnection();
System.out.println("拿到连接:" + connection.getConnectionName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("归还连接:" + connection.getConnectionName());
mockPool.free(connection);
}).start();
}
}
}
final class MockPool {
private final MockConnection[] connections;
private final AtomicIntegerArray state;
private static final Integer DEFAULT_POOL_SIZE = 5;
public MockPool() {
connections = new MockConnection[DEFAULT_POOL_SIZE];
state = new AtomicIntegerArray(DEFAULT_POOL_SIZE);
initPool();
}
public MockPool(Integer poolSize) {
connections = new MockConnection[poolSize];
state = new AtomicIntegerArray(poolSize);
initPool();
}
private void initPool() {
for (int i = 0; i < connections.length; i++) {
connections[i] = new MockConnection("连接-" + (i + 1));
}
}
@SneakyThrows
public MockConnection getConnection() {
for (; ; ) {
for (int i = 0; i < connections.length; i++) {
if (state.get(i) == 0) {
if (state.compareAndSet(i, 0, 1)) {
return connections[i];
}
}
}
synchronized (this) {
System.out.println("无可用连接,等待...");
wait();
}
}
}
public void free(MockConnection connection) {
for (int i = 0; i < connections.length; i++) {
if (connections[i] == connection) {
state.set(i, 0);
synchronized (this) {
notifyAll();
}
return;
}
}
System.out.println("要释放的连接不存在");
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class MockConnection {
private String connectionName;
}



