栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【C++】并发编程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【C++】并发编程

一、线程创建与管理 1.1 并发 1.1.1 并发与并行

并发:同一时间段内可以交替处理多个操作,强调同一时段内交替发生。

并行:同一时刻内同时处理多个操作,强调同一时刻点同时发生。

1.1.2 硬件并发与任务切换

单片机上的单核处理器支持并发多任务处理,依靠任务切换实现,与多核处理器上的多任务并发处理方式不同。

  • 双核处理器并行执行(硬件并发)对比单核处理器并发执行(任务上下文切换)

  • 双核处理器均并发执行(一般任务数远大于处理器核心数,多核并发更常见)

1.1.3 多线程并发与多进程并发

任务

为达到某一目的而进行的一系列操作,在计算机中主要指由软件完成的一个活动;一个任务既可以是一个进程,也可以是一个线程。

举例:读取数据并放入内存,可以通过进程实现,也可以通过线程实现。

进程

系统中并发执行的单位,资源分配的基本单位,也可能作为调度运行的单位,拥有独立的数据空间和代码空间。

举例:用户运行程序,系统即创建一个进程,在为其分配资源后放入就绪队列,当被进程调度程序选中时,为其分配CPU及其他相关资源,该进程开始运行。

线程

执行处理器调度的基本单位。一个进程由一个或多个线程构成,各线程共享相同的代码和全局数据,同时也有各自私有的堆栈。

总结

进程与线程的区别在于:进程有独立的全局数据,线程存在于进程中,故一个进程的所有线程共享该进程的全局数据。由于线程共享同样的系统区域,所以操作系统分配给一个进程的资源对该进程的所有线程都是可用的。

在MAC、Windows NT等采用微内核的操作系统中,进程只作为资源分配的单位,不再是调度运行的单位。在微内核系统中,真正调度运行的基本单位是线程。因此实现并发功能的单位是线程。在Linux系统中,线程只作为特殊的进程存在,二者不做过多区分。

多进程并发与多线程并发的区别主要在于有没有共享数据:多进程间的通信较复杂且代价较大,常见的进程间通信方式有管道、信号、文件、套接字等(C++未提供进程间通信的原生支持);多线程本身共享进程的全局数据。

1.2 如何使用并发 1.2.1 为什么使用并发

程序使用并发的原因有两种:关注点分离和提高性能。

关注点分离

通过将相关的代码放在一起并与无关的代码分开,可以使程序更容易理解和测试,从而减少出错的可能性。使用并发可以分隔不同的功能区域,程序中不同的功能,使用不同的线程去执行。当为了分离关注点而使用多线程时,设计线程的数量的依据,不再是依赖于CPU中的可用内核的数量,而是依据概念上的设计(依据功能的划分)。

提高性能

为了充分发挥多核心处理器的优势,使用并发将单个任务分成几部分且各自并行运行,从而降低总运行时间。根据任务分割方式的不同,又可以将其分为两大类:一类是对同样的数据应用不同的处理算法(任务并行);另一类是用同样的处理算法共同处理数据的几部分(数据并行)。

运行越多的线程,操作系统需要为每个线程分配独立的栈空间,需要越多的上下文切换,这会消耗很多操作系统资源,如果在线程上的任务完成得很快,那么实际执行任务的时间要比启动线程的时间小很多,所以在某些时候,增加一个额外的线程实际上会降低,而非提高应用程序的整体性能,此时收益比不上成本。

1.2.2 在C++中使用并发和多线程

C++11标准中引入了多线程,提供语言级别的多线程原生支持;在此之前需要借助编译器厂商提供的平台相关的扩展多线程API来实现并发编程。

1.3 C++线程创建

函数并发运行时需要确保共享数据的并发访问是安全的。

1.3.1 C++11标准多线程支持库
多线程库功能
thread提供线程创建及管理的函数或类接口
mutex为线程提供获得独占式资源访问能力的互斥算法,保证多个线程对共享资源的同步访问
condition_variable允许一定量的线程等待(可以定时)被另一线程唤醒,然后再继续执行
future提供了一些工具来获取异步任务(即在单独的线程中启动的函数)的返回值,并捕捉其所抛出的异常
atomic为细粒度的原子操作(不能被处理器拆分处理的操作)提供组件,允许无锁并发编程
1.3.2 线程创建示例

线程创建和管理的函数或类主要由< thread >库文件来提供,该库文件的主要操作如下:

操作效果
thread t默认构造函数,构造不表示线程的 thread 对象(nonjoinable)
thread t(f, …)构造新的 std::thread 对象并将它与执行线程关联,f可调用对象将被启动于一个线程中 或 抛出 std::system_error
thread t(rv)移动构造函数,构造表示曾为 rv 所表示的执行线程的 thread 对象。此调用后 other 不再表示执行线程(nonjoinable)
t.~thread()销毁*this,若t是joinable则调用std::terminate()
t = rv移动赋值,将rv状态移动赋值到t,若t是joinable则调用std::terminate()
t.joinable检查 t 是否有一个关联线程(joinable),若是则返回true。
t.join()等待关联线程完成工作(joinable),然后令t变成nonjoinable;若t不是joinable则抛出std::system_error
t.detach()解除t与线程的关联(joinable)并让线程继续运行,然后令t变成nonjoinable;若t不是joinable便抛出std::system_error
t.get_id()返回std::thread::id(t的唯一标识符)
t.native_handle()返回依赖于平台的类型native_handle_type,用于不具可移植性的扩展

通过std::thread t(f, args…)创建线程,可以给线程函数传递参数。通过join()函数关联并阻塞线程,等待该线程执行完毕后继续;通过detach()函数解除关联使线程可以与主线程并发执行,但若主线程执行完毕退出后,detach()解除关联的线程即便没有执行完毕,也将自动退出(避免此类情况发生)。

说明:

复制构造函数被删除,thread 不可复制,即不存在两个 std::thread 对象表示同一执行线程。移动或按值复制线程函数的参数,若需要传递引用参数给线程函数,则必须使用 std::ref 或 std::cref 包装。

示例

  
#include 
#include 
#include 
 
// 可调用对象:函数
void thread_function(int n)
{
    // 获取线程ID
    std::thread::id this_id = std::this_thread::get_id();			

    for(int i = 0; i < 5; i++){    
        std::cout << "Child function thread " << this_id<< " running : " << i+1 << std::endl;
        // 进程睡眠n秒
        std::this_thread::sleep_for(std::chrono::seconds(n));   	
    }
}

// 可调用对象:仿函数
class Thread_functor
{
public:
    void operator()(int n)
    {
        std::thread::id this_id = std::this_thread::get_id();

        for(int i = 0; i < 5; i++){
            std::cout << "Child functor thread " << this_id << " running: " << i+1 << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(n));   
        }
    }	
};
 
int main()
{
    // 通过 可调用对象-函数 构造。
    std::thread mythread1(thread_function, 1);
    // 判断是否mythread1是否关联线程
    if(mythread1.joinable())
    {
        // 合并线程:阻塞主线程等待mythread1关联的线程完成工作。
        mythread1.join(); 
    }
    
    // 通过 可调用对象-仿函数 构造。
    Thread_functor thread_functor;
    std::thread mythread2(thread_functor, 3);
    if(mythread2.joinable())
    {
        // 分离线程:使子线程和主线程并行运行,主线程不再等待子线程。
        mythread2.detach();
    }                         

    // 可调用对象:Lambda表达式
    auto thread_lambda = [](int n){
        std::thread::id this_id = std::this_thread::get_id();
        for(int i = 0; i < 5; i++)
        {
            std::cout << "Child lambda thread " << this_id << " running: " << i+1 << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(n));   
        }       
    };

    // 通过 可调用对象-Lambda表达式 构造。
    std::thread mythread3(thread_lambda, 4);     
    if(mythread3.joinable())
    {
        mythread3.join();
    }

    // 获取主线程ID
    std::thread::id this_id = std::this_thread::get_id();
    for(int i = 0; i < 5; i++){
        std::cout << "Main thread " << this_id << " running: " << i+1 << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    
#ifdef WIN32
    system("pause");
#else
    getchar();
#endif
    
    return 0;
}

