栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

C语言实现支持动态拓展和销毁的线程池

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C语言实现支持动态拓展和销毁的线程池

本文实例介绍了C 语言实现线程池,支持动态拓展和销毁,分享给大家供大家参考,具体内容如下

实现功能

  • 1.初始化指定个数的线程
  • 2.使用链表来管理任务队列
  • 3.支持拓展动态线程
  • 4.如果闲置线程过多,动态销毁部分线程
#include 
#include 
#include 
#include 
 


typedef struct thread_worker_s{
  void *(*process)(void *arg); //处理函数
  void *arg;   //参数
  struct thread_worker_s *next;
}thread_worker_t;
 
#define bool int
#define true 1
#define false 0
 

#define THREAD_STATE_RUN 0
#define THREAD_STATE_TASK_WAITING   1
#define THREAD_STATE_TASK_PROCESSING  2
#define THREAD_STATE_TASK_FINISHED   3
#define THREAD_STATE_EXIT4   
 
 
typedef struct thread_info_s{
  pthread_t id;
  int    state; 
  struct thread_info_s *next;
}thread_info_t;
 
static char* thread_state_map[] ={"创建","等待任务","处理中","处理完成","已退出"};

 
 

#define THREAD_BUSY_PERCENT 0.5  
#define THREAD_IDLE_PERCENT 2   
 
typedef struct thread_pool_s{
  pthread_mutex_t queue_lock ; //队列互斥锁,即涉及到队列修改时需要加锁
  pthread_cond_t queue_ready; //队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了
 
  thread_worker_t *head   ;    //任务队列头指针
  bool    is_destroy   ;    //线程池是否已经销毁
  int num;//线程的个数
  int rnum;  ;    //正在跑的线程
  int knum;  ;    //已杀死的线程
  int queue_size;    //工作队列的大小 
  thread_info_t *threads   ;    //线程组id,通过pthread_join(thread_ids[0],NULL) 来执行线程
  pthread_t   display   ;    //打印线程
  pthread_t   destroy   ;    //定期销毁线程的线程id
  pthread_t   extend    ;
  float percent;    //线程个数于任务的比例 rnum/queue_size
  int  init_num      ;
  pthread_cond_t  extend_ready     ;    //如果要增加线程
}thread_pool_t;
 


thread_pool_t* thread_pool_create(int num);
void *thread_excute_route(void *arg);
 
 

void debug(char *message,int flag){
  if(flag)
    printf("%sn",message);
}
 
void *display_thread(void *arg);

int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //网线程池的队列中增加一个需要执行的函数,也就是任务
 

 
 
 
void *thread_pool_is_need_recovery(void *arg);
void *thread_pool_is_need_extend(void *arg);
void thread_pool_destory(thread_pool_t *pool);
 
 
thread_pool_t *thread_pool_create(int num){
  if(num<1){
    return NULL;
  }
  thread_pool_t *p;
  p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));
  if(p==NULL)
    return NULL;
  p->init_num = num;
  
  pthread_mutex_init(&(p->queue_lock),NULL);
  pthread_cond_init(&(p->queue_ready),NULL);
 
  
  p->num  = num; 
  p->rnum = num;
  p->knum = 0;
 
  p->head = NULL;
  p->queue_size =0; 
  p->is_destroy = false;
 
   
  int i=0;
  thread_info_t *tmp=NULL;
  for(i=0;inext = p->threads;
      p->threads = tmp;
    }
    pthread_create(&(tmp->id),NULL,thread_excute_route,p);
    tmp->state = THREAD_STATE_RUN;
  }
 
  
  pthread_create(&(p->display),NULL,display_thread,p);
  
  //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p);
  
  pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p);
  return p;
}
 
int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){
  thread_pool_t *p= pool;
  thread_worker_t *worker=NULL,*member=NULL;
  worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));
  int incr=0;
  if(worker==NULL){
    return -1;
  }
  worker->process = process;
  worker->arg   = arg;
  worker->next  = NULL;
  thread_pool_is_need_extend(pool);
  pthread_mutex_lock(&(p->queue_lock));
  member = p->head;
  if(member!=NULL){
    while(member->next!=NULL)
      member = member->next;
    member->next = worker;
  }else{
    p->head = worker;
  }
  p->queue_size ++;
  pthread_mutex_unlock(&(p->queue_lock));
  pthread_cond_signal(&(p->queue_ready));
  return 1;
}
 
 
void thread_pool_wait(thread_pool_t *pool){
  thread_info_t *thread;
  int i=0;
  for(i=0;inum;i++){
    thread = (thread_info_t*)(pool->threads+i);
    thread->state = THREAD_STATE_EXIT;
    pthread_join(thread->id,NULL);
  }
}
void thread_pool_destory(thread_pool_t *pool){
  thread_pool_t  *p   = pool;
  thread_worker_t *member = NULL;
 
  if(p->is_destroy)
    return ;
  p->is_destroy = true;
  pthread_cond_broadcast(&(p->queue_ready));
  thread_pool_wait(pool);
  free(p->threads);
  p->threads = NULL;
  
  while(p->head){
    member = p->head;
    p->head = member->next;
    free(member);
  }
  
  thread_info_t *tmp=NULL;
  while(p->threads){
    tmp = p->threads;
    p->threads = tmp->next;
    free(tmp);
  }
 
  pthread_mutex_destroy(&(p->queue_lock));
  pthread_cond_destroy(&(p->queue_ready));
  return ;
}

thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){
  thread_info_t *thread=NULL;
  thread_info_t *p=pool->threads;
  while(p!=NULL){
    if(p->id==id)
      return p;
    p = p->next;
  }
  return NULL;
}
 
 

void *thread_excute_route(void *arg){
  thread_worker_t *worker = NULL;
  thread_info_t  *thread = NULL; 
  thread_pool_t*  p = (thread_pool_t*)arg;
  //printf("thread %lld create successn",pthread_self());
  while(1){
    pthread_mutex_lock(&(p->queue_lock));
 
    
    pthread_t pthread_id = pthread_self();
    
    thread = get_thread_by_id(p,pthread_id);
 
    
    if(p->is_destroy==true && p->queue_size ==0){
      pthread_mutex_unlock(&(p->queue_lock));
      thread->state = THREAD_STATE_EXIT;
      p->knum ++;
      p->rnum --;
      pthread_exit(NULL);
    }
    if(thread){
      thread->state = THREAD_STATE_TASK_WAITING; 
    }
    
    while(p->queue_size==0 && !p->is_destroy){
      pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));
    }
    p->queue_size--;
    worker = p->head;
    p->head = worker->next;
    pthread_mutex_unlock(&(p->queue_lock));
     
 
    if(thread)
      thread->state = THREAD_STATE_TASK_PROCESSING; 
    (*(worker->process))(worker->arg);
    if(thread)
      thread->state = THREAD_STATE_TASK_FINISHED;  
    free(worker);
    worker = NULL;
  }
}
 
 
 

void *thread_pool_is_need_extend(void *arg){
  thread_pool_t *p = (thread_pool_t *)arg;
  thread_pool_t *pool = p;
  
  if(p->queue_size>100){
    int incr =0;
    if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){
      incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; 
      int i=0;
      thread_info_t *tmp=NULL;
      thread_pool_t *p = pool;
      pthread_mutex_lock(&pool->queue_lock);
      if(p->queue_size<100){
 pthread_mutex_unlock(&pool->queue_lock);
 return ;
      }
      for(i=0;inext = p->threads;
   p->threads = tmp;
 }
 p->num ++;
 p->rnum ++;
 pthread_create(&(tmp->id),NULL,thread_excute_route,p);
 tmp->state = THREAD_STATE_RUN;
      }
      pthread_mutex_unlock(&pool->queue_lock);
    }
  }
  //pthread_cond_signal(&pool->extend_ready);
}
pthread_cond_t sum_ready;

void *thread_pool_is_need_recovery(void *arg){
  thread_pool_t *pool = (thread_pool_t *)arg;
  int i=0;
  thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL;
  
  while(1){
    i=0;
    if(pool->queue_size==0 && pool->rnum > pool->init_num ){
      sleep(5);
      
      if(pool->queue_size==0 && pool->rnum > pool->init_num ){
 pthread_mutex_lock(&pool->queue_lock);
 tmp = pool->threads;
 while((pool->rnum != pool->init_num) && tmp){
   
   if(tmp->state != THREAD_STATE_TASK_PROCESSING){
     i++;
     if(prev)
prev->next  = tmp->next;
     else
pool->threads = tmp->next;
     pool->rnum --; 
     pool->knum ++; 
     kill(tmp->id,SIGKILL); 
     p1 = tmp;
     tmp = tmp->next;
     free(p1);
     continue;
   }
   prev = tmp;
   tmp = tmp->next;
 }
 pthread_mutex_unlock(&pool->queue_lock);
 printf("5s内没有新任务销毁部分线程,销毁了 %d 个线程n",i);
      }
    }
    sleep(5);
  }
}
 
 
 

void *display_thread(void *arg){
  thread_pool_t *p =(thread_pool_t *)arg;
  thread_info_t *thread = NULL;
  int i=0;
  while(1){
    printf("threads %d,running %d,killed %dn",p->num,p->rnum,p->knum);  
    thread = p->threads;
    while(thread){
      printf("id=%ld,state=%sn",thread->id,thread_state_map[thread->state]);
      thread = thread->next;
    }
    sleep(5);
  }
}

希望本文所述对大家学习C语言程序设计有所帮助。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/64657.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号