栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

线程池简介

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

线程池简介

线程池是较为底层的组件,平时我们在开发的过程中,可能更多的是调用这些组件的接口。对于服务器而言,这些底层的组件代码是很通用的。线程池主要用在写日志、计算结果、增删改查中。
线程池主要是由任务队列、执行队列和池管理组件构成,这里直接给出代码。

#define LL_ADD(item, list) do           
{ 	                                
	item->prev = NULL;				
	item->next = list;				    
	list = item;				     	
} while(0)

#define LL_REMOVE(item, list) do                       
{					                             	
	if (item->prev != NULL) item->prev->next = item->next;	
	if (item->next != NULL) item->next->prev = item->prev;	
	if (list == item) list = item->next;					
	item->prev = item->next = NULL;							
} while(0)

typedef struct NWORKER 
{
	pthread_t thread;
	int terminate;
	struct NWORKQUEUE *workqueue;
	struct NWORKER *prev;
	struct NWORKER *next;
} nWorker;

typedef struct NJOB 
{
	void (*job_function)(struct NJOB *job);
	void *user_data;
	struct NJOB *prev;
	struct NJOB *next;
} nJob;

typedef struct NWORKQUEUE 
{
	struct NWORKER *workers;
	struct NJOB *waiting_jobs;
	pthread_mutex_t jobs_mtx;
	pthread_cond_t jobs_cond;
} nWorkQueue;

typedef nWorkQueue nThreadPool;

static void *ntyWorkerThread(void *ptr) 
{
	nWorker *worker = (nWorker*)ptr;
	while (1) 
{
		pthread_mutex_lock(&worker->workqueue->jobs_mtx);
		while (worker->workqueue->waiting_jobs == NULL) 
        {
			if (worker->terminate) break;
			pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);
		}
		if (worker->terminate) 
        {
			pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
			break;
		}
		nJob *job = worker->workqueue->waiting_jobs;
		if (job != NULL) 
        {
			LL_REMOVE(job, worker->workqueue->waiting_jobs);
		}
		pthread_mutex_unlock(&worker->workqueue->jobs_mtx);
		if (job == NULL) continue;
		job->job_function(job);
	}
	free(worker);
	pthread_exit(NULL);
}

int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers) 
{
	if (numWorkers < 1) numWorkers = 1;
	memset(workqueue, 0, sizeof(nThreadPool));
	
	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
	memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));
	pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
	memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));

	int i = 0;
	for (i = 0;i < numWorkers;i ++) 
    {
		nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
		if (worker == NULL) 
        {
			perror("malloc");
			return 1;
		}
		memset(worker, 0, sizeof(nWorker));
		worker->workqueue = workqueue;
		int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);
		if (ret) 
        {
			perror("pthread_create");
			free(worker);
			return 1;
		}
		LL_ADD(worker, worker->workqueue->workers);
	}
	return 0;
}

void ntyThreadPoolShutdown(nThreadPool *workqueue) 
{
	nWorker *worker = NULL;
	for (worker = workqueue->workers;worker != NULL;worker = worker->next) 
    {
		worker->terminate = 1;
	}
	pthread_mutex_lock(&workqueue->jobs_mtx);
	workqueue->workers = NULL;
	workqueue->waiting_jobs = NULL;
	pthread_cond_broadcast(&workqueue->jobs_cond);
	pthread_mutex_unlock(&workqueue->jobs_mtx);
}

void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) 
{
	pthread_mutex_lock(&workqueue->jobs_mtx);
	LL_ADD(job, workqueue->waiting_jobs);
	pthread_cond_signal(&workqueue->jobs_cond);
	pthread_mutex_unlock(&workqueue->jobs_mtx);
}

代码中有几点需要解释,一是对链表的操作使用了宏函数,因为这样可以避免因为数据类型不同而需要定义多个函数的问题,二是互斥锁和条件变量的初始化方法,这里用的静态初始化方法,却避免了变量在定义好后不能用静态初始化赋值的问题。
这里再啰嗦一点,服务器的处理流程,一是检测IO事件是否就绪,二是对IO进行读写操作,三是对数据进行解析和操作。这三步分别对应epoll、recv()/send()、parse。因此,服务器有三种做法,一是单线程做法,一个线程处理以上三步,二是把IO读写和解析都放在线程池中处理,三是只把解析过程放在线程池中,读写IO还是由单线程处理。以上三种做法中,第二种方法服务器响应最快,但是有个问题,就是多个线程会共用一个fd。想象这样一个情况,线程池中有两个任务,操作的都是同一fd,而这两个任务分配给了两个线程,那么这两个线程如果同时发数据,就会出现脏数据的现象,或者是,一个线程正在收发数据,另一个线程却调用了close(),导致崩溃。第二个方法适用于针对fd操作时间较长的情景,而第三种方法适用于针对业务处理时间较长的情形。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/738942.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号