栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

03 重修C++之并发实战4.1

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

03 重修C++之并发实战4.1

03 重修C++之并发实战4.1

上一篇:03 重修C++之并发实战3.5-3.8(3end)

文章目录
    • 03 重修C++之并发实战4.1
      • 4.1等待事件或其它条件
        • 4.1.1 用条件变量等待
        • 4.1.2 使用条件变量建立一个线程安全的队列

【同步并发操作】

有些时候我们不仅要保护数据,还需要在独立的线程上进行同步操作。例如,一个线程在能够完成其认为u之前可能需要等待另一个线程完成任务。一般来说一个西安城等待特定事件的发生或是一个条件变为true是常见的。虽然通过定期检查“任务完成”的标识或是在共享数据中存储类似的东西也能实现这种功能,但是效果和效率却不够理想。对于这样的需求,C++标准库提供了以**条件变量(condition)和期值(future)**为形式的工具来处理它。

4.1等待事件或其它条件

如果一个线程正等待另一个线程完成一项任务,那么它有以下几个选择:

  1. 它可以一直检查共享数据(由互斥元保护)中的标识,并且让执行任务的线程在完成任务时设置该标识。这个方法是有一定问题的,首先线程占用了宝贵的时间反复检查该标识,其次就是当互斥元被等待的线程锁定后就不能被任何其它线程锁定。这两个问题都造成了线程的忙等,限制了等待中的线程的可用资源,甚至可能导致完成任务的时候无法设置标识。
  1. 第二个选择是使用std::this_thread::sleep_for()函数,让等待中的线程在检查之间休眠一会。这是一个进步,因为在线程休眠时并不浪费处理时间,但是得到正确的休眠时间是很难的。检查之间休眠过短仍然会浪费处理时间,过长又会导致延迟,而且更多时候等待的时间是不固定的,可能会出现不同时间段需要休眠的时间各不相同。
#include 
#include 
#include 

bool flag = false; //任务完成标志
std::mutex m;

void wait_for_flag()
{
    std::unique_lock lk(m);
    
    while (!flag) //循环等待
    {
        lk.unlock(); //休眠之前解锁互斥元
        std::cout << "waiting target finish...... ......" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        lk.lock(); //休眠之后解锁互斥元 以保证其它线程能够设置完成标识
    }
    std::cout << "target has finished!" << std::endl;
    
}

void target()
{
    std::cout << "" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    
    std::unique_lock lk(m);
    flag = true;
    std::cout << "" << std::endl;

}

int main(int argc, const char** argv) {
    
    std::thread t1(wait_for_flag);
    std::thread t2(target);

    t1.join();
    t2.join();

    return 0;
}

waiting target finish...... ......
waiting target finish...... ......
waiting target finish...... ......
waiting target finish...... ......
waiting target finish...... ......
waiting target finish...... ......
waiting target finish...... ......
waiting target finish...... ......
waiting target finish...... ......
waiting target finish...... ......

target has finished!
*************************************************/
  1. 第三个选择,也是目前最好的选择,使用C++标准库提供的工具来等待事件本身。等待由另一个线程触发一个事件的最基本机制。从概念上说,条件变量与某些事件或其他条件相关,并且一个或多个线程可以等待该条件被满足。当某个线程已经确定条件得到满足,他就可以通过通知一个或多个正在条件变量上进行等待的线程,以便唤醒它们继续处理。下面将详细介绍这种方法。
4.1.1 用条件变量等待

标准C++库提供了两个条件变量的实现:std::condition_variable和std::condition_variable_any。这两个实现都在库的头文件声明。两者都需要和互斥元一起工作,以便提供恰当的同步;前者仅限于和std::mutex一起工作,而后者可以与符合称为雷士互斥元的最低标准的任何东西一起工作,因此以_any为后缀。因为std::condition_variable_any泛用性更强,所以会有大小、性能或者操作系统资源方面形式的额外代价的可能,因此一般都会优先考虑std::condition_variable。下面的示例将展示条件变量的使用。

#include 
#include 
#include 
#include 
#include 
#include 

std::mutex mut; 
std::queue data_queue; //传递数据的队列
std::condition_variable data_cond; //条件变量

void data_preparation_thread() //准备数据线程
{
    for (int i = 0; i < 10; i++)
    {   //上锁 入队
        std::lock_guard lk(mut);
        data_queue.push(i);
        std::cout << "----------------------" << std::endl;
        //通知
        data_cond.notify_one();
        //休眠1sm不然执行太快看不到效果
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    } 
}

void data_processing_thread() //处理产生的数据
{
    while (true)
    {
        //获取锁的状态
        std::unique_lock lk(mut);
        //等待通知 并检查队列状态
        data_cond.wait(lk, []{ //可以不传参数 或者传一个现有的检查函数
            return !data_queue.empty();
        });
        //取数据 解锁
        int data = data_queue.front();
        data_queue.pop();
        lk.unlock(); //这个地方解锁和不解锁好像没区别,有待研究
        //打印数据
        std::cout << data << std::endl;
        if(data == 9)
        {
            break;
        }
    }
}

int main(int argc, const char** argv) {
    std::thread t1(data_processing_thread);
    sleep(1);
    std::thread t2(data_preparation_thread);

    t1.join();
    t2.join();
    return 0;
}

