栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

基于阻塞队列的生产者消费者模型

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于阻塞队列的生产者消费者模型

目录
  • 生产者消费者模型
    • 生活中的栗子
    • 优点
    • 代码的栗子
  • BlockQueue生产者消费者模型的实现

生产者消费者模型 生活中的栗子

我们的日常生活中常见到生产者消费者模型。比如超市。超市有很多的货架,供货商将生产好的商品放到货架上,消费者去超市的货架上拿走商品,完成交易。

  • 在这个过程中我们发现了三种关系,生产者和生产者之间是互斥关系,一个货架只能有一个供货商的商品。消费者和消费者之间是互斥关系,因为超市资源有限,你拿走了,我就没法拿。生产者和消费者之间是同步关系,如果货架满了,供应商就不再供应,通知消费者来消费。如果货架空了,消费者就不再消费,通知生产者来生产。
  • 2种身份:生产者和消费者
  • 1个场所:在这个栗子里就是超市。在计算机就是一块内存区域。
优点
  • 考虑这样的情况,如果没有超市这个中间场所,生产者直接将产品送到消费者的手中。那么效率是非常慢的。因为你还要考虑到消费者在不在,有没有空拿产品。还要考虑在哪里存放产品。
  • 而有了超市这个中间场所,生产者生成完毕直接丢到超市不用管。消费者直接去超市拿东西。
  • 中间场所相当于生产者和消费者之间的缓冲区。
代码的栗子
int add(const int lhs, const int rhs){
	return lhs + rhs;
}

int main(){
	int a = 0; 
	int b = 1;
	int ret = add(a, b);
	return 0;
}

我们在main函数中创建两个变量交给add去使用,在这里main函数就是生产者的身份。而add函数返回一个结果,在这里add函数是生产者的身份。
但是这里有一个问题,add函数必须等待main函数将变量构造完毕才能工作,而main函数必须等到add函数return才能继续工作。
也许一次还好,但是如果多次有这种事情发生,那么是非常损失效率的。而且这种串行的方式,如果一个函数出现bug,就得搞垮整个进程,我们说这种方式是高耦合的。

  • 而如果我们使用生产者消费者模型就可以优化代码。A线程只负责生产变量,B线程只负责add变量然后打印。
  • 这样A线程创建完变量后直接放到中间场所。而B线程同样的直接到中间场所拿数据。即使运行多次,也会非常节省时间。而且大部分时间A和B线程会并行的执行。
  • 这叫做解耦。
BlockQueue生产者消费者模型的实现

consumer_productor.hpp文件:

  1 #ifndef __CONSUMER_PRODUCTOR_H__  
  2 #define __CONSUMER_PRODUCTOR_H__  
  3   
  4 #include   
  5 #include   
  6 #include   
  7   
  8 using std::cout;  
  9 using std::endl;  
 10   
 11 template  
 12 class BlockQueue{  
 13 private:  
 14   std::size_t _capacity; //阻塞队列有效数据  
 15   pthread_mutex_t lock;  
 16   pthread_cond_t consumer_cond;  
 17   pthread_cond_t productor_cond;  
 18   std::queue _bq;    // 阻塞队列  
 19 public:  
 20   void LockQueue(){  
 21     pthread_mutex_lock(&lock);  
 22   }  
 23   void UnLockQueue(){  
 24     pthread_mutex_unlock(&lock);                                                                                             
 25   }  
 26   bool IsEmpty(){  
 27     return _bq.size() == 0;  
 28   }  
 29   bool IsFull(){  
 30     return _bq.size() >= _capacity;  
31   }
 32   void ConsumerWait(){
 33     cout << "consumer wait" << endl;
 34     pthread_cond_wait(&consumer_cond, &lock);
 35   }
 36   void ProductorWait(){
 37     cout << "productor wait" << endl;
 38     pthread_cond_wait(&productor_cond, &lock);
 39   }
 40   void NodifyConsumer(){
 41     cout << "wake consumer..." << endl;
 42     pthread_cond_signal(&consumer_cond);
 43   }
 44   void NodifyProductor(){
 45     cout << "wake productor..." << endl;
 46     pthread_cond_signal(&productor_cond);
 47   }
 48 public:
 49   BlockQueue(size_t capacity) : _capacity(capacity) {
 50        //   _bq.size() = 0;
 51        pthread_mutex_init(&lock, nullptr);   
 52        pthread_cond_init(&consumer_cond, nullptr);   
 53        pthread_cond_init(&productor_cond, nullptr);                                                                          
 54   }
 55   ~BlockQueue(){
 56     _capacity = 0;
 57     pthread_mutex_destroy(&lock);
 58     pthread_cond_destroy(&consumer_cond);
59     pthread_cond_destroy(&productor_cond);
 60   }
 61 
 62   void Put(const T& task){
 63     LockQueue();
 64     while(IsFull()){   // 这里的细节是使用while,因为wait是有可能失败的。如果使用if,一旦wait失败,就会导致越界
 65       //ProductorWait();  不能先等待再通知,这太蠢了! 我既然都等待了(原地阻塞),那么怎么通知呢?
 66       //NodifyConsumer();
 67       
 68       NodifyConsumer();
 69       ProductorWait();
 70     }  
 71     // 到这里,阻塞队列非满
 72     _bq.push(task);
 73     UnLockQueue();
 74   }
 75   void Take(T* ptask){ // 这里使用输出型参数的方法拿走任务
 76       LockQueue();
 77       while(IsEmpty()){   // 使用while,同上
 78         NodifyProductor();
 79         ConsumerWait();
 80       }
 81 
 82       // 到这里,阻塞队列非空
 83       *ptask = _bq.front();
 84       _bq.pop();
 85       UnLockQueue();
 86   }
 87 };
 88 #endif                                                                     

