栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

linux线程同步--生产者消费者模型

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

linux线程同步--生产者消费者模型

小编最近面试一家公司3面的时候,面试官就提出了一个他们的项目用的就是一个生产者消费者模型,可见这种模型还是有使用价值的。那么小编今天就带搞一下生产者消费者模型。(题外话:面试问的最多的就是操作系统,多线程,网络编程,数据库的知识,语言的话会让你手撕代码,当然数据结构和算法肯定是要过关的,这是一门语言的基础,万一人家让你手撕一下红黑树不就拉闸了吗?所以大家这些知识一定扎实呀,当然小编也会一直更新到数据库截止,c++的内容就算完成了,当然像高阶的数据结构(B+树,跳表等)和高阶的算法等,小编能力有限,不过小编也会一直努力加油的) 我们先回顾上节我们说到的互斥锁代码如下:
#include
using namespace std;
#include
#include
#define NUM 5
class Ticket
{
private:
   size_t  tickets;
   pthread_mutex_t mutex;
public:
    Ticket()
        :tickets(1000)  //1000张票
    {
        pthread_mutex_init(&mutex,nullptr);
    }
    bool GetTicket()
    {
        bool res = true;
        pthread_mutex_lock(&mutex);
        if(tickets > 0)
        {
            usleep(1000);
            cout<<"我是线程["<GetTicket())
        {
            break;
        }
    }
}
int main()
{
    pthread_t tid[NUM];
    Ticket * t = new Ticket();
    for(size_t i = 0; i< NUM; ++i)
    {
        pthread_create(tid+i,nullptr,pthread_run,(void*) t);
    }
    for(size_t i = 0; i< NUM; ++i)
    {
        pthread_join(tid[i],nullptr);
    }
}

可以看出有的线程抢占cpu资源一直不放,这样其实是不公平的,所以今天我们还需要学习条件变量 Linux线程同步 条件变量 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情 况就需要用到条件变量。 举个例子:比如你喜欢一个女生,你就说我可以做你男朋友吗?人家说不好意思呀,我有男朋友了,明天你又去问她,他说不好意思呀,我有男朋友了,你天天都去问人家。这样肯定是不好的,所以你找到她,说要不我们加个微信,等你啥时候和男朋友分手了,我就做你男朋友好不好。条件变量类似这种情景。但是想告诫XDM不要做舔狗(舔狗添到最后一无所有)。 同步概念与竞态条件 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解 下面我就看一组接口: 条件变量函数 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr); 
参数:cond:
要初始化的条件变量 attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:cond:要在这个条件变量上等待 
mutex:互斥量,后面详细解释

这个接口可以理解为你喜欢的那个女生和她男朋友分手了 

唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

 pthread_cond_signal:可以理解为你喜欢的女生只告诉你,她和她男朋友分手了,你有机会了

 pthread_cond_broadcast:可以理解为你喜欢的女生告诉她的鱼塘的所有的鱼,她和她男朋友分手了,你们都有机会了。

