C++多线程-生产者和消费者
#include#include #include #include #include #include using namespace std; //缓冲区存储的数据类型 struct CacheData { //商品id int id; //商品属性 string data; }; queue Q; //缓冲区最大空间 const int MAX_CACHEDATA_LENGTH = 10; //互斥量,生产者之间,消费者之间,生产者和消费者之间,同时都只能一个线程访问缓冲区 mutex m; //同步线程 condition_variable condConsumer;//生产者调用condConsumer.notify_one()消费者开始启动,不在堵塞 condition_variable condProducer;//消费者调用condProducer.notify_one()生产者开始启动,不在堵塞 //全局商品id int ID = 1; //消费者动作 void ConsumerActor() { unique_lock lockerConsumer(m); cout << "[" << this_thread::get_id() << "] 获取了锁" << endl; while (Q.empty()) { cout << "因为队列为空,所以消费者Sleep" << endl; cout << "[" << this_thread::get_id() << "] 不再持有锁" << endl; //队列空, 消费者停止,等待生产者唤醒 condConsumer.wait(lockerConsumer); cout << "[" << this_thread::get_id() << "] Weak, 重新获取了锁" << endl; } cout << "[" << this_thread::get_id() << "] "; CacheData temp = Q.front(); cout << "- ID:" << temp.id << " data:" << temp.data << endl; Q.pop(); condProducer.notify_one();//通知生产者队列不为满了,可以生产了 cout << "[" << this_thread::get_id() << "] 释放了锁" << endl; } //生产者动作 void ProducerActor() { unique_lock lockerProducer(m); cout << "[" << this_thread::get_id() << "] 获取了锁" << endl; while (Q.size() > MAX_CACHEDATA_LENGTH) { cout << "因为队列为满,所以生产者Sleep" << endl; cout << "[" << this_thread::get_id() << "] 不再持有锁" << endl; //对列慢,生产者停止,等待消费者唤醒 condProducer.wait(lockerProducer); cout << "[" << this_thread::get_id() << "] Weak, 重新获取了锁" << endl; } cout << "[" << this_thread::get_id() << "] "; CacheData temp; temp.id = ID++; temp.data = "*****"; cout << "+ ID:" << temp.id << " data:" << temp.data << endl; Q.push(temp); condConsumer.notify_one();//通知消费者队列不为空了,可以消费了 cout << "[" << this_thread::get_id() << "] 释放了锁" << endl; } //消费者 void ConsumerTask() { while (1) { ConsumerActor(); } } //生产者 void ProducerTask() { while (1) { ProducerActor(); } } //管理线程的函数 void Dispatch(int ConsumerNum, int ProducerNum) { vector thsC; for (int i = 0; i < ConsumerNum; ++i) { thsC.push_back(thread(ConsumerTask)); } vector thsP; for (int j = 0; j < ProducerNum; ++j) { thsP.push_back(thread(ProducerTask)); } for (int i = 0; i < ConsumerNum; ++i) { if (thsC[i].joinable()) { thsC[i].join(); } } for (int j = 0; j < ProducerNum; ++j) { if (thsP[j].joinable()) { thsP[j].join(); } } } int main() { //一个消费者线程,5个生产者线程,则生产者经常要等待消费者 Dispatch(1, 5); getchar(); return 0; }
原文参考链接



