并发概念
- 同一个系统中多个活动同时进行
并发的方式
- 多进程
- 进程间通信:
- 设置复杂或者速度慢
- OS有额外保护或者高级通信机制
- 进程间通信:
- 多线程
- 共享内存带来隐患
C++11 多线程
C++11 新标准中引入了四个头文件来支持多线程编程,他们分别是<atomic> ,<thread>,<mutex>,<condition_variable>,<future>
。
- atomic, 声明了atomic 和atomic_flag 类,原子变量
- thread, std::this_thread类在其中
- condition_variable 条件变量相关的类std::condition_variable 和std::condition_variable_any
- future: std::promise, std::async()
std::thread
构造函数:
- thread() noexcept;
template <class Fn, class... Args> explicit thread (Fn&& fn, Args&&... args);
- 拷贝构造函数delete
- thread (thread&& x) noexcept;
赋值运算符函数:
- thread& operator= (thread&& rhs) noexcept;
仅支持move赋值,1
2
3
4
5
6
7
8
9
10
11
12
13
14void f(); // 函数
struct F { // 函数对象
void operator()(); // F的调用操作符(§6.3.2)
};
void user()
{
thread t1 {f}; // f() 在单独的线程里和执行
thread t2 {F()}; // F()() 在单独的线程里和执行
t1.join(); // 等待t1
t2.join(); // 等待t2
} - std::this_thread::get_id() 返回线程号
- std::thread::hardware_concurrency() 返回系统线程数
std::mutex
- std::mutex
- std::recursive_mutex
- std::time_mutex: try_lock_for(std::chrono::seconds(1)), try_lock_until()
- std::recursive_timed_mutex
lock类
- std::lock_guard
- std::unique_lock (std::shared_lock要到14才支持,11里面用读写锁建议用boost)
其他类型
- std::once_flag
- std::adopt_lock_t
- std::defer_lock_t
- std::try_to_lock_t
函数
- std::try_lock 尝试同时对多个互斥量上锁, std::lock
- std::call_once 只掉一次
初始化
使用call_once 初始化:
1 | auto f = []() // 在线程里运行的lambda表达式 |
线程管控
- detach() 分离线程,将std::thread对象同线程分离
- join() 汇合线程:等待线程结束后再结束
- 异常境况下可能跳过join()
利用RAII等待线程完结
1 | class thread_guard { |
传递参数
参数会按照默认方式复制到内部存储空间,新创建的线程才能访问它们。然后,这些副本被当成临时变量,右值传给新线程的函数或者可调用对象。
- 传递时尽量减小副本开销
注意
- java的volatile会让编译器在被修饰变量的写操作后插入写屏障,C++不会
- C++的volatile只是表示变量易变,禁止编译器优化重排指令,跟多线程完全没有关系
- 不加锁应该用atomic变量
异步任务
future&promise
[!NOTE] future & promise
future和promise侧重点在于,两个任务之间传递值时,它们能避免锁的显式使用; “系统”高效地实现这个传递。
基本思路很简单:一个任务需要给另一个任务传递值时,就把这个值放进promise。 “大变活值”之后,具体实现会把这个值弄进对应的future里,(通常是该任务的启动者)就能从future里把值读出来了。
![[C++并发.png]]
手动创建线程用std::promise,在线程返回的时候,用set_value() 设置返回值。在主线程中用get_future()获得std::future对象,进一步再进行get。
1 | std::promise<int> pret; |
只要线程没执行完, wait()会一直等,而wait_for可以设置一个最长等待时间,返回一个std::future_status表示等待是否成功。
package_task
packaged_task
封装了代码,能够把任务的返回值或异常放进promise
, 它拥有其promise并且(间接地)要对该任务占有的资源负责。
1 | double accum(double* beg, double* end, double init) |
async
想发起一个可能异步执行的任务,可以用async(),async()把函数调用的“调用部分”和“获取结果部分”拆开, 并把它们都与实际执行的任务分离开。 使用async(),就不用再去操心线程和锁, 相反,你需要考虑的就只是那个有可能异步执行的任务。
1 | double comp4(vector<double>& v) |
std::async接受一个带返回值的lambda,自身返回一个std::future对象。lambda函数体在另一个线程执行。
1 | std::future<int> fret = std::async([&] { |
互斥量
std::lock_guard
符合RAII思想的上锁解锁std::unique_lock
符合RAII思想,但是自由度高,可以提前解锁
- std::unique_lock grd(mtx, std::defer_lock)
- std::unique_lock grd(mtx, std::try_to_lock) 调用mtx.try_lock
- std::unique_lock grd(mtx, std::adopt_lock) mutex已经上锁
1 | std::unique_lock grd(mtx); |
使用std::unique_lock grd(mtx, std::defer_lock)
则不会在构造函数中调用mtx.lock,需要手动调用lock才能上锁。
mtx.try_lock()
如果发现已经上锁,返回false,无阻塞mtx.try_lock_for(std::chrono::milliseconds(500))
等待一段时间mtx.try_lock_until
死锁
上锁顺序不一致就可能导致死锁,例如thread1先锁了1, t2锁2, 然后这时候t1请求2, t2
请求1,就会发生死锁。上锁顺序保持一致即可,或者使用:
1 | std::lock(mtx1, mtx2); //保证上锁顺序一致 |
条件变量
1 | std::condition_variable cv; |
多个等待者:cv.notify_all()唤醒全部
原子变量
两种概念:
- 获得:一个内存的读操作,当前线程的任后面的读写操作不能重排到前面(否则就读错了)
x.load(memory_order_acquire)
- 释放:一个内存的写操作,当前线程前面的读写操作不允许重拍到后面(以防读到写过的值)。
x.store(2, memory_order_release)
acquire 和 release 通常都是配对出现的,目的是保证如果对同一个原子对象的 release 发生在 acquire 之前的话,release 之前发生的内存修改能够被 acquire 之后的内存读取全部看到。
mutex上锁需要通过系统调用,线程阻塞后可能会调度到其他线程进行执行,开销很大。
atomic 会锁住内存总线,放弃乱序优化等策略
- fetch_add : +=,返回增加前的值
- store : =
- exchange : 返回旧值
- compare_exchange_strong 比较是否相等,相等则写入