栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

[C++多线程]1.3-多线程控制的另一种姿势-条件变量(condition

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

[C++多线程]1.3-多线程控制的另一种姿势-条件变量(condition

文章目录

条件变量(C++11)

为什么要引入条件变量条件变量的用法条件变量引发的虚假唤醒 信号量(C++20)

std::binary_semaphore使用counting_semaphore使用

条件变量(C++11) 为什么要引入条件变量

我们先来看看一个由互斥量加锁构成的生产者消费者模型:

//
// Created by Alone on 2022-3-27.
//
#include 
#include 
#include 
#include 
std::mutex mtx;
std::deque q;

// producer
void task1(){
    int i = 0;
    while (1){
        std::unique_lock lock(mtx);
        //std::this_thread::sleep_for(std::chrono::milliseconds(10));
        q.push_back(i);
        if (i < 9999999) {
            i++;
        }else {
            i = 0;
        }
    }
}

// consumer
void task2(){
    int data = 0;
    while (1) {
        std::unique_lock lock(mtx);
        if(!q.empty()){
            data = q.front();
            q.pop_front();
            std::cout<<"Get value from que task2:"< lock(mtx);
        if (!q.empty()) {
            data = q.front();
            q.pop_front();
            std::cout<<"Get value from que task3:"< 

以上代码,由于直接的while(1)循环会导致cpu资源占用的非常厉害,我们可以通过延时sleep_for来进行优化,但这个延时的时间我们并不好控制!

我们这个生产者、消费者线程,想要实现的愿景就是,当生成者生产出资源后,我们能够及时的唤醒消费者线程,让其获取资源。

但如果是简单的对生产者和消费者进行加锁来实现这一过程,可能中间会有很多过程是在消费者拿到锁后,发现生产者并没有生产出资源,而这个过程很明显就是一个无用功,那么有没有一种方式能够让生产者生产出资源后,立马通知消费者线程来读取,且在没有资源的时候,消费者线程能够阻塞让出cpu时间片呢?实现这个需求有很多种方法,而条件变量就是其中的一种!

条件变量的用法

从C++11起,标准库开始引入条件变量。

它的成员函数也不复杂,就下面这些:

更多详细描述

void wait (unique_lock& lck);

这是非模板成员函数类型,接收一个unique_lock,调用后,会帮你unlock,并且线程陷入等待状态,直到被调用notify唤醒。

有关notify的成员函数也就这两个:notify_one和notify_all。

顾名思义,随机唤醒一个,和唤醒全部处于等待被唤醒的线程。

我们再利用新学的条件变量改造下前面的代码如下:

#include 
#include 
#include 
#include 
#include 
std::mutex mtx;
std::deque q;
std::condition_variable cv;

// producer
void task1(){
    int i = 0;
    while (1){
        std::unique_lock lock(mtx);
        q.push_back(i);
        cv.notify_one();
        if (i < 9999999) {
            i++;
        }else {
            i = 0;
        }
    }
}

// consumer
void task2(){
    int data = 0;
    while (1) {
        std::unique_lock lock(mtx);
        if(q.empty()) {
            cv.wait(lock);
        }
        data = q.front();
        q.pop_front();
        std::cout<<"Get value from que task2:"< lock(mtx);
        if(q.empty()){
            cv.wait(lock);
        }
        data = q.front();
        q.pop_front();
        std::cout<<"Get value from que task3:"< 
条件变量引发的虚假唤醒 

什么是虚假唤醒?

前面我们写的利用条件变量写的生产者消费者线程,我可以肯定的告诉你,它是有问题的,运行起来肯定是会报错的!

这是因为虚假唤醒的原因,那么什么是虚假唤醒呢?

虚假唤醒的意思是,当一个正在等待条件变量的线程由于条件变量被触发而唤醒时,却发现它等待的条件(共享数据)没有满足(也就是没有共享数据)。

简而言之就是:明明当前线程已经被唤醒了,却得不到需要的数据。

虚假唤醒的产生分析:

那么我们来分析一下,上面的代码是如何发生的虚假唤醒,如果出现以下情形:task1刚好生成出一个数据到q中,而此时task2被唤醒,把数据读出后又pop掉,然后进入mutex争夺,进入阻塞或者是得到锁,按理来说,只要notify_one真的只会唤醒一个在等待的线程,那么一个生产者对应多个消费者的情况下,是不会产生虚假唤醒的。后面我多番查找资料,说是在多核处理器的环境下,notify_one可能会唤醒不止一个线程,所以会产生一个虚假唤醒,这就导致明明q是空的,却在被读取!

如何避免虚假唤醒?

一个简单粗暴的避免虚假唤醒的法子就是把if语句改为while语句就行,这个产生的直接作用就是,本来唤醒后会因为没有达到预期情况却还往下执行,而while的加入则确保被唤醒的线程一定要是满足预期情况!

代码如下:

#include 
#include 
#include 
#include 
#include 
std::mutex mtx;
std::deque q;
std::condition_variable cv;

// producer
void task1(){
    int i = 0;
    while (1){
        std::unique_lock lock(mtx);
        q.push_back(i);
        cv.notify_one();
        if (i < 9999999) {
            i++;
        }else {
            i = 0;
        }
    }
}

// consumer
void task2(){
    int data = 0;
    while (1) {
        std::unique_lock lock(mtx);
        while (q.empty()) {
            cv.wait(lock);
        }
        data = q.front();
        q.pop_front();
        std::cout<<"Get value from que task2:"< lock(mtx);
        while (q.empty()){
            cv.wait(lock);
        }
        data = q.front();
        q.pop_front();
        std::cout<<"Get value from que task3:"< 
信号量(C++20) 

定义于头文件

信号量是C++20正式加入标准库的,之前使用信号量都是直接调用Linux或者window的底层API,没有统一的接口。

信号量应该算是操作系统里面的一个概念。

具体而言:

维基百科:信号量(英语:Semaphore)又称为信号量、旗语,是一个同步对象,用于保持在0至指定最大值之间的一个计数值。当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;当线程完成一次对semaphore对象的释放(release)时,计数值加一。当计数值为0,则线程等待该semaphore对象不能成功直至该semaphore对象变成signaled状态。semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态.

其中,信号量又分为两种:二进制信号量和计数信号量。

对应到C++20里面的semaphore就是:

std::binary_semaphore 和 counting_semaphore

std::binary_semaphore使用

其实binary_semaphore就是counting_sesmaphore的一个特化而已。

定义如下:

using binary_semaphore = std::counting_semaphore<1>;

讲信号量使用前,我们需要讲讲它的基本运用场景,它一般不使用在存在资源竞争的多线程情况下,比如之前的生产者消费者线程,用信号量是非常不适合的。

比较适合的情况是:某些线程需要在满足某个情况后被通知执行,有点类似于Qt的信号槽机制。

以下有个使用示例:

以下代码已经还有充分的注释了,具体而言就是可以通过release方法让计数器+1,从而使得信号量状态发生改变,由于binary_semaphore只有0和1两个状态,当状态为1的时候,会使得被阻塞的线程激活,而被激活后会立马把状态-1为0,使得其他线程还是被阻塞状态,所以binary的信号量只能通知一个线程执行任务。

以下代码定义了两个信号量,一个是从main线程传递到子线程的信号量,一个是从子线程传递到main线程的信号量。

#include 
#include 
#include 
#include 


// global binary semaphore instances
// object counts are set to zero
// objects are in non-signaled state
std::binary_semaphore
        smphSignalMainToThread{0},
        smphSignalThreadToMain{0};

void ThreadProc()
{
    // wait for a signal from the main proc
    // by attempting to decrement the semaphore
    smphSignalMainToThread.acquire();

    // this call blocks until the semaphore's count
    // is increased from the main proc

    std::cout << "[thread] Got the signaln"; // response message

    // wait for 3 seconds to imitate some work
    // being done by the thread
    using namespace std::literals;
    std::this_thread::sleep_for(3s);

    std::cout << "[thread] Send the signaln"; // message

    // signal the main proc back
    smphSignalThreadToMain.release();
}

int main()
{
    // create some worker thread
    std::thread thrWorker(ThreadProc);

    std::cout << "[main] Send the signaln"; // message

    // signal the worker thread to start working
    // by increasing the semaphore's count
    smphSignalMainToThread.release();

    // wait until the worker thread is done doing the work
    // by attempting to decrement the semaphore's count
    smphSignalThreadToMain.acquire();

    std::cout << "[main] Got the signaln"; // response message
    thrWorker.join();
}
counting_semaphore使用

原理与binary版本完全一致只是状态不只是0和1,它能够自定义上限的状态,如下代码,我将上限定为了3,那么release调用的时候可以设置最多+3,那么它就能成功唤醒三个线程.

#include 
#include 
#include 
#include 


// global binary semaphore instances
// object counts are set to zero
// objects are in non-signaled state
std::counting_semaphore<3>
        smphSignalMainToThread{0},
        smphSignalThreadToMain{0};
void ThreadProc2()
{
    // wait for a signal from the main proc
    // by attempting to decrement the semaphore
    smphSignalMainToThread.acquire();

    // this call blocks until the semaphore's count
    // is increased from the main proc

    std::cout << "[thread] Got the signal2n"; // response message

    // wait for 3 seconds to imitate some work
    // being done by the thread
    using namespace std::literals;
    std::this_thread::sleep_for(3s);

    std::cout << "[thread] Send the signal2n"; // message

    // signal the main proc back
    smphSignalThreadToMain.release();
}

void ThreadProc1()
{
    // wait for a signal from the main proc
    // by attempting to decrement the semaphore
    smphSignalMainToThread.acquire();

    // this call blocks until the semaphore's count
    // is increased from the main proc

    std::cout << "[thread] Got the signal1n"; // response message

    // wait for 3 seconds to imitate some work
    // being done by the thread
    using namespace std::literals;
    std::this_thread::sleep_for(3s);

    std::cout << "[thread] Send the signal1n"; // message

    // signal the main proc back
    smphSignalThreadToMain.release();
}

void ThreadProc3()
{
    // wait for a signal from the main proc
    // by attempting to decrement the semaphore
    smphSignalMainToThread.acquire();

    // this call blocks until the semaphore's count
    // is increased from the main proc

    std::cout << "[thread] Got the signal3n"; // response message

    // wait for 3 seconds to imitate some work
    // being done by the thread
    using namespace std::literals;
    std::this_thread::sleep_for(3s);

    std::cout << "[thread] Send the signal3n"; // message

    // signal the main proc back
    smphSignalThreadToMain.release();
}

int main()
{
    // create some worker thread
    std::thread thrWorker1(ThreadProc1);
    std::thread thrWorker2(ThreadProc2);

    std::cout << "[main] Send the signaln"; // message

    // signal the worker thread to start working
    // by increasing the semaphore's count
    smphSignalMainToThread.release(3);

    // wait until the worker thread is done doing the work
    // by attempting to decrement the semaphore's count
    smphSignalThreadToMain.acquire();

    std::cout << "[main] Got the signaln"; // response message
    thrWorker1.join();
    thrWorker2.join();
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/780137.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号