线程创建的参数是函数对象,包括函数指针、成员函数指针、仿函数及lambda表达式。以上示例分别用三种函数对象创建了三个线程,其中第一个线程mythread1阻塞等待其执行完后继续往下执行,第二个线程mythread2不阻塞等待在后台与后面的第三个线程mythread3并发执行,第三个线程继续阻塞等待其完成后再继续往下执行主线程任务。为了便于观察并发过程,对三个线程均用了睡眠延时this_thread::sleep_for(duration)函数。

补充

针对任何线程(包括主线程),< thread > 声明了命名空间std::this_thread,用以提高线程专属的全局函数。函数声明和效果见下表:

操作效果
this_thread::get_id()获取当前线程的ID
this_thread::sleep_for(dur)将某个线程阻塞dur时间段
this_thread::sleep_until(tp)将某个线程阻塞到tp时间点
this_thread::yield()建议释放控制以便重新调度使下一个线程能够执行
二、线程同步之互斥锁 2.1 什么是线程同步

多线程并发: 在同一时间段内交替处理多个操作,线程切换时间片很短(一般为毫秒级),一个时间片多数时候来不及处理完对某一资源的访问。

线程间通信: 一个任务被分割为多个线程并发处理,多个线程可能都要处理某一共享内存的数据,多个线程对同一共享内存数据的访问需要准确有序。

如果多个进程都需要访问相同的共享内存数据,进行读取和写入(数据并发访问或数据竞争),就需要使读写有序(同步化),否则可能会造成数据混乱,无法得到预期的结果。

同步:是指在不同进程之间的若干程序片断,它们的运行必须严格按照规定的某种先后次序来运行,这种先后次序依赖于要完成的特定的任务。

如果用对资源的访问来定义的话,同步是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源。

互斥:是指散布在不同进程之间的若干程序片断,当某个进程运行其中一个程序片段时,其它进程就不能运行它们之中的任一程序片段,只能等到该进程运行完这个程序片段后才可以运行。

如果用对资源的访问来定义的话,互斥某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。

多个线程对共享内存数据访问的竞争条件的形成,取决于一个以上线程的相对执行顺序,每个线程都抢占资源以完成自己的任务。C++标准中对数据竞争的定义是:多个线程并发的去修改一个独立对象,数据竞争是未定义行为的起因。

2.2 如何处理数据竞争

数据竞争源于并发修改同一数据结构,则最简单的处理数据竞争的方法就是对该数据结构采用某种保护机制,确保只有进行修改的线程才能看到数据被修改的中间状态,从其他访问线程的角度看,修改不是已经完成就是还未开始。C++标准库提供了很多类似的机制,最基本的就是互斥量,由< mutex >库文件专门支持对共享数据结构的互斥访问。

2.2.1 lock与unlock保护共享资源

Mutex,全名mutual exclusion(互斥体),是个object对象,用来协助采取独占排他方式控制对资源的并发访问。此处的资源可能是个对象,或多个对象的组合。为了获得独占式的资源访问能力,相应的线程必须锁定(lock) mutex,以防止其他线程也锁定mutex,直到该线程解锁(unlock) mutex。

mutex类的主要操作函数见下表

操作效果
mutex m默认构造函数,构造未锁定(unlocked)的mutex对象
m.~mutex()销毁mutex(要求未被锁定)
m.lock()尝试锁住mutex(阻塞)
m.try_lock()尝试锁住mutex(锁定成功返回true)
m.try_lock_for(dur)尝试在时间段dur内锁定(锁定成功返回true)
m.try_lock_until(tp)尝试在时间点tp之前锁定(锁定成功返回true)
m.unlock()解除mutex(要求被锁定)
m.native_handle()返回依赖于平台的类型native_handle_type,用于不具可移植性的扩展

示例:


#include  
#include 
#include 
#include 

std::chrono::milliseconds interval(100);

std::mutex mutex;
// 多线程共享的全局变量,多线程操作,使用mutex保护
int job_shared = 0; 
// 单线程独占的全局变量,无需mutex保护
int job_exclusive = 0;

void job_1()
{
    mutex.lock();
    // 持锁等待
    std::this_thread::sleep_for(5 * interval);  
    ++job_shared;
    std::cout << "job_1 shared (" << job_shared << ")n";
    mutex.unlock();
}

void job_2()
{
    while (true) 
    {    
        // 无限循环,尝试获得锁
        if (mutex.try_lock()) 
        {     
            // 尝试获得锁成功则修改'job_shared'
            ++job_shared;
            std::cout << "job_2 shared (" << job_shared << ")n";
            mutex.unlock();
            return;
        }
        else 
        {      
            // 尝试获得锁失败则修改'job_exclusive'
            ++job_exclusive;
            std::cout << "job_2 exclusive (" << job_exclusive << ")n";
            std::this_thread::sleep_for(interval);
        }
    }
}

int main()
{
    std::thread thread_1(job_1);
    std::thread thread_2(job_2);

    thread_1.join();
    thread_2.join();

#ifdef WIN32
    system("pause");
#endif

    return 0;
}

以上程序创建了两个线程和两个全局变量,其中一个全局变量job_exclusive是排他的,两线程并不共享,不会产生数据竞争,所以不需要锁保护。另一个全局变量job_shared是两线程共享的,会引起数据竞争,因此需要锁保护。线程thread_1持有互斥锁lock的时间较长,线程thread_2为免于空闲等待,使用了尝试锁try_lock,如果获得互斥锁则操作共享变量job_shared,未获得互斥锁则操作排他变量job_exclusive,提高多线程效率。

2.2.2 lock_guard与unique_lock保护共享资源

lock与unlock必须成对合理配合使用,使用不当可能会造成资源被永远锁住,甚至出现死锁(两个线程在释放它们自己的lock之前彼此等待对方的lock)。C++针对lock与unlock引入智能锁lock_guard与unique_lock,使用RAII技术对普通锁进行封装,达到智能管理互斥锁资源释放的效果。

与之类似的new和delete,若使用不当可能会造成内存泄漏等严重问题,为此C++引入了智能指针shared_ptr与unique_ptr。智能指针借用了RAII技术(Resource Acquisition Is Initialization—资源获取即初始化,使用类来封装资源的分配和初始化,在构造函数中完成资源的分配和初始化,在析构函数中完成资源的清理,可以保证正确的初始化和资源释放)对普通指针进行封装,达到智能管理动态内存释放的效果。

lock_guard操作如下

操作效果
lock_guard lg(m)为 mutex m 建立一个 lock_guard 并锁定之
lock_guard lg(m, adopt_lock)为已被锁定的 mutex m 建立一个 lock_guard
lg.~lock_guard()解锁 mutex m 并销毁 lock_guard

unique_lock操作如下

