栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

C++并发编程

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C++并发编程

C++11多线程编程
  • 并发概念
  • C++11线程创建
  • 互斥锁std::mutex
  • 自动加锁
  • 读写共享锁std::shared_mutex(C++17)
  • 一次执行std::call_once
  • 线程同步
  • 异步编程:future+async
  • 基于锁的并发数据结构

并发概念

所谓并发,是指两个或更多独立的活动同时发生。并发在生活中随处可见:边走路一 边说话,也可以两只手同时作不同的动作,每个人都过着相互独立的生活;当我在学习的时候,你在玩游戏。

并发(concuttrnt)与并行(parallel)

  • 并发:一个时间段内有很多的线程或进程在执行,但任何时间点上都只有一个在执行,多个线程或进程争抢时间片轮流执行。并发的意义就是:让一个程序同时做多件事情。
  • 并行:一个时间段和时间点上都有多个线程或进程在执行(多核)。
  • 并发是并行的必要不充分条件。并行一定是并发的,但并发不一定是并行的。

实现方式:

  • 多线程/多进程
  • 多个进程间的通信(包含启动)要比单一进程中的多线程间的通信的开销大,若不考虑共享内存可能会带来的问题,多线程将会成为主流语言(包括C++)更青睐的并发途径。此外,C++ 标准并未对进程间通信提供任何原生支持,所以使用多进程的方式实现,这会依赖与平台相关的API。
C++11线程创建
  • pthread_create/-beginthreadex/Create Thread
  • std::thread t(func, param), //#include
    • join, detach, param, std::ref
  • std::thread::hardware_concurrency()
    • 硬件支持的线程数量
  • std::this_thread::get_id, sleep_for

简单示例:

#include 
#include 

int thread_func(int ¶m)
{
	std::cout << "线程id:" << std::this_thread::get_id() << std::endl;

	for (int i = 0; i < 10; i++)
	{
		std::cout << "hello world" << std::endl;
		std::this_thread::sleep_for(std::chrono::seconds(1));
	}
	return 0;
}


int main()
{
	std::cout << "获取系统硬件线程数:" << std::thread::hardware_concurrency() << std::endl;
	int a = 100;

	//第一种创建线程的方式
	//std::thread t1(thread_func, std::ref(a));
	

	//第二种创建线程的方式 采用Lambda
	std::thread t2([=](int x) {
        std::cout << "lambda-线程id" << std::this_thread::get_id() << std::endl;
		std::cout << "x:" << x << " a:" << a << std::endl;
		for (int i = 0; i < 10; i++)
		{
			std::cout << "lambda-hello world" << std::endl;
			std::this_thread::sleep_for(std::chrono::seconds(1));
		}
		}, a);

	//采用join等待线程结束,避免主线程结束了子线程还在运行带来的问题
	t1.join();
	t2.join();
	//t.detach(); //可以采用detach设置为守护线程

	std::cout << "hello world" << std::endl;
	return 0;
}
互斥锁std::mutex

Mutex 又称互斥量,C++ 11中与 Mutex 相关的类(包括锁类型)和函数都声明在 头文件中,所以如果你需要使用 std::mutex,就必须包含 头文件。

Mutex系列类

  • std::mutex, 基本的mutex类。
  • std::resursive_mutex, 递归mutex类
  • std::time_mutex, 定时mutex类
  • std::recursive_timed_mutex,定时递归mutex类

Lock类

  • std::lock_guard, 与mutex RAII相关,方便线程对互斥锁上锁。
  • std::unique_lock, 与mutex RAII相关,方便线程对互斥量上锁,但提供了更好的上锁和解锁控制。

其他类型

  • std::once_flag
  • std::adopt_lock_t
  • std::defer_lock_t
  • std::try_to_lock_t

函数

  • std::try_lock, 尝试同时对多个互斥量上锁
  • std::lock, 可以同时对多个互斥量上锁
  • std::call_once, 同时多个线程需要同时调用某个函数,call_once可以保证多个线程对该函数只能调用一次。

