栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

线程池原理及其代码实现(C和C++)【池式组件】

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

线程池原理及其代码实现(C和C++)【池式组件】

线程有哪些接口

创建线程

pthead_create(pthread_t tid, void *attr, void *func, void *arg)
			  //线程id	      线程属性   线程执行的内容  线程参数

线程退出

void pthread_exit(void *retval)
						//线程退出状态

阻塞等待线程退出

int pthread_join(pthread_t thread, void **retual)
success--0  fail--errno

获取线程id

pthread_t pthread_self(void);
return : success--0 fail--无

线程分离

int pthread_detach(pthread_t thread)
success--0 fail--errno

杀死线程

int pthread_cancel(pthread_t thread)
success--0  fail--errno

比较线程号是否相等

int pthread_equal(pthread_t t1, pthread_t2)
线程和进程控制原语对比
进程线程
创建forkpthread_create
退出exitpthread_exit
等待waitpthread_join
杀死killpthread_cancel
获取idgetpidpthread_self
如何实现线程池 生产者-消费者模型

产品:等待队列生产者:程序消费者:运行队列

使用互斥锁和条件锁共同实现
1、举例创建了含有100个线程的线程池,线程池中每个线程先给互斥锁加锁,随后等待条件锁,等待条件锁的原则是先释放互斥锁,如果争夺到了条件锁就会马上请求获取互斥锁
2、首先时主程序创建新的任务节点,跟着拿到互斥锁,将任务节点插入等待队列,跟着给条件锁发送信号,唤醒正在线程池中的线程来争夺条件锁,最后释放互斥锁
3、线程池中的线程进行争夺条件锁,争夺到的线程就获取到互斥锁,随后向等待队列中获取一个任务节点,并执行任务节点中的任务函数,随后释放互斥锁

线程池的对象有哪些 1、等待队列

任务节点就是指在等待被调度执行的节点

typedef struct NWORKER{
	pthread_t id;
	int terminate;
	struct NTHREADPOOL *pool;
	struct NWORKER *next;
	struct NWORKER *prev;
}nworker;
2、运行队列

运行队列是指正在执行的队列

typedef struct  NJOB{
	void (*func_job)(struct NJOB *job);
	void *user_data;
	
	struct NJOB *prev;
	struct NJOB *next;
}njob;
3、线程池管理节点

调度运行队列和等待队列,并加锁解锁

typedef struct NTHREADTOOL{

	struct NWORKER *workers;
	struct NJOB *wait_jobs;

	pthread_cond_t cond;
	pthread_mutex_t mtx;
	
}nthreadpool_t;
线程池的创建

在堆区创建,将新创建的节点加入等待队列

//num_thread ---需要创建执行点的数量
int thread_pool_create(nthreadpool_t *pool, int num_thread){

	
	//pool
	if(pool==NULL)return -1;
	if(num_thread<1) num_thread = 1;
	memset(pool, 0, sizeof(nthreadpool_t)); 			//清零,避免脏数据
	
	//cond
	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
	memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));

	//metux
	pthread_mutex_t blank_mtx = PTHREAD_MUTEX_INITIALIZER; 	
	memcpy(&pool->mtx, &blank_mtx, sizeof(pthread_mutex_t)); 	
	
	//create
	//一个线程对应一个执行点
	int idx = 0;
	for(idx=0; idxpool = pool;

		int ret = pthread_create(&worker->id, NULL, thread_callback, worker);
		if(ret){
			perror("pthread_create");
			free(worker);
			return idx;
		}
		LL_ADD(worker, pool->workers);
	}
	return idx;
}

线程池的销毁

将标志位设置为1,线程会在回调函数中判断标志位进行销毁,解锁并唤醒全部阻塞的线程

void thread_pool_destroy(nthreadpool_t *pool){
	nworker *worker = NULL;
	for(worker = pool->workers; worker != NULL; worker = worker->next){

		worker->terminate = 1;
		
	}
	pthread_mutex_lock(&pool->mtx);
	pthread_cond_broadcast(&pool->cond); //唤醒全部阻塞的线程
	pthread_mutex_unlock(&pool->mtx);
}
等待队列和运行队列之间的切换

很简单,直接将等待队列中的节点加入到运行队列里面

void thread_pool_push_job(nthreadpool_t *pool, njob *job){
	
	pthread_mutex_lock(&pool->mtx);
	
	LL_ADD(job, pool->wait_jobs);		//加入任务队列
	pthread_cond_signal(&pool->cond); 	//向条件锁发送信号
	
	pthread_mutex_unlock(&pool->mtx);
}
线程的回调函数