操作效果
unique_lock l默认构造函数,建立一个 lock_guard 但不关联任何 mutex
unique_lock l(m)为 mutex m 建立一个 lock_guard 并锁定
unique_lock l(m, adopt_lock)为已锁定的 mutex m 建立一个 lock_guard
unique_lock l(m, defer_lock)为 mutex m 建立一个 lock_guard 但不锁定
unique_lock l(m, try_lock)为 mutex m 建立一个 lock_guard 并试图锁定
unique_lock l(m, dur)为 mutex m 建立一个 lock_guard 并试图在时间段 dur 内锁定
unique_lock l(m, tp)为 mutex m 建立一个 lock_guard 并试图在时间点 tp 前锁定
unique_lock l(rv)移动构造函数,将 lock state 从 rv 移到 l(rv 不再关联任何 mutex)
l.~unique_lock()解锁 mutex (若被锁定)并销毁 lock_guard
unique_lock l = rv移动赋值,将 lock state 从 rv 移到 l(rv 不再关联任何 mutex)
swap(l1, l2)交换lock
l1.swap(l2)交换lock
l.release()返回指向关联的 mutex 的指针并释放 mutex
l.owns_lock()若关联的 mutex 被锁定则返回 true
if(l)检查关联的 mutex 是否被锁定
l.mutex()返回指向关联的 mutex 的指针
l.lock()锁住关联的 mutex
l.try_lock()尝试锁住关联的 mutex (若成功则返回 true)
l.try_lock_for(dur)尝试在时间段 dur 内锁住关联的 mutex(若成功则返回 true)
l.try_lock_until(tp)尝试在时间点 tp 之前锁住关联的 mutex(若成功则返回 true)
l.unlock()解除关联的 mutex

lock_guard与unique_lock描述及策略对比

类模板描述策略
std::lock_guard严格基于作用域(scope-based)的锁管理类模板,构造时是否加锁是可选的(不加锁时假定当前线程已经获得锁的所有权—使用std::adopt_lock策略),析构时自动释放锁,所有权不可转移,对象生存期内不允许手动加锁和释放锁std::adopt_lock
std::unique_lock更加灵活的锁管理类模板,构造时是否加锁是可选的,在对象析构时如果持有锁会自动释放锁,所有权可以转移。对象生命期内允许手动加锁和释放锁std::adopt_lock std::defer_lock std::try_to_lock

对mutex_1.cpp示例进行修改:

  • job_1函数:将普通锁 lock/unlock 替换为智能锁 lock_guard

    void job_1()
    {
        // 获取RAII智能锁,离开作用域会自动析构解锁
        std::lock_guard lockg(mutex);    
        std::this_thread::sleep_for(5 * interval);
        ++job_shared;
        std::cout << "job_1 shared (" << job_shared << ")n";
    }
    
  • job_2函数:将普通锁 lock/unlock 替换为智能锁 unique_lock( 使用尝试锁策略:std::try_to_lock )

    void job_2()
    {
        while (true) 
        {   
            // 以尝试锁策略创建智能锁
            std::unique_lock ulock(mutex, std::try_to_lock);		
            if (ulock) 
            {
                ++job_shared;
                std::cout << "job_2 shared (" << job_shared << ")n";
                return;
            } else 
            { 
                ++job_exclusive;
                std::cout << "job_2 exclusive (" << job_exclusive << ")n";
                std::this_thread::sleep_for(interval);
            }
        }
    }
    
2.2.3 timed_mutex与recursive_mutex提供更强大的锁

互斥量 mutex 提供的普通锁 lock/unlock 和智能锁 lock_guard/unique_lock,基本可满足大多数对共享数据资源的保护需求。但缺乏某些更复杂的功能:如线程函数中嵌套调用对共享资源的嵌套锁定需求(mutex在一个线程中只允许锁定一次),以及可阻塞特定时长的锁。针对以上需求,< mutex >库提供以下互斥类,对比见下表:

类模板描述
std::mutex同一时间只可被一个线程锁定。如果它被锁住,任何其他lock()都会阻塞(block),直到这个mutex再次可用,且try_lock()会失败。
std::recursive_mutex允许在同一时间多次被同一线程获得其lock。其典型应用是:函数捕获一个lock并调用另一函数而后者再次捕获相同的lock。
std::timed_mutex额外允许传递一个时间段或时间点,用来定义多长时间内它可以尝试捕获一个lock。提供接口:try_lock_for(duration) 和 try_lock_until(timepoint)。
std::recursive_timed_mutex允许同一线程多次取得其lock,且可指定期限。

示例如下:


#include  
#include 
#include 
#include 

std::chrono::milliseconds interval(100);

std::timed_mutex tmutex;
// 多线程共享的全局变量,多线程操作,使用mutex保护
int job_shared = 0; 
// 单线程独占的全局变量,无需mutex保护
int job_exclusive = 0;

void job_1()
{
    std::lock_guard lockg(tmutex);
    // 持锁等待
    std::this_thread::sleep_for(5 * interval);  
    ++job_shared;
    std::cout << "job_1 shared (" << job_shared << ")n";
}

void job_2()
{
    while (true) 
    {    
        // 无限循环,尝试获得锁(策略:暂不锁定)
        std::unique_lock ulock(tmutex, std::defer_lock);
        if (ulock.try_lock_for(3 * interval)) 
        {     
            // 尝试获得锁成功则修改'job_shared'
            ++job_shared;
            std::cout << "job_2 shared (" << job_shared << ")n";
            return;
        }
        else 
        {      
            // 尝试获得锁失败则修改'job_exclusive'
            ++job_exclusive;
            std::cout << "job_2 exclusive (" << job_exclusive << ")n";
            std::this_thread::sleep_for(interval);
        }
    }
}

int main()
{
    std::thread thread_1(job_1);
    std::thread thread_2(job_2);

    thread_1.join();
    thread_2.join();

#ifdef WIN32
    system("pause");
#endif

    return 0;
}
三、线程同步之条件变量 3.1 什么是条件变量

条件变量是线程的另外一种有效同步机制,为线程提供了交互场所(一个线程给另外的一个或者多个线程发送消息),指定条件变量发生的位置,一个线程修改该条件变量变量使其满足其它线程继续向下执行的条件,其它线程则等待接收条件已经发生改变的信号。当条件变量同互斥锁一起使用时,条件变量允许线程以一种无竞争的方式等待任意条件的发生。

3.2 为什么引入条件变量

为解决多线程并发访问共享数据产生的数据竞争问题,通过互斥锁保护共享数据,以保证多线程对共享数据的访问同步有序。但线程等待互斥锁的释放时,通常需要轮询该互斥锁是否已被释放,无法寻找适当的轮询周期,若轮询周期太短则浪费CPU资源,若轮询周期太长则可能互斥锁已被释放,而该线程还在睡眠导致产生延误。

示例

现有两个线程,一个线程向队列中存入数据,一个线程向队列中取出数据,在取数据之前需要判断该队列是否为空;由于此队列被这两个读写线程共享,故需要使用互斥锁保护,以保证一个线程在存数据的同时另一个线程不能取数据,反之亦然。


#include 
#include 
#include 
#include 

// 双端队列标准容器全局变量
std::deque q;
// 互斥锁全局变量
std::mutex mu;

// 生产者:向队列放入数据
void function_1() 
{
    int count = 10;
    while (count > 0) 
    {
        std::unique_lock locker(mu);
        // 数据入队锁保护
        q.push_front(count);			
        locker.unlock();
        // 延时1秒
        std::this_thread::sleep_for(std::chrono::seconds(1));		
        count--;
    }
}
// 消费者:从队列提取数据
void function_2() 
{
    int data = 0;
    while ( data != 1) 
    {
        std::unique_lock locker(mu);
        //  判断队列是否为空
        if (!q.empty()) 
        {			
            data = q.back();
            // 数据出队锁保护
            q.pop_back();			
            locker.unlock();
            std::cout << "t2 got a value from t1: " << data << std::endl;
        } 
        else 
        {
            locker.unlock();
            
        }
    }
}

