本线程池代码是在Vscode下创建项目并结合CMakeList.txt使用GCC进行编译的。
我的文件组织形式为:
CMakeList.txt:
cmake_minimum_required(VERSION 3.1.0)
project(ProjectName)#ProjectName 需要你自己取项目名字!
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -Wall ")
set(CMAKE_BUILD_TYPE Debug)
include_directories(${CMAKE_SOURCE_DIR}/include)
file(GLOB SRC_FILES #注意这里定义都shell 变量 SRC_FILES 一定要对应在add_executable中!
"${PROJECT_SOURCE_DIR}/src
Task* taskQ;
int queueCapacity; //容量
int queueSize; //当前任务个数
int queueFront; //队头 -> 取数据
int queueRear; //队尾 -> 放数据
pthread_t managerID; //管理者线程ID(只有1个!)
pthread_t *threadIDs; //工作的线程ID(有许多个!)
int minNum;
int maxNum;
int busyNum;
int liveNum;
int exitNum;
pthread_mutex_t mutexPool; //锁整个线程池 pthread_mutex_t <==> std::mutex(C++11)
pthread_mutex_t mutexBusy; //锁busyNum变量 pthread_mutex_t <==> std::mutex(C++11)
pthread_cond_t notFull; //任务队列是不是满了 pthread_cond_t <==> std::condition_varibale类(C++11)
pthread_cond_t notEmpty; //任务队列是不是满了
int shutdown; //是不是要销毁线程池,销毁为1,不销毁为0
};
//创建线程池并且初始化
ThreadPool* threadPoolCreate(int min,int max,int queueSize);
//销毁线程池
int threadPoolDestroy(ThreadPool* pool);
//给线程池添加任务
void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg);
//获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
//获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
//注意:工作的线程一定是活着的!但是,活着的线程不一定在工作!!!
/
//工作的线程(消费者线程)的任务函数
void* worker(void* arg);
//管理者线程任务函数
void* manager(void* arg);// (主要用于创建和销毁线程池的!)
//单个线程退出
void threadExit(ThreadPool* pool);
#endif
threadpool.cpp:
#include"threadpool.h" #include//用memset函数 #include //用sleep函数 #include //使用pthread_self() 打印当前线程的id的函数 #include //使用malloc操作 const int ADDORDESTROYNUMBER { 2 };//每次添加/销毁的线程的number个数 struct ThreadPool; //创建线程池并且初始化 ThreadPool* threadPoolCreate(int min,int max,int queueSize){//形参表分别为:最小线程个数,最大线程个数以及队列大小! ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool)); do{ if(pool == nullptr){ printf("malloc threadpool fail ...n");//分配内存失败 return 空 break; // return nullptr; } //succeed to create a thread pool! pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);//工作线程中,分别Max个空间的heap区数组空间 if(pool->threadIDs == nullptr){ printf("malloc threadIDs fail ...n");//分配内存失败 return 空 break; // return nullptr; } //初始化工作的线程IDs们 memset(pool->threadIDs, 0, sizeof(pthread_t) * max);//memset为cstring中的函数,用来赋值! //把线程id数组中的元素全都赋值为0 pool->minNum = min; pool->maxNum = max; pool->busyNum = 0; pool->liveNum = min;//初始化时以线程的个数最小值来创建活着的线程的个数! pool->exitNum = 0;//一开始初始化肯定没有销毁线程的,这个数量要根据程序运行中的状态来decide! if(pthread_mutex_init(&pool->mutexPool,nullptr) != 0|| pthread_mutex_init(&pool->mutexBusy,nullptr) != 0|| pthread_cond_init(&pool->notEmpty,nullptr) != 0|| pthread_cond_init(&pool->notFull,nullptr) != 0) { //此时创建失败 printf("mutex or condition inti fail ...n"); break; // return 0; } //此时创建锁mutex和条件变量cond成功! //然后创建任务队列 pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize); //开辟一块arr内存存放任务队列,arr所存储的任务最大值为容量那么大 pool->queueCapacity = queueSize; pool->queueSize = 0;//当前任务数为0个 pool->queueFront = 0;//因为没有任务,所有头部执行0index pool->queueRear = 0;//因为没有任务,所有尾部执行0index pool->shutdown = 0;//初始化时肯定不能销毁线程池,所有标记为0(自己规定的) //创建线程 pthread_create(&pool->managerID,nullptr,manager,pool);//创建管理者这一个线程 for(int i=0;i threadIDs[i],nullptr,worker,pool); } //如果能成功执行到这里,那么就表示成功执行了线程池! //此时直接返回线程池即可! return pool; }while(0);//只会执行一次!只要有开辟不成功的case,马上break出while循环! //下面再进行资源释放的工作! if(pool && pool->threadIDs) free(pool->threadIDs);//线程池存在的case下,开辟了线程IDs的空间时,就释放它! if(pool && pool->taskQ) free(pool->taskQ);//线程池存在的case下,开辟了taskQ的空间时,就释放它! if(pool) free(pool);//释放线程池 return nullptr; } //销毁线程池(当线程池被销毁时,线程池中的所有成员都必须被销毁!) int threadPoolDestroy(ThreadPool* pool){ if(pool == nullptr) return -1;// 表示此时线程池已空,不需要销毁了 // 线程池不空(没被销毁时) pool->shutdown = 1;//关闭线程池! // 阻塞回收管理者线程 pthread_join(pool->managerID,nullptr); // 唤醒阻塞的(活着的)消费者线程 // 唤醒后他们会自动退出,为什么退出呢?(因为我们写了让他们退出的条件判断代码!) for(int i = 0;i < pool->liveNum; i++){ pthread_cond_signal(&pool->notEmpty); } // 释放申请的堆区内存 if(pool->taskQ){ free(pool->taskQ); pool->taskQ = nullptr; } if(pool->threadIDs){ free(pool->threadIDs); pool->threadIDs = nullptr; } //再释放互斥量锁还有条件类锁 pthread_mutex_destroy(&pool->mutexBusy); pthread_mutex_destroy(&pool->mutexPool); pthread_cond_destroy(&pool->notEmpty); pthread_cond_destroy(&pool->notFull); free(pool); pool = nullptr; return 0;//return 0 就表示的是成功Destory了线程池并返回了! } //给线程池的任务队列中添加任务 void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg){ //由于你给该线程池的任务队列中添加任务时,很有可能此时你正在对任务进行读or写的操作 //那么因此这里就必须要用线程池的锁mutex_pool来锁住(防止这份共享代码因为OS的调度切换搞乱了!) pthread_mutex_lock(&pool->mutexPool); while(pool->queueSize == pool->queueCapacity && !pool->shutdown){ // 此时线程池的任务队列数 = 其最大容量了 并且 该线程池还没有被销毁 时 // 阻塞生产者线程 pthread_cond_wait(&pool->notFull,&pool->mutexPool); } // 此时被堵塞的生产者线程 被唤醒了(注意:此时该线程还是拿到了mutex互斥锁的状态的!) // 先判断线程池是否已经被销毁了! // 线程池被销毁 if(pool->shutdown){ pthread_mutex_unlock(&pool->mutexPool);// 解锁 return;// 并 退出程序 } // 线程池没被销毁时 // 给线程池中的任务队列 添加任务 pool->taskQ[pool->queueRear].function = func; pool->taskQ[pool->queueRear].arg = arg; // 让队尾index (循环)后移! pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity; pool->queueSize++;// 队列任务+1 pthread_cond_signal(&pool->notEmpty);//pool->notEmpty用来worker中! //通知pool->notEmpty这个condition_variable条件变量的对象 返回true 让他唤醒了去工作干活了! pthread_mutex_unlock(&pool->mutexPool); } // 获取线程池中工作(忙)的线程的个数 int threadPoolBusyNum(ThreadPool* pool){ // 注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据 // 应该加上锁! pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy); return busyNum; } // 获取线程池中活着的线程的个数 int threadPoolAliveNum(ThreadPool* pool){ // 注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据 // 应该加上锁!且这里因为没有给aliveNum 定义对应的互斥量 so 直接用线程池的锁即可 pthread_mutex_lock(&pool->mutexPool); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexPool); return busyNum; } / // 工作的线程(消费者线程)的任务函数 void* worker(void* arg){ //首先把传入进来的参数类型转换为线程池类型 ThreadPool* pool = (ThreadPool*)arg; //接下来就算不断地读取任务队列中的任务(每个任务就是一个函数) while(1){ pthread_mutex_lock(&pool->mutexPool);//加锁 // 当前任务队列是否为空 while(pool->queueSize == 0 && !pool->shutdown){ // 在当前队列中任务为空 并且 线程池没有被销毁时 就阻塞在这个工作线程中 pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);//只有当pool->notEmpty的返回值为true时,就继续往下执行! // 当该哦工作的线程被唤醒时,判断一下该线程是否需要被销毁 if(pool->exitNum > 0){ pool->exitNum--;//每次判断好要让该线程自杀后,必须让exitNum--才行! // 当然,当条件变量wait被唤醒后,肯定是拿到了互斥锁才继续往下面执行的 // 此时就必须要把pool->mutexPool这把锁头给unlock解锁一下,再退出该程序,否则你没解锁就退出的话该程序就直接死锁了! if(pool->liveNum > pool->minNum){ pool->liveNum--; pthread_mutex_unlock(&pool->mutexPool);//解锁 // pthread_exit(nullptr);//退出该线程! threadExit(pool); } } } // 判断线程池是否被关闭了 if(pool->shutdown){// 若线程池被销毁了,可以直接退出线程! pthread_mutex_unlock(&pool->mutexPool);//解锁 // pthread_exit(nullptr);//退出线程 threadExit(pool); } // 若线程池没有被销毁了,可以直接做任务了(消费) // 从任务队列中取出一个任务(真正 do things!) Task task; task.function = pool->taskQ[pool->queueFront].function; task.arg = pool->taskQ[pool->queueFront].arg; // 移动头节点 使之do循环移动 构成一个循环队列 pool->queueFront = (pool->queueFront+1) % pool->queueCapacity; pool->queueSize--;//取出1个任务了,所有-- //解锁 pthread_cond_signal(&pool->notFull); pthread_mutex_unlock(&pool->mutexPool); // do 真正的工作(在function中) printf("thread %ld start working...n",pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum++;//工作时++ pthread_mutex_unlock(&pool->mutexBusy); // do things task.function(task.arg);// <==> (*task.function)(task.arg); // 同时,因为此时这个线程在忙,所有busyNum++且用对应的锁来锁住,防止这个线程的小任务还没真正开始干活呢, // OS就因为调度切换到别的线程去了 free(task.arg); task.arg = nullptr; printf("thread %ld end working...n",pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum--;//工作完成后-- pthread_mutex_unlock(&pool->mutexBusy); pthread_mutex_unlock(&pool->mutexPool);//解锁 } return nullptr; } //管理者线程任务函数(主要用于创建和销毁线程池的!) void* manager(void* arg){ ThreadPool* pool = (ThreadPool*)arg; while(!pool->shutdown){ // 只要线程池没有销毁 就继续管理它 // 每隔3s钟 管理一下 sleep(3);//linux 下要包含头文件unistd.h才能使用! // 取出线程池钟的任务数量以及当前线程的数量 // (由于你读取的过程钟有可能别的线程正在写数据),为了防止这种case你就必须要上锁才行! pthread_mutex_lock(&pool->mutexPool);//加锁 int queueSize = pool->queueSize; int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool);//解锁 // 取出忙的线程的数量 pthread_mutex_lock(&pool->mutexBusy);//加锁 int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy);//解锁 // 添加线程(此时可以根据你项目的需要来制定相应的添加线程个数的规则) // 比如:当存活的线程数 < max && 当前存活的线程数 < 当前任务队列钟的任务数量 时 就添加线程!(此时表明当前的线程数已经忙不过来了) if(liveNum < pool->maxNum && liveNum < queueSize){ pthread_mutex_lock(&pool->mutexPool);//加锁 int cnt = 0; for(int i=0;i< pool->maxNum && cnt < ADDORDESTROYNUMBER && pool->liveNum < pool->maxNum;i++){ if(pool->threadIDs[i] == 0)//为0就 表明此时该线程ID可用!(之前没被使用,线程我可以用了!) { // 创建线程元素并放进去数组中! pthread_create(&pool->threadIDs[i],nullptr,worker,pool); cnt++; pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexPool);//解锁 } // 销毁线程(此时可以根据你项目的需要来制定相应的销毁线程个数的规则) // 比如:当 忙的线程数*2 < 存活的线程数 && 最小线程数 < 存活的线程数 时 就销毁线程 if(busyNum * 2 < liveNum && pool->minNum < liveNum){ pthread_mutex_lock(&pool->mutexPool); pool->exitNum = ADDORDESTROYNUMBER;//要销毁的线程数字定义为ADDORDESTROYNUMBER(2) pthread_mutex_unlock(&pool->mutexPool); // 让工作的线程自杀 for(int i=0;inotEmpty);// ==> condition_variable中的.notify_one() } } } return nullptr; } //单个线程退出 void threadExit(ThreadPool* pool){ //先获取当前线程的线程id pthread_t threadId = pthread_self();// <==> std::this_thread::get_id(); for(int i=0;i maxNum;i++){ if(pool->threadIDs[i] == threadId){ pool->threadIDs[i] = 0;//重新置为0,表示该线程ID可在后续被使用了! printf("threadExit() called,%ld existing...n",threadId); break; } } pthread_exit(nullptr); }
main.cpp:
#include#include"threadpool.h" #include //用sleep函数 #include #include using namespace std; // 工作的函数 // 注意:只要你在线程池中创建的函数类型与这里传入的函数类型一致,即可以作为工作函数去使用! void taskFunc(void* arg){ int num = *( (int*)arg ); printf("thread %ld is working, number = %dn",pthread_self(),num); sleep(1);//休眠1s钟! } int main(void){ //创建线程池 ThreadPool* myPool = threadPoolCreate(3,10,100);// 线程池最大线程数为10,最小线程数为3,线程池中的任务队列最大容量为100个 for(int i =0;i<100;i++){ int *pnum = (int*)malloc(sizeof(int)); *pnum = i + 100; threadPoolAdd(myPool,taskFunc,pnum); } //注意:当你想要销毁线程池之前,应该让主线程休眠一段时间,以让仍然在线程池中的线程执行完他们进行中的任务 sleep(10);//主线程休眠10s,等待子线程中的线程池做完他们手头上的工作! //销毁线程池! threadPoolDestroy(myPool); return 0; }
在对应的build文件夹下用cmake .. 命令,并make即可成功运行!(无任何Error和Warnings!)



