生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
如何解决?如上百度百科所示,它是一个经典的多线程同步问题,然后解决方案就用我们标题所说的信号量和互斥锁共同解决:
如上图所示,由主线程main创建子线程生产者以及消费者,生产者向缓冲区中写入数据,消费者向缓冲区读取数据,当缓冲区为满(full)时,生产者写入数据的操作必须堵塞,而当缓冲区为空(NULL)时,消费者向缓冲区读取数据的操作也必须堵塞
然后子线程中的生产者线程和消费者线程必定会产生竞争,如何让这若干生产者和若干消费者有序的进行呢,这时候就必须想到多线程的同步,最常用的就是信号量和互斥锁,关于信号量和互斥锁还有不太懂得朋友们可以去看看这篇:Linux多线程的同步-----信号量和互斥锁_神厨小福贵!的博客-CSDN博客
上面说到的缓冲区为NULL时,读取阻塞,如果缓冲区为满(full)时,写入数据进行阻塞,那么我们如何判断这个缓冲区是NULL或者是满(full)的呢?其实很简单,下标来追踪数据,定义两个下标in和out,in即是我们写入数据的下标,out是我们读取数据的下标,每写入一次in++,每读取一次,out++,然后这两下标的初始值都为0,因为刚开始的时候没有数据。
然后我们数据追踪的事情解决了,下面就来进行信号量初值的给定,假设我们这里缓冲区最大可以存储一个buff[10]的int整型数据,定义一个buff数组最大值为初始写入的信号量,表示还有空间,可以写入那么生产者写入操作的信号量初值为10
那么消费者读取数据的操作的信号量的初值代表的含义就是,缓冲区中有没有数据,刚开始缓冲区中肯定是没有数据的,所以读取操作的信号量的初值给0就好
上图中的条件就是我们所说的生产者和消费者有序进行的条件,但是生产者消费者不可能只是单个的一条执行路线,可能会有多个生产者和多个消费者,一起共同运作,那么当生产者/消费者对缓冲区写入/读取数据的时候,怎么保证其他的生产者/消费者不会对共同下标的数据进行操作呢,也急速在某一时间内,只能有单个生产者或消费者对缓冲区进行操作,这是不是很像互斥锁的概念:
对的,就是互斥锁,这块为了不让若干生产者/消费者共同对同一下标的数据进行操作,这块在信号量之后加上互斥锁,就可以保证同样一时间只有某个单一的操作对缓冲区进行操作:
下面来看代码:
#include下面来看运行截图:#include #include #include #include #include #include #define BUFF_MAX 10 //假设缓冲区buff最大存储数据为10 #define SC_NUM 2 //假设两个生产者 #define XF_NUM 3 //假设两个消费者 sem_t empty; //信号量empty sem_t full; //信号量full pthread_mutex_t mutex; //互斥锁的定义 int buff[BUFF_MAX]; //缓冲区存储10十个数据 int in = 0; //初始写入下标为0 int out = 0; //初始读取下标为0 void * sc_thread(void * arg) //生产者线程函数 sc的意思就是生产的缩写 { int index = (int)arg; for(int i = 0 ; i < 30 ; i++) { sem_wait(&empty); //对初始值为10的信号量进行减一的操作 pthread_mutex_lock(&mutex); //上锁 buff[in] = rand() % 100; //在缓冲区中存储的数据是多少我们不关心,只需要知道那玩意存进去就好了 printf("第%d个生产者,产生了%d数据,在%d位置上n",index,buff[in],in); //打印做个标记 in = (in + 1) % BUFF_MAX; //假设写满之后,重头开始覆盖前面的数据,不会产生越界 pthread_mutex_unlock(&mutex); //解锁 sem_post(&full); //对full为0的初始值信号量进行加一,告诉消费者函数,可以进行读取了 int n = rand() % 3; sleep(n); //随机进行睡眠一到三秒 } } void * xf_thread(void * arg) //xf是消费者的缩写 { int index = (int)arg; for(int i = 0 ; i < 20 ; i++) //因为生产者有两个,各自写入30次总共就是60次 { //然后消费者有三个,各自读取20次,刚好可以读完 sem_wait(&full); pthread_mutex_lock(&mutex); printf("第%d个消费者,读取了%d数据,在%d位置上n",index,buff[out],out); out = (out + 1) % BUFF_MAX; pthread_mutex_unlock(&mutex); sem_post(&empty); int n = rand() % 10; sleep(n); } } int main() { sem_init(&empty,0,BUFF_MAX); sem_init(&full,0,0); pthread_mutex_init(&mutex,NULL); srand(time(NULL)); pthread_t sc_id[SC_NUM]; pthread_t xf_id[XF_NUM]; for(int i = 0 ; i < SC_NUM ; i++) { pthread_create(&sc_id[i],NULL,sc_thread,(void*)i); } for(int i = 0 ; i < XF_NUM ; i++) { pthread_create(&xf_id[i],NULL,xf_thread,(void*)i); } for(int i = 0 ; i < SC_NUM ; i++) { pthread_join(sc_id[i],NULL); } for(int i = 0 ; i < XF_NUM ; i++) { pthread_join(xf_id[i],NULL); } sem_destroy(&empty); sem_destroy(&full); pthread_mutex_destroy(&mutex); printf("main run overn"); exit(0); }
运行结果如上所示,需要注意的一点就是这道题的难点就在于,信号量里面嵌套一个互斥锁,来达到让若干生产者/消费者不会同时对某一块缓冲区进操作,代码理解的话,都不是很难,在前面一篇也都有讲到!
“加油 兄弟们!”