int main() 
{
    std::thread t1(function_1);
    std::thread t2(function_2);
    t1.join();
    t2.join();

    getchar();
    return 0;
}

分析

在生产过程中,每放入一个数据延迟1秒;在消费过程中,存在while循环,只有在接收到表示结束数据时才停止,每次循环都是先加锁再判断队列非空,然后取出一个数据,最后解锁。经过测试,单核CPU占用率达到100%。

***示例优化(增加轮询间隔)***:

CPU占用率高的原因在于消费者在while循环内等待数据所致,故可使消费者在队列为空时睡眠一会儿(如500ms),如此增加了轮询的间隔周期,极大降低了CPU的占用率。修改后的方案如下:

// 消费者:从队列提取数据
void function_2() 
{
    int data = 0;
    while ( data != 1) 
    {
        std::unique_lock locker(mu);
        //  判断队列是否为空
        if (!q.empty()) 
        {			
            data = q.back();
            // 数据出队锁保护
            q.pop_back();			
            locker.unlock();
            std::cout << "t2 got a value from t1: " << data << std::endl;
        } 
        else 
        {
            locker.unlock();
            // 优化方案:队列为空时,消费者线程休息500ms。
            std::this_thread::sleep_for(std::chrono::milliseconds(500));            
        }
    }
}

以上优化的困难之处在于延长时间(即轮询间隔周期)难以确定,太短将导致过多CPU资源占用,太长会导致无法及时响应造成延误。故引入条件变量来解决此问题,条件变量使用"通知-唤醒"(事件驱动)模型,生产者产出一个数据后通知消费者使用,消费者未接到通知前处于休眠状态以节约CPU资源;当消费者收到通知后被唤醒来处理数据。

3.3 如何使用条件变量

C++标准库在< condition_variable >中提供了条件变量,通过条件变量,一个线程可以唤醒一个或多个处于等待中的其它线程。

条件变量操作函数

操作效果
condvar cv默认构造函数,建立一个条件变量
cv.~condvar()析构函数,销毁条件变量
cv.notify_one()唤醒一个等待中的线程(若存在)
cv.notify_all()唤醒所有等待中的线程
cv.wait(ul)使用unique_lock ul来等待通知
cv.wait(ul, pred)使用unique_lock ul来等待通知,直到pred在一次苏醒之后为结果 true
cv.wait_for(ul, duration)使用unique_lock ul来等待通知,等待期限为 duration
cv.wait_for(ul, duration, pred)使用unique_lock ul来等待通知,等待期限为 duration,或直到 pred 在一次苏醒之后为结果 true
cv.wait_until(ul, timepoint)使用unique_lock ul来等待通知,直到时间点 timepoint
cv.wait_until(ul, timepoint, pred)使用unique_lock ul来等待通知,直到时间点 timepoint,或直到 pred 在一次苏醒之后为结果 true
cv.native_handle()返回依赖于平台的类型native_handle_type,用于不具可移植性的扩展
notify_all_at_thread_exit(cv, ul)在调用所在的线程唤醒所有使用unique_lock ul来等待 cv 的线程

所有通知(notification)都会被自动同步化,所以并发调用notify_one()和notify_all()不会产生问题。

所有等待某个条件变量(condition variable)的线程都必须使用相同的 mutex,当 wait() 家族的某个成员被调用时该 mutex 必须被 unique_lock 锁定,否则会发生不明确的行为。

wait() 函数会执行"解锁互斥量–>陷入休眠等待–>被通知唤醒–>再次锁定互斥量–>检查条件判断式是否为真"几个步骤,这意味着传给 wait 函数的判断式总是在锁定情况下被调用的,可以安全的处理受互斥量保护的对象;但在"解锁互斥量–>陷入休眠等待"过程之间产生的通知(notification)会被遗失。

条件变量的运作如下

  1. 程序同时包含< mutex >和< condition_variable >,并声明一个 mutex 和一个 condition_variable 变量。
  2. 发出"条件已满足"通知的线程(或多个线程之一)必须调用notify_one()或notify_all(),以便条件满足时唤醒处于等待中的一个条件变量。
  3. 等待"条件被满足"通知的线程必须调用wait(),可让线程在条件未被满足时陷入休眠状态,当接收到通知时被唤醒去处理相应的任务。

将上面的cond_var1.cpp程序使用条件变量解决轮询间隔难题的示例代码如下:

***队列多线程存取优化版(使用条件变量)***:


#include 
#include 
#include 
#include 
#include 

// 双端队列标准容器全局变量
std::deque q;
// 互斥锁全局变量
std::mutex mu;
// 条件变量全局变量
std::condition_variable cond;

// 生产者:向队列放入数据
void function_1() 
{
    int count = 10;
    while (count > 0) 
    {
        std::unique_lock locker(mu);
        // 数据入队锁保护
        q.push_front(count);			
        locker.unlock();
        // 向一个等待线程发出"条件已满足"通知
        cond.notify_one();
        // 延时1秒
        std::this_thread::sleep_for(std::chrono::seconds(1));		
        count--;
    }
}
// 消费者:从队列提取数据
void function_2() 
{
    int data = 0;
    while ( data != 1) 
    {
        std::unique_lock locker(mu);
        //  判断队列是否为空
        while (q.empty()) 
        {
            // 解锁互斥量并陷入休眠等待通知到达被唤醒,唤醒后对互斥量加锁
            cond.wait(locker);
        }
	
        data = q.back();
        // 数据出队锁保护
        q.pop_back();			
        locker.unlock();
        std::cout << "t2 got a value from t1: " << data << std::endl;
    }
}

int main() 
{
    std::thread t1(function_1);
    std::thread t2(function_2);
    t1.join();
    t2.join();

    getchar();
    return 0;
}

说明

  1. 在function_2中,判断队列为空时使用 while(q.empty)而非 if(q.empty),原因在于 wait()从阻塞到返回,并不一定是由 notify_one()引起,还可能是因系统的不确定原因唤醒(伪唤醒,与实现机制有关),故需要唤醒后再次检查队列是否为空,若还是为空则继续 wait()阻塞。

  2. 在管理互斥类时使用 std::unique_lock而非 std::lock_guard,因为后者不提供 lock和 unlock接口。

  3. 使用细粒度锁,尽量减小锁的范围,在 notify_one()时,无需处于互斥锁的保护范围内,故先解锁在唤醒。

cond.wait(locker)的另一种写法

wait()的第二个参数可传入一个函数表示检查条件,多使用lambda函数,若该lambda函数返回true,则wait()不阻塞直接返回;若返回false,wait()函数继续阻塞等待唤醒,被伪唤醒后依旧检查lambda函数返回值。

补充

线程同步可以保证多个线程对共享数据的有序访问,但全局共享变量的使用容易增加不同任务或线程间的耦合度,也增加了引入bug的风险,所以全局共享变量应尽可能少用。但多数场景下只需要传递某个线程或任务的执行结果,以便参与后续的运算,不必阻塞等待该线程或任务执行完毕,而是继续执行暂时不需要该线程或任务执行结果参与的运算,当需要该线程执行结果时直接获得,才能更充分发挥多线程并发的效率优势。

