栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

C++基础组件——线程池实现

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C++基础组件——线程池实现

C++系列文章目录

1、C++设计模式——单例模式
2、C++基础组件——线程池实现

文章目录
  • C++系列文章目录
  • 前言
  • 一、线程池定义
    • 线程池解决什么问题
  • 二、C语言实现
  • 三、C++实现
    • 1.v1版本
      • 问题
    • 2.v2版本
  • 总结


前言

实现线程池,给出C语言版以及两个C++版的实现方法,解决了一些常见问题,总结了遇到的问题及解决方法。


一、线程池定义

池化技术,起到了建立缓冲区的作用:内存池、数据库连接池、消息池(消息队列)、对象池

线程池解决什么问题

1、解决任务处理

2、阻塞IO

3、解决线程创建与销毁的成本问题

4、管理线程

在日志系统中,直接写磁盘的话,性能会被限制在磁盘的读写能力上面,引入线程池,达到异步解耦的作用

二、C语言实现

三大组件:

1、执行队列,线程

2、任务队列,任务

3、管理组件

#include 
#include 
#include 
#include 
#include 
#include 
#include 

//双链表表示 
typedef struct Worker{
	pthread_t thread;
	struct Manager *pool;//指向管理者
	int terminate;//工作线程终止符
	struct Worker *prev;
	struct Worker *next;
}WORK;

typedef struct Job{
	void *(*process) (void *arg);//执行函数
    void *arg;
    struct Job *prev;
    struct Job *next;
}JOB;

typedef struct Manager{
	WORK *workers;//指向第一个工作线程
	JOB *jobs;//指向第一个任务
	
	pthread_cond_t jobs_cond;//接收工作信号
    pthread_mutex_t jobs_mutex;//标记锁定任务
}ThreadPool;

int pool_add_job(ThreadPool *pool,JOB *job);//添加任务
void *nThreadCallback(void *arg);

//线程池初始化
int ThreadPool_init(ThreadPool *pool,int max_thread_num){
	if(max_thread_num < 1)	max_thread_num = 1;
	if(pool == NULL)	return -1;
	memset(pool,0,sizeof(ThreadPool));
	
    //初始化信号量
    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
    //初始化信号量
    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));

    for(int i=0;i
    	WORK *worker = (WORK*)malloc(sizeof(WORK));
    	if(worker == NULL){
    		printf("malloc");
    		return -2;
		}
		memset(worker,0,sizeof(WORK));
		worker->pool = pool;
		worker->next=NULL;  
		worker->prev=NULL;
		
		int ret = pthread_create(&worker->thread,NULL,nThreadCallback,worker);//创建工作线程,监听nThreadCallback函数,传入worker参数
		if(ret){
			printf("pthread_create");
			free(worker);
			return -3;
		}
		//加入线程池
		if(pool->workers == NULL){//第一个结点:pool->workers指向第一个结点
			pool->workers=worker; 
			worker->next=NULL;
			worker->prev=NULL;
		}
		else{//插入其余结点
			worker->next=pool->workers;
			pool->workers->prev=worker;
			pool->workers=worker; 
		}
	}
	return 0;
}
//加入任务 
int pool_add_job(ThreadPool *pool,JOB *job){
	pthread_mutex_lock(&pool->jobs_mutex);//加锁
	//加入任务链表
	if(pool->jobs == NULL){
		pool->jobs=job;
		job->next=NULL;
		job->prev=NULL;
	}
	else{
		job->next=pool->jobs;
		pool->jobs->prev=job;
		pool->jobs=job;
	}
	pthread_cond_signal(&pool->jobs_cond);//传递信号:告诉工作线程开始执行
	pthread_mutex_unlock(&pool->jobs_mutex);//解锁
	return 0;
}