现在整理一下流程,首先当数据准备就绪时,准备数据线程使用std::lock_guard锁定保护队列的互斥元,并将数据入队;然后在std::condition_varoable的实例上调用notify_one()通知等待中的线程。在另一侧如果有处理线程。该线程会锁定互斥元,但是这次使用的是unique_lock,不使用std::lock_guard的原因是该线程下面会在std::condition_varoable的实例上调用wait()传入锁对象以及等待 条件的lambda函数,以检查队列中是否有数据,如果返回false就解锁互斥元并将该线程置于阻塞等待状态。当来自数据准备线程调用对应条件变量的notify_one()时,处理线程从睡眠中被唤醒,重新获取到互斥元上的锁并检查条件,如果条件满足就从wait返回(此时互斥元依然是锁定状态),如果条件不满足,该线程就解锁互斥元,并从新回到阻塞状态等待唤醒。这就是不使用std::lock_guard的原因,如果使用数据处理线程使用std::lock_guard,那么互斥元将始终被锁定,数据准备线程会一直无法锁定互斥元,队列就会无法更新,数据线程就会一直在等待。

条件变量可能会对所提供的检查条件检查任意多次。然而,这总是在互斥元被锁定的情况下这样做,并且当且仅当测试条件返回true时立即返回。当等待线程重新获取互斥元并检查条件时,如果它并非是直接相应另一个线程的通知,那么这种就是所谓的伪唤醒。由于这种伪唤醒的次数和频率根据定义是不确定的,所以使用对于条件检查具有副作用的函数是不可取的。如果非要这么做就要准备好多次产生副作用的准备。

使用队列在线程之间传输数据,是很常见的场景。做得好的话,同步可以被现在在队列本身,大大减少了同步问题和竞争条件大概得数量。

4.1.2 使用条件变量建立一个线程安全的队列

如果要设计一个泛型队列,就需要好好考虑需要怎么组织数据结构怎么设计接口。首先除去必要的构造函数、析构函数、赋值和交换函数,那么还需要查询队列状态的函数(empty和size)、查询队列元素(front和back)、以及修改队列(push、pop和emplace),这里为了简化操作,将查询和修改操作合并,直接设置到push和pop操作中,但是由于该队列要在多线程中提供线程安全的队列模式,所以在出队的地方要考虑到阻塞(wait_and_pop)和非阻塞(try_pop)的操作。然后我们需要一个内置锁,一个条件变量还有一个承载数据的队列容器。大致定义如下:

#include 
#include 
#include 

template
class threadsafe_queue
{
private:
    mutable std::mutex queue_mutex; //互斥元必须是可变的
    std::condition_variable queue_cond;
    std::queue data_queue;

public:
    threadsafe_queue() { }
    threadsafe_queue(const threadsafe_queue &) { }
    //删除移动构造函数和赋值操作符
    threadsafe_queue(threadsafe_queue &&) = delete;  
    threadsafe_queue &operator=(threadsafe_queue &&) = delete;
    threadsafe_queue &operator=(const threadsafe_queue &) = delete;
    
    ~threadsafe_queue() { }

public: //这里的阻塞非阻塞是相对队列是否为空 不是能不能拿到锁
    void push(T new_value); //入队
    //非阻塞出队 必须携带能反映是否成功的标识
    bool try_pop(T& value); 
    std::shared_ptr try_pop();
	//阻塞出队
    void wait_and_pop(T& value);
    std::shared_ptr wait_and_pop();
	//判空
    bool empty() const;

};

下面开始实现功能,注意模板类的方法实现必须在头文件中,否则编译成库的时候找不到。

#include 
#include 
#include 

template
class threadsafe_queue
{
private:
    mutable std::mutex queue_mutex;
    std::condition_variable queue_cond;
    std::queue data_queue;

public:
    threadsafe_queue() { }
    threadsafe_queue(const threadsafe_queue &other) 
    {
        std::lock_guard lk(other.queue_mutex);
        this->data_queue = other.data_queue;
    }
    
    threadsafe_queue(threadsafe_queue &&) = delete;  
    threadsafe_queue &operator=(threadsafe_queue &&) = delete;
    threadsafe_queue &operator=(const threadsafe_queue &) = delete;
    ~threadsafe_queue() { }

public:
    void push(T new_value)
    {
        std::lock_guard lk(queue_mutex);
        data_queue.push(new_value);
        queue_cond.notify_one();
    }
    
    bool try_pop(T& value)
    {
        std::lock_guard lk(queue_mutex);
        if (data_queue.empty())
            return false;
        value = data_queue.front();
        data_queue.pop();
        return true;
    }
    std::shared_ptr try_pop()
    {
        std::lock_guard lk(queue_mutex);
        if (data_queue.empty())
            return std::shared_ptr ();
        std::shared_ptr ret( std::make_shared(data_queue.front()) );
        data_queue.pop();
        return ret;
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock lk(queue_mutex);
        queue_cond.wait(lk, [&]{ return !data_queue.empty(); });
        value = data_queue.front();
        data_queue.pop();
    }
    std::shared_ptr wait_and_pop()
    {
        std::unique_lock lk(queue_mutex);
        queue_cond.wait(lk, [&]{ return !data_queue.empty(); });
        std::shared_ptr ret( std::make_shared(data_queue.front()) );
        data_queue.pop();
        return ret;
    }

    bool empty() const
    {
        std::lock_guard lk(queue_mutex);
        return data_queue.empty();
    }

};

上述代码经过测试,功能都能可靠运行,特点是wait_and_pop会一直等,try_pop则会一直返回(由于数据push是很快的,所以try_pop不会因为等锁而一直等待,只要push一释放就很快会拿到锁返回,无论是否成功)。
【2021.11.04】
下一篇:待更新。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/429790.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号