栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

C++线程池(C语言版本)

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C++线程池(C语言版本)

本线程池代码是在Vscode下创建项目并结合CMakeList.txt使用GCC进行编译的。

我的文件组织形式为:

CMakeList.txt:

cmake_minimum_required(VERSION 3.1.0)
project(ProjectName)#ProjectName 需要你自己取项目名字!

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall ")
set(CMAKE_BUILD_TYPE Debug)
include_directories(${CMAKE_SOURCE_DIR}/include)
file(GLOB SRC_FILES    #注意这里定义都shell 变量 SRC_FILES 一定要对应在add_executable中!
    "${PROJECT_SOURCE_DIR}/src
    Task* taskQ;
    int queueCapacity;      //容量
    int queueSize;          //当前任务个数
    int queueFront;         //队头 -> 取数据
    int queueRear;          //队尾  -> 放数据
    pthread_t managerID;    //管理者线程ID(只有1个!)
    pthread_t *threadIDs;   //工作的线程ID(有许多个!)
    int minNum;
    int maxNum;
    int busyNum;
    int liveNum;
    int exitNum;
    pthread_mutex_t mutexPool; //锁整个线程池 pthread_mutex_t <==> std::mutex(C++11)
    pthread_mutex_t mutexBusy; //锁busyNum变量 pthread_mutex_t <==> std::mutex(C++11)
    pthread_cond_t notFull;    //任务队列是不是满了 pthread_cond_t <==> std::condition_varibale类(C++11)
    pthread_cond_t notEmpty;   //任务队列是不是满了
    int shutdown;              //是不是要销毁线程池,销毁为1,不销毁为0 

};
    //创建线程池并且初始化
    ThreadPool* threadPoolCreate(int min,int max,int queueSize);

    //销毁线程池
    int threadPoolDestroy(ThreadPool* pool);

    //给线程池添加任务
    void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg);

    //获取线程池中工作的线程的个数
    int threadPoolBusyNum(ThreadPool* pool);

    //获取线程池中活着的线程的个数
    int threadPoolAliveNum(ThreadPool* pool);

    //注意:工作的线程一定是活着的!但是,活着的线程不一定在工作!!!
    /
    //工作的线程(消费者线程)的任务函数
    void* worker(void* arg);
    //管理者线程任务函数
    void* manager(void* arg);// (主要用于创建和销毁线程池的!)
    //单个线程退出
    void threadExit(ThreadPool* pool);
#endif

threadpool.cpp:

 #include"threadpool.h"
 #include//用memset函数
 #include//用sleep函数
 #include//使用pthread_self() 打印当前线程的id的函数
 #include//使用malloc操作
 const int ADDORDESTROYNUMBER { 2 };//每次添加/销毁的线程的number个数
