本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下
实现功能
- 1.初始化指定个数的线程
- 2.使用链表来管理任务队列
- 3.支持拓展动态线程
- 4.如果闲置线程过多,动态销毁部分线程
#include#include #include #include typedef struct thread_worker_s{ void *(*process)(void *arg); //处理函数 void *arg; //参数 struct thread_worker_s *next; }thread_worker_t; #define bool int #define true 1 #define false 0 #define THREAD_STATE_RUN 0 #define THREAD_STATE_TASK_WAITING 1 #define THREAD_STATE_TASK_PROCESSING 2 #define THREAD_STATE_TASK_FINISHED 3 #define THREAD_STATE_EXIT4 typedef struct thread_info_s{ pthread_t id; int state; struct thread_info_s *next; }thread_info_t; static char* thread_state_map[] ={"创建","等待任务","处理中","处理完成","已退出"}; #define THREAD_BUSY_PERCENT 0.5 #define THREAD_IDLE_PERCENT 2 typedef struct thread_pool_s{ pthread_mutex_t queue_lock ; //队列互斥锁,即涉及到队列修改时需要加锁 pthread_cond_t queue_ready; //队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了 thread_worker_t *head ; //任务队列头指针 bool is_destroy ; //线程池是否已经销毁 int num;//线程的个数 int rnum; ; //正在跑的线程 int knum; ; //已杀死的线程 int queue_size; //工作队列的大小 thread_info_t *threads ; //线程组id,通过pthread_join(thread_ids[0],NULL) 来执行线程 pthread_t display ; //打印线程 pthread_t destroy ; //定期销毁线程的线程id pthread_t extend ; float percent; //线程个数于任务的比例 rnum/queue_size int init_num ; pthread_cond_t extend_ready ; //如果要增加线程 }thread_pool_t; thread_pool_t* thread_pool_create(int num); void *thread_excute_route(void *arg); void debug(char *message,int flag){ if(flag) printf("%sn",message); } void *display_thread(void *arg); int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //网线程池的队列中增加一个需要执行的函数,也就是任务 void *thread_pool_is_need_recovery(void *arg); void *thread_pool_is_need_extend(void *arg); void thread_pool_destory(thread_pool_t *pool); thread_pool_t *thread_pool_create(int num){ if(num<1){ return NULL; } thread_pool_t *p; p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s)); if(p==NULL) return NULL; p->init_num = num; pthread_mutex_init(&(p->queue_lock),NULL); pthread_cond_init(&(p->queue_ready),NULL); p->num = num; p->rnum = num; p->knum = 0; p->head = NULL; p->queue_size =0; p->is_destroy = false; int i=0; thread_info_t *tmp=NULL; for(i=0;i next = p->threads; p->threads = tmp; } pthread_create(&(tmp->id),NULL,thread_excute_route,p); tmp->state = THREAD_STATE_RUN; } pthread_create(&(p->display),NULL,display_thread,p); //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p); pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p); return p; } int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){ thread_pool_t *p= pool; thread_worker_t *worker=NULL,*member=NULL; worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s)); int incr=0; if(worker==NULL){ return -1; } worker->process = process; worker->arg = arg; worker->next = NULL; thread_pool_is_need_extend(pool); pthread_mutex_lock(&(p->queue_lock)); member = p->head; if(member!=NULL){ while(member->next!=NULL) member = member->next; member->next = worker; }else{ p->head = worker; } p->queue_size ++; pthread_mutex_unlock(&(p->queue_lock)); pthread_cond_signal(&(p->queue_ready)); return 1; } void thread_pool_wait(thread_pool_t *pool){ thread_info_t *thread; int i=0; for(i=0;i num;i++){ thread = (thread_info_t*)(pool->threads+i); thread->state = THREAD_STATE_EXIT; pthread_join(thread->id,NULL); } } void thread_pool_destory(thread_pool_t *pool){ thread_pool_t *p = pool; thread_worker_t *member = NULL; if(p->is_destroy) return ; p->is_destroy = true; pthread_cond_broadcast(&(p->queue_ready)); thread_pool_wait(pool); free(p->threads); p->threads = NULL; while(p->head){ member = p->head; p->head = member->next; free(member); } thread_info_t *tmp=NULL; while(p->threads){ tmp = p->threads; p->threads = tmp->next; free(tmp); } pthread_mutex_destroy(&(p->queue_lock)); pthread_cond_destroy(&(p->queue_ready)); return ; } thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){ thread_info_t *thread=NULL; thread_info_t *p=pool->threads; while(p!=NULL){ if(p->id==id) return p; p = p->next; } return NULL; } void *thread_excute_route(void *arg){ thread_worker_t *worker = NULL; thread_info_t *thread = NULL; thread_pool_t* p = (thread_pool_t*)arg; //printf("thread %lld create successn",pthread_self()); while(1){ pthread_mutex_lock(&(p->queue_lock)); pthread_t pthread_id = pthread_self(); thread = get_thread_by_id(p,pthread_id); if(p->is_destroy==true && p->queue_size ==0){ pthread_mutex_unlock(&(p->queue_lock)); thread->state = THREAD_STATE_EXIT; p->knum ++; p->rnum --; pthread_exit(NULL); } if(thread){ thread->state = THREAD_STATE_TASK_WAITING; } while(p->queue_size==0 && !p->is_destroy){ pthread_cond_wait(&(p->queue_ready),&(p->queue_lock)); } p->queue_size--; worker = p->head; p->head = worker->next; pthread_mutex_unlock(&(p->queue_lock)); if(thread) thread->state = THREAD_STATE_TASK_PROCESSING; (*(worker->process))(worker->arg); if(thread) thread->state = THREAD_STATE_TASK_FINISHED; free(worker); worker = NULL; } } void *thread_pool_is_need_extend(void *arg){ thread_pool_t *p = (thread_pool_t *)arg; thread_pool_t *pool = p; if(p->queue_size>100){ int incr =0; if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){ incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; int i=0; thread_info_t *tmp=NULL; thread_pool_t *p = pool; pthread_mutex_lock(&pool->queue_lock); if(p->queue_size<100){ pthread_mutex_unlock(&pool->queue_lock); return ; } for(i=0;i next = p->threads; p->threads = tmp; } p->num ++; p->rnum ++; pthread_create(&(tmp->id),NULL,thread_excute_route,p); tmp->state = THREAD_STATE_RUN; } pthread_mutex_unlock(&pool->queue_lock); } } //pthread_cond_signal(&pool->extend_ready); } pthread_cond_t sum_ready; void *thread_pool_is_need_recovery(void *arg){ thread_pool_t *pool = (thread_pool_t *)arg; int i=0; thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL; while(1){ i=0; if(pool->queue_size==0 && pool->rnum > pool->init_num ){ sleep(5); if(pool->queue_size==0 && pool->rnum > pool->init_num ){ pthread_mutex_lock(&pool->queue_lock); tmp = pool->threads; while((pool->rnum != pool->init_num) && tmp){ if(tmp->state != THREAD_STATE_TASK_PROCESSING){ i++; if(prev) prev->next = tmp->next; else pool->threads = tmp->next; pool->rnum --; pool->knum ++; kill(tmp->id,SIGKILL); p1 = tmp; tmp = tmp->next; free(p1); continue; } prev = tmp; tmp = tmp->next; } pthread_mutex_unlock(&pool->queue_lock); printf("5s内没有新任务销毁部分线程,销毁了 %d 个线程n",i); } } sleep(5); } } void *display_thread(void *arg){ thread_pool_t *p =(thread_pool_t *)arg; thread_info_t *thread = NULL; int i=0; while(1){ printf("threads %d,running %d,killed %dn",p->num,p->rnum,p->knum); thread = p->threads; while(thread){ printf("id=%ld,state=%sn",thread->id,thread_state_map[thread->state]); thread = thread->next; } sleep(5); } }
希望本文所述对大家学习C语言程序设计有所帮助。