void *nThreadCallback(void *arg){//回调函数
	printf("starting thread 0x%ldn",pthread_self());
	WORK *worker = (WORK*)arg;
	while(1){
		pthread_mutex_lock(&(worker->pool->jobs_mutex));//上锁
		while(worker->pool->jobs == NULL){//任务队列为空则等待 
			if(worker->terminate)	break;
			pthread_cond_wait(&worker->pool->jobs_cond,&worker->pool->jobs_mutex);//若无信号传来则一直阻塞 
		}
		if(worker->terminate){//销毁工作线程 
			pthread_mutex_unlock(&(worker->pool->jobs_mutex));
			printf ("thread 0x%ld will exitn", pthread_self ());
             pthread_exit (NULL);
			break;
		}
		//得到执行信号,开始执行任务
		printf ("thread 0x%ld is starting to workn", pthread_self ());
		JOB *job=worker->pool->jobs;
		
		//移除任务
		assert(job != NULL);//断言,任务结点不为空
		worker->pool->jobs=job->next;
		if(worker->pool->jobs != NULL)//worker->pool->jobs指向真正的任务结点时才将前驱结点置空
			worker->pool->jobs->prev=NULL;
		job->next=NULL;
		pthread_mutex_unlock(&(worker->pool->jobs_mutex));//解锁
		(*(job->process)) (job->arg);//执行函数
	}
    //释放资源
	free(worker);
	pthread_exit(NULL);
}
//销毁线程池
void ThreadPoolDestroy(ThreadPool *pool){
	for(WORK *worker = pool->workers; worker != NULL; worker = worker->next){
		worker->terminate = 1;//将每个工作线程的终止符设为1
	}
	pthread_mutex_lock(&(pool->jobs_mutex));//上锁
	pthread_cond_broadcast(&pool->jobs_cond);//传出信号量:结束到信号的工作线程开始销毁
	pthread_mutex_unlock(&(pool->jobs_mutex));//解锁
}

void *myprocess (void *arg)
{
    printf ("threadid is 0x%ld, working on task %dn", pthread_self (),*(int *) arg);
    sleep (1);
    return NULL;
}
int main (int argc, char **argv)
{
    ThreadPool *pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    ThreadPool_init(pool,3);
    
    int tasknum[10];
    for(int i=0;i<10;i++){
    	tasknum[i]=i;
    	JOB *job = (JOB*)malloc(sizeof(JOB));
    	memset(job,0,sizeof(JOB));
    	job->process = myprocess;
    	job->arg = &tasknum[i];//需要传入不同的地址
    	job->next=NULL;
    	job->prev=NULL;
    	pool_add_job(pool,job);
    	
	}
    
    sleep (5);  //这句可能出问题,偷懒写法。任务执行时间长时,当main函数退出时,所有子线程被迫退出
    
    ThreadPoolDestroy (pool);
    return 0;
}

每个工作线程实际上一直在执行回调函数,在wait语句处阻塞,等待插入任务。传递信号量后,所有线程争抢任务,得到任务的线程执行,其余线程继续等待。
对任务进行加锁是防止多个线程同时取得任务,对任务进行修改,任务只能是固定参数。


三、C++实现 1.v1版本

使用单例模式包装线程池,全局仅产生一个对象

#pragma once
#ifndef _MYTHREADPOOL_V1_H_
#define _MYTHREADPOOL_V1_H_
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include

class MyThreadPool_v1{
private:
	static MyThreadPool_v1* instance;
	using ThreadPoolTask = std::function;//	包装器包装void类型
	typedef struct Task {
		ThreadPoolTask task;
		std::string name;
		Task() {
			this->task = nullptr;
			this->name = "";
		}
		Task(ThreadPoolTask task, std::string name) {
			this->task = task;
			this->name = name;
		}
	}Task;
	std::queue Tasks;		//任务队列
	std::vector> WorkThreads;		//工作线程组,独占指针指向每一个线程
	std::mutex taskmutex;		//互斥锁
	std::condition_variable cond;		//条件变量
	static std::atomicrunning;	//原子操作保证多线程不会产生争夺
	MyThreadPool_v1();
	~MyThreadPool_v1();
	void CallBack(int id);
	//防止复制
	MyThreadPool_v1(MyThreadPool_v1 const&) = delete;
	MyThreadPool_v1& operator=(MyThreadPool_v1 const&) = delete;
public:
	static MyThreadPool_v1* GetInstance();
	void PushTask(ThreadPoolTask task, std::string name);
	void desstory();
};
#endif // !_MYTHREADPOOL_V1_H_
#include"MyThreadPool_v1.h"
#include "windows.h"
#include

MyThreadPool_v1* MyThreadPool_v1::instance = nullptr;
std::atomic MyThreadPool_v1::running = true;	//原子操作保证多线程不会产生争夺
MyThreadPool_v1* MyThreadPool_v1::GetInstance() {
	std::cout << "instance" << std::endl;
	if (nullptr == instance) {	//双重检验
		static std::mutex mutex;	//新建一个静态锁
		std::lock_guardlock(mutex);	//上锁
		if (nullptr == instance) {
			//生成MyThreadPool_v1对象封装为共享指针赋值给单例对象
			instance = new MyThreadPool_v1();
		}
	}
	return instance;
}

