要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
代码实现
import com.example.demo.hmjuc.Sleep;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.LinkedList;
public class Test1 {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(5);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 10; j++) {
Message message = new Message(j, "hello world" + j);
System.out.println("生成了一个消息 = " + message);
messageQueue.put(message);
Sleep.sleep(1000);
}
}).start();
}
new Thread(() -> {
while (true) {
Message message = messageQueue.take();
System.out.println("消费了一个消息 = " + message);
Sleep.sleep(500);
}
}).start();
}
}
class MessageQueue {
private final LinkedList list = new LinkedList<>();
private final int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
public Message take() {
synchronized (list) {
while (list.isEmpty()) {
try {
System.out.println("队列为空,等待中...");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.removeFirst();
list.notifyAll();
// 获取第一个消息 并且移除
return message;
}
}
public void put(Message message) {
synchronized (list) {
// 如果队列满了 则等待
while (list.size() == capacity) {
try {
System.out.println("队列已满,等待中...");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 添加消息
list.addLast(message);
list.notifyAll();
}
}
}
@ToString
@Getter
@AllArgsConstructor
class Message {
private int id;
private Object value;
}



