- 生产者消费者模型
- 生活中的栗子
- 优点
- 代码的栗子
- BlockQueue生产者消费者模型的实现
我们的日常生活中常见到生产者消费者模型。比如超市。超市有很多的货架,供货商将生产好的商品放到货架上,消费者去超市的货架上拿走商品,完成交易。
- 在这个过程中我们发现了三种关系,生产者和生产者之间是互斥关系,一个货架只能有一个供货商的商品。消费者和消费者之间是互斥关系,因为超市资源有限,你拿走了,我就没法拿。生产者和消费者之间是同步关系,如果货架满了,供应商就不再供应,通知消费者来消费。如果货架空了,消费者就不再消费,通知生产者来生产。
- 2种身份:生产者和消费者
- 1个场所:在这个栗子里就是超市。在计算机就是一块内存区域。
- 考虑这样的情况,如果没有超市这个中间场所,生产者直接将产品送到消费者的手中。那么效率是非常慢的。因为你还要考虑到消费者在不在,有没有空拿产品。还要考虑在哪里存放产品。
- 而有了超市这个中间场所,生产者生成完毕直接丢到超市不用管。消费者直接去超市拿东西。
- 中间场所相当于生产者和消费者之间的缓冲区。
int add(const int lhs, const int rhs){
return lhs + rhs;
}
int main(){
int a = 0;
int b = 1;
int ret = add(a, b);
return 0;
}
我们在main函数中创建两个变量交给add去使用,在这里main函数就是生产者的身份。而add函数返回一个结果,在这里add函数是生产者的身份。
但是这里有一个问题,add函数必须等待main函数将变量构造完毕才能工作,而main函数必须等到add函数return才能继续工作。
也许一次还好,但是如果多次有这种事情发生,那么是非常损失效率的。而且这种串行的方式,如果一个函数出现bug,就得搞垮整个进程,我们说这种方式是高耦合的。
- 而如果我们使用生产者消费者模型就可以优化代码。A线程只负责生产变量,B线程只负责add变量然后打印。
- 这样A线程创建完变量后直接放到中间场所。而B线程同样的直接到中间场所拿数据。即使运行多次,也会非常节省时间。而且大部分时间A和B线程会并行的执行。
- 这叫做解耦。
consumer_productor.hpp文件:
1 #ifndef __CONSUMER_PRODUCTOR_H__ 2 #define __CONSUMER_PRODUCTOR_H__ 3 4 #include5 #include 6 #include 7 8 using std::cout; 9 using std::endl; 10 11 template 12 class BlockQueue{ 13 private: 14 std::size_t _capacity; //阻塞队列有效数据 15 pthread_mutex_t lock; 16 pthread_cond_t consumer_cond; 17 pthread_cond_t productor_cond; 18 std::queue _bq; // 阻塞队列 19 public: 20 void LockQueue(){ 21 pthread_mutex_lock(&lock); 22 } 23 void UnLockQueue(){ 24 pthread_mutex_unlock(&lock); 25 } 26 bool IsEmpty(){ 27 return _bq.size() == 0; 28 } 29 bool IsFull(){ 30 return _bq.size() >= _capacity; 31 } 32 void ConsumerWait(){ 33 cout << "consumer wait" << endl; 34 pthread_cond_wait(&consumer_cond, &lock); 35 } 36 void ProductorWait(){ 37 cout << "productor wait" << endl; 38 pthread_cond_wait(&productor_cond, &lock); 39 } 40 void NodifyConsumer(){ 41 cout << "wake consumer..." << endl; 42 pthread_cond_signal(&consumer_cond); 43 } 44 void NodifyProductor(){ 45 cout << "wake productor..." << endl; 46 pthread_cond_signal(&productor_cond); 47 } 48 public: 49 BlockQueue(size_t capacity) : _capacity(capacity) { 50 // _bq.size() = 0; 51 pthread_mutex_init(&lock, nullptr); 52 pthread_cond_init(&consumer_cond, nullptr); 53 pthread_cond_init(&productor_cond, nullptr); 54 } 55 ~BlockQueue(){ 56 _capacity = 0; 57 pthread_mutex_destroy(&lock); 58 pthread_cond_destroy(&consumer_cond); 59 pthread_cond_destroy(&productor_cond); 60 } 61 62 void Put(const T& task){ 63 LockQueue(); 64 while(IsFull()){ // 这里的细节是使用while,因为wait是有可能失败的。如果使用if,一旦wait失败,就会导致越界 65 //ProductorWait(); 不能先等待再通知,这太蠢了! 我既然都等待了(原地阻塞),那么怎么通知呢? 66 //NodifyConsumer(); 67 68 NodifyConsumer(); 69 ProductorWait(); 70 } 71 // 到这里,阻塞队列非满 72 _bq.push(task); 73 UnLockQueue(); 74 } 75 void Take(T* ptask){ // 这里使用输出型参数的方法拿走任务 76 LockQueue(); 77 while(IsEmpty()){ // 使用while,同上 78 NodifyProductor(); 79 ConsumerWait(); 80 } 81 82 // 到这里,阻塞队列非空 83 *ptask = _bq.front(); 84 _bq.pop(); 85 UnLockQueue(); 86 } 87 }; 88 #endif
main.cc文件:
1 #include "consumer_productor.hpp" 2 #include3 #include 4 5 using namespace std; 6 7 struct task{ 8 int _x; 9 int _y; 10 task(int x = 0, int y = 0): _x(x), _y(y){} 11 int add(){ 12 return _x + _y; 13 } 14 }; 15 16 void* consumer_thread(void* arg){ 17 BlockQueue * bq = static_cast *>(arg); 18 while(1){ 19 task t; 20 bq->Take(&t); //从阻塞队列中拿走任务 21 cout << t._x << " + " << t._y << " = "<< t.add() << endl; 22 sleep(1); 23 } 24 25 } 26 void* productor_thread(void* arg){ 27 BlockQueue * bq = static_cast *>(arg); 28 srand(0); 29 while(1){ 30 task t(rand()%5 + 1, rand()%5 + 2); 31 bq->Put(t); //将任务加入阻塞队列 32 //cout << "queue size is " << bq->_bq.size() << endl; // 用来调试 33 cout << t._x << " + " << t._y << " = ?"<< endl; 34 } 35 } 36 int main(){ 37 pthread_t consumer, productor; 38 BlockQueue * bq = new BlockQueue (5); 39 pthread_create(&consumer, nullptr, consumer_thread, (void*)bq); 40 pthread_create(&productor, nullptr, productor_thread, (void*)bq); 41 42 pthread_join(consumer, nullptr); 43 pthread_join(productor, nullptr); 44 45 }
- 上面的代码是一个经典的单生产者单消费者模型的栗子。关于上面的代码我要说明几点。
- 1,关于pthread_cond_wait函数。这个函数等待条件变量的满足,同时还需要锁作为参数。究其原因,是wait函数等待的时候需要释放锁,不然它占用的临界资源无法被其他的线程使用,那就等不到cond的满足。其次,当条件满足的时候,还需要再次上锁。
//pthread_cond_wait函数 unlock(); wait(); //真正的等待 lock();
-
2, 上面的栗子是消费者每隔1S拿走任务,而生产者相对较快。这就会导致队列很快就会满,虽然我们使用了条件变量来防止极端的情况出现——队列满了之后生产者仍在检查锁,实现了同步,但是这种同步还是有点不尽人意。
-
我们想要让消费者去适配生产者的速度,我们可以多创建几个生产者线程,然后创建一个多生产者多消费者的模型。此时我们就需要更多的锁。第一把锁决定谁是最快的生产者,第二把锁决定谁是最快的消费者,第三把锁决定生产者和消费者谁先工作。
-
这种锁和环境变量的搭配是很好了,但是不够好。阻塞队列有许多的空间,但是每次生产者和消费者只能一者进入该队列。有没有这样一种办法,生产者将任务生产出来,同时消费者就在消费。它们使用的是同一临界资源不同的空间。
-
这就好像在超市,生产者消费者模型要求每次只能进来一个人,即使有很多的空间,很多的货架。但是我们想要的是你生产者生产你的,你去摆你的货架,我消费者就消费我的,我消费你摆好的货架!!