MyThreadPool_v1::MyThreadPool_v1() {
	std::cout << "构造函数" << std::endl;
	int cpunums = 4;
	//std::condition_variable(cond);	//初始化条件变量
	for (int i = 0; i < cpunums; i++) {
		//std::lock_guardlock(taskmutex);	//上锁
		auto func = std::bind(&MyThreadPool_v1::CallBack, this, i);//绑定
		WorkThreads.push_back(std::make_unique(func));//加入指向func的独占指针,类型是线程类
	}
	printf("WorkThreads.size(): %dn", WorkThreads.size());
}

MyThreadPool_v1::~MyThreadPool_v1() {
	{
		std::unique_lock lock(taskmutex);
		running = false;
	}
	//通知所有工作线程重新抢锁并重新判断wait条件,唤醒后因为running为false了,所以都会结束
	cond.notify_all();//通知全部线程
	// 等待所有工作线程结束
	for (const auto &workthread : WorkThreads) {
		workthread->join();//由于线程都开始竞争了,因此必定会执行完,join可等待线程执行完
	}
}

void MyThreadPool_v1::CallBack(int id) {
	printf("WorkThread:  %d startn", id);
	while (1) {
		std::unique_lock lock(this->taskmutex);
		while (Tasks.empty()) {	//当任务队列为空时一直循环
			if (running == false) {
				break;
			}
			cond.wait(lock);	//阻塞
		}
		if (running == false) {
			break;
		}
		Task task;
		{	//同步操作
			task = Tasks.front();
			Tasks.pop();
		}
		//解锁
		printf("Work(%d):startn", id);
		task.task();//执行任务
		char ch[10];
		strcpy_s(ch, task.name.c_str());
		printf("正在执行的任务id:%sn",ch);
		printf("Work(%d):endn", id);
	}
	printf("Work(%d):stopedn", id);
}

void MyThreadPool_v1::PushTask(ThreadPoolTask task, std::string name) {
	{
		//加锁
		std::unique_lock lock(taskmutex);
		Tasks.push({ task,name });
		char ch[10];
		strcpy_s(ch, name.c_str());
	}
	cond.notify_one();//通知一个进程
}
void MyThreadPool_v1::desstory() {
	{
		std::unique_lock lock(taskmutex);
		running = false;
	}
	//通知所有工作线程重新抢锁并重新判断wait条件,唤醒后因为running为false了,所以都会结束
	cond.notify_all();//通知全部线程
	// 等待所有工作线程结束
	for (const auto &workthread : WorkThreads) {
		workthread->join();//由于线程都开始竞争了,因此必定会执行完,join可等待线程执行完
	}
}
#include"MyThreadPool_v1.h"
#include
#include 
using namespace std;
void funct() {
	cout << "进入到函数:" << endl; 
	Sleep(2);
}

int main() {
	auto ThreadPool = MyThreadPool_v1::GetInstance();//	拿到线程池单例对象
	for (int i = 0; i < 10; i++) {
		ThreadPool->PushTask(funct, to_string(i));
	}
	Sleep(5);	//防止执行过快程序直接结束
	ThreadPool->desstory();	//线程池销毁时,判断所有线程是否执行完毕,进行阻塞,避免直接return 0中止未完成的线程
	return 0;	//任务执行时间长,但是程序已经运行到结尾,会直接中止所有线程
}
问题

当前台线程结束后,程序结束,main函数结束会强制所有后台子线程结束

1.任务执行时间长时,当main函数退出时,所有子线程被迫退出

2.单例线程池无法自动析构

3.任务函数参数不可变

2.v2版本

使用智能指针的单例模式实现线程池

使用C++11新特性实现可变参数模板,可以执行不同参数的任务

#pragma once
#ifndef _MYTHREADPOOL_V2_H_
#define _MYTHREADPOOL_V2_H_
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include 
class MyThreadPool_v2 {
public:
	static std::shared_ptr GetInstacne(int threadnums);
	~MyThreadPool_v2();
	//可变参数,利用后置decltype推导类型传递给auto
	template
	auto PushTask(F&& f, Args&&... args) //任务管道函数
		-> std::future::type>;
private:
	MyThreadPool_v2(int threadnums);
	using ThreadPoolTask = std::function;//	包装器包装void类型
	std::queue Tasks;		//任务队列
	std::vector WorkThreads;		//工作线程组,独占指针指向每一个线程

