- 为什么要用线程池
- 线程池api接口的使用
- 线程池工作原理(流程)
- 互斥锁和条件变量
- 线程池组成
- 管理的数据
- 创建线程池
- 工作线程
- 管理者线程
- 添加任务
- 销毁线程池
- 源代码
我们平时会在使用的时候创建线程,实现很方便。
有某个场景:并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了。
问题:频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
我们需要复用线程:执行完一个任务,并不销毁,继续执行其他的任务。
线程池api接口的使用用户会使用到的接口:
typedef struct threadpool_t threadpool_t; // 创建线程池 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size); // 销毁线程池 int threadpool_destroy(threadpool_t *pool); // 向线程池的任务队列中添加一个任务 int threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg);
某种使用方法:
- 创建线程池
- 添加任务
- 销毁线程池
void* call_back(void *param)
{
if (xxxx)
{
free(param); // 必要的话释放参数
}
return NULL;
}
int mian(void)
{
threadpool_t *pool = threadpool_create(10, 100, 100); // 初始化线程池
// 循环处理
while (1)
{
if (xxx)
{
break; // 退出循环
}
threadpool_add_task(pool, call_back, (void*)param); // 添加任务 param为参数 可以new一个结构体 线程中记得释放
}
threadpool_destroy(pool); // 销毁线程池
return 0;
}
线程池工作原理(流程)
互斥锁和条件变量
条件变量的主要作用不是处理线程同步,而是进行线程的阻塞。如果在多线程程序中只使用条件变量无法实现线程的同步,必须要配合互斥锁来使用。虽然条件变量和互斥锁都能阻塞线程,但是二者的效果是不一样的,二者的区别如下:
- 假设有 A-Z 26 个线程,这 26 个线程共同访问同一把互斥锁,如果线程 A 加锁成功,那么其余 B-Z 线程访问互斥锁都阻塞,所有的线程只能顺序访问临界区
- 条件变量只有在满足指定条件下才会阻塞线程,如果条件不满足,多个线程可以同时进入临界区,同时读写临界资源,这种情况下还是会出现共享资源中数据的混乱。
int pthread_mutex_lock(pthread_mutex_t *mutex); // 上锁 int pthread_mutex_unlock(pthread_mutex_t * mutex); // 解锁
// 线程阻塞函数, 哪个线程调用这个函数, 哪个线程就会被阻塞 // 在阻塞线程时候,如果线程已经对互斥锁 mutex 上锁,那么会将这把锁打开,这样做是为了避免死锁 // 当线程解除阻塞的时候,函数内部会帮助这个线程再次将这个 mutex 互斥锁锁上,继续向下访问临界区 int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); // 唤醒阻塞在条件变量上的线程, 至少有一个被解除阻塞 int pthread_cond_signal(pthread_cond_t *cond); // 唤醒阻塞在条件变量上的线程, 被阻塞的线程全部解除阻塞 int pthread_cond_broadcast(pthread_cond_t *cond);线程池组成
- 任务队列(由线程池使用者生产)
- 将一个待处理的任务添加到任务队列
- 工作的线程处理结束的任务会被从任务队列中删除
- 工作的线程(任务队列任务的消费者) ,N个
- 线程池中维护了一定数量的工作线程,他们的作用是不停的读任务队列,从里边取出任务并处理
- 如果任务队列为空,工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)
- 如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作
- 管理者线程(不处理任务队列中的任务),1个
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
- 当任务过多的时候,可以适当的创建一些新的工作线程
- 当任务过少的时候,可以适当的销毁一些工作的线程
任务就是pthread_create最后两个参数,线程函数的入口地址和传给新线程执行函数的参数。
线程池的管理数据
- 线程状态 — shutdown,true为关闭
- 任务队列相关信息 — task_queue、queue_max_size、queue_front、queue_rear、queue_size
- 线程池的线程数量 — min_thr_num、max_thr_num、live_thr_num、busy_thr_num、wait_exit_thr_num
- 线程id — admin_tid(管理者线程)、threads(工作线程数组)
- 锁和条件变量
- 整个结构体threadpool_t的互斥锁 — lock
- 忙线程数busy_thr_num的互斥锁 — thread_counter
- 队列不为空条件变量 — queue_not_empty
- 队列不为满条件变量 — queue_not_full
// 任务
typedef struct
{
void *(*function)(void *);
void *arg;
} threadpool_task_t;
// 线程池管理
struct threadpool_t
{
pthread_mutex_t lock; // 锁住整个结构体
pthread_mutex_t thread_counter; // 用于使用忙线程数时的锁
pthread_cond_t queue_not_full; // 条件变量,任务队列不为满
pthread_cond_t queue_not_empty; // 任务队列不为空
pthread_t *threads; // 存放线程的tid,实际上就是管理了线程数组
pthread_t admin_tid; // 管理者线程tid
threadpool_task_t *task_queue; // 任务队列
// 线程池信息
int min_thr_num; // 线程池中最小线程数
int max_thr_num; // 线程池中最大线程数
int live_thr_num; // 线程池中存活的线程数
int busy_thr_num; // 忙线程,正在工作的线程
int wait_exit_thr_num; // 需要销毁的线程数
// 任务队列信息
int queue_front; // 队头
int queue_rear; // 队尾
int queue_size; // 当前任务个数
// 存在的任务数
int queue_max_size; // 队列能容纳的最大任务数
// 状态
int shutdown; // true为关闭
};
创建线程池
参数为线程池中最小线程数、线程池中最大线程数、队列能容纳的最大任务数。
- 分配threadpool_t的内存
- 初始化数量等信息
- 分配工作线程id内存(按照最大线程分配,因为需要的内存不大)
- 分配队列内存
- 初始化互斥锁和条件变量
- 启动最少工作线程和管理者线程
这边有个do {} while (0);的设计属实很好看。
threadpool_t* threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
threadpool_t *pool = NULL;
do
{
if (NULL == (pool = (threadpool_t*)malloc(sizeof(threadpool_t))))
{
printf("malloc threadpool false; n");
break;
}
// 信息初始化
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num;
pool->wait_exit_thr_num = 0;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->queue_size = 0;
pool->queue_max_size = queue_max_size;
pool->shutdown = false;
// 根据最大线程数,给工作线程数组开空间,清0
if (NULL == (pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * max_thr_num)))
{
printf("malloc threads false;n");
break;
}
memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num);
// 队列开空间
if (NULL == (pool->task_queue = (threadpool_task_t*)malloc(sizeof(threadpool_task_t) * queue_max_size)))
{
printf("malloc task queue false;n");
break;
}
memset(pool->task_queue, 0, sizeof(threadpool_task_t) * queue_max_size);
// 初始化互斥锁和条件变量
if (pthread_mutex_init(&(pool->lock), NULL) != 0 ||
pthread_mutex_init(&(pool->thread_counter), NULL) != 0 ||
pthread_cond_init(&(pool->queue_not_empty), NULL) != 0 ||
pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
printf("init lock or cond false;n");
break;
}
// 启动min_thr_num个工作线程
for (int i = 0; i < min_thr_num; ++i)
{
// pool指向当前线程池
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
printf("start thread 0x%p... n", pool->threads[i].p);
}
// 管理者线程
pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool);
return pool;
} while (0);
// 释放pool的空间
threadpool_free(pool);
return NULL;
}
工作线程
首先操作前先给整个结构体上锁,然后进入循环,退出条件为线程池关闭以及线程太多自杀。
进入一个循环,如果没有任务就阻塞在queue_not_empty这个条件变量上(如果有需要销毁的线程,在条件变量解除阻塞后线程会自杀)。
如果线程池关闭,线程就自杀。
从队列中取出任务。
解除条件变量queue_not_full的阻塞(如果任务队列已经满了,处理掉一个就可以继续添加任务了)。
整个结构体解锁。
增加在忙的线程数量(当然要使用互斥锁thread_counter)。
执行任务(这边可以设计为执行完任务释放参数)。
减少在忙的线程数量(当然要使用互斥锁thread_counter)。
void* threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
while (true)
{
pthread_mutex_lock(&(pool->lock));
// 无任务则阻塞在 任务队列不为空 上,有任务则跳出
while ((pool->queue_size == 0) && (!pool->shutdown))
{
//printf("thread 0x%p is waiting n", pthread_self().p);
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
// 判断是否需要清除线程,自杀功能
if (pool->wait_exit_thr_num > 0)
{
pool->wait_exit_thr_num--;
// 判断线程池中的线程数是否大于最小线程数,是则结束当前线程
if (pool->live_thr_num > pool->min_thr_num)
{
printf("thread 0x%p is exiting n", pthread_self().p);
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL); // 结束线程
}
}
}
// 线程池开关状态
if (pool->shutdown) // 关闭线程池
{
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%p is exiting n", pthread_self().p);
pthread_exit(NULL); // 线程自己结束自己
}
// 否则该线程可以拿出任务
task.function = pool->task_queue[pool->queue_front].function; // 出队操作
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; // 环型结构
pool->queue_size--;
// 通知可以添加新任务
pthread_cond_broadcast(&(pool->queue_not_full));
// 释放线程锁
pthread_mutex_unlock(&(pool->lock));
// 执行刚才取出的任务
//printf("thread 0x%p start working n", pthread_self().p);
pthread_mutex_lock(&(pool->thread_counter)); // 锁住忙线程变量
pool->busy_thr_num++;
pthread_mutex_unlock(&(pool->thread_counter));
(*(task.function))(task.arg); // 执行任务
// 清空工作线程的回调函数的参数arg
//if (task.arg != NULL)
//{
// free(task.arg);
// task.arg = NULL;
//}
// 任务结束处理
//printf("thread 0x%p end working n", pthread_self().p);
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--;
pthread_mutex_unlock(&(pool->thread_counter));
}
}
管理者线程
进入循环,线程池关闭才退出。
睡眠等待DEFAULT_TIME毫秒。
上锁lock得到任务数和存活的线程数。
上锁thread_counter得到忙线程数。
如果工作线程没处理完的任务多余规定的数量,并且线程还没有到达规定的最大数量,就添加新的线程。
如果存活的线程有一半都不在忙,并且大于规定的数量,就销毁多余的线程。
int is_thread_alive(pthread_t tid)
{
int kill_rc = pthread_kill(tid, 0); // 发送0号信号,测试是否存活
if (kill_rc == ESRCH) // 线程不存在
{
return false;
}
return true;
}
void* admin_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
while (!pool->shutdown)
{
printf("admin -----------------n");
Sleep(DEFAULT_TIME); // 隔一段时间再管理
pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size; // 任务数
int live_thr_num = pool->live_thr_num; // 存活的线程数
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num; // 忙线程数
pthread_mutex_unlock(&(pool->thread_counter));
printf("admin busy live -%d--%d-n", busy_thr_num, live_thr_num);
// 创建新线程 实际任务数量大于 最小正在等待的任务数量,存活线程数小于最大线程数
if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
{
printf("admin add-----------n");
pthread_mutex_lock(&(pool->lock));
int add = 0;
// 一次增加 DEFAULT_THREAD_NUM 个线程
for (int i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_NUM
&& pool->live_thr_num < pool->max_thr_num; ++i)
{
if (pool->threads[i].p == NULL || !is_thread_alive(pool->threads[i]))
{
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
add++;
pool->live_thr_num++;
printf("new thread -----------------------n");
}
}
pthread_mutex_unlock(&(pool->lock));
}
// 销毁多余的线程 忙线程x2 都小于 存活线程,并且存活的大于最小线程数
if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num)
{
// printf("admin busy --%d--%d----n", busy_thr_num, live_thr_num);
// 一次销毁DEFAULT_THREAD_NUM个线程
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
pthread_mutex_unlock(&(pool->lock));
for (int i = 0; i < DEFAULT_THREAD_NUM; ++i)
{
// 通知正在处于空闲的线程,自杀
pthread_cond_signal(&(pool->queue_not_empty));
printf("admin cler --n");
}
}
}
return NULL;
}
添加任务
上锁lock。
如果任务队列满了,就阻塞到有任务被处理。
如果线程池关闭就什么都不干。
添加任务。
解除queue_not_empty阻塞,唤醒线程池中的一个线程。
解锁lock。
int threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg)
{
pthread_mutex_lock(&(pool->lock));
// 如果队列满了,调用wait阻塞
while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
{
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
// 如果线程池处于关闭状态
if (pool->shutdown)
{
pthread_mutex_unlock(&(pool->lock));
return -1;
}
// 添加任务到任务队列
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; // 逻辑环
pool->queue_size++;
// 添加完任务后,队列就不为空了,唤醒线程池中的一个线程
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
销毁线程池
结束管理者线程和工作线程,然后释放各种资源。
// 释放线程池
int threadpool_free(threadpool_t *pool)
{
if (NULL == pool)
{
return -1;
}
if (pool->task_queue)
{
free(pool->task_queue);
}
if (pool->threads)
{
free(pool->threads);
pthread_mutex_lock(&(pool->lock)); // 先锁住再销毁
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return 0;
}
// 销毁线程池
int threadpool_destroy(threadpool_t *pool)
{
if (NULL == pool)
{
return -1;
}
pool->shutdown = true;
// 销毁管理者线程
pthread_join(pool->admin_tid, NULL);
// 通知所有线程去自杀(在自己领任务的过程中)
for (int i = 0; i < pool->live_thr_num; ++i)
{
pthread_cond_broadcast(&(pool->queue_not_empty));
}
// 等待线程结束 先是pthread_exit 然后等待其结束
for (int i = 0; i < pool->live_thr_num; ++i)
{
pthread_join(pool->threads[i], NULL);
}
threadpool_free(pool);
return 0;
}
源代码
在一个简单的http服务器中使用线程池:
百度云链接:https://pan.baidu.com/s/1DZw6mx8sOX0C_9m52CMHBg
提取码:10pb