std::mutex

  • 构造函数,std::mutex不允许拷贝构造,也不可以使用move拷贝,最初产生的mutex对象是出于unlock状态的。
  • lock(),调用线程将锁住该互斥量。
  • unlock(),解锁,释放对互斥锁的所有权。
  • try_lock(),尝试锁住互斥量,如果互斥量被占有,那么当前线程也不会被租塞。

std::time_mutex

std::time_mutex比std::mutex多了两个成员函数,try_lock_for(), try_lock_until()。

  • try_lock_for()函数接受一个时间范围,表示在这一段范围内线程如果没有获取到锁则被阻塞(与 std::mutex 的 try_lock() 不同,try_lock 如果被调用时没有获得锁则直接返回 false),如果在此期间其他线程释放了锁,则该线程可以获得对互斥量的锁,如果超时(即在指定时间内还是没有获得锁),则返回 false。
  • try_lock_until 函数则接受一个时间点作为参数,在指定时间点未到来之前线程如果没有获得锁则被阻塞住,如果在此期间其他线程释放了锁,则该线程可以获得对互斥量的锁,如果超时(即在指定时间内还是没有获得锁),则返回 false。

std::recursive_timed_mutex

和std::recursive_mutex与std::mutex的关系一样,std::recursive_timed_mutex的特种也可以从std::timed_mutex推导出来

自动加锁

原理:把锁封装到一个类里,在类生成对象时,会调用构造函数,所以我们在构造函数中拿锁,类销毁时,会调用析构函数,所以在析构函数中解锁,就使得了自动加锁和解锁。‘

std::mutex mut;
std::lock_guard lgk(mut);
std::unique_lock ulk(mut);

std::lock_guard

  • 类的构造函数禁用拷贝构造,且禁用移动构造。std:lock_ guard类除了构造函数和析构函数没有其它成员函数。在std::lock_ guard对象构造时,传入的mutex对象(即它所管理的mutex对象)会被当前线程锁住。在lock_ guard对 象被析构时,它所管理的mutex对象会自动解锁,不需要程序员手动调用lock和unlock对mutex进行上锁和解锁操作。它的特点如下:
    • 创建即加锁,作用域结束自动析构并解锁,无需手工解锁。
    • 不能中途解锁,必须等作用域结束才解锁
    • 不能复制

std::unique_lock

  • lock_ guard的升级加强版,它具有lock guard 的所有功能,同时又具有其他很多方法,允许延迟锁定,限时深度锁定,递归锁定,锁定所有权的转移以及与条件变量一起使用,使用起来更强灵活方便,能够应对更复杂的锁定需要。特点如下:
    • 创建时可以不锁定(通过指定第二个参数为std::defer_lock),而在需要时再解锁
    • 可以随时加锁解锁
    • 作用域规则同lock_guard,析构时自动释放锁
    • 不可复制,可以移动
    • 条件变量需要该类型的锁作为参数
    • 所有lock_guard能做到的,都可以使用unique_lock坐到

如何选择?

  • 如果使用锁的时候,首先考虑lock_guard
  • 它简单、明了、易读。如果用它完全OK,就不要考虑其他了。
  • 如果无法满足需求,就使用unique_lock
读写共享锁std::shared_mutex(C++17)
  • std::shared_mutex
  • std::shared_lock rlock(sharedmutex)
  • std::lock_guard wlock(sharedmutx)

示例:

#include 
#include 
#include 
#include 

class ThreadSafeCounter
{
public:
	ThreadSafeCounter() {};
	~ThreadSafeCounter() {};

	unsigned int get() const
	{
		std::shared_lock rlock(shmutex);
		return m_value;
	}

	unsigned int increment()
	{
		std::unique_lock wlock(shmutex);
		m_value++;
		return m_value;
	}

	void reset()
	{
		std::unique_lock wlock(shmutex);
		m_value = 0;
	}

private:
	unsigned int m_value = 0;
	mutable std::shared_mutex shmutex;
};

//全局
ThreadSafeCounter counter;
std::mutex mtx;

void reader()
{
	while (true)
	{
		std::lock_guard lck(mtx);
		std::cout << "reader" << std::this_thread::get_id() << ":" << counter.get() << std::endl;
		//std::this_thread::sleep_for(std::chrono::seconds(1));
	}
}