生产者和消费者模型,条件变量和互斥锁共同实现

void *thread_callback(void *arg){

	nworker *worker = (nworker*) arg;

	//取任务并执行
	while(1){
		//加互斥锁
		pthread_mutex_lock(&worker->pool->mtx);
		
		//判断任务队列是否为空
		while(worker->pool->wait_jobs == NULL){
			if(worker->terminate)break;
			//条件等待唤醒
			pthread_cond_wait(&worker->pool->cond, &worker->pool->mtx);
		}
		if(worker->terminate){
			pthread_mutex_unlock(&worker->pool->mtx);
			break;
		}
		
		//从等待队列取任务,取了就删除
		njob *job = worker->pool->wait_jobs;
		if(job){
			LL_REMOVE(job, worker->pool->wait_jobs);
		}
		
		//解互斥锁
		pthread_mutex_unlock(&worker->pool->mtx);

		if(job == NULL)continue; //	为何为空,多个线程竞争一个任务,会出现有的线程竞争到空的资源,避免死锁则加入判断跳出

		//执行任务内容
		job->job_func(job);
	}
	free(worker);

运行示例

开一百个线程,并打印自身线程id

#if 1

void  counter(njob *job){

	if(job==NULL)return;

	int idx = *(int*)job->user_data;
	printf("[%lu]idx=%d,n", pthread_self(), idx);

	free(job->user_data);
	free(job);
}

#define MAX_COUNT 		1000000

int main(){

	nthreadpool_t pool = {0};

	int num_thread = 100;

	thread_pool_create(&pool, num_thread);

	int i=0;
	for(i=0; ijob_func = counter;
		job->user_data = malloc(sizeof(int));  		//定义在堆上,防止栈上系统自动回收
		*(int*)job->user_data = i;
		//将job加入任务队列
		thread_pool_push_job(&pool,job);
	}
	getchar();
	return 0;
}

#endif
C++实现的线程池类

原理和C的一摸一样,不过等到队列用的是数组,调度的时候并不像C那样会自动从等待队列将节点放入运行队列

#ifndef _THREADPOOL_H
#define _THREADPOOL_H


#include 
#include "locker.h"
#include 
#include 
#include 

//线程池类
template
class threadpool{
public:
	threadpool(int thread_number = 8, int max_requests = 10000);

	~threadpool();		

	bool apend(T *request);		//将request加入工作队列
	
private:
	int m_thread_pool_number;	//线程池的数量

	pthread_t *m_threads;			//线程数组

	int m_max_request;

	std::list m_workqueue;   //工作队列

	mutex m_queuemutex;		//线程安全
	cond m_queuecond;
	sem m_queuesem;

	bool m_stop;
	
private:
	static void* worker(void *arg);
	void run();
};


template
threadpool::threadpool(int thread_number, int max_requests):
	m_thread_pool_number(thread_number), m_max_request(max_requests), 
	m_stop(NULL), m_threads(NULL){

	if((thread_number<=0) || (m_max_request<=0)){
		throw std::execption();
	}

	m_threads = new pthread_t[m_thread_number];
	if(!m_threads){
		throw std::execption();
	}
	
	this->m_queuemutex = new mutex();
	this->m_queuecond = new cond();
	this->m_queuesem = new sem();

	for(int i=0; i
threadpool::~threadpool(){
	delete [] m_threads;
	delete m_queuemutex;
	delete m_queuecond;
	delete m_queuesem;
	m_stop  = true;
}

template
bool threadpool::append(T *request){
	m_queuemutex.lock();
	if(m_workqueue.size() > m_max_request){
		m_queuemutex.unlock();
		return false;
	}
	m_workqueue.push_back(request);
	m_queuemutex.unlock();
	m_queuesem.post();
	return true;
}

template
void* threadpool::worker(void* arg){
	threadpool * pool = (threadpool*)arg;

	pool->run();
	return pool;
}

template
void threadpool::run(){
	while(!m_stop){
		m_queuesem.wait();
		m_queuemutex.lock();
		if(m_workqueue.empty()){
			m_queuemutex.unlock();
			continue;
		}
		T* request = m_workqueue.front();
		m_workqueue.pop_back();
		m_queuemutex.unlock();
		if(!request){
			continue;
		}
		request->process();
	}
}

#endif
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/749420.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号