四、异步编程 4.1 什么是异步编程

以上章节中的线程同步主要目的是解决对共享数据的竞争访问问题,故线程同步主要是对共享数据的访问同步化(按照既定的先后次序,一个访问需要阻塞等待前一个访问完成才能开始)。而异步编程主要针对任务和线程的执行顺序,即一个任务不需要阻塞等到上一个任务完成后再执行,程序的执行顺序与任务的排列顺序是不一致的。

同步:在发出一个调用后且未得到结果前,该调用不返回;即调用者主动等待调用结果。

异步:调用在发出之后,该调用直接返回,没有返回结果;即当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。

4.2 如何使用异步编程 4.2.1 使用全局变量和条件变量传递结果

条件变量具有"通知-唤醒"功能,可将执行结果或执行状态放入全局变量,当被调用者完成任务后,通过条件变量通知调用者结果或状态已更新。


#include 
#include 
#include 
#include 
#include 
#include 
#include 

// 保存结果的全局变量
int res = 0;	
// 互斥锁全局变量					
std::mutex mu;	
// 全局条件变量					
std::condition_variable cond;       
 
void accumulate(std::vector::iterator first,
                std::vector::iterator last)
{
    // 标准库求和函数
    int sum = std::accumulate(first, last, 0);      
    std::unique_lock locker(mu);
    res = sum;
    locker.unlock();
     // 向一个等待线程发出"条件已满足"的通知
    cond.notify_one();             
}
 
int main()
{
    std::vector numbers = { 1, 2, 3, 4, 5, 6 };
    std::thread work_thread(accumulate, numbers.begin(), numbers.end());

    std::unique_lock locker(mu);
    // 如果条件变量被唤醒,检查结果是否被改变,为真则直接返回,为假则继续等待
    cond.wait(locker, [](){ return res;});   
    std::cout << "result=" << res << 'n';
    locker.unlock();
    // 阻塞等待线程执行完成
    work_thread.join();         
 
    getchar();
    return 0;
}

使用全局变量会导致全局变量较多,增大了多线程间的耦合度。为此,C++提供了< future >库函数来实现异步编程。

4.2.2 使用promise与future传递结果

< future >头文件功能允许对特定提供者设置的值进行异步访问,可能在不同的线程中。
这些提供程序(promise 对象、packaged_task对象 或 对异步的调用async)与future对象共享共享状态:提供者使共享状态就绪的点与future对象访问共享状态的点同步。< future >头文件的结构如下:

类/函数描述
promise为异步检索存储一个值(类模板)
packaged_task打包一个函数,用于存储异步检索的返回值(类模板)
future等待一个异步设置的值(类模板)
shared_future等待一个异步设置的值(可能被其它future所引用)(类模板)
future_error报告与future或promise有关的错误(类)
future_errc识别future的错误代码(枚举)
future_status用于定时的操作的返回值(枚举)
launch指定std::async的启动策略(枚举)
async异步地运行一个函数(可能在一个新的线程中)并返回一个保持结果的std::future(函数模板)
future_category识别future的错误类别(函数)

多线程间传递的返回值或抛出的异常都在共享状态中交流,多线程间并发访问共享数据是需要保持同步,共享状态是保证返回值或异常在线程间正确传递的关键,被调用线程可以通过改变共享状态通知调用线程返回值或异常已写入完毕,可进行访问或操作。future的状态(future_status)有三种:deferred(异步操作还没开始)、ready(异步操作已经完成)和timeout(异步操作超时)。

通过std::promise< T >类模板为线程提供包含共享状态的对象

操作效果
promise p默认构造函数,建立一个带shared_state的promise
promise p(allocator_arg, alloc)建立一个带shared_state的promise,以alloc为分配器
promise p(rv)移动构造函数,建立一个新的promise,取rv状态并移除其shared state
p.~promise()释放shared state,若非ready则存储(无值且非异常)std::future_error异常并夹带broken_promise
p = rv移动赋值,移动rv状态到p
swap(p1, p2)互换p1和p2的状态
p1.swap(p2)互换p1和p2的状态
p.get_future()产生一个future object用以取回shared state(即线程执行结果)
p.set_value(val)设val为值并令状态为ready或抛出std::future_error)
p.set_value_at_thread_exit(val)在当前线程结束时设val为值并令状态为ready(否则抛出std::future_error)
p.set_exception(e)设e为异常并令状态为ready(或抛出std::future_error)
p.set_expection_at_thread_exit(e)在当前线程结束时设e为异常并令状态为ready(或抛出std::future_error)

异步编程:使用promise + future传递结果

#include 
#include 
#include 
#include 
#include 
#include 
 
void accumulate(std::vector::iterator first,
                std::vector::iterator last,
                std::promise accumulate_promise)
{
    int sum = std::accumulate(first, last, 0);
    // 将结果存入,并让共享状态变为就绪以提醒future
    accumulate_promise.set_value(sum);  
}
 
int main()
{
    // 演示用 promise 在线程间传递结果。
    std::vector numbers = { 1, 2, 3, 4, 5, 6 };
    std::promise accumulate_promise;
    std::future accumulate_future = accumulate_promise.get_future();
    std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
                            std::move(accumulate_promise));
    // 等待结果
    accumulate_future.wait();  
    std::cout << "result=" << accumulate_future.get() << 'n';
    // 阻塞等待线程执行完成
    work_thread.join();  
 
    return 0;
}

说明

std::promise< T >构造时,产生一个未就绪的共享状态(包含存储的T值和是否就绪的状态)。可设置T值,并让状态变为ready;也可以通过产生一个future对象获取到已就绪的共享状态中的T值。std::promise< T >对象的成员函数get_future()产生一个std::future< T >对象。

< future > 操作函数如下

操作效果
future f默认构造函数,建立一个future(无效状态 invalid state)
future f(rv)移动构造函数,建立一个新的future对象,状态取自rv并令rv失效
f.~future()销毁future对象(包含状态)
f = rv移动赋值:销毁f的旧状态,并将rv的状态移动到f(rv失效)
f.valid()若f具备有效状态则返回true
f.get()阻塞直到后台完成,迫使被推迟的线程同步启动,产出结果或抛出异常,并使其状态失效
f.wait()阻塞直到后台完成,迫使被推迟的线程同步启动
f.wait_for(dur)阻塞dur时间段,或直到后台操作完成,但被推迟的线程不会同步启动
f.wait_until(tp)阻塞直到时间点tp,或直到后台操作完成,但被推迟的线程不会同步启动
f.share()产生一个shared_future带有当前状态,并令f的状态失效

std::future< T >在多个线程等待时,只有一个线程能获取等待结果。当需要多个线程等待相同的事件的结果(即多处访问同一个共享状态),需要使用std::shared_future< T >来替代std::future < T >,std::future< T >提供了一个将future转换为shared_future的方法f.share(),但转换后原future状态失效(类似于智能指针std::unique_ptr< T >与std::shared_ptr< T >的关系)。

4.2.3 使用packaged_task与future传递结果

除了为一个任务或线程提供一个包含共享状态的变量,还可以直接把共享状态包装进一个任务或线程中,C++通过std::packaged_task< Func >来实现此功能。

通过std::packaged_task< Func >类模板将共享状态包装进线程

