创建线程
pthead_create(pthread_t tid, void *attr, void *func, void *arg) //线程id 线程属性 线程执行的内容 线程参数
线程退出
void pthread_exit(void *retval) //线程退出状态
阻塞等待线程退出
int pthread_join(pthread_t thread, void **retual) success--0 fail--errno
获取线程id
pthread_t pthread_self(void); return : success--0 fail--无
线程分离
int pthread_detach(pthread_t thread) success--0 fail--errno
杀死线程
int pthread_cancel(pthread_t thread) success--0 fail--errno
比较线程号是否相等
int pthread_equal(pthread_t t1, pthread_t2)线程和进程控制原语对比
| 进程 | 线程 | |
|---|---|---|
| 创建 | fork | pthread_create |
| 退出 | exit | pthread_exit |
| 等待 | wait | pthread_join |
| 杀死 | kill | pthread_cancel |
| 获取id | getpid | pthread_self |
产品:等待队列生产者:程序消费者:运行队列
线程池的对象有哪些 1、等待队列使用互斥锁和条件锁共同实现
1、举例创建了含有100个线程的线程池,线程池中每个线程先给互斥锁加锁,随后等待条件锁,等待条件锁的原则是先释放互斥锁,如果争夺到了条件锁就会马上请求获取互斥锁
2、首先时主程序创建新的任务节点,跟着拿到互斥锁,将任务节点插入等待队列,跟着给条件锁发送信号,唤醒正在线程池中的线程来争夺条件锁,最后释放互斥锁
3、线程池中的线程进行争夺条件锁,争夺到的线程就获取到互斥锁,随后向等待队列中获取一个任务节点,并执行任务节点中的任务函数,随后释放互斥锁
任务节点就是指在等待被调度执行的节点
typedef struct NWORKER{
pthread_t id;
int terminate;
struct NTHREADPOOL *pool;
struct NWORKER *next;
struct NWORKER *prev;
}nworker;
2、运行队列
运行队列是指正在执行的队列
typedef struct NJOB{
void (*func_job)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
}njob;
3、线程池管理节点
调度运行队列和等待队列,并加锁解锁
typedef struct NTHREADTOOL{
struct NWORKER *workers;
struct NJOB *wait_jobs;
pthread_cond_t cond;
pthread_mutex_t mtx;
}nthreadpool_t;
线程池的创建
在堆区创建,将新创建的节点加入等待队列
//num_thread ---需要创建执行点的数量
int thread_pool_create(nthreadpool_t *pool, int num_thread){
//pool
if(pool==NULL)return -1;
if(num_thread<1) num_thread = 1;
memset(pool, 0, sizeof(nthreadpool_t)); //清零,避免脏数据
//cond
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
//metux
pthread_mutex_t blank_mtx = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->mtx, &blank_mtx, sizeof(pthread_mutex_t));
//create
//一个线程对应一个执行点
int idx = 0;
for(idx=0; idxpool = pool;
int ret = pthread_create(&worker->id, NULL, thread_callback, worker);
if(ret){
perror("pthread_create");
free(worker);
return idx;
}
LL_ADD(worker, pool->workers);
}
return idx;
}
线程池的销毁
将标志位设置为1,线程会在回调函数中判断标志位进行销毁,解锁并唤醒全部阻塞的线程
void thread_pool_destroy(nthreadpool_t *pool){
nworker *worker = NULL;
for(worker = pool->workers; worker != NULL; worker = worker->next){
worker->terminate = 1;
}
pthread_mutex_lock(&pool->mtx);
pthread_cond_broadcast(&pool->cond); //唤醒全部阻塞的线程
pthread_mutex_unlock(&pool->mtx);
}
等待队列和运行队列之间的切换
很简单,直接将等待队列中的节点加入到运行队列里面
void thread_pool_push_job(nthreadpool_t *pool, njob *job){
pthread_mutex_lock(&pool->mtx);
LL_ADD(job, pool->wait_jobs); //加入任务队列
pthread_cond_signal(&pool->cond); //向条件锁发送信号
pthread_mutex_unlock(&pool->mtx);
}
线程的回调函数
生产者和消费者模型,条件变量和互斥锁共同实现
void *thread_callback(void *arg){
nworker *worker = (nworker*) arg;
//取任务并执行
while(1){
//加互斥锁
pthread_mutex_lock(&worker->pool->mtx);
//判断任务队列是否为空
while(worker->pool->wait_jobs == NULL){
if(worker->terminate)break;
//条件等待唤醒
pthread_cond_wait(&worker->pool->cond, &worker->pool->mtx);
}
if(worker->terminate){
pthread_mutex_unlock(&worker->pool->mtx);
break;
}
//从等待队列取任务,取了就删除
njob *job = worker->pool->wait_jobs;
if(job){
LL_REMOVE(job, worker->pool->wait_jobs);
}
//解互斥锁
pthread_mutex_unlock(&worker->pool->mtx);
if(job == NULL)continue; // 为何为空,多个线程竞争一个任务,会出现有的线程竞争到空的资源,避免死锁则加入判断跳出
//执行任务内容
job->job_func(job);
}
free(worker);
运行示例
开一百个线程,并打印自身线程id
#if 1
void counter(njob *job){
if(job==NULL)return;
int idx = *(int*)job->user_data;
printf("[%lu]idx=%d,n", pthread_self(), idx);
free(job->user_data);
free(job);
}
#define MAX_COUNT 1000000
int main(){
nthreadpool_t pool = {0};
int num_thread = 100;
thread_pool_create(&pool, num_thread);
int i=0;
for(i=0; ijob_func = counter;
job->user_data = malloc(sizeof(int)); //定义在堆上,防止栈上系统自动回收
*(int*)job->user_data = i;
//将job加入任务队列
thread_pool_push_job(&pool,job);
}
getchar();
return 0;
}
#endif
C++实现的线程池类
原理和C的一摸一样,不过等到队列用的是数组,调度的时候并不像C那样会自动从等待队列将节点放入运行队列
#ifndef _THREADPOOL_H #define _THREADPOOL_H #include#include "locker.h" #include
#include #include //线程池类 template class threadpool{ public: threadpool(int thread_number = 8, int max_requests = 10000); ~threadpool(); bool apend(T *request); //将request加入工作队列 private: int m_thread_pool_number; //线程池的数量 pthread_t *m_threads; //线程数组 int m_max_request; std::list m_workqueue; //工作队列 mutex m_queuemutex; //线程安全 cond m_queuecond; sem m_queuesem; bool m_stop; private: static void* worker(void *arg); void run(); }; template threadpool ::threadpool(int thread_number, int max_requests): m_thread_pool_number(thread_number), m_max_request(max_requests), m_stop(NULL), m_threads(NULL){ if((thread_number<=0) || (m_max_request<=0)){ throw std::execption(); } m_threads = new pthread_t[m_thread_number]; if(!m_threads){ throw std::execption(); } this->m_queuemutex = new mutex(); this->m_queuecond = new cond(); this->m_queuesem = new sem(); for(int i=0; i threadpool ::~threadpool(){ delete [] m_threads; delete m_queuemutex; delete m_queuecond; delete m_queuesem; m_stop = true; } template bool threadpool ::append(T *request){ m_queuemutex.lock(); if(m_workqueue.size() > m_max_request){ m_queuemutex.unlock(); return false; } m_workqueue.push_back(request); m_queuemutex.unlock(); m_queuesem.post(); return true; } template void* threadpool ::worker(void* arg){ threadpool * pool = (threadpool*)arg; pool->run(); return pool; } template void threadpool ::run(){ while(!m_stop){ m_queuesem.wait(); m_queuemutex.lock(); if(m_workqueue.empty()){ m_queuemutex.unlock(); continue; } T* request = m_workqueue.front(); m_workqueue.pop_back(); m_queuemutex.unlock(); if(!request){ continue; } request->process(); } } #endif