struct ThreadPool;


 //创建线程池并且初始化
    ThreadPool* threadPoolCreate(int min,int max,int queueSize){//形参表分别为:最小线程个数,最大线程个数以及队列大小!
        ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
        do{
            if(pool == nullptr){
                printf("malloc threadpool fail ...n");//分配内存失败 return 空
                break;
                // return nullptr;
            }
            //succeed to create a thread pool!
            pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);//工作线程中,分别Max个空间的heap区数组空间
            if(pool->threadIDs == nullptr){
                printf("malloc threadIDs fail ...n");//分配内存失败 return 空
                break;
                // return nullptr;
            }
            //初始化工作的线程IDs们
            memset(pool->threadIDs, 0, sizeof(pthread_t) * max);//memset为cstring中的函数,用来赋值!
            //把线程id数组中的元素全都赋值为0
            pool->minNum = min;
            pool->maxNum = max;
            pool->busyNum = 0;
            pool->liveNum = min;//初始化时以线程的个数最小值来创建活着的线程的个数!
            pool->exitNum = 0;//一开始初始化肯定没有销毁线程的,这个数量要根据程序运行中的状态来decide!


            if(pthread_mutex_init(&pool->mutexPool,nullptr) != 0||
                pthread_mutex_init(&pool->mutexBusy,nullptr) != 0||
                pthread_cond_init(&pool->notEmpty,nullptr) != 0||
                pthread_cond_init(&pool->notFull,nullptr) != 0)
            {
                //此时创建失败
                printf("mutex or condition inti fail ...n");
                break;
                // return 0;
            }

            //此时创建锁mutex和条件变量cond成功!

            //然后创建任务队列
            pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
            //开辟一块arr内存存放任务队列,arr所存储的任务最大值为容量那么大
            pool->queueCapacity = queueSize;
            pool->queueSize = 0;//当前任务数为0个
            pool->queueFront = 0;//因为没有任务,所有头部执行0index
            pool->queueRear = 0;//因为没有任务,所有尾部执行0index
            
            pool->shutdown = 0;//初始化时肯定不能销毁线程池,所有标记为0(自己规定的)

            //创建线程
            pthread_create(&pool->managerID,nullptr,manager,pool);//创建管理者这一个线程
            for(int i=0;ithreadIDs[i],nullptr,worker,pool);
            }

            //如果能成功执行到这里,那么就表示成功执行了线程池!
            //此时直接返回线程池即可!
            return pool;
        }while(0);//只会执行一次!只要有开辟不成功的case,马上break出while循环!
        
        //下面再进行资源释放的工作!
        if(pool && pool->threadIDs) free(pool->threadIDs);//线程池存在的case下,开辟了线程IDs的空间时,就释放它!
        if(pool && pool->taskQ) free(pool->taskQ);//线程池存在的case下,开辟了taskQ的空间时,就释放它!
        if(pool) free(pool);//释放线程池

        return nullptr;
    }

    //销毁线程池(当线程池被销毁时,线程池中的所有成员都必须被销毁!)
    int threadPoolDestroy(ThreadPool* pool){
        if(pool == nullptr) return -1;//    表示此时线程池已空,不需要销毁了
        //  线程池不空(没被销毁时)
        pool->shutdown = 1;//关闭线程池!

        //  阻塞回收管理者线程
        pthread_join(pool->managerID,nullptr);

        //  唤醒阻塞的(活着的)消费者线程
        //  唤醒后他们会自动退出,为什么退出呢?(因为我们写了让他们退出的条件判断代码!)
        for(int i = 0;i < pool->liveNum; i++){
            pthread_cond_signal(&pool->notEmpty);
        }

        //  释放申请的堆区内存
        if(pool->taskQ){
            free(pool->taskQ);
            pool->taskQ = nullptr;
        }
        if(pool->threadIDs){
            free(pool->threadIDs);
            pool->threadIDs = nullptr;
        }


        //再释放互斥量锁还有条件类锁
        pthread_mutex_destroy(&pool->mutexBusy);
        pthread_mutex_destroy(&pool->mutexPool);
        pthread_cond_destroy(&pool->notEmpty);
        pthread_cond_destroy(&pool->notFull);

        free(pool);
        pool = nullptr;
        return 0;//return 0 就表示的是成功Destory了线程池并返回了!
    }

    //给线程池的任务队列中添加任务
    void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg){
        //由于你给该线程池的任务队列中添加任务时,很有可能此时你正在对任务进行读or写的操作
        //那么因此这里就必须要用线程池的锁mutex_pool来锁住(防止这份共享代码因为OS的调度切换搞乱了!)
        pthread_mutex_lock(&pool->mutexPool);
        while(pool->queueSize == pool->queueCapacity && !pool->shutdown){
            //  此时线程池的任务队列数 = 其最大容量了 并且 该线程池还没有被销毁 时
            
            //  阻塞生产者线程
            pthread_cond_wait(&pool->notFull,&pool->mutexPool);
                
        }
        //  此时被堵塞的生产者线程 被唤醒了(注意:此时该线程还是拿到了mutex互斥锁的状态的!)

        //  先判断线程池是否已经被销毁了!
        //  线程池被销毁
        if(pool->shutdown){
            pthread_mutex_unlock(&pool->mutexPool);//   解锁
            return;//   并 退出程序
        }

        //  线程池没被销毁时 
        //  给线程池中的任务队列 添加任务
        
        pool->taskQ[pool->queueRear].function = func;
        pool->taskQ[pool->queueRear].arg = arg;
        //  让队尾index (循环)后移!
        pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
        pool->queueSize++;//    队列任务+1

        pthread_cond_signal(&pool->notEmpty);//pool->notEmpty用来worker中!
        //通知pool->notEmpty这个condition_variable条件变量的对象 返回true 让他唤醒了去工作干活了!

        pthread_mutex_unlock(&pool->mutexPool);
    }

    //  获取线程池中工作(忙)的线程的个数
    int threadPoolBusyNum(ThreadPool* pool){
        //  注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据
        //  应该加上锁!
        pthread_mutex_lock(&pool->mutexBusy);
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);
        return busyNum;
    }

    //  获取线程池中活着的线程的个数
    int threadPoolAliveNum(ThreadPool* pool){
        //  注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据
        //  应该加上锁!且这里因为没有给aliveNum 定义对应的互斥量 so 直接用线程池的锁即可
        pthread_mutex_lock(&pool->mutexPool);
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexPool);
        return busyNum;
    }

    /
    //  工作的线程(消费者线程)的任务函数
    void* worker(void* arg){
        //首先把传入进来的参数类型转换为线程池类型
        ThreadPool* pool = (ThreadPool*)arg;

        //接下来就算不断地读取任务队列中的任务(每个任务就是一个函数)

        while(1){
            pthread_mutex_lock(&pool->mutexPool);//加锁
            //  当前任务队列是否为空
            while(pool->queueSize == 0 && !pool->shutdown){
                //  在当前队列中任务为空 并且 线程池没有被销毁时 就阻塞在这个工作线程中
                pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);//只有当pool->notEmpty的返回值为true时,就继续往下执行!

                //  当该哦工作的线程被唤醒时,判断一下该线程是否需要被销毁
                if(pool->exitNum > 0){
                    pool->exitNum--;//每次判断好要让该线程自杀后,必须让exitNum--才行!
                    //  当然,当条件变量wait被唤醒后,肯定是拿到了互斥锁才继续往下面执行的
                    //  此时就必须要把pool->mutexPool这把锁头给unlock解锁一下,再退出该程序,否则你没解锁就退出的话该程序就直接死锁了!
                    if(pool->liveNum > pool->minNum){
                        pool->liveNum--;
                        pthread_mutex_unlock(&pool->mutexPool);//解锁
                        // pthread_exit(nullptr);//退出该线程!
                        threadExit(pool);
                    }
                    
                }

            }

            //  判断线程池是否被关闭了
            if(pool->shutdown){//   若线程池被销毁了,可以直接退出线程!
                pthread_mutex_unlock(&pool->mutexPool);//解锁
                //  pthread_exit(nullptr);//退出线程
                threadExit(pool);
            }

            //  若线程池没有被销毁了,可以直接做任务了(消费)
            //  从任务队列中取出一个任务(真正 do things!)
            Task task;
            task.function = pool->taskQ[pool->queueFront].function;
            task.arg = pool->taskQ[pool->queueFront].arg;
            //  移动头节点 使之do循环移动 构成一个循环队列
            pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;
            pool->queueSize--;//取出1个任务了,所有--

            //解锁
            pthread_cond_signal(&pool->notFull);
            pthread_mutex_unlock(&pool->mutexPool);

            //  do 真正的工作(在function中)
            printf("thread %ld start working...n",pthread_self());
            pthread_mutex_lock(&pool->mutexBusy);
            pool->busyNum++;//工作时++
            pthread_mutex_unlock(&pool->mutexBusy);
            
            //  do things
            task.function(task.arg);// <==> (*task.function)(task.arg);
            //  同时,因为此时这个线程在忙,所有busyNum++且用对应的锁来锁住,防止这个线程的小任务还没真正开始干活呢,
            //  OS就因为调度切换到别的线程去了

            free(task.arg);
            task.arg = nullptr;
            printf("thread %ld end working...n",pthread_self());
           
            pthread_mutex_lock(&pool->mutexBusy);
            pool->busyNum--;//工作完成后--
            pthread_mutex_unlock(&pool->mutexBusy);

            pthread_mutex_unlock(&pool->mutexPool);//解锁
        }
        return nullptr;
    }
    //管理者线程任务函数(主要用于创建和销毁线程池的!)
    void* manager(void* arg){
        ThreadPool* pool = (ThreadPool*)arg;
        while(!pool->shutdown){
            //  只要线程池没有销毁 就继续管理它
            //  每隔3s钟 管理一下
            sleep(3);//linux 下要包含头文件unistd.h才能使用!
            
            //  取出线程池钟的任务数量以及当前线程的数量
            //  (由于你读取的过程钟有可能别的线程正在写数据),为了防止这种case你就必须要上锁才行!
            pthread_mutex_lock(&pool->mutexPool);//加锁
            int queueSize = pool->queueSize;
            int liveNum = pool->liveNum;
            pthread_mutex_unlock(&pool->mutexPool);//解锁


            //  取出忙的线程的数量
            pthread_mutex_lock(&pool->mutexBusy);//加锁
            int busyNum = pool->busyNum;
            pthread_mutex_unlock(&pool->mutexBusy);//解锁

            //  添加线程(此时可以根据你项目的需要来制定相应的添加线程个数的规则)
            
            //  比如:当存活的线程数 < max && 当前存活的线程数 < 当前任务队列钟的任务数量 时 就添加线程!(此时表明当前的线程数已经忙不过来了)
            
            if(liveNum < pool->maxNum && liveNum < queueSize){
                pthread_mutex_lock(&pool->mutexPool);//加锁
                int cnt = 0; 
                for(int i=0;i< pool->maxNum && cnt < ADDORDESTROYNUMBER 
                    && pool->liveNum < pool->maxNum;i++){
                    if(pool->threadIDs[i] == 0)//为0就 表明此时该线程ID可用!(之前没被使用,线程我可以用了!)
                    {
                        //  创建线程元素并放进去数组中!
                        pthread_create(&pool->threadIDs[i],nullptr,worker,pool);
                        cnt++;
                        pool->liveNum++;
                    }
                }
                pthread_mutex_unlock(&pool->mutexPool);//解锁
            }
            
            //  销毁线程(此时可以根据你项目的需要来制定相应的销毁线程个数的规则)
            
            //  比如:当 忙的线程数*2 < 存活的线程数 && 最小线程数 < 存活的线程数 时 就销毁线程
            if(busyNum * 2 < liveNum && pool->minNum < liveNum){
                pthread_mutex_lock(&pool->mutexPool);
                pool->exitNum = ADDORDESTROYNUMBER;//要销毁的线程数字定义为ADDORDESTROYNUMBER(2)
                pthread_mutex_unlock(&pool->mutexPool);

                //  让工作的线程自杀
                for(int i=0;inotEmpty);// ==> condition_variable中的.notify_one()
                }
            }
        }
        return nullptr;
    }
    //单个线程退出
    void threadExit(ThreadPool* pool){
        //先获取当前线程的线程id
        pthread_t threadId = pthread_self();// <==> std::this_thread::get_id();
        for(int i=0;imaxNum;i++){
            if(pool->threadIDs[i] == threadId){
                pool->threadIDs[i] = 0;//重新置为0,表示该线程ID可在后续被使用了!
                printf("threadExit() called,%ld existing...n",threadId);
                break;
            }
        }
        pthread_exit(nullptr);
    }

main.cpp:

#include
#include"threadpool.h"
#include//用sleep函数
#include
#include
using namespace std;
//  工作的函数
//  注意:只要你在线程池中创建的函数类型与这里传入的函数类型一致,即可以作为工作函数去使用!
void taskFunc(void* arg){
    int num = *( (int*)arg );
    printf("thread %ld is working, number = %dn",pthread_self(),num);
    sleep(1);//休眠1s钟!
}
int main(void){

    //创建线程池
    ThreadPool* myPool = threadPoolCreate(3,10,100);//  线程池最大线程数为10,最小线程数为3,线程池中的任务队列最大容量为100个
    for(int i =0;i<100;i++){
        int *pnum = (int*)malloc(sizeof(int));
        *pnum = i + 100;
        threadPoolAdd(myPool,taskFunc,pnum);
    }
    //注意:当你想要销毁线程池之前,应该让主线程休眠一段时间,以让仍然在线程池中的线程执行完他们进行中的任务

    sleep(10);//主线程休眠10s,等待子线程中的线程池做完他们手头上的工作!

    //销毁线程池!
    threadPoolDestroy(myPool);
    return 0;
}

在对应的build文件夹下用cmake .. 命令,并make即可成功运行!(无任何Error和Warnings!)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/690883.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号