操作效果
packaged_task pt默认构造函数,建立一个packaged_task,不带shared state也不带stored task
packaged_task pt(f)为task f建立一个对象
packaged_task pt(alloc, f)为task f建立一个对象,使用分配器alloc
packaged_task pt(rv)移动构造函数,将packaged_task rv(task和state)移至pt(其后rv将不再拥有shared state)
pt.~packaged_task()销毁*this(可能导致shared state变成ready)
pt = rv移动赋值,将packaged_task rv(task和state)移至pt(其后rv将不再拥有shared state)
swap(pt1, pt2)两个packaged_task互换
pt1.swap(pt2)两个packaged_task互换
pt.valid()若pt有一个shared state则返回true
pt.get_future()返回一个future object,用来取回shared state(包含task执行结果)
pt(args)调用task并令shared state变为ready
pt.make_ready_at_thread_exit(args)调用task并在线程退出时令shared state变为ready
pt.reset()为pt建立一个新的shared state(可能导致shared state变成ready)

std::packaged_task< Func >构造时绑定一个函数对象,也产生一个未就绪的共享状态。相比于promise,未提供set_value()公用接口,而是当执行完绑定的函数对象,其执行结果返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中。

异步编程:使用packaged_task+ future传递结果


#include 
#include 
#include 
#include 
#include 
#include 
 
int accumulate(std::vector::iterator first,
                std::vector::iterator last)
{
    int sum = std::accumulate(first, last, 0);
    return sum;
}
 
int main()
{
    // 演示用 packaged_task 在线程间传递结果。
    std::vector numbers = { 1, 2, 3, 4, 5, 6 };
    std::packaged_task::iterator, std::vector::iterator)> accumulate_task(accumulate);
    std::future accumulate_future = accumulate_task.get_future();
    std::thread work_thread(std::move(accumulate_task), numbers.begin(), numbers.end());
    // 等待结果
    accumulate_future.wait();  
    std::cout << "result=" << accumulate_future.get() << 'n';
    // 阻塞等待线程执行完成
    work_thread.join();  
 
    return 0;
}

总结

一般不同函数间传递数据时,主要借助全局变量、返回值、函数参数等来实现。以上第一种方法使用全局变量传递数据,会使得不同函数间的耦合度较高,不利于模块化编程。后面两种方法分别通过函数参数与返回值来传递数据,可以降低函数间的耦合度,使编程和维护更简单快捷。

4.2.4 使用async传递结果

std::promise< T >与std::packaged_task< Func >虽提供了较丰富的异步编程工具,但在使用时既需要创建提供共享状态的对象(promise与packaged_task),又需要创建访问共享状态的对象(future与shared_future),使用较为负责;为此,future头文件封装了更高级别的函数std::async。

关于std::future std::async(std::launch policy, Func, Args)函数

std::async是一个函数而非类模板,其函数执行完后的返回值绑定给使用std::async的std::futrue对象(std::async封装了thread,packged_task的功能,使异步执行一个任务更为方便)。其中,Func为可调用对象,Args是传递给可调用对象的参数,std::launch policy是启动策略,启动策略控制std::async的异步行为,共有以下三种不同的启动策略:

  • std::launch::async:保证异步行为,传递函数将在单独的线程中运行。
  • std::launch::deferred:当其他线程调用 get() / wait() 来访问共享状态时,将调用非异步行为。
  • std::launch::async | std::launch::deferred:默认行为,异步运行与否取决于系统负载。

使用async传递结果


#include 
#include 
#include 
#include 
#include 
#include 
 
int accumulate(std::vector::iterator first,
                std::vector::iterator last)
{
    int sum = std::accumulate(first, last, 0);
    return sum;
}
 
int main()
{
    // 演示用 async 在线程间传递结果。
    std::vector numbers = { 1, 2, 3, 4, 5, 6 };
    // auto可以自动推断变量的类型
    auto accumulate_future = std::async(std::launch::async, accumulate, numbers.begin(), numbers.end());		
    std::cout << "result=" << accumulate_future.get() << 'n';
    
    return 0;
}

五、原子操作与无锁编程 5.1 什么是原子操作

多线程之间通过互斥锁与条件变量来保证共享数据的同步,互斥锁主要是针对过程加锁来实现对共享资源的排他性访问。很多时候,对共享资源的访问主要是对某一数据结构的读写操作,如果数据结构本身就带有排他性访问的特性,也就相当于该数据结构自带一个细粒度的锁,对该数据结构的并发访问就能更加简单高效,C++11提供原子数据类型< atomic >来实现。

原子操作

不可分割的操作,该操作只存在未开始和已完成两种状态,不存在中间状态。

原子类型

原子库中定义的数据类型,对这些类型的所有操作都是原子操作,包括通过原子类模板std::atomic< T >实例化的数据类型,也支持原子操作。

5.2 如何使用原子类型 5.2.1 原子库atomic支持的原子操作

原子库< atomic >提供了一些基本原子类型,也可以通过原子类模板实例化一个原子对象。

基本原子类型

原子类型相关特化类
atomic_boolstd::atomic< bool >
atomic_charstd::atomic< char>
atomic_scharstd::atomic< signed char >
atomic_ucharstd::atomic< unsigned char>
atomic_intstd::atomic< int >
atomic_uintstd::atomic< unsigned >
atomic_shortstd::atomic< short >
atomic_ushortstd::atomic< unsigned short >
atomic_longstd::atomic< long >
atomic_ulongstd::atomic< unsigned long >
atomic_llongstd::atomic< long long>
atomic_ullongstd::atomic< unsigned long long>
atomic_char16_tstd::atomic< char16_t >
atomic_char32_tstd::atomic< char32_t >
atomic_wchar_tstd::atomic< wchar_t >

原子库提供原子操作load()和store(val)对原子类型进行读写。

原子操作

操作trivint typeptr type效果
atomic a = valYYY以val为a的初始值
atomic a; atomic_init(&a, val)YYY同上,无atomic_init则初始化不完整
a.is_lock_free()YYY若类型内部不使用lock则返回true
a.store(val)YYY赋值val
a.load()YYY返回数值a的拷贝
a.exchange(val)YYY赋值val并返回旧值a的拷贝
a.compare_exchange_strong(exp, des)YYYCAS操作
a.compare_exchange_weak(exp, des)YYYweak CAS操作
a = valYYY赋值并返回val的拷贝
a.operator atomic()YYY返回数值a的拷贝
a.fetch_add(val)YY不可切割之 a += val
a.fetch_sub(val)YY不可切割之 a -= val
a += valYY等同于 a.fetch_add(val)
a -=valYY等同于 a.fetch_sub(val)
++a, a++YY调用 a.fetch_add(1) 并返回 a 或 a+1 的拷贝
–a, a–YY调用 a.fetch_sub(1) 并返回a或a-1的拷贝
a.fetch_and(val)Y不可切割之 a &= val,(返回新值的拷贝)
a.fetch_or(val)Y不可切割之 a |= val,(返回新值的拷贝)
a.fetch_xor(val)Y不可切割之 a ^= val,(返回新值的拷贝)
a &= valY等同于 a.fetch_and(val)
a |= valY等同于 a.fetch_or(val)
a ^= valY等同于 a.fetch_xor(val)
5.2.2 原子操作中的内存访问模型

