栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

基于c++11,尽量简单的语法实现线程池

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于c++11,尽量简单的语法实现线程池

文章目录
  • 前言
  • 一、用到的c++11
  • 二、代码
    • 1.threadPool.h
    • 2.threadPool.cpp
    • 3.main.cpp
  • 总结


前言

本人菜鸟一枚,最近在学c++11多线程特性(也是在看面试八股文),看了网上很多代码后手写了一个线程池。代码中如有错误之处,希望各位大佬不吝赐教。


一、用到的c++11
  1. mutex互斥锁
  2. condition_variable 条件变量
  3. functional 函数包装器
  4. atomic 原子操作
  5. thread 线程类
  6. 其它一些c++11常用技术
二、代码 1.threadPool.h

代码如下:

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_

#include 
#include 
#include 
#include 
#include 
#include 
#include 

using Task=std::function;

class Thread_pool{
public:
    Thread_pool(int max,int min);
    ~Thread_pool();

    //向任务队列中添加新任务
    void add_work_queue(Thread_pool* pool,Task task);
    //工作线程执行函数
    static void worker_fun(void* arg);
    //管理者线程执行函数
    static void manager_fun(void* arg);
    //销毁线程池
    bool destroy();
    //得到当前线程池内线程数量
    int get_threads_num();

private:
    std::queue task_queue;   //任务队列
    std::vector work_threads;   //工作线程
    std::thread manager_thread;     //管理者线程--根据实际运行可1.5倍动态增长线程数量(模仿vector只增不减,实际工作中可能并不适合 哈哈)
    int max_num;    //线程池中最大、最小线程数量
    int min_num;
    std::atomic_int cur_num;        //线程池当前的线程数量--原子操作
    std::atomic_int cur_work_num;   //当前正在工作的线程数量--原子操作
    bool isShutdown;    //是否开始销毁线程池

    //声明互斥锁和条件变量:用于同步任务队列
    std::mutex task_queue_mutex;
    std::condition_variable task_queue_cond;
};


#endif
2.threadPool.cpp

代码如下:

#include "threadPool.h"
#include 

Thread_pool::Thread_pool(int max=15,int min=3)
{
    this->max_num=max;
    this->min_num=min;
    this->cur_work_num=0;   //当前正在工作的线程数量为0
    this->cur_num=min;      //起始时创建min个线程
    this->isShutdown=false;
    
    //创建管理者线程
    manager_thread=std::thread(Thread_pool::manager_fun,this);
    //创建工作线程
    for(int i=0;iisShutdown)
        return;
    //注意线程同步
    std::unique_lock lck(pool->task_queue_mutex);
    pool->task_queue.push(task);
    //唤醒一个阻塞的线程
    pool->task_queue_cond.notify_one();	
}
//管理者线程
void Thread_pool::manager_fun(void* arg)
{
    Thread_pool* p=(Thread_pool*)arg;
    while(1){
        if(p->cur_num<=0)
            break;

        //管理者线程每隔3s苏醒一次,检查当前线程池线程数量
        //管理策略:忙碌线程数量>=线程池线程数量*80%   自动扩容1.5倍
        std::this_thread::sleep_for(std::chrono::seconds(3));
        int thread_num=p->cur_num.load();   //原子取值操作
        int work_num=p->cur_work_num.load();
     
        if(work_num>=thread_num*0.8 && thread_num*1.5<=p->max_num){
            int new_num=thread_num*0.5;
            //此处需加锁???
            while(new_num--){
                p->work_threads.emplace_back(std::thread(Thread_pool::worker_fun,p));
                p->cur_num++;   //原子操作
            }
        }
    }
    std::cout<<"manager_thread exitn";
}

//工作线程
void Thread_pool::worker_fun(void* arg)
{
    Thread_pool* p=(Thread_pool*) arg;
    while(1){
        if(p->task_queue.empty()&&p->isShutdown)
            break;

        std::unique_lock lck(p->task_queue_mutex);
        p->task_queue_cond.wait(lck,[=](){return p->task_queue.size()>0||p->isShutdown;});
        //加上p->isShutdown为了解决死锁问题
        if(p->task_queue.empty())
            break;
        //取出任务
        auto t=p->task_queue.front();
        p->task_queue.pop();
        //释放任务队列锁
        lck.unlock(); //在执行任务前必须释放任务队列锁

        p->cur_work_num++;  //忙碌线程数加一
        t();    //执行任务
        p->cur_work_num--;  //忙碌线程数减一
    }
    p->cur_num--;	//线程退出
    std::cout<<"worker_thread exitn";
}
//销毁线程池
bool Thread_pool::destroy()
{
    this->isShutdown=true;
    this->task_queue_cond.notify_all();	//唤醒所有阻塞的线程,前面添加新任务只唤醒一个
    //销毁之前队列中存在的所有任务
    // while(!this->task_queue.empty()){
    // }
    for(int i=0;iwork_threads.size();i++){
        work_threads[i].join();
    }
	this->manager_thread.join();
	
    return true;
}
//得到当前线程池内线程数量
int Thread_pool::get_threads_num()
{
    return this->cur_num.load();    //原子
}

Thread_pool::~Thread_pool()
{

}
3.main.cpp
#include "threadPool.h"
#include 
#include 
//这里用到了linux的线程库pthread.h,用来获得本线程的id
//我的实验环境是deepin+vscode,若在windows环境运行 可删去
std::mutex m;
void fun()
{
    //cout不是线程安全的
    std::unique_lock lck(m);
    std::cout<<"Thread: "<get_threads_num()<add_work_queue(pool,fun);
    }
    
    pool->destroy();
    delete pool;

    return 0;
}


总结 期间经历了一个处理死锁的过程: 当主线程发出销毁线程池的信号后,不再给任务队列添加新任务,如果此时任务队列为空,会导致正处于条件变量wait状态的线程死锁!(最后加了个判断操作 得以解决)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/312081.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号