线程池的作用
在具体的代码中,经常会遇到以下场景:
① 监听机制:在代码正常运行时,需要随时监听主线程的状态或者某个变量的状态,一旦状态变化立刻需要处理。
② 耗时任务:在主进程执行某个耗时特别长的任务时,会导进程长时间阻塞卡顿现象。
③ 任务状态控制:即在任务执行过程中,能够终止此任务的执行。等等。
以上情景都可以用线程来实现,当以上场景在一个代码中多次出现时,可能就要创建很多个线程来满足相应的需求了。但线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能。
在线程池中只存在几个固定的线程,由线程池来维护,等待调度器派发已存在空闲的线程去执行对应的任务。
由此,便实现了线程的一次创建多次使用的功能,从而避免了短时间内的任务时创建与销毁线程的代价。线程池不仅能保护资源的充分利用,还能保证不被过分调度。
线程池的原理
线程池的在初始化时,会先创建固定数量的线程;具体的任务会放在任务队列中,类似于生产者-消费者概念。
多个线程作为消费者,任务队列作为生产者。当任务队列存在多个任务时,便会由调度器依次将任务派发给现有的线程执行。
某个任务执行完毕后,当前线程就会被释放,此时调度器可继续派发任务给线程执行。如此反复便实现了,多个任务并发的执行。
一般一个简单的线程池有下列组件:
线程池管理器(ThreadPoolManager):用于创建并管理线程池
工作线程(WorkThread): 线程池中线程
任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
任务队列:用于存放没有处理的任务。提供一种缓冲机制。
我们定义以下3个结构体:
1、工作线程
// 工作线程
typedef struct NWORKER {
pthread_t id;
int terminate;
struct NWORKER* prev;
struct NWORKER* next;
struct NTHREADPOOL* pool;
} nworker;
2、任务
typedef struct NTASK {
void (*task_func)(struct NTASK* arg);
void* user_data;
struct NTASK* prev;
struct NTASK* next;
} ntask;
3、线程池管理器
typedef struct NTHREADPOOL {
struct NWORKER* workers;
struct NTASK* wait_tasks;
pthread_cond_t cond;
pthread_mutex_t mutex;
} nthreadpool;
4、线程池我们对外提供3个接口,创建,添加任务和销毁。
int thread_pool_create(nthreadpool* pool, int num_thread)
int thread_pool_push_task(nthreadpool* pool, ntask* task)
int thread_pool_destroy(nthreadpool* pool)
int thread_pool_create(nthreadpool* pool, int num_thread)
{
if (NULL == pool) return -1;
if (num_thread < 1) num_thread = 1;
memset(pool, 0, sizeof(nthreadpool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
int idx = 0;
for (idx = 0; idx < num_thread; idx++)
{
nworker* worker = (nworker*)malloc(sizeof(nworker));
if (NULL == worker)
{
perror("malloc error");
return idx;
}
memset(worker, 0, sizeof(nworker));
worker->pool = pool;
int ret = pthread_create(&worker->id, NULL, thread_callback, worker);
if (ret)
{
perror("pthread_create error");
free(worker);
worker = NULL;
return idx;
}
LL_ADD(worker, pool->workers);
}
return idx;
}
int thread_pool_push_task(nthreadpool* pool, ntask* task)
{
if (pool == NULL || task == NULL) return -1;
pthread_mutex_lock(&pool->mutex);
LL_ADD(task, pool->wait_tasks);
// 向thread_callback发送信号
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
return 0;
}
int thread_pool_destroy(nthreadpool* pool)
{
if (NULL == pool) return -1;
nworker* worker = NULL;
for (worker = pool->workers; worker != NULL; worker = worker->next)
{
worker->terminate = 1;
}
pthread_mutex_lock(&pool->mutex);
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
pool->workers = NULL;
pool->wait_tasks = NULL;
return 0;
}
5、最后我们实现一下线程的回调函数
void* thread_callback(void* arg)
void* thread_callback(void* arg)
{
nworker* worker = (nworker*)arg;
while (1)
{
pthread_mutex_lock(&worker->pool->mutex);
while (worker->pool->wait_tasks == NULL)
{
if (worker->terminate) break;
// 如果任务队列为为空,阻塞等待pthread_cond_signal或者pthread_cond_broadcast给条件变量发送信号
pthread_cond_wait(&worker->pool->cond, &worker->pool->mutex);
}
// 如果线程已经终止,释放mutex,结束循环
if (worker->terminate)
{
pthread_mutex_unlock(&worker->pool->mutex);
break;
}
ntask* task = worker->pool->wait_tasks;
// 抓取任务成功,将该任务从任务队列中移除
if (task)
{
LL_REMOVE(task, worker->pool->wait_tasks);
}
pthread_mutex_unlock(&worker->pool->mutex);
if (task == NULL) continue;
// 调用task_func处理任务
task->task_func(task);
}
if (worker != NULL) free(worker);
worker = NULL;
}



