0%

C++并发

并发概念

  • 同一个系统中多个活动同时进行

并发的方式

  • 多进程
    • 进程间通信:
      • 设置复杂或者速度慢
      • OS有额外保护或者高级通信机制
  • 多线程
    • 共享内存带来隐患

C++11 多线程

C++11 新标准中引入了四个头文件来支持多线程编程,他们分别是<atomic> ,<thread>,<mutex>,<condition_variable>,<future>

  • atomic, 声明了atomic 和atomic_flag 类,原子变量
  • thread, std::this_thread类在其中
  • condition_variable 条件变量相关的类std::condition_variable 和std::condition_variable_any
  • future: std::promise, std::async()

std::thread

构造函数:

  • thread() noexcept;
  • template <class Fn, class... Args> explicit thread (Fn&& fn, Args&&... args);
  • 拷贝构造函数delete
  • thread (thread&& x) noexcept;

赋值运算符函数:

  • thread& operator= (thread&& rhs) noexcept;
    仅支持move赋值,
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    void f();               // 函数

    struct F { // 函数对象
    void operator()(); // F的调用操作符(§6.3.2)
    };

    void user()
    {
    thread t1 {f}; // f() 在单独的线程里和执行
    thread t2 {F()}; // F()() 在单独的线程里和执行

    t1.join(); // 等待t1
    t2.join(); // 等待t2
    }
  • std::this_thread::get_id() 返回线程号
  • std::thread::hardware_concurrency() 返回系统线程数

std::mutex

  • std::mutex
  • std::recursive_mutex
  • std::time_mutex: try_lock_for(std::chrono::seconds(1)), try_lock_until()
  • std::recursive_timed_mutex

lock类

  • std::lock_guard
  • std::unique_lock (std::shared_lock要到14才支持,11里面用读写锁建议用boost)

其他类型

  • std::once_flag
  • std::adopt_lock_t
  • std::defer_lock_t
  • std::try_to_lock_t

函数

  • std::try_lock 尝试同时对多个互斥量上锁, std::lock
  • std::call_once 只掉一次

初始化

使用call_once 初始化:

1
2
3
4
5
6
7
8
9
10
11
auto f = []()                // 在线程里运行的lambda表达式
{
std::call_once(flag, // 仅一次调用,注意要传flag
[](){ // 匿名lambda,初始化函数,只会执行一次
cout << "only once" << endl;
} // 匿名lambda结束
); // 在线程里运行的lambda表达式结束
};

thread t1(f); // 启动两个线程,运行函数f
thread t2(f);

线程管控

  • detach() 分离线程,将std::thread对象同线程分离
  • join() 汇合线程:等待线程结束后再结束
  • 异常境况下可能跳过join()

利用RAII等待线程完结

1
2
3
4
5
6
7
8
9
10
11
12
class thread_guard {
std::thread& t;
public:
explicit thread_guard(std::thread& t_) :t(t_){}
~thread_guard() {
if (t.joinable()) {
t.join();
}
}
thread_guard(thread_guard const&)=delete;
thread_guard& operator=(thread_guard const&)=delete;
};

传递参数

参数会按照默认方式复制到内部存储空间,新创建的线程才能访问它们。然后,这些副本被当成临时变量,右值传给新线程的函数或者可调用对象。

  • 传递时尽量减小副本开销

注意

  • java的volatile会让编译器在被修饰变量的写操作后插入写屏障,C++不会
  • C++的volatile只是表示变量易变,禁止编译器优化重排指令,跟多线程完全没有关系
  • 不加锁应该用atomic变量

异步任务

future&promise

[!NOTE] future & promise
future和promise侧重点在于,两个任务之间传递值时,它们能避免锁的显式使用; “系统”高效地实现这个传递。

基本思路很简单:一个任务需要给另一个任务传递值时,就把这个值放进promise。 “大变活值”之后,具体实现会把这个值弄进对应的future里,(通常是该任务的启动者)就能从future里把值读出来了。

![[C++并发.png]]

手动创建线程用std::promise,在线程返回的时候,用set_value() 设置返回值。在主线程中用get_future()获得std::future对象,进一步再进行get。

1
2
3
4
5
6
7
8
9
10
11
12
std::promise<int> pret;
std::thread t1([&] {
auto ret = download("hello.zip");
pret.set_value(ret);
});

std::future<int> fret = pret.get_future();

...//dosomething

int ret = fret.get();
t1.join();

只要线程没执行完, wait()会一直等,而wait_for可以设置一个最长等待时间,返回一个std::future_status表示等待是否成功。

package_task

packaged_task封装了代码,能够把任务的返回值或异常放进promise, 它拥有其promise并且(间接地)要对该任务占有的资源负责。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
double accum(double* beg, double* end, double init)
// 计算[beg:end)的和,初始值为init
{
return accumulate(beg,end,init);
}