void writer()
{
	while (true)
	{
		std::lock_guard lck(mtx);
		std::cout << "write" << std::this_thread::get_id() << ":" << counter.increment() << std::endl;
		//std::this_thread::sleep_for(std::chrono::seconds(1));
	}
}

int main()
{
	std::thread readers[5];
	std::thread writers[5];

	for (int i = 0; i < 5; i++)
	{
		readers[i] = std::thread(reader);
	}

	for (int  i = 0; i < 5; i++)
	{
		writers[i] = std::thread(writer);
	}

	for (int i = 0; i < 5; i++)
	{
		readers[i].join();
		writers[i].join();
	}

	std::cout << counter.get() << std::endl;
}

C++11实现共享锁:

来源:访问原创地址

#ifndef __WRITE_FIRST_RW_LOCK_H
#define __WRITE_FIRST_RW_LOCK_H
 
#include 
#include 
 
class WfirstRWLock
{
public:
	WfirstRWLock() = default;
	~WfirstRWLock() = default;
public:
	void lock_read()
	{
		std::unique_lock ulk(counter_mutex);
		cond_r.wait(ulk, [=]()->bool {return write_cnt == 0; });
		++read_cnt;
	}
	void lock_write()
	{
		std::unique_lock ulk(counter_mutex);
		++write_cnt;
		cond_w.wait(ulk, [=]()->bool {return read_cnt == 0 && !inwriteflag; });
		inwriteflag = true;
	}
	void release_read()
	{
		std::unique_lock ulk(counter_mutex);
		if (--read_cnt == 0 && write_cnt > 0)
		{
			cond_w.notify_one();
		}
	}
	void release_write()
	{
		std::unique_lock ulk(counter_mutex);
		if (--write_cnt == 0)
		{
			cond_r.notify_all();
		}
		else
		{
			cond_w.notify_one();
		}
		inwriteflag = false;
	}
	
private:
	volatile size_t read_cnt{ 0 };
	volatile size_t write_cnt{ 0 };
	volatile bool inwriteflag{ false };
	std::mutex counter_mutex;
	std::condition_variable cond_w;
	std::condition_variable cond_r;
};
 
template 
class unique_writeguard
{
public:
	explicit unique_writeguard(_RWLockable &rw_lockable)
		: rw_lockable_(rw_lockable)
	{
		rw_lockable_.lock_write();
	}
	~unique_writeguard()
	{
		rw_lockable_.release_write();
	}
private:
	unique_writeguard() = delete;
	unique_writeguard(const unique_writeguard&) = delete;
	unique_writeguard& operator=(const unique_writeguard&) = delete;
private:
	_RWLockable &rw_lockable_;
};
template 
class unique_readguard
{
public:
	explicit unique_readguard(_RWLockable &rw_lockable)
		: rw_lockable_(rw_lockable)
	{
		rw_lockable_.lock_read();
	}
	~unique_readguard()
	{
		rw_lockable_.release_read();
	}
private:
	unique_readguard() = delete;
	unique_readguard(const unique_readguard&) = delete;
	unique_readguard& operator=(const unique_readguard&) = delete;
private:
	_RWLockable &rw_lockable_;
};
 
#endif
一次执行std::call_once
  • std::call_once:代码只被执行一次
    • template
    • void call_once(once_flag&flag, Fn &&fn, Args && args);
    • call_once函数第一个参数是std::once_flag的一个对象,迭戈参数可以是函数、成员函数、函数对象、Lambda函数。
线程同步
  • std::condition_variable
  • wait
  • notify_one/all

示例:

#include 
#include 
#include 
#include 

bool ready = false;
std::mutex mtx;
std::condition_variable cv;

void print_thrd(int id)
{
	std::unique_lock lck(mtx);
	//while (!ready)cv.wait(lck);
	cv.wait(lck, [] {return ready; });
	std::cout << "thread:" << id << std::endl;
}

int main()
{
	std::thread threads[5];
	for (int i = 0; i < 5; i++)
	{
		threads[i] = std::thread(print_thrd, i);
	}

	std::cout << "start to notify threads:n";
	std::this_thread::sleep_for(std::chrono::seconds(5));
	{
		std::unique_lock lck(mtx);
		ready = true;
		//cv.notify_all();
		cv.notify_one();
	}

	for (auto& th : threads)
	{
		th.join();
	}

	std::cout << "-------" << std::endl;
}
异步编程:future+async
  • 在编程的过程中,程序中往往会有一些功能或者很耗时或者很占计算资源,这样的程序我们往往倾向于让它在另一个线程中运行,我们的主线程可以先干其他事情,等干完了其他事情再来看看之前的工作干完了没有,如果干完了,那么拿出结果,如果没干完就等它干完或者再等一会。
  • 假如你等待一趟航班, 行旦你到达机杨,清除各种办理登机手续之后,你仍然必须等待通知登机,可能需要几个小时。你可以找点事做,比如看书、上网、进餐等,从根本上说你在等一件事。 而且你只是等这一次, 这次完了就是下-一趟就不是你的航班了。
  • 在C++中使用future+async来做达到这个目的。如果一个线程要等待一个一次性事件,可以使用future, 线程可以以一定的间隔检查这个future,在空闲期间线程可以干别的事。或者线程可以去忙其它的事,直到他想要检查这个结果的时候。
  • C++标准库有两种future,一种是唯一futures ( std::future)另一种是shared_ future(std: shared future<>), std::future只能与一个实例关联,而std:: shared_ future可 以在同一个事件上关联多个实例,第二种多个实实例会在同一时间变为ready状态。他们全部都可以访问实例里的数据。尽管future是用来多线程通信的工具,他是他们本身不提供同步访问。也就是多个线程访问一个future必须要使用同步手段,比如mutex等。
  • std::async不是唯一使用std::future的方法,也可以通过使用std:: packaged_ task或者std:: promise来使用

示例:

#include 
#include 
#include 

int async_task(int count)
{
	int sum = 0;
	for (int i = 1; i <= count; i++)
	{
		sum += i;
		std::this_thread::sleep_for(std::chrono::seconds(1));
	}
	return sum;
}

int main()
{
	std::cout << "start asyncn";
	//int为返回类型
	std::future fut = std::async(async_task, 10);
	std::cout << "wait for result for 15 seconds...n";

	std::chrono::seconds span(15);
	if (fut.wait_for(span) == std::future_status::timeout)
	{
		std::cout << "async timeoutn";
	}
	else
	{
		std::cout << "result:" << fut.get() << std::endl;
	}

}
基于锁的并发数据结构
#include 
#include 
#include 
#include 
#include 
#include 

struct empty_stack:std::exception
{
	const char* what() const throw()
	{
		return "empty stack";
	}
};

template
class threadsafe_stack
{
public:
	threadsafe_stack() {};
	threadsafe_stack(const threadsafe_stack& other)
	{
		std::lock_guard lock(other.mtx);
		data = other.data;
	}
	threadsafe_stack& operator=(const threadsafe_stack&) = delete;

	void push(T value)
	{
		std::lock_guard lock(mtx);
		data.push(std::move(value));
	}

	std::shared_ptr pop()
	{
		std::lock_guard lock(mtx);
		if (data.empty()) throw empty_stack();

		std::shared_ptr const res(std::make_shared(data.top()));
		data.pop();
		return res;
	}

	void pop(T& val)
	{
		std::lock_guard lock(mtx);
		if (data.empty()) throw empty_stack();
		val = std::move(data.top());
		data.pop();
	}

	bool empty()const
	{
		std::lock_guard lock(mtx);
		return data.empty();
	}

	~threadsafe_stack() {};

private:
	std::stack data;
	mutable std::mutex mtx;
};

threadsafe_stack g_stack;

void push_thrd()
{
	for (int i = 0; i < 1000; i++)
	{
		g_stack.push(i + 1);
	}
}

void pop_thrd()
{
	for (int i = 0; i < 1010; i++)
	{
		int value = 0;
		try
		{
			g_stack.pop(value);
			std::cout << "poped:" << value << std::endl;
		}
		catch (const std::exception& e)
		{
			std::cout << e.what() << std::endl;
		}
		
	}
}

int main()
{
	std::thread t1(push_thrd);
	std::thread t2(pop_thrd);

	t1.join();
	t2.join();
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/875869.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号