C++11 并发1 原语
- thread join && detach
- std::once_flag && call_once
- std::mutex
- std::condition_variable
- std::unique_lock && std::lock_guard
- std::async && std::future
- std::promise
- std::packaged_task
- std::shared_future
其实和 posix pthread 比,我还是更习惯 posix…
核心就是 thread 使用。
thread join && detach
和 posix pthread 里的概念一致。
join
等待线程结束;detach
分离方式使用线程,就像pthread_create
。
thread 可以用任何函数对象来构造,并传递参数:
//1 normal
void func(int a, string b) {}
thread t(func,1,"sunny"); //可以判断thread里实现了类似bind的操作
//2 lambda
thread t1([](int a){ cout << a;}) //lambda
//3 函数对象
struct funobj {
void operator()(){ cout << "good day"}
};
funcobj fo;
thread t2(f0); // function obj
//4 直接用类成员函数和对象
class X
{
public:
void do_lengthy_work(int);
};
X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);
std::once_flag && call_once
类似 pthread_once。
某些场景下,我们需要代码只被执行 1 次,比如单例类的初始化,考虑到多线程安全,需要进行加锁控制。C++11 中提供的 call_once 可以很好的满足这种需求,使用又非常简单。
#include<mutex>
template <class Fn, class... Args>
void call_once (once_flag& flag, Fn&& fn, Args&&...args);
要求 once_flag 对象的生命周期要长于 fn 的生命周期,定义为全局的比较好。
总之,需要某些过程仅执行1次的,如单例实例化,某些连接初始化,都可以考虑。
想起 streamer 项目里streamer_thread
就是这个需求。
std::mutex
mutex 同步原语,用来保护共享的资源的;
- std::mutex
- std::recursize_mutex
- std::time_mutex 带超时的普通 lock。
而 boost::shared_mutex 可对应 posix 里读写锁。
boost::shared_mutex mut;
//读保护用共享锁
read_sth {
boost:shared_lock<boost::shared_mutex> lk(mut);
...//reading job
}
//写保护用普通锁
write_sth {
std::lock_guard<boost::shared_mutex> lk(mut);
...//writing job
}
std::condition_variable
- condition_varivable
- condition_variable_any
见线程安全队列一文。
std::unique_lock && std::lock_guard
对原始 mutex 一层封装,核心是自动加解锁
。
//normal
void process(){
LOCK();.
...
UNLOCK();
}
//unique_lock
std::mutex m;
void process(){
lock_guard lg(m); //lg的生命周期结束则解锁。RAII again.
...
}
二者区别
:
unique_lock 可以更灵活的使用锁
,有更多的 api:
std::async && std::future
类似 thread, async 异步提交任务,future 从异步任务获取返回值,就是 posix 的int pthread_join(pthread_t thread, void **retval);
std::future 提供了一种访问异步操作结果的机制, future 的异步等待函数有:
- wait 仅等待完成
- wait_for 超时等待
- get 等待并取得结果
注意异步等待只能等待一次
。
可以设置 async 的参数:
- std::launch:async (默认)
- std::launch:deferred 延迟执行,直到 get or wait 调用时才执行
int working_thread(string name ,int a, int b)
{
cout << name << " counting...\n";
//chrono::seconds(1);// 类似sleep()
sleep(1);
return a+b;
}
void async_future_test()
{
thread t(working_thread,string("baby"),1,1);
t.detach();
future<int> dad = async(working_thread, string("daddy"),2,2);
future<int> mom = async(launch::deferred , working_thread, string("mommy"),3,3);
future<int> peggy = async(launch::deferred , working_thread, string("peggy"),4,4);
//等待async线程执行完毕并返回结果, 类似pthread_join
cout << "dad get " << dad.get() << endl;
cout << "mom get " << mom.get() << endl;
cout << "peggy wait" << endl ;
peggy.wait();
}
std::promise
std::promise 为获取线程函数中的某个值提供便利,在线程函数中给外面传进来的promise
赋值,当线程函数执行完成之后就可以通过 promise 获取该值了,值得注意的是取值是间接的通过 promise 内部提供的 future 来获取的,比函数返回值要更灵活
:
“期望”使得函数化编程模式并发化(FP-style concurrency)在 C++中成为可能;一个“期望”对象可以在线程间互相传递,并允许其中一个计算结果依赖于另外一个的结果,而非对共享数据的显式访问。
promise 的方法:
- set_value 设置 promise 返回值
- set_value_at_thread_exit 设置 thread_exit 时 primise 值
- set_exception 设置为 exception
- set_exception_at_thread_exit
如果要获取线程的中间结果,用普通 thread,那肯定又是全局变量满天飞,以及各种加锁。
用set_value
甚至在线程没有结束前就可以返回中间结果。
如果在 set_value 之前,promise 已经被析构,又叫违背承诺
,则 get 可能无限等待下去。
int promise_thread(int a, int b, int c, promise<vector<int>> & p)
{
vector<int> vec;
if(a && b && c) {
vec.push_back(a+b);
vec.push_back(b+c);
vec.push_back(a+b+c);
if(a && b && c) {
vec.push_back(a+b);
vec.push_back(b+c);
vec.push_back(a+b+c);
try{
//p.set_value_at_thread_exit(vec);
p.set_value(vec);
} catch(exception_ptr e) {
p.set_exception(e);
}
} else return 1;
} else return 1;
sleep(1);
cout << "promise_thread end" << endl;
return 0;
}
void promise_test()
{
promise<vector<int>> promise_result;
auto task = thread(promise_thread,1,2,3,ref(promise_result));
future<vector<int>> f = promise_result.get_future();
auto finial = f.get();
task.detach();
//线程还没执行完毕就可以取出中间结果
cout << "got result\n";
for(auto i:finial) {
cout << i << endl;
}
}
只能说把对线程的使用做到超级灵活
,就看你怎么用。
std::packaged_task
更高级的封装了,高度抽象还是为了使用更方便,应付更复杂的场景。
std::shared_future
std::future 也有局限性,在很多线程在等待的时候,只有一个线程能获取等待结果。当多个线程需要等待相同的事件的结果,你就需要使用 std::shared_future 来替代 std::future 了
std::future 是只移动的,所以其所有权可以在不同的实例中互相传递,但是只有一个实例可以获得特定的同步结果;而 std::shared_future 实例是可拷贝的,所以多个对象可以引用同一关联“期望”的结果。
多个线程等待时,每个都可以用自己的share_future
去 get.
std::future 有一个 share()成员函数,可用来创建新的 std::shared_future ,并且可以直接转移“期望”的所有权。
std::promise<std::string> p;
std::shared_future<std::string> sf(p.get_future()); // 1 隐式转移所有
auto sf=p.get_future().share();