我们现在就使用一下这个接口:场景就用上面的场景,一个女生和多个男朋友的故事吧。
#include
using namespace std;
#define NUMMAN  5  //鱼塘里有5只鱼
#include
#include
#include
//逻辑1个女生线程,5个男生线程,女生可以控制谁做他男朋友
pthread_mutex_t mutex;
pthread_cond_t cond;
void *pthread_woman(void *args)
{
    while(true)
    {
        cout<<"我是女生:我需要一个男朋友"< 
结果如下图: 

可以看出女生每次唤醒的男生线程一定要遵循队列的原则,先来后到的原则,不会出现一个男生线程占着女生线程不放的情况了,这就是我们说的同步机制。

上面是一次唤醒一个男生线程,也可以一次唤醒全部的男生线程。用

int pthread_cond_broadcast(pthread_cond_t *cond);就可以了如下图:

 为什么 pthread_cond_wait 需要互斥量?

条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须 要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件 变量上的线程。 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有 互斥锁就无法安全的获取和修改共享数据。  生产者消费者模型 为何要使用生产者消费者模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而 通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者 要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队 列就是用来给生产者和消费者解耦的。 生产者消费者模型优点 解耦 支持并发 支持忙闲不均 举个例子:

 如果用我们之前的单进程单线程的模式。funA将参数传递给funB加工,而funA是阻塞的,必须要要等funB执行完毕才能运行下一条代码,而生产者消费者模型解决这种问题,两个线程(可以使多个生产者多个消费者)线程生产出a,b放入队列中,线程B直接从队列中取数据,这样两个线程就进行了解耦,可以并行的执行各自的代码,提高了效率。 在举个例子

 消费者要消费不用直接从生产者哪里那,而是从超市购买即可,超市充当了交易场所。超市就是临界资源。

所以生产者消费者模型:可以记为“321”模式, 3.三种关系 (生产者VS生产者,消费者VS消费者,生产者VS消费者) 2.两种角色 (生产者 和消费者) 1.一段缓冲区 (内存空间,STL容器等) 下面我们先实现一个生产者消费者模型: 基于 BlockingQueue 的生产者消费者模型 BlockingQueue 在多线程编程中阻塞队列 (Blocking Queue) 是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出( 以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程 操作时会被阻塞)

C++ queue模拟阻塞队列的生产消费模型

代码如下 :

BlockQueue.hpp

#include
#include
#include
#include
#include

namespace Ns_Block_Queue
{
    template
    class BlocQueue
    {
    private:
        std::queue _qb;  //队列
        size_t _cap; //队列大小
        pthread_mutex_t mutex;   //STL容器中的一般都是要加锁的,queue是临界资源所以要加锁
        pthread_cond_t is_empty;  //条件变量,队列为空
        pthread_cond_t is_fill;  //条件变量,队列为满
    private:
    void LockQueue()
    {
        pthread_mutex_lock(&mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&mutex);
    }
    bool IsFill()
    {
        return _qb.size() == 5;        
    }
    bool IsEmpty()
    {
        return _qb.empty();
    }
    void WaitProducter()
    {
        //生产者等待队列为空在生产,这里我们先这样搞等会可以修改策略
         pthread_cond_wait(&is_empty,&mutex); 
    }
    void WaitConsumer()
    {
        //消费者等待队列为满在生产,这里我们先这样搞等会可以修改策略
         pthread_cond_wait(&is_fill,&mutex); 
    }
    void WackUpConsumer()
    {
        pthread_cond_signal(&is_fill);
    }
    void WackUpProducter()
    {
        pthread_cond_signal(&is_empty);
    }
    public:
        BlocQueue(size_t cap = 5)  //我们默认给5个的缺省值就可以,方便做实验
            :_cap(cap)
        {
            pthread_mutex_init(&mutex,nullptr);
            pthread_cond_init(&is_empty,nullptr);
            pthread_cond_init(&is_fill,nullptr);
        }
        //这里告诉小伙伴一个职业化的内容
        //一般纯输入型参数用: const &
        //纯输出型参数用:*
        //输入也是输出用 *
        void Push(const T& data)  //生产者生产
        {
            LockQueue();   //加锁
            if(IsFill())   //队列为满,不能在生产了
            {
                WaitProducter(); //生产者等待
            }
            _qb.push(data);    //队列是临界资源需要加锁
            //插入一个就可以唤醒消费者线程消费了
            WackUpConsumer(); 
            UnlockQueue(); //解锁
            //WackUpConsumer();  这里写到解锁前和解锁后都可以,机制不一样,但是效果一样
        }
        void Pop(T* out)   //消费者消费
        {
            LockQueue();   //加锁
            if(IsEmpty()) //为空不能消费,等待生产者线程生产
            {
                WaitConsumer();   //消费者等待
            }
            *out = _qb.front(); //队列是临界资源需要加锁
            _qb.pop();
            //唤醒生产者进行生产
            WackUpProducter();
            UnlockQueue(); //解锁
             //WackUpConsumer();  这里写到解锁前和解锁后都可以,机制不一样,但是效果一样
        }
        ~BlocQueue()
        {
            pthread_mutex_destroy(&mutex);
            pthread_cond_destroy(&is_empty);
            pthread_cond_destroy(&is_fill);
        }
    };
}

 CpTest.cc

#include"BlockQueue.hpp"
#include

void *consumer(void *args)
{
    Ns_Block_Queue::BlocQueue * bq = (Ns_Block_Queue::BlocQueue *)args;
    while(true)   //消费者一定是在不断消费
    {
        int data = 0;
        bq->Pop(&data);
        std::cout<<"消费者消费数据:"< * bq = (Ns_Block_Queue::BlocQueue *)args;
    while(true)    //生产者一定是在不断生产
    {
        int data = rand() % 20 + 1;
        std::cout<<"生产者生产数据:"<Push(data); 
        sleep(2);
    }
}
int main()
{
    pthread_t c,p; //创建一个消费者一个生产者线程
    Ns_Block_Queue::BlocQueue * bq = new Ns_Block_Queue::BlocQueue;
    pthread_create(&c,nullptr,consumer,(Ns_Block_Queue::BlocQueue *)bq);
    pthread_create(&p,nullptr,producter,(Ns_Block_Queue::BlocQueue *)bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    return 0;
}
运行结果如下:

 为了做实验代码我都是尽量写的简单易懂,小伙伴也可以根据自己的实际需求,实现程序健壮性更强,比较加入多态,继承,或者实现自己的算法等都可以,小编只是为了阐述生产者消费者这种设计模型,还有小伙伴可能会问c++11不是引进了线程库吗?小编为啥不用c++11,小编想说的是,我主要是在讲Linux系统编程,为了配合后面的网络编程,毕竟c++并没有提供标准的网络编程库,当然小编也一直期待着c++大佬能设计出一套标准的网络编程库,毕竟c++是小编的母语。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/875144.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号