double comp2(vector<double>& v)
{
using Task_type = double(double*,double*,double); // 任务的类型

packaged_task<Task_type> pt0 {accum}; // 把任务(即accum)打包
packaged_task<Task_type> pt1 {accum};

future<double> f0 {pt0.get_future()}; // 获取pt0的future
future<double> f1 {pt1.get_future()}; // 获取pt1的future

double* first = &v[0];
thread t1 {move(pt0),first,first+v.size()/2,0}; // 为pt0启动一个线程
thread t2 {move(pt1),first+v.size()/2,first+v.size(),0}; // 为pt1启动一个线程

// ...

return f0.get()+f1.get(); // 取结果
}

async

想发起一个可能异步执行的任务,可以用async(),async()把函数调用的“调用部分”和“获取结果部分”拆开, 并把它们都与实际执行的任务分离开。 使用async(),就不用再去操心线程和锁, 相反,你需要考虑的就只是那个有可能异步执行的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
double comp4(vector<double>& v)
// 如果v足够大,就触发多个任务
{
if (v.size()<10000) // 值得采用并发吗?
return accum(v.begin(),v.end(),0.0);

auto v0 = &v[0];
auto sz = v.size();

auto f0 = async(accum,v0,v0+sz/4,0.0); // 第一份
auto f1 = async(accum,v0+sz/4,v0+sz/2,0.0); // 第二份
auto f2 = async(accum,v0+sz/2,v0+sz*3/4,0.0); // 第三份
auto f3 = async(accum,v0+sz*3/4,v0+sz,0.0); // 最后一份

return f0.get()+f1.get()+f2.get()+f3.get(); // 收集并求和结果
}

std::async接受一个带返回值的lambda,自身返回一个std::future对象。lambda函数体在另一个线程执行。

1
2
3
4
5
6
7
8
9
10
11
std::future<int> fret = std::async([&] {
....
return 1;
});

int ret = fret.get();
// or fret.wait(); // 等待,但是不要返回值
// std::future_status s=fret.wait_for(std::chrono::milliseconds(1000));等1000ms
std::future<int> fret = std::async(std::launch::deferred, [&]{ // 函数式编程的异步
...
});

互斥量

std::lock_guard 符合RAII思想的上锁解锁
std::unique_lock 符合RAII思想,但是自由度高,可以提前解锁

  • std::unique_lock grd(mtx, std::defer_lock)
  • std::unique_lock grd(mtx, std::try_to_lock) 调用mtx.try_lock
  • std::unique_lock grd(mtx, std::adopt_lock) mutex已经上锁
1
2
3
4
std::unique_lock grd(mtx);
...
grd.unlock();
...// outside the lock

使用std::unique_lock grd(mtx, std::defer_lock) 则不会在构造函数中调用mtx.lock,需要手动调用lock才能上锁。

mtx.try_lock() 如果发现已经上锁,返回false,无阻塞
mtx.try_lock_for(std::chrono::milliseconds(500)) 等待一段时间
mtx.try_lock_until

死锁

上锁顺序不一致就可能导致死锁,例如thread1先锁了1, t2锁2, 然后这时候t1请求2, t2
请求1,就会发生死锁。上锁顺序保持一致即可,或者使用:

1
2
3
4
5
6
std::lock(mtx1, mtx2); //保证上锁顺序一致

{
// lock 的RAII 版本,std::scoped_lock,可以上锁多个lock
std::scoped_lock gcd(mtx1, mtx2);
}

条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::condition_variable cv;
std::mutex mtx;

bool ready = false;

std::thread t1( [&] {
std::cout << "t1" <<std::endl;
std::unique_lock lck(mtx);
cv.wait(lck, [&] {return ready;});
std::cout << "t1 is awake" <<std::endl;
});

std::this_thread::sleep_for(std::chrono::seconds(1));

std::cout << "notifying.." << std::endl;
cv.notify_one();

ready = true;
cv.notify_one();

t1.join();

多个等待者:cv.notify_all()唤醒全部

原子变量

两种概念:

  • 获得:一个内存的读操作,当前线程的任后面的读写操作不能重排到前面(否则就读错了)x.load(memory_order_acquire)
  • 释放:一个内存的写操作,当前线程前面的读写操作不允许重拍到后面(以防读到写过的值)。x.store(2, memory_order_release)

acquire 和 release 通常都是配对出现的,目的是保证如果对同一个原子对象的 release 发生在 acquire 之前的话,release 之前发生的内存修改能够被 acquire 之后的内存读取全部看到。

mutex上锁需要通过系统调用,线程阻塞后可能会调度到其他线程进行执行,开销很大。

atomic 会锁住内存总线,放弃乱序优化等策略

  • fetch_add : +=,返回增加前的值
  • store : =
  • exchange : 返回旧值
  • compare_exchange_strong 比较是否相等,相等则写入