main.cc文件:

  1 #include "consumer_productor.hpp"
  2 #include 
  3 #include 
  4 
  5 using namespace std;
  6 
  7 struct task{
  8   int _x;
  9   int _y;
 10   task(int x = 0, int y = 0): _x(x), _y(y){}
 11   int add(){
 12     return _x + _y;
 13   }
 14 };
 15                                                                                                                              
 16 void* consumer_thread(void* arg){
 17   BlockQueue* bq = static_cast*>(arg);
 18   while(1){
 19     task t;
 20     bq->Take(&t); //从阻塞队列中拿走任务
 21     cout << t._x << " + " << t._y << " = "<< t.add() << endl;
 22     sleep(1);
 23   }
 24 
 25 }
 26 void* productor_thread(void* arg){
 27   BlockQueue* bq = static_cast*>(arg);
 28   srand(0);
 29   while(1){
 30     task t(rand()%5 + 1, rand()%5 + 2);
31     bq->Put(t); //将任务加入阻塞队列
 32   //cout << "queue size is " << bq->_bq.size() << endl;  // 用来调试
 33     cout << t._x << " + " << t._y << " = ?"<< endl;
 34   }
 35 }
 36 int main(){
 37   pthread_t consumer, productor;
 38   BlockQueue* bq = new BlockQueue(5); 
 39   pthread_create(&consumer, nullptr, consumer_thread, (void*)bq);
 40   pthread_create(&productor, nullptr, productor_thread, (void*)bq);
 41 
 42   pthread_join(consumer, nullptr);
 43   pthread_join(productor, nullptr);
 44 
 45 }                           
  • 上面的代码是一个经典的单生产者单消费者模型的栗子。关于上面的代码我要说明几点。
  • 1,关于pthread_cond_wait函数。这个函数等待条件变量的满足,同时还需要锁作为参数。究其原因,是wait函数等待的时候需要释放锁,不然它占用的临界资源无法被其他的线程使用,那就等不到cond的满足。其次,当条件满足的时候,还需要再次上锁。
//pthread_cond_wait函数
unlock(); 
wait();  //真正的等待
lock();
  • 2, 上面的栗子是消费者每隔1S拿走任务,而生产者相对较快。这就会导致队列很快就会满,虽然我们使用了条件变量来防止极端的情况出现——队列满了之后生产者仍在检查锁,实现了同步,但是这种同步还是有点不尽人意。

  • 我们想要让消费者去适配生产者的速度,我们可以多创建几个生产者线程,然后创建一个多生产者多消费者的模型。此时我们就需要更多的锁。第一把锁决定谁是最快的生产者,第二把锁决定谁是最快的消费者,第三把锁决定生产者和消费者谁先工作。

  • 这种锁和环境变量的搭配是很好了,但是不够好。阻塞队列有许多的空间,但是每次生产者和消费者只能一者进入该队列。有没有这样一种办法,生产者将任务生产出来,同时消费者就在消费。它们使用的是同一临界资源不同的空间。

  • 这就好像在超市,生产者消费者模型要求每次只能进来一个人,即使有很多的空间,很多的货架。但是我们想要的是你生产者生产你的,你去摆你的货架,我消费者就消费我的,我消费你摆好的货架!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/287887.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号