	std::mutex taskmutex;		//互斥锁
	std::condition_variable cond;		//条件变量
	std::atomicrunning = true;	//原子操作保证多线程不会产生争夺
};

//模板不能和定义分离,在编译时期不知道参数定义
template 
auto MyThreadPool_v2::PushTask(F&& f, Args&& ...args)
	->std::future::type> {
	using return_type = typename std::result_of::type;

	//指向packaged_task的共享指针
	auto task = std::make_shared>(		//packaged_task包装一个可调用对象,可以传递给future对象,使其在另一线程获取结果
		std::bind(std::forward(f), std::forward(args)...)		//完美转发,保持函数和参数类型不变
		);
	//获取任务future
	std::future res = task->get_future();
	{
		std::unique_locklock(taskmutex);//独占加锁

		if (!running) {
			throw std::runtime_error("PushTask on stopped ThreadPool");
		}
		Tasks.emplace([task]() { (*task)(); });
		// 发送通知,唤醒一个wait状态的工作线程重新抢锁并重新判断wait条件
		cond.notify_one();
		return res;
	}
}
#endif // !_MYTHREADPOOL_V2_H_
#include"MyThreadPool_v2.h"
std::shared_ptr MyThreadPool_v2::GetInstacne(int threadnums) {
	//第一次声明静态对象是线程安全的,存在静态区域
	static std::shared_ptr instacne_v2(new MyThreadPool_v2(threadnums));
	return instacne_v2;
}

MyThreadPool_v2::MyThreadPool_v2(int threadnums) {
	//int threadnums = 4;
	printf("构造函数n");
	for (int i = 0; i < threadnums; ++i) {
		//每一个线程执行一个匿名函数
		printf("线程%d开始执行n", i);
		//匿名函数包含加锁,阻塞,取任务,执行,即之前的回调函数
		WorkThreads.emplace_back
		(
			[this]
		{
			for (;;)
			{
				ThreadPoolTask task;//任务对象
				{
					std::unique_lock lock(this->taskmutex);
					
					this->cond.wait(lock, [this] { return !this->running || !this->Tasks.empty(); });
					// 如果线程池停止且任务队列为空,说明需要关闭线程池结束返回
					if (!running&&Tasks.empty()) {
						return;
					}
					// 取得任务队首任务(注意此处的std::move)
					task = std::move(this->Tasks.front());
					// 从队列移除
					this->Tasks.pop();
				}
				//执行任务
				task();
			}
		}
		);
	}
}

MyThreadPool_v2::~MyThreadPool_v2() {
	printf("析构函数n");
	{
		std::unique_lock lock(taskmutex);
		running = false;
	}
	//通知所有工作线程重新抢锁并重新判断wait条件,唤醒后因为running为false了,所以都会结束
	cond.notify_all();//通知全部线程
	// 等待所有工作线程结束
	for (auto &workthread : WorkThreads) {
		workthread.join();//由于线程都开始竞争了,因此必定会执行完,join可等待线程执行完
	}
}
#include 
#include 
#include 
#include 
#include "MyThreadPool_v2.h"
int main() {
	auto pool = MyThreadPool_v2::GetInstacne(4);
	
	std::vector< std::future > results;    //线程执行结果
	// 批量执行线程任务
	for (int i = 0; i < 8; ++i) {
		results.emplace_back(   // 保存线程执行结果到results
			pool->PushTask([i] {  // 添加一个新的工作任务到线程池
			std::cout << "hello " << i << std::endl;
			//std::this_thread::sleep_for(std::chrono::seconds(1));
			//std::cout << "world " << i << std::endl;
			return i * i;
		})
		);
	}
	// 打印线程执行结果
	for (auto && result : results)
		std::cout << "result:" << result.get() << std::endl;
	std::cout << std::endl;
	return 0;
}

在单例模式中使用了智能指针,最后的析构交由智能指针实现。C++11标准中静态对象的线程安全创建,其静态对象必须在GetInstance函数中第一次声明,不必使用双重检验来保证线程安全。PushTask函数定义了可变参数模板,使用了C++11的新特性,后置推导参数类型传递给auto,这两篇文章讲的非常详细参考1,参考2。


总结

记录一下实现线程池遇到的问题,并和之前的单例模式结合实现,其应用场景非常广泛,其中若有bug欢迎大家指出,期待指导。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/883164.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号