生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况:
解决方法存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品。互相等待,从而发生死锁。
采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。
在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。
Java能实现的几种方法核心:保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。
wait() / notify()方法
当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;
当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。
当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;
当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
public class ProducerConsumer {
public static void main(String[] args) {
PublicBox box = new PublicBox();
Consumer con = new Consumer(box);
Producer pro = new Producer(box);
Thread t1 = new Thread(pro,"A");
Thread t2 = new Thread(pro,"B");
Thread t3 = new Thread(con);
t1.start();
t2.start();
t3.start();
}
}
class Producer implements Runnable {
private PublicBox box;
public Producer(PublicBox box) {
this.box = box;
}
@Override
public void run() {
while (true) {
try {
System.out.println("Put Number:" + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
box.put();
}
}
}
class Consumer implements Runnable {
private PublicBox box;
public Consumer(PublicBox box) {
this.box = box;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
box.get();
}
}
}
class PublicBox {
private int product = 0;
public synchronized void put() {
while (product == 5) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
product++;
System.out.println("Put SUCCESS!Product Nums:" + product);
notify();
}
public synchronized void get() {
while (product == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
product--;
System.out.println("Get SUCCESS!Product Nums:" + product);
notify();
}
}
notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行。
await() / signal()方法
在JDK5.0以后,JAVA提供了新的更加健壮的线程处理机制,包括了同步、锁定、线程池等等,可以实现更小粒度上的控制。await()和signal()就是其中用来同步的两种方法,功能基本上和wait()/notify()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。
public class ProducerConsumer {
public static void main(String[] args) {
PublicBox box = new PublicBox();
Consumer con = new Consumer(box);
Producer pro = new Producer(box);
Thread t1 = new Thread(pro,"A");
Thread t2 = new Thread(pro,"B");
Thread t3 = new Thread(con);
t1.start();
t3.start();
}
}
class Producer implements Runnable {
private PublicBox box;
public Producer(PublicBox box) {
this.box = box;
}
@Override
public void run() {
while (true) {
try {
System.out.println("Put Number:" + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
box.put();
}
}
}
class Consumer implements Runnable {
private PublicBox box;
public Consumer(PublicBox box) {
this.box = box;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
box.get();
}
}
}
class PublicBox {
private int product = 0;
// 锁
private final Lock lock = new ReentrantLock();
// 仓库满的条件变量
private final Condition full = lock.newCondition();
// 仓库空的条件变量
private final Condition empty = lock.newCondition();
public void put() {
lock.lock();
try {
while (product == 5) {
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
product++;
System.out.println("Put SUCCESS!Product Nums:" + product);
empty.signalAll();
} finally {
lock.unlock();
}
}
public void get() {
lock.lock();
try {
while (product == 0) {
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
product--;
System.out.println("Get SUCCESS!Product Nums:" + product);
full.signalAll();
} finally {
lock.unlock();
}
}
}
BlockingQueue阻塞队列方法
BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。
put()方法: 类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
take()方法: 类似于我们上面的消费者线程,容量为0时,自动阻塞。
public class ProducerConsumer {
public static void main(String[] args) {
PublicBox box = new PublicBox();
Consumer con = new Consumer(box);
Producer pro = new Producer(box);
Thread t1 = new Thread(pro,"A");
Thread t2 = new Thread(pro,"B");
Thread t3 = new Thread(con);
t1.start();
t2.start();
t3.start();
}
}
class Producer implements Runnable {
private PublicBox box;
public Producer(PublicBox box) {
this.box = box;
}
@Override
public void run() {
while (true) {
try {
System.out.println("Put Number:" + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
box.put();
}
}
}
class Consumer implements Runnable {
private PublicBox box;
public Consumer(PublicBox box) {
this.box = box;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
}
box.get();
}
}
}
class PublicBox {
private linkedBlockingQueue
信号量
Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。
Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行)。
public class TestSemaphore {
static WareHouse wareHouse = new WareHouse();
//生产者
static class Producer implements Runnable {
static int num = 1;
@Override
public void run() {
while (true) {
try {
wareHouse.insert(num);
System.out.println("生产物品" + num);
num++;
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
//消费者
static class Consumer implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
while (true) {
try {
System.out.println("消费物品" + wareHouse.remove());
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
//仓库,可以放置和拿走物品
static class WareHouse {
private final int capacity = 10;
private final Semaphore full = new Semaphore(0); //仓库中被占用的槽的信号量
private final Semaphore empty = new Semaphore(capacity); //仓库中空的槽的信号量
private final Semaphore mutex = new Semaphore(1); //互斥信号量
private int insertIndex = 0; //仓库中当前可以放置物品的位置
private int removeIndex = 0; //仓库中当前可以拿走物品的位置
private final Object[] items = new Object[capacity]; //仓库中的所有物品
int count = 0; //仓库中的现有物品数
//向仓库中放置物品
public void insert(Object item) throws InterruptedException {
empty.acquire();
mutex.acquire();
items[insertIndex++] = item;
if (insertIndex == capacity) {
insertIndex = 0;
}
count++;
mutex.release();
full.release();
}
//从仓库中拿走物品
public Object remove() throws InterruptedException {
full.acquire();
mutex.acquire();
Object item = items[removeIndex++];
if (removeIndex == capacity) {
removeIndex = 0;
}
count--;
mutex.release();
empty.release();
return item;
}
}
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}
管道
一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。
inputStream.connect(outputStream)或outputStream.connect(inputStream)作用是使两个Stream之间产生通信链接,这样才可以将数据进行输出与输入。
这种方式只适用于两个线程之间通信,不适合多个线程之间通信。



