前面关于线程池的封装一帖,仅仅是将指定工作线程创建后就保持阻塞等待任务,相对来说比较简单。现在对线程池来进行一些优化:
- 设置核心线程(最小线程)
- 非核心线程超时等待自动销毁
- 给任务队列配置了互斥锁(保证多个线程在竞争任务队列时同步)
根据以上优化,那么工作线程就没必要一次性全部创建。在任务量少时仅需要维持核心线程运作就能够处理已有任务量; 当任务量所需要运作的线程大于核心线程数,那么视任务情况逐个撞见非核心线程来处理任务,并且非核心线程如果超时间没有被唤醒会自动销毁,由以上优化线程池调度会十分方便。
二. 封装源码 类声明
#pragma once #include#include #include #include #include #include #include #include #include "CTask.h" #define ETIMEDOUT 110 class ThreadPool { public: ThreadPool(int max = 20, int min = 3, int waitsec = 60); // 最大线程数、核心线程数、超时等待时间 ~ThreadPool(); public: static void* start_routine_A(void* arg); // 工作线程 void add_task(CTask* task); // 添加任务 void start(); void destroy(); // 销毁线程(这里需要先把所有睡眠的线程唤醒才能将其所有销毁 void create_thread(); // 创建线程 public: void lock_mutex(); void unlock_mutex(); void lock_task_mutex(); void unlock_task_mutex(); void broadcast(); // 唤醒所有线程 void signal_cond(); // 唤醒条件变量,也就是唤醒线程 bool timewait(int waitsec); // 超时等待(需要转成绝对时间要用到timeval与timespec private: bool b_stop; // 结束标志位 int max_num; // 最大工作线程数 int min_num; // 核心线程数 int cur_count; // 当前线程数 int wait_count; // 睡眠线程数 int waitsec; // 超时等待时间(s) std::queue task_que; // 任务队列 pthread_cond_t cond; // 线程唤醒的条件变量 pthread_mutex_t mutex; // 条件变量的互斥所锁 pthread_mutex_t mutex2; // 任务队列的互斥锁 保证多个被唤醒的线程在竞争任务队列时同步 };
类定义
#include "ThreadPool.h" #include三. 这次优化的几个要点using namespace std; ThreadPool::ThreadPool(int max, int min, int waitsec) { this->b_stop = false; this->max_num = max; this->min_num = min; this->waitsec = waitsec; this->wait_count = 0; this->cur_count = 0; pthread_cond_init(&this->cond, NULL); pthread_mutex_init(&this->mutex, NULL); } ThreadPool::~ThreadPool() { destroy(); } void* ThreadPool::start_routine_A(void* arg) { pthread_detach(pthread_self()); ThreadPool* pool = (ThreadPool*)arg; while (1) { pool->lock_mutex(); if( pool->task_que.empty()) { if (pool->b_stop) { cout << "thread is destory,pid = " << pthread_self() << endl; pool->cur_count--; pool->unlock_mutex(); pthread_exit(NULL); } pool->wait_count++; // 阻塞前等待线程++ bool b_signal = pool->timewait(pool->waitsec); pool->wait_count--; // 结束阻塞等待线程-- if (!b_signal && pool->cur_count > pool->min_num) // 超时等待 + 当前线程数大于核心线程数时允许销毁线程(也就是保留核心线程,只销毁非核心线程) { pool->cur_count--; pool->unlock_mutex(); pthread_exit(NULL); } } pool->unlock_mutex(); pool->lock_task_mutex(); if (!pool->task_que.empty()) { CTask* t = pool->task_que.front(); pool->task_que.pop(); pool->unlock_task_mutex(); t->run(); delete t; } else { pool->unlock_task_mutex(); } } } void ThreadPool::add_task(CTask* task) { cout << "-------------Add task-------------" << endl; if (b_stop) { return; } lock_task_mutex(); task_que.push(task); unlock_task_mutex(); lock_mutex(); if (wait_count) { signal_cond(); // 唤醒一个线程 } else if (cur_count < max_num) { create_thread(); } unlock_mutex(); } void ThreadPool::start() { lock_mutex(); b_stop = false; unlock_mutex(); } void ThreadPool::destroy() { b_stop = true; while (cur_count > 0) { lock_mutex(); broadcast(); unlock_mutex(); sleep(1); } } void ThreadPool::create_thread() { pthread_t pid; int res = pthread_create(&pid, NULL, start_routine_A, (void*)this); if (res < 0) { perror("ThreadPool::create_thread"); } else { cur_count++; } } void ThreadPool::lock_mutex() { pthread_mutex_lock(&this->mutex); } void ThreadPool::unlock_mutex() { pthread_mutex_unlock(&this->mutex); } void ThreadPool::lock_task_mutex() { pthread_mutex_lock(&this->mutex2); } void ThreadPool::unlock_task_mutex() { pthread_mutex_unlock(&this->mutex2); } void ThreadPool::broadcast() { cout << "All thread is wake" << endl; pthread_cond_broadcast(&cond); } void ThreadPool::signal_cond() { pthread_cond_signal(&cond); } bool ThreadPool::timewait(int waitsec) { struct timeval now; struct timespec outtime; gettimeofday(&now, NULL); outtime.tv_sec = now.tv_sec + waitsec; int ret = pthread_cond_timedwait(&this->cond, &this->mutex, &outtime); if (ret == ETIMEDOUT) { cout << "no core thread is time out." << endl; return false; } else if (ret != 0) { perror("no core thread error:"); } cout << "no core Thread is wake up" << endl; return true; }
- 线程阻塞方法由pthread_cond_wait换成了pthread_cond_timewait
- pthread_cond_timewait的第三个参数(超时参数)需要的是绝对时间
struct timeval now; struct timespec outtime; gettimeofday(&now, NULL); outtime.tv_sec = now.tv_sec + waitsec;
- 进程结束时候,先唤醒所有线程在挨个进行销毁
- 非核心线程超时销毁条件: pthread_cond_timewait超时返回 + 当前线程 > 核心线程数(当前线程是否为核心线程)
- 后面还会把条件变量以及对它的操作封装成类
线程池优化的过程中, 在非核心线程超时等待销毁,以及进程关闭时候所有线程的销毁上代码逻辑有些不正确,多亏了下面博文提供的源码才顺利解决我面对的问,感谢大神的源码分享
https://blog.csdn.net/qq_42766268/article/details/88368799
旧帖共享内存配合消息队列与信号量的类封装
简单线程池的类封装



