栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

C++线程池(固定线程数,条件变量通知,future返回)

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C++线程池(固定线程数,条件变量通知,future返回)

参考资料:
  • 《C++并发编程实战》
  • https://en.cppreference.com
代码下载

https://github.com/541380000/csdn_share/tree/main/threadpool_cpp

实现(见注释)
#pragma once
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
using std::thread;
using std::max;
using std::min;

class ThreadPool
{
    using Task = std::function;
    // 执行任务的线程
    std::vector pool;
    // 任务队列
    std::queue tasks;
    // 保护任务队列的锁
    std::mutex taskQueueLock;
    // 条件变量,所有线程在该变量上等待。有任务加入任务队列时,唤醒该条件变量
    std::condition_variable cvTask;
    // 是否已经关闭,关闭后,无法再提交任务
    std::atomic isStopped;
    // 空闲线程个数
    std::atomic idleThreadNumber;
public:
    inline ThreadPool(uint32_t size = 4) : isStopped{ false }
    {
        idleThreadNumber = max(1U, size);
        idleThreadNumber = min(idleThreadNumber.load(), MAX_THREAD_NUMBER);
        for (size = 0; size < idleThreadNumber; ++size)
        {   // 初始化任务执行线程
            pool.emplace_back(
                    [this]
                    {
                        while (!this->isStopped)
                        {
                            std::function task;
                            {   // 在条件变量上等待
                                std::unique_lock lock{ this->taskQueueLock };
                                // 等待,第二个参数Predicate,用于解决假唤醒,see also: https://en.cppreference.com/w/cpp/thread/condition_variable/wait
                                // 当线程池被终止,或者任务队列不为空的时候,不等待
                                // 相当于
                                
                                this->cvTask.wait(lock,
                                                   [this] {
                                                       return this->isStopped.load() || !this->tasks.empty();
                                                   }
                                ); // wait 直到有 task
                                // 线程池已关闭且没有任务可以做
                                if (this->isStopped && this->tasks.empty())
                                    return;
                                // 使用了notify_all后,多个线程被唤醒,但可能该线程没有拿到任务,那就继续等待
                                if (this->tasks.empty()) continue;
                                task = std::move(this->tasks.front()); // 取一個 task
                                this->tasks.pop();
                            }
                            idleThreadNumber--;
                            task();
                            idleThreadNumber++;
                        }
                    }
            );
        }
    }
    inline ~ThreadPool()
    {
        isStopped.store(true);
        cvTask.notify_all(); // 唤醒所有线程,执行任务
        for (std::thread& thread : pool) {
            if (thread.joinable())
                thread.join(); // 等待线程结束
        }
    }

public:
    const uint32_t MAX_THREAD_NUMBER = thread::hardware_concurrency();
    // 提交任务接口
    template
    auto commit(F&& f, Args&&... args) ->std::future
    {
        if (isStopped.load())    // stop == true ??
            throw std::runtime_error("commit on ThreadPool is stopped.");

        // 获取函数的返回值类型
        using RetType = decltype(f(args...));
        // 创建一个新的packaged_task,可以通过packaged_task获得future,进而在函数执行完成后获取返回值
        auto task = std::make_shared >(
                std::bind(std::forward(f), std::forward(args)...)
        );
        std::future future = task->get_future();
        {    // 将任务添加到任务队列
            std::lock_guard lock{ taskQueueLock };
            //
            tasks.emplace(
                    [task]()
                    {
                        (*task)();
                    }
            );
        }
        // 唤醒一个线程进行执行
        cvTask.notify_one();
        return future;
    }
    [[nodiscard]] uint32_t GetIdleThreadNumber() const { return idleThreadNumber; }
    bool IsAllFinished(){
        std::lock_guard lock{ taskQueueLock };
        return tasks.empty();
    }
};
如何使用示例
#include 
#include "threadpool.hpp"
using namespace std;

atomic_int64_t cnt = 0;
int64_t PrintHelloWorld(){
    return cnt++;
}

struct Number{
    bool isnan;
    double value;
    void operator()() const{
        if (isnan)
            cout << "nann";
        else {
            char buf[1024];
            sprintf(buf, "%lfn", value);
            cout << string(buf);
        }
    }
};

int main() {
    ThreadPool pool(10);
    vector results1;
    // 相当于vector> results1;
    results1.reserve(50);
    // 存储future,然后调用get,get方法会阻塞直到future可用
    for (int i=0; i<50; i++){
        results1.emplace_back(pool.commit(PrintHelloWorld));
    }
    for (auto& r : results1){
        cout << r.get() << endl;
    }
    cout << "nn";
    for (int i=0; i<50; i++){
        pool.commit(Number{static_cast(rand() % 2), static_cast(rand() % 200)});
    }
    while(!pool.IsAllFinished());
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(ThreadPool)

set(CMAKE_CXX_STANDARD 17)
# 需要链接pthread
find_package( Threads )
add_executable(ThreadPool main.cpp)
target_link_libraries( ThreadPool ${CMAKE_THREAD_LIBS_INIT} )
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/853474.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号