原子操作保证了对数据的访问只有未开始和已完成两种状态,不会访问到中间状态,原子操作函数支持控制读写顺序,即带有一个数据同步内存模型参数std::memory_order,用于对同一时间的读写操作进行排序。C++11定义的6种类型如下:

  • memory_order_relaxed:宽松操作,没有同步或顺序制约,仅对此操作要求原子性。

  • memory_order_release & memory_order_acquire:两个线程A&B,A线程Release后,B线程Acquire能保证一定读到的是最新被修改过的值;这种模型更强大的地方在于它能保证发生在A-Release前的所有写操作,在B-Acquire后都能读到最新值。

  • memory_order_release & memory_order_consume:上一个模型的同步是针对所有对象的,而这种模型只针对依赖于该操作涉及的对象,比如这个操作发生在变量a上,而s = a + b; 那s依赖于a,但b不依赖于a;当然这里也有循环依赖的问题,例如:t = s + 1,因为s依赖于a,那t其实也是依赖于a的。

  • memory_order_seq_cst:顺序一致性模型,这是C++11原子操作的默认模型;大概行为为对每一个变量都进行Release-Acquire操作,当然这也是一个最慢的同步模型。

5.2.3 使用原子类型替代互斥锁编程

#include 
#include 
#include 
#include  

std::chrono::milliseconds interval(100);
// 原子布尔类型,取代互斥量
std::atomic readyFlag(false);    
// 两个线程都能修改'job_shared',将该变量特化为原子类型 
std::atomic job_shared(0); 
// 只有一个线程能修改'job_exclusive',不需要保护
int job_exclusive = 0; 

// 此线程只能修改 'job_shared'
void job_1()
{   
    std::this_thread::sleep_for(5 * interval);
    job_shared.fetch_add(1);
    std::cout << "job_1 shared (" << job_shared.load() << ")n";
    // 改变布尔标记状态为真
    readyFlag.store(true);      
}

// 此线程能修改'job_shared'和'job_exclusive'
void job_2()
{
    // 无限循环,直到可访问并修改"job_shared"
    while (true) 
    {    
        // 判断布尔标记状态是否为真,为真则修改"job_shared"
        if (readyFlag.load()) 
        {   
            
            job_shared.fetch_add(1);
            std::cout << "job_2 shared (" << job_shared.load() << ")n";
            return;
        } 
        else // 布尔标记为假,则修改"job_exclusive"
        {      
            
            ++job_exclusive;
            std::cout << "job_2 exclusive (" << job_exclusive << ")n";
            std::this_thread::sleep_for(interval);
        }
    }
}

int main() 
{
    std::thread thread_1(job_1);
    std::thread thread_2(job_2);
    thread_1.join();
    thread_2.join();

    return 0;
}

5.2.4 使用互斥锁实现自旋锁

自旋锁(spinlock)与互斥锁(mutex)类似,在任一时刻最多只能有一个持有者,但如果资源已被占用,互斥锁会让资源申请者进入睡眠状态,而自旋锁不会引起调用者睡眠,会一直循环判断该锁是否成功获取。自旋锁是专为防止多处理器并发而引入的一种锁,它在内核中大量应用于中断处理等部分(对于单处理器来说,防止中断处理中的并发可简单采用关闭中断的方式,即在标志寄存器中关闭/打开中断标志位,不需要自旋锁)。对于多核处理器来说,检测到锁可用与设置锁状态两个动作需要实现为一个原子操作,如果分为两个原子操作,则可能一个线程在获得锁后设置锁前被其余线程抢到该锁,导致执行错误。故需原子库提供对原子变量"读-修改-写(Read-Modify-Write)"的原子操作,原子类型支持的操作中提供了RMW(Read-Modify-Write)原子操作,如a.exchange(val)与a.compare_exchange(expected, desired)。

标准库专门提供了一个原子布尔类型std::atomic_flag,不同于所有 std::atomic 的特化,它保证是免锁的,不提供load()与store(val)操作,但提供了test_and_set()与clear()操作,其中test_and_set()就是支持RMW的原子操作,可用std::atomic_flag实现自旋锁的功能,代码如下:


#include 
#include 
#include 
#include 

// 初始化原子布尔类型
std::atomic_flag lock = ATOMIC_FLAG_INIT;       
 
void f(int n)
{
    for (int cnt = 0; cnt < 100; ++cnt) 
    {
        // 获得锁
        while (lock.test_and_set(std::memory_order_acquire)); // 自旋
        std::cout << n << " thread Output: " << cnt << 'n';
        // 释放锁
        lock.clear(std::memory_order_release);
    }
}
 
int main()
{
    // 实例化一个元素类型为std::thread的向量
    std::vector v;     
    for (int n = 0; n < 10; ++n) 
    {
        // 以参数(f,n)为初值的元素放到向量末尾,相当于启动新线程f(n)
        v.emplace_back(f, n);       
    }
    for (auto& t : v) 
    {     
        // 遍历向量v中的元素,基于范围的for循环,auto&自动推导变量类型并引用指针指向的内容
        t.join();   // 阻塞主线程直至子线程执行完毕
    }

    return 0;
}

5.3 如何进行无锁编程 5.3.1 什么是无锁编程

在原子操作出现之前,对共享数据的读写可能得到不确定的结果,所以多线程并发编程时要对使用锁机制对共享数据的访问过程进行保护。但锁的申请释放增加了访问共享资源的消耗,且可能引起线程阻塞、锁竞争、死锁、优先级反转、难以调试等问题。基于原子操作的支持,对单个基础数据类型的读、写访问无需锁保护,但对于复杂数据类型比如链表,有可能出现多个核心在链表同一位置同时增删节点的情况,这将会导致操作失败或错序。所以在对某节点操作前,需要先判断该节点的值是否跟预期的一致,如果一致则进行操作,不一致则更新期望值,这几步操作依然需要实现为一个RMW(Read-Modify-Write)原子操作,即CAS(Compare And Swap)原子操作。无锁编程就可以理解为不使用锁机制就可保证多线程间原子变量同步的编程。无锁(lock-free)的实现只是将多条指令合并成了一条指令形成一个逻辑完备的最小单元,通过兼容CPU指令执行逻辑形成的一种多线程编程模型。无锁编程是基于原子操作的,对基本原子类型的共享访问由load()与store(val)即可保证其并发同步,对抽象复杂类型的共享访问则需要更复杂的CAS来保证其并发同步,并发访问过程只是不使用锁机制了,但还是可以理解为有锁止行为的,其粒度很小,性能更高。对于某个无法实现为一个原子操作的并发访问过程还是需要借助锁机制来实现。

5.3.2 CAS原子操作实现无锁编程

CAS原子操作主要是通过函数a.compare_exchange(expected,desired)实现的,其语义为“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,CAS算法的实现伪码如下:

bool compare_exchange_strong(T& expected, T desired) 
{ 
    if( this->load() == expected ) { 
        this->store(desired); 
        return true; 
    } else {
        expected = this->load();
    	return false; 
    } 
}

案例:无锁栈


#include 
#include 

template
class lock_free_stack
{
private:
    struct node
    {
        T data;
        node* next;
        node(const T& data) : data(data), next(nullptr) {}
    };
    std::atomic head;

 public:
    lock_free_stack(): head(nullptr) {}
    void push(const T& data)
    {
        node* new_node = new node(data);
        do
        {
            new_node->next = head.load();   //将 head 的当前值放入new_node->next
        }while(!head.compare_exchange_strong(new_node->next, new_node));
        // 如果新元素new_node的next和栈顶head一样,证明在你之前没人操作它,使用新元素替换栈顶退出即可;
        // 如果不一样,证明在你之前已经有人操作它,栈顶已发生改变,该函数会自动更新新元素的next值为改变后的栈顶;
        // 然后继续循环检测直到状态1成立退出;
    }
    T pop()
    {
        node* node;
        do
        {
            node = head.load();
        }while (node && !head.compare_exchange_strong(node, node->next));

        if(node) 
        {
            return node->data;
        }
    }
};
 
int main()
{
    lock_free_stack s;
    s.push(1);
    s.push(2);
    s.push(3);
    std::cout << s.pop() << std::endl;
    std::cout << s.pop() << std::endl;
    
    return 0;
}

在将数据压栈前,先通过比较原子类型head与新元素的next指向对象是否相等来判断head是否已被其他线程修改,根据判断结果选择是继续操作还是更新期望,而这一切都是在一个原子操作中完成的,保证了在不使用锁的情况下实现共享数据的并发同步。

CAS的缺陷

ABA 问题,假设一个变量 A ,修改为 B之后又修改为 A,CAS 的机制是无法察觉的,但实际上已经被修改过了。(增加版本号解决)

六、线程池

线程池:管理一个任务队列,一个线程队列,每次分配一个任务给一个线程,循环往复。

实现思路:每一个thread创建后,执行调度函数,循环获取task,然后执行。

线程池的组成

  • 任务队列(Task Queue):
  • 线程池(Thread Pool):

线程池与任务队列之间的匹配操作,是典型的生产者-消费者模型,本模型使用了两个工具:一个mutex + 一个条件变量。mutex锁,保证任务的添加和移除(获取)的互斥性;条件变量保证多个线程获取task的同步性:当任务队列为空时,线程应该等待(阻塞)。

6.1 任务队列(Task Queue)

线程池中的线程会持续查询任务队列是否有可用工作,当两个甚至多个线程试图同时执行查询工作时,将引发错误。因此需要对C++的std::queue进行包装,实现一个线程安全的SafeQueue(通过std::mutex限制并发访问)。

SafeQueue代码:

// 线程安全任务队列
template 
class SafeQueue
{
private:
    std::queue _queue;
    std::mutex _mutex;
public:
    SafeQueue() {}
    SafeQueue(SafeQueue&& other) {}
    ~SafeQueue() {}

    bool empty()
    {
        std::unique_lock lock(_mutex);
        return _queue.empty();
    }

    int size()
    {
        std::unique_lock lock(_mutex);
        return _queue.size();
    }

    void enqueue(T& t)
    {
        std::unique_lock lock(_mutex);
        _queue.emplace(t);
    }

    bool dequeue(T& t)
    {
        std::unique_lock lock(_mutex);
        if (_queue.empty())
        {
            return false;
        }
        t = std::move(_queue.front());
        _queue.pop();
        return true;
    }
};

6.2 线程池(Thread Queue)
#ifndef THREAD_POOL_HPP
#define THREAD_POOL_HPP



#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

// 线程安全任务队列
template 
class SafeQueue
{
private:
    std::queue _queue;
    std::mutex _mutex;
public:
    SafeQueue() {}
    SafeQueue(SafeQueue&& other) {}
    ~SafeQueue() {}

    bool empty()
    {
        std::unique_lock lock(_mutex);
        return _queue.empty();
    }

    int size()
    {
        std::unique_lock lock(_mutex);
        return _queue.size();
    }

    void enqueue(T& t)
    {
        std::unique_lock lock(_mutex);
        _queue.emplace(t);
    }

    bool dequeue(T& t)
    {
        std::unique_lock lock(_mutex);
        if (_queue.empty())
        {
            return false;
        }
        t = std::move(_queue.front());
        _queue.pop();
        return true;
    }
};

// 线程池
class ThreadPool
{
private:
    // 线程内置工作类
    class ThreadWorker
    {
    private:
        int _id;
        ThreadPool* _pool;
    public:
        ThreadWorker(ThreadPool* pool, const int id) : _pool(pool), _id(id) {}

        void operator()()
        {
            // 基础函数类func
            std::function func; 
            // 任务队列元素出队标志
            bool dequeued;  
            // 线程池关闭标志:否
            while (!_pool->_shutdown)
            {
                {
                    std::unique_lock lock(_pool->_mutex);
                    // 若任务队列为空,则阻塞当前线程
                    if (_pool->_queue.empty())
                    {
                        // 等待条件变量通知时唤醒
                        _pool->_conditional_lock.wait(lock);
                    }
                    // 任务队列元素出队
                    dequeued = _pool->_queue.dequeue(func);
                }
                // 若出队标志:true,执行工作函数
                if (dequeued)
                {
                    func();
                }
            }
        }
    };

    // 线程池关闭标志
    bool _shutdown;    
    // 工作任务队列
    SafeQueue> _queue;   
    // 工作线程队列
    std::vector _threads; 
    // 线程休眠互斥锁
    std::mutex _mutex; 
    // 条件变量,使线程休眠或唤醒
    std::condition_variable _conditional_lock; 

public:
    ThreadPool(const int threads = 4) : _threads(std::vector(threads)), _shutdown(false) {}
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool(ThreadPool&&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    ThreadPool& operator=(ThreadPool&&) = delete;

    // 声明并分配工作线程,将工作线程放入工作线程队列
    void init()
    {
        for (int i = 0; i < _threads.size(); i++)
        {
            _threads.at(i) = std::thread(ThreadWorker(this, i));
        }
    }

    // 唤醒所有工作线程,并等待所有线程完成工作后关闭线程池
    void shutdown()
    {
        // 线程池关闭标志:是
        _shutdown = true;
        // 唤醒所有工作线程
        _conditional_lock.notify_all();
        for (int i = 0; i < _threads.size(); i++)
        {
            if (_threads.at(i).joinable())
            {
                _threads.at(i).join();
            }
        }
    }

    // 提交函数
    template
    auto submit(F&& f, Args &&...args) -> std::future
    {
        std::function func = std::bind(std::forward(f), std::forward(args)...);
        auto task_ptr = std::make_shared>(func);
        std::function warpper_func = [task_ptr]()
        {
            (*task_ptr)();
        };
        _queue.enqueue(warpper_func);
        _conditional_lock.notify_one();
        return task_ptr->get_future();
    }
};

#endif // !THREAD_POOL_HPP

6.3 测试程序
#include 
#include 
#include "ThreadPool.hpp"

//  真实随机数产生器
std::random_device rd;  
//  生成计算随机数mt​
std::mt19937 mt(rd());  
//  生成-1000到1000之间的离散均匀分布数​
std::uniform_int_distribution dist(-1000, 1000);   
auto rnd = std::bind(dist, mt);

//  设置线程睡眠时间(1000 ~ 3000)ms
void simulate_hard_computation()
{
    std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}

// 打印执行结果,不传递
void multiply(const int a, const int b)
{
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
}

// 通过输出参数传递执行结果
void multiply_output(int& out, const int a, const int b)
{
    simulate_hard_computation();
    out = a * b;
    std::cout << a << " * " << b << " = " << out << std::endl;
}

// 通过返回值传递执行结果
int multiply_return(const int a, const int b)
{
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
    return res;
}

void example()
{
    // 创建线程池:3个线程
    ThreadPool pool(3);

    // 初始化线程池
    pool.init();

    // 提交乘法操作:共30个
    for (int i = 1; i <= 3; ++i)
    {
        for (int j = 1; j <= 10; ++j)
        {
            pool.submit(multiply, i, j);
        }
    }

    // 使用ref传递的输出参数提交函数
    int output_ref;
    auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
    future1.get();
    std::cout << "Last operation result is equals to " << output_ref << std::endl;

    // 使用return返回值提交函数
    auto future2 = pool.submit(multiply_return, 5, 3);
    int res = future2.get();
    std::cout << "Last operation result is equals to " << res << std::endl;

    // 关闭线程池
    pool.shutdown();

}

int main()
{
    example();

    return 0;
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/273583.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号