- 多线程
- std::thread
- detach
- joinable
- operator =
- 静态函数
- 线程同步
- 成员函数
- lock
- try_lock
- unlock
- 代码
- std::lock_guard
- std::recursive_mutex
- std::timed_mutex
- 读写锁
- 条件变量
- 生产消费者模型
- 原子变量
- 死锁
- 永远不要同时持有两把锁
- 保证双方上锁顺序一致
- 用std::lock同时对多个上锁
- 自旋锁
- 线程池
多线程
多线程主要是
线程的函数,创建、传参
互斥锁
条件变量
生产消费者模型
源自变量
死锁
*线程异步操作
c++提供的线程类std::thread基于这个类创建一个新的线程非常简单,只需要提供线程函数或者函数对象即可,并且可以同时指定线程函数的参数。
// ① thread() noexcept; // ② thread( thread&& other ) noexcept; // ③ template< class Function, class... Args > explicit thread( Function&& f, Args&&... args ); // ④ thread( const thread& ) = delete;
- 构造函数1:默认构造函数,构造一个线程对象,在这个线程中不执行任何处理动作
- 构造函数2:移动构造函数,将other的线程所有权转移给新的thread对象。之后other不在表示执行线程。
- 执行函数3:创建线程对象,并在该线程中之形函数f中的业务逻辑,args是要传递给函数f的参数
void func(int num, string str)
{
for (size_t i = 0; i < 10; i++)
{
cout << "子线程 " << i << " num :" << num << " str " << str << endl;
}
}
void do_some_work()
{
for (size_t i = 0; i < 5; i++)
{
cout << "work " << i << endl;
}
}
void test0()
{
//创建线程1,传入参数
thread t(func, 520, "i love you");
//创建线程2,无参
std::thread my_tread(do_some_work);
//获取线程id
cout << " t " << t.get_id() << endl;
cout << " t1 " << my_tread.get_id() << endl;
//等待线程终止
t.join();
my_tread.join();
}
detach
线程分离函数detach,在线程分离之后,主线程退出也会一并销毁创建出的所有子线程,在主线程退出之前,它可以脱离主线程继续独立的运行,任务执行完毕之后,这个子线程会自动释放自己占用的系统资源。
void detach();joinable
判断主线程和子线程是否处于关联状态,一般情况下两者处于关联状态,该函数返回一个布尔类型,但是调用detach后线程就不关联了
线程中的资源是不能被复制的,因此通过=操作符进行赋值操作最终并不会得到两个完全相同的对象。
// move (1) thread& operator= (thread&& other) noexcept; // copy [deleted] (2) thread& operator= (const other&) = delete;
通过上面=操作符重载声明可以得知:
- 如果 other 是一个右值,会进行资源所有权的转移
- 如果 other 不是右值,禁止拷贝,该函数被显示删除(=delete),不可用
thread线程类还提供了一个静态方法,计算当前计算机的cpu核心数,根据这个可以在程序中创建出数量相等的线程,每个线程独占一个cpu核心,这些线程就不用分时复用cpu时间片,此时程序并发效率是最高的
#include线程同步#include using namespace std; int main() { int num = thread::hardware_concurrency(); cout << "CPU number: " << num << endl; }
进行多线程编程,如果需要多个线程对一块内存操作,比如:同时读、同时写、同时读写对于后两种情况来说,如果不做任何的认为干涉就会出现各种各样的错误数据(比如变量自加一,首先从内存中取出变量放入寄存器,寄存器自加一,再从寄存器放回内存)
为了解决多线程数据混乱的方案就是进行线程同步,最常用的就是互斥锁,在c++11中提供了四种互斥锁
- std::mutex独占的互斥锁,不能递归使用
- std::timed_mutex带超时的独占互斥锁,不能递归使用
- std::recursive_mutex递归互斥锁,不带超时功能
- std::recursive_timed_mutex带超时的递归锁
也有称作为互斥量,两者是一个东西
用于给临界区加锁,并且只能有一个线程获得锁的所有权。他有阻塞线程的作用
void lock();try_lock
也可以对临界区进行加锁
区别是
二者的区别在于 try_lock() 不会阻塞线程,lock() 会阻塞线程:
- 如果互斥锁是未锁定状态,得到了互斥锁所有权并加锁成功,函数返回 true
- 如果互斥锁是锁定状态,无法得到互斥锁所有权加锁失败,函数返回 false
对被锁定的锁解锁,但是-只有拥有互斥锁所有权的线程,也就是对互斥锁上锁的线程才能对其解锁,其他线程是没有权限做这件事情的
int g_num = 0;
mutex g_num_mutex;
void slow_increment(int id)
{
for (size_t i = 0; i < 3; i++)
{
g_num_mutex.lock();
g_num++;
cout << id << " - >" << g_num << endl;
g_num_mutex.unlock();
this_thread::sleep_for(chrono::seconds(1));
}
}
void test0()
{
thread t1(slow_increment, 0);
thread t2(slow_increment, 1);
t1.join();
t2.join();
}
std::lock_guard
lock_guard是c++11新增的一个模板类,使用这个类可以简化互斥锁lock和unlock的写法,同时也更加安全。这个模板类的定义和常用的构造函数原型如下:
// 类的定义,定义于头文件template< class Mutex > class lock_guard; // 常用构造函数 explicit lock_guard( mutex_type& m );
lock_guard 在使用上面提供的这个构造函数构造对象时,会自动锁定互斥量,而在退出作用域后进行析构时就会自动解锁,从而保证了互斥量的正确操作,避免忘记 unlock() 操作而导致线程死锁。lock_guard 使用了 RAII 技术,就是在类构造函数中分配资源,在析构函数中释放资源,保证资源出了作用域就释放。
//由于lock_guard是独占锁,所以递归调用会导致死锁
struct Calculate
{
Calculate() : m_i(6){}
void mul(int x)
{
cout << "mul" << endl;
lock_guard locker(m_mutex);
m_i *= x;
void test2()
{
//Calculate cal;
Calculate1 cal;
cal.both(6, 3);
}
}
void div(int x){
cout << "div" << endl;
lock_guard locker(m_mutex);
m_i /= x;
}
void both(int x, int y)
{
cout << "both" << endl;
lock_guard locker(m_mutex);
mul(x);
div(y);
}
int m_i;
mutex m_mutex;
};
void test2()
{
Calculate cal;
cal.both(6, 3);
}
std::recursive_mutex
递归互斥锁允许统一鲜橙多次获得互斥锁,可以用来解决同一线程多次获取互斥量时死锁的问题
//std::recursive_mutex允许统一线程多次获得互斥锁
struct Calculate1
{
Calculate1() : m_i(6){}
void mul(int x)
{
cout << "mul" << endl;
lock_guard locker(m_mutex);
m_i *= x;
}
void div(int x){
cout << "div" << endl;
lock_guard locker(m_mutex);
m_i /= x;
}
void both(int x, int y)
{
cout << "both" << endl;
lock_guard locker(m_mutex);
mul(x);
div(y);
}
int m_i;
recursive_mutex m_mutex;
};
void test2()
{
//Calculate cal;
Calculate1 cal;
cal.both(6, 3);
}
- 使用递归互斥锁的场景往往都是可以简化的,使用递归互斥锁很容易放纵复杂逻辑的产生,从而导致bug的产生
- 递归互斥锁比非递归互斥锁效率要低一些。
- 递归互斥锁虽然允许同一个线程多次获得同一个互斥锁的所有权,但最大次数并未具体说明,一旦超过一定的次数,就会抛出std::system错误。
超时独占锁,主要是在获取互斥锁资源时增加了超时等待功能,因为不知道获取锁资源需要等待多长时间,为了保证不一直等待下去,设置了一个超时时长,超时后线程就可以解除阻塞去做其他事情了。
std::timed_mutex 比 std::_mutex 多了两个成员函数:try_lock_for() 和try_lock_until():
timed_mutex g_mutex;
void work()
{
chrono::seconds timeout(1);
while (true)
{
if (g_mutex.try_lock_for(timeout))
{
cout << "当前线程id " << this_thread::get_id() << " 得到互斥锁所有权 " << endl;
//模拟处理任务用了一定时长
this_thread::sleep_for(chrono::seconds(10));
g_mutex.unlock();
break;
}else
{
cout << "当前线程id " << this_thread::get_id() << " 没有得到互斥锁所有权 " << endl;
//模拟其他任务
this_thread::sleep_for(chrono::seconds(1));
}
}
}
void test3()
{
thread t1(work);
thread t2(work);
t1.join();
t2.join();
}
读写锁
读可以共享,写必须独占,且写和读不能共存
shared_mutex
class MTVVector
{
std::vecotr m_arr;
mutable std::shared_mutex m_mtx;
public:
void push_back(int val)
{
//写锁
m_mtx.lock();
m_arr.push_back(cal);
m_mtx.unlock();
}
size_t size() const{
//读锁
m_mtx.lock_shared();
m_arr.size();
m_mtx.unlock();
}
};
raii思想的
class MTVVector
{
std::vecotr m_arr;
mutable std::shared_mutex m_mtx;
public:
void push_back(int val)
{
//写锁
std::unique_lock qrd(m_mtx);
m_arr.push_back(cal);
}
size_t size() const{
//读锁
std::shared_lock qrd(m_mtx);
m_arr.size();
}
};
条件变量
条件变量是c++11提供的另外一种用于等待的同步机制,他能阻塞一个或多个线程,直到收到另外一个线程发出的通知或者超时时,才会唤醒当前阻塞的线程,条件变量需要和互斥量配合起来使用,c++11提供了两种条件变量
- condition_variable:需要配合 std::unique_lockstd::mutex 进行 wait 操作,也就是阻塞线程的操作。
- condition_variable_any:可以和任意带有 lock()、unlock() 语义的 mutex 搭配使用,也就是说有四种:
- std::mutex:独占的非递归互斥锁
- std::timed_mutex:带超时的独占非递归互斥锁
- std::recursive_mutex:不带超时功能的递归互斥锁
- std::recursive_timed_mutex:带超时的递归互斥锁
条件变量通常用于生产者消费者模型,大致使用过程如下
- 拥有条件变量的线程获取互斥量
- 循环检查某个条件,如果条件不满足阻塞当前线程,否则继续乡下执行
- 产品数量达到上限,生产者阻塞,否则一直生产
- 产品的数量为0,消费者阻塞,否则消费者一直消费
- 条件满足之后调用notify_one()或者notify_all()唤醒一个或者所有被阻塞的线程
#include原子变量#include #include #include #include #include #include using namespace std; class SyncQueue { private: //存储队列数据 list
m_queue; //互斥锁 mutex m_mutex; //不为空的条件变量 condition_variable m_notEmpty; //没有满的条件变量 condition_variable m_notFull; //任务队列的最大任务数 int m_max_size; public: SyncQueue(int max_size) : m_max_size(max_size){} //入队 void put(const int& x) { unique_lock locker(m_mutex); //判断任务队列是否已经满了 while (m_queue.size() == m_max_size) { cout << "任务队列已满,请耐心等候 ..." << endl; //阻塞线程 m_notFull.wait(locker); } //将任务放入到任务队列中 m_queue.push_back(x); cout << x << " 被生产" << endl; //通知消费者消费 m_notEmpty.notify_one(); } int take() { unique_lock locker(m_mutex); while (m_queue.empty()) { cout << "任务队列已空,请耐心等候..." << endl; m_notEmpty.wait(locker); } //从任务队列中取出任务(消费) int x = m_queue.front(); m_queue.pop_front(); cout << x << " 被消费" << endl; //通知生产者进行生产 m_notFull.notify_one(); return x; } bool empty() { lock_guard locker(m_mutex); return m_queue.empty(); } bool full() { lock_guard locker(m_mutex); return m_queue.size() == m_max_size; } int size() { lock_guard locker(m_mutex); return m_queue.size(); } ~SyncQueue(){m_queue.clear();} }; void test0() { SyncQueue taskQ(50); //生产者 auto produce = bind(&SyncQueue::put, &taskQ, placeholders::_1); //消费者 auto consume = bind(&SyncQueue::take, &taskQ); //生产者消费者各三个线程 thread t1[3]; thread t2[3]; for (size_t i = 0; i < 3; i++) { t1[i] = thread(produce, i+100); t2[i] = thread(consume); } for (size_t i = 0; i < 3; i++) { t1[i].join(); t2[i].join(); } return; } class SyncQueueT { private: //存储队列数据 list m_queue; //互斥锁 mutex m_mutex; //不为空的条件变量 condition_variable_any m_not_empty; //没有满的条件变量 condition_variable_any m_not_full; //任务队列的最大任务个数 int m_max_size; public: SyncQueueT(int maxsize): m_max_size(maxsize){} void put(const int& x){ lock_guard locker(m_mutex); //根据条件阻塞线程 m_not_full.wait(m_mutex, [this](){ return m_queue.size() != m_max_size; }); //将任务放入到任务队列中去 m_queue.push_back(x); cout << x << " +++" << endl; this_thread::sleep_for(chrono::seconds(1)); //通知消费者消费 m_not_empty.notify_one(); } int take(){ lock_guard locker(m_mutex); m_not_empty.wait(m_mutex, [this](){ return !m_queue.empty(); }); //从任务队列中取出任务去消费 int x = m_queue.front(); m_queue.pop_front(); //通知生产者去生产 m_not_full.notify_one(); cout << x << " ---" << endl; return x; } bool empty(){} bool full(){} int size(){} }; void test1(){ SyncQueueT taskQ(50); //生产者 auto produce = bind(&SyncQueueT::put, &taskQ, placeholders::_1); //消费者 auto consume = bind(&SyncQueueT::take, &taskQ); //生产者消费者各三个线程 thread t1[100]; thread t2[100]; for (size_t i = 0; i < 100; i++) { t1[i] = thread(produce, i+100); t2[i] = thread(consume); } for (size_t i = 0; i < 100; i++) { t1[i].join(); t2[i].join(); } return; } int main() { test0(); cout << "--------------------------- " << endl; test1(); return 0; }
c++提供了原子类型std::atomic,通过这个原子类型管理的内部变量就可以称之为源自变量,我们可以给原子类型指定bool/char/int/long/指针等类型作为模板参数(不支持浮点型和复合型)
原子指的是一系列不可被cpu上下文交换的机器指令,这些指令组合在一起就形成了原子操作,在多核cpu下,当某个cpu核心开始运行原子操作时,会暂停其他cpu内核对内存的操作,以保证原子操作不会被其他cpu内核做干扰
原子操作是通过指令提供的,所以性能比锁和消息传递好很多。
#include死锁 永远不要同时持有两把锁 保证双方上锁顺序一致 用std::lock同时对多个上锁#include #include #include #include using namespace std; struct Couter { // Couter():m_value(0){} void increment(){ for (size_t i = 0; i < 10000; i++) { m_value++; // cout << "increment number " << m_value // << ", thread id " << this_thread::get_id() << endl; this_thread::sleep_for(chrono::milliseconds(1)); } } void decrement() { for (size_t i = 0; i < 10000; i++) { m_value --; // cout << "decrement number " << m_value << ", thread id" // << this_thread::get_id() << endl; this_thread::sleep_for(chrono::milliseconds(1)); } } // int m_value; atomic_int m_value; // atomic m_value = 0; }; void test0() { Couter c; c.m_value = 0; auto increment = bind(&Couter::increment, &c); auto decrement = bind(&Couter::decrement, &c); thread t1(increment); thread t2(decrement); t1.join(); t2.join(); std::cout << " value " << c.m_value << std::endl; return; } int main() { test0(); return 0; }
std::mutex mtx1;
std::mutex mtx2;
void thread()
{
std::lock(mtx1, mtx2);
mtx1.unlock();
mtx2.unlock();
}
void thread1()
{
std::lock(mtx1, mtx2);
mtx1.unlock();
mtx2.unlock();
}
自旋锁
使用递归锁
#pragma once #include#include #include #include #include #include #include #include #include #include #include namespace std { //线程池最大容量 #define THREADPOOL_MAX_NUM 16 //#define THREADPOOL_AUTO_GROW class ThreadPool { private: using Task = function ; //定义类型 //线程池 vector _pool; //任务队列 queue _tasks; //同步 mutex _lock; //条件阻塞 condition_variable _task_cv; //线程池是否运行 atomic _run{true}; //空闲线程池数量 atomic _idlThrNum{0}; public: inline ThreadPool(unsigned short size = 4){addThread(size);} inline ~ThreadPool() { _run = false; //唤醒所有线程执行 _task_cv.notify_all(); for(thread& thread : _pool) { if (thread.joinable()) { //等待任务结束 thread.join(); } } } public: //提交一个任务 //调用.get()方法获取返回值会等待任务执行完,获取返回值 //两种方法可以实现调用类成员 //一种是 bind //另外一种是 mem_fn template auto commit(F&& f, Args&&... args) ->future { //停止? if (!_run) { throw runtime_error("commit on Thread is stoped."); } //typename std::result_of ::type, 函数f的返回值类型 using RetType = decltype(f(args...)); auto task = make_shared >( bind(forward (f), forward (args)...) );//把函数入口及参数,打包(绑定) future future = task->get_future(); { //添加任务到队列 //对当前块的语句加锁 lock_guard lock{_lock}; _tasks.emplace([task](){ (*task)(); }); } #ifdef THREADPOOL_AUTO_GROW if (_idlThrNum < 1 && _pool.size() < THREADPOOL_MAX_NUM) { addThread(1); } #endif //唤醒一个线程执行 _task_cv.notify_one(); return future; } //空闲线程数量 int idlCount(){return _idlThrNum;} //线程数量 int hrCount(){return _pool.size();} #ifdef THREADPOOL_AUTO_GROW private: #endif void addThread(unsigned short size) { for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size) { _pool.emplace_back([this]{ while (_run) { //获取一个待执行的task Task task; { unique_lock lock{_lock}; _task_cv.wait(lock, [this]{ return !_run || !_tasks.empty(); }); //wait 直到有 task if (!_run && _tasks.empty()) { return; } //按照先入先出从队列取出一个task task = move(_tasks.front()); _tasks.pop(); } _idlThrNum --; //执行任务 task(); _idlThrNum ++; } }); _idlThrNum ++; } } }; }



