简介:
生产者、消费者模型是多线程编程的常见问题,最简单的一个生产者、一个消费者线程模型大多数人都能够写出来,但是一旦条件发生变化,我们就很容易掉进多线程的bug中。这篇文章主要讲解了生产者和消费者的数量,商品缓存位置数量,商品数量等多个条件的不同组合下,写出正确的生产者消费者模型的方法。
欢迎探讨,如有错误敬请指正
生产消费者模型
生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图。
定义商品类
package demo;
public class Goods {
public final String name;
public final int price;
public final int id;
public Goods(String name, int price, int id){
this.name = name;
this.price = price;
this.id = id;
}
@Override
public String toString(){
return "name: " + name + ", price:"+ price + ", id: " + id;
}
}
基本要求:
1)生产者不能重复生产一个商品,也就是说不能有两个id相同的商品
2)生产者不能覆盖一个商品(当前商品还未被消费,就被下一个新商品覆盖)。也就是说消费商品时,商品的id属性可以不连续,但不能出现缺号的情况
3)消费者不能重复消费一个商品
1.生产者线程无线生产,消费者线程无限消费的模式
1.1使用线程对象,一个生产者线程,一个消费者线程,一个商品存储位置
package demo;
import java.util.Random;
public class ProducterComsumerDemo1 {
private volatile Goods goods;
private Object obj = new Object();
private volatile Boolean isFull = false;
private int id = 1;
private Random rnd = new Random();
public class ComsumeThread implements Runnable{
@Override
public void run(){
try{
while(true){
synchronized(obj){
if(!isFull){
obj.wait();
}
Thread.sleep(rnd.nextint(250));
System.out.println(goods);
Thread.sleep(rnd.nextint(250));
isFull = false;
obj.notify();
}
Thread.sleep(rnd.nextint(250));
}
}
catch (InterruptedException e){
}
}
}
public class ProductThread implements Runnable{
@Override
public void run(){
try {
while(true){
synchronized(obj){
if(isFull){
obj.wait();
}
Thread.sleep(rnd.nextint(500));
if(id % 2 == 0){
goods = new Goods("A", 2, id);
} else{
goods = new Goods("B", 1, id);
}
Thread.sleep(rnd.nextint(250));
id++;
isFull = true;
obj.notify();
}
}
}
catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
new Thread(p).start();
new Thread(c).start();
}
}
运行结果
name: B, price:1, id: 1 name: A, price:2, id: 2 name: B, price:1, id: 3 name: A, price:2, id: 4 name: B, price:1, id: 5 name: A, price:2, id: 6 name: B, price:1, id: 7 name: A, price:2, id: 8 name: B, price:1, id: 9 name: A, price:2, id: 10 name: B, price:1, id: 11 name: A, price:2, id: 12 name: B, price:1, id: 13 ……
从结果看出,商品类型交替生产,每个商品的id都不相同,且不会漏过任何一个id,生产者没有重复生产,消费者没有重复消费,结果完全正确。
1.2.使用线程对象,多个生产者线程,多个消费者线程,1个缓存位置
1.2.1一个经典的bug
对于多生产者,多消费者这个问题,看起来我们似乎不用修改代码,只需在main方法中多添加几个线程就好。假设我们需要三个消费者,一个生产者,那么我们只需要在main方法中再添加两个消费者线程。
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
new Thread(c).start();
new Thread(p).start();
new Thread(c).start();
new Thread(c).start();
}
运行结果
name: B, price:1, id: 1 name: A, price:2, id: 2 name: A, price:2, id: 2 name: B, price:1, id: 3 name: B, price:1, id: 3 name: A, price:2, id: 4 name: A, price:2, id: 4 name: B, price:1, id: 5 name: B, price:1, id: 5 name: A, price:2, id: 6 ……
从结果中,我们发现消费者重复消费了商品,所以这样做显然是错误的。这里我们定义多个消费者,一个生产者,所以遇到了重复消费的问题,如果定义成一个消费者,多个生产者就会遇到id覆盖的问题。如果我们定义多个消费者,多个生产者,那么即会遇到重复消费,也会遇到id覆盖的问题。注意,上面的代码使用的notifyAll唤醒方法,如果使用notify方法唤醒bug仍然可能发生。
现在我们来分析一下原因。当生产者生产好了商品,会唤醒因没有商品而阻塞消费者线程,假设唤醒的消费者线程超过两个,这两个线程会竞争获取锁,获取到锁的线程就会从obj.wait()方法中返回,然后消费商品,并把isFull置为false,然后释放锁。当被唤醒的另一个线程竞争获取到锁了以后也会从obj.wait()方法中返回。会再次消费同一个商品。显然,每一个被唤醒的线程应该再次检查isFull这个条件。所以无论是消费者,还是生产者,isFull的判断必须改成while循环,这样才能得到正确的结果而不受生产者的线程数和消费者的线程数的影响。
而对于只有一个生产者线程,一个消费者线程,用if判断是没有问题的,但是仍然强烈建议改成while语句进行判断。
1.2.2正确的姿势
package demo;
import java.util.Random;
public class ProducterComsumerDemo1 {
private volatile Goods goods;
private Object obj = new Object();
private volatile Boolean isFull = false;
private int id = 1;
private Random rnd = new Random();
public class ComsumeThread implements Runnable{
@Override
public void run(){
try{
while(true){
synchronized(obj){
while(!isFull){
obj.wait();
}
Thread.sleep(rnd.nextint(250));
System.out.println(goods);
Thread.sleep(rnd.nextint(250));
isFull = false;
obj.notifyAll();
}
Thread.sleep(rnd.nextint(250));
}
}
catch (InterruptedException e){
}
}
}
public class ProductThread implements Runnable{
@Override
public void run(){
try {
while(true){
synchronized(obj){
while(isFull){
obj.wait();
}
Thread.sleep(rnd.nextint(500));
if(id % 2 == 0){
goods = new Goods("A", 2, id);
} else{
goods = new Goods("B", 1, id);
}
Thread.sleep(rnd.nextint(250));
id++;
isFull = true;
obj.notifyAll();
}
}
}
catch (InterruptedException e) {
}
}
}
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
new Thread(p).start();
new Thread(p).start();
new Thread(p).start();
new Thread(c).start();
new Thread(c).start();
new Thread(c).start();
}
}
1.3使用线程对象,多个缓存位置(有界),多生产者,多消费者
1)当缓存位置满时,我们应该阻塞生产者线程
2)当缓存位置空时,我们应该阻塞消费者线程
下面的代码我没有用java对象内置的锁,而是用了ReentrantLock对象。是因为普通对象的锁只有一个阻塞队列,如果使用notify方式,无法保证唤醒的就是特定类型的线程(消费者线程或生产者线程),而notifyAll方法会唤醒所有的线程,当剩余的缓存商品的数量小于生产者线程数量或已缓存商品的数量小于消费者线程时效率就比较低。所以这里我们通过ReentrantLock对象构造两个阻塞队列提高效率。
1.3.1普通方式
package demo;
import java.util.linkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducterComsumerDemo2 {
private final int MAX_SLOT = 2;
private linkedList queue = new linkedList();
private Lock lock = new ReentrantLock();
private Condition full = lock.newCondition();
private Condition empty = lock.newCondition();
private int id = 1;
private Random rnd = new Random();
public class ComsumeThread implements Runnable{
@Override
public void run(){
while(true){
lock.lock();
try {
while(queue.isEmpty()){
System.out.println("queue is empty");
empty.await();
}
Thread.sleep(rnd.nextint(200));
Goods goods = queue.remove();
System.out.println(goods);
Thread.sleep(rnd.nextint(200));
full.signal();
}
catch (InterruptedException e) {
}
finally{
lock.unlock();
}
try {
Thread.sleep(rnd.nextint(200));
}
catch (InterruptedException e) {
}
}
}
}
public class ProductThread implements Runnable{
@Override
public void run(){
while(true){
lock.lock();
try{
while(queue.size() == MAX_SLOT){
System.out.println("queue is full");
full.await();
}
Thread.sleep(rnd.nextint(200));
Goods goods = null;
switch(id%3){
case 0 : goods = new Goods("A", 1, id);
break;
case 1 : goods = new Goods("B", 2, id);
break;
case 2 : goods = new Goods("C", 3, id);
break;
}
Thread.sleep(rnd.nextint(200));
queue.add(goods);
id++;
empty.signal();
}
catch(InterruptedException e){
}
finally{
lock.unlock();
}
try {
Thread.sleep(rnd.nextint(100));
}
catch (InterruptedException e) {
}
}
}
}
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo2 pcd = new ProducterComsumerDemo2();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
new Thread(p).start();
new Thread(p).start();
new Thread(c).start();
new Thread(c).start();
}
}
运行结果
queue is empty queue is empty name: B, price:2, id: 1 name: C, price:3, id: 2 name: A, price:1, id: 3 queue is full name: B, price:2, id: 4 name: C, price:3, id: 5 queue is full name: A, price:1, id: 6 name: B, price:2, id: 7 name: C, price:3, id: 8 name: A, price:1, id: 9 name: B, price:2, id: 10 name: C, price:3, id: 11 name: A, price:1, id: 12 name: B, price:2, id: 13 name: C, price:3, id: 14 ……
1.3.2 更优雅的实现方式
下面使用线程池(ThreadPool)和阻塞队列(linkedBlockingQueue)原子类(AtomicInteger)以更加优雅的方式实现上述功能。linkedBlockingQueue阻塞队列仅在take和put方法上锁,所以id必须定义为原子类。
package demo;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.linkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ProducterComsumerDemo4 {
private final int MAX_SLOT = 3;
private linkedBlockingQueue queue = new linkedBlockingQueue(MAX_SLOT);
private AtomicInteger id = new AtomicInteger(1);
private Random rnd = new Random();
public class ComsumeThread implements Runnable{
@Override
public void run(){
while(true){
try {
Thread.sleep(rnd.nextint(200));
Goods goods = queue.take();
System.out.println(goods);
Thread.sleep(rnd.nextint(200));
}
catch (InterruptedException e) {
}
}
}
}
public class ProductThread implements Runnable{
@Override
public void run(){
while(true){
try{
int x = id.getAndIncrement();
Goods goods = null;
Thread.sleep(rnd.nextint(200));
switch(x%3){
case 0 : goods = new Goods("A", 1, x);
break;
case 1 : goods = new Goods("B", 2, x);
break;
case 2 : goods = new Goods("C", 3, x);
break;
}
Thread.sleep(rnd.nextint(200));
queue.put(goods);
Thread.sleep(rnd.nextint(100));
}
catch(InterruptedException e){
}
}
}
}
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo4 pcd = new ProducterComsumerDemo4();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
ExecutorService es = Executors.newCachedThreadPool();
es.execute(p);
es.execute(p);
es.execute(p);
es.execute(c);
es.execute(c);
es.shutdown();
}
}
2.有限商品个数
这个问题显然比上面的问题要复杂不少,原因在于要保证缓存区的商品要全部消费掉,没有重复消费商品,没有覆盖商品,同时还要保证所有线程能够正常结束,防止存在一直阻塞的线程。
2.1使用线程对象,多个缓存位置(有界),多生产者,多消费者
思路定义一下三个变量
private final int TOTAL_NUM = 30; private volatile int productNum = 0; private volatile int comsumedNum = 0;
每生产一个商品 productNum 自增1,直到TOTAL_NUM为止,如果不满足条件 productNum < TOTAL_NUM 则结束进程,自增操作必须在full.await()方法调用之前,防止生产者线程无法唤醒。
同理,每消费一个商品 comsumedNum 自增1,直到TOTAL_NUM为止,如果不满足条件 comsumedNum < TOTAL_NUM 则结束进程,自增操作必须在empty.await()方法调用之前,防止消费者线程无法唤醒。
comsumedNum和productNum相当于计划经济时代的粮票一样,有了它能够保证生产者线程在唤醒后一定需要生产一个商品,消费者线程在唤醒以后一定能够消费一个商品
package demo;
import java.util.linkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducterComsumerDemo3 {
private final int TOTAL_NUM = 30;
private volatile int productNum = 0;
private volatile int comsumedNum = 0;
private final int MAX_SLOT = 2;
private Lock lock = new ReentrantLock();
private Condition full = lock.newCondition();
private Condition empty = lock.newCondition();
private linkedList queue = new linkedList();
private int id = 1;
private Random rnd = new Random();
public class ComsumeThread implements Runnable{
@Override
public void run(){
while(true){
lock.lock();
try {
Thread.sleep(rnd.nextint(250));
if(comsumedNum < TOTAL_NUM){
comsumedNum++;
} else{
break;
}
while(queue.isEmpty()){
System.out.println("queue is empty");
empty.await();
}
Thread.sleep(rnd.nextint(250));
Goods goods = queue.remove();
System.out.println(goods);
Thread.sleep(rnd.nextint(250));
full.signal();
}
catch (InterruptedException e) {
}
finally{
lock.unlock();
}
try {
Thread.sleep(rnd.nextint(250));
}
catch (InterruptedException e) {
}
}
System.out.println(
"customer "
+ Thread.currentThread().getName()
+ " is over");
}
}
public class ProductThread implements Runnable{
@Override
public void run(){
while(true){
lock.lock();
try{
Thread.sleep(rnd.nextint(250));
if(productNum < TOTAL_NUM){
productNum++;
} else{
break;
}
Thread.sleep(rnd.nextint(250));
while(queue.size() == MAX_SLOT){
System.out.println("queue is full");
full.await();
}
Thread.sleep(rnd.nextint(250));
Goods goods = null;
switch(id%3){
case 0 : goods = new Goods("A", 1, id);
break;
case 1 : goods = new Goods("B", 2, id);
break;
case 2 : goods = new Goods("C", 3, id);
break;
}
queue.add(goods);
id++;
empty.signal();
}
catch(InterruptedException e){
}
finally{
lock.unlock();
}
try {
Thread.sleep(rnd.nextint(250));
}
catch (InterruptedException e) {
}
}
System.out.println(
"producter "
+ Thread.currentThread().getName()
+ " is over");
}
}
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo3 pcd = new ProducterComsumerDemo3();
ComsumeThread c = pcd.new ComsumeThread();
ProductThread p = pcd.new ProductThread();
new Thread(p).start();
new Thread(p).start();
new Thread(p).start();
new Thread(c).start();
new Thread(c).start();
new Thread(c).start();
System.out.println("main Thread is over");
}
}
2.2利用线程池,原子类,阻塞队列,以更优雅的方式实现
linkedBlockingQueue阻塞队列仅在take和put方法上锁,所以productNum和comsumedNum必须定义为原子类。
package demo;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.linkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class linkedBlockingQueueDemo {
private final int TOTAL_NUM = 20;
volatile AtomicInteger productNum = new AtomicInteger(0);
volatile AtomicInteger comsumedNum = new AtomicInteger(0);
private final int MAX_SLOT = 5;
private linkedBlockingQueue lbq = new linkedBlockingQueue(MAX_SLOT);
private Random rnd = new Random();
private volatile AtomicInteger pn = new AtomicInteger(1);
public class CustomerThread implements Runnable{
@Override
public void run(){
while(comsumedNum.getAndIncrement() < TOTAL_NUM){
try{
Thread.sleep(rnd.nextint(500));
Goods goods = lbq.take();
Thread.sleep(rnd.nextint(500));
System.out.println(goods);
Thread.sleep(rnd.nextint(500));
}
catch(InterruptedException e){
}
}
System.out.println(
"customer "
+ Thread.currentThread().getName()
+ " is over");
}
}
public class ProducerThread implements Runnable{
@Override
public void run(){
while(productNum.getAndIncrement() < TOTAL_NUM){
try {
int x = pn.getAndIncrement();
Goods goods = null;
switch(x%3){
case 0 : goods = new Goods("A", 1, x);
break;
case 1 : goods = new Goods("B", 2, x);
break;
case 2 : goods = new Goods("C", 3, x);
break;
}
Thread.sleep(rnd.nextint(500));
lbq.put(goods);
Thread.sleep(rnd.nextint(500));
}
catch (InterruptedException e1) {
}
}
System.out.println(
"producter "
+ Thread.currentThread().getName()
+ " is over ");
}
}
public static void main(String[] args){
linkedBlockingQueueDemo lbqd = new linkedBlockingQueueDemo();
Runnable c = lbqd.new CustomerThread();
Runnable p = lbqd.new ProducerThread();
ExecutorService es = Executors.newCachedThreadPool();
es.execute(c);
es.execute(c);
es.execute(c);
es.execute(p);
es.execute(p);
es.execute(p);
es.shutdown();
System.out.println("main Thread is over");
}
}
总结
以上就是本文关于Java多线程中不同条件下编写生产消费者模型方法介绍的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:
Java设计模式之代理模式原理及实现代码分享
快速理解Java设计模式中的组合模式
Java设计模式之访问者模式使用场景及代码示例
如有不足之处,欢迎留言指出。



