其实和 posix pthread 比,我还是更习惯 posix…

非常好的在线书籍

核心就是 thread 使用。

thread join && detach

和 posix pthread 里的概念一致。

join等待线程结束;detach分离方式使用线程,就像pthread_create

thread 可以用任何函数对象来构造,并传递参数:

//1 normal
void func(int a, string b) {}
thread t(func,1,"sunny"); //可以判断thread里实现了类似bind的操作

//2 lambda
thread t1([](int a){ cout << a;}) //lambda

//3 函数对象
struct funobj {

void operator()(){ cout << "good day"}
};

funcobj fo;
thread t2(f0); // function obj

//4 直接用类成员函数和对象
class X
{
public:
  void do_lengthy_work(int);
};
X my_x;
int num(0);
std::thread t(&X::do_lengthy_work, &my_x, num);

std::once_flag && call_once

类似 pthread_once。

某些场景下,我们需要代码只被执行 1 次,比如单例类的初始化,考虑到多线程安全,需要进行加锁控制。C++11 中提供的 call_once 可以很好的满足这种需求,使用又非常简单。

#include<mutex>
template <class Fn, class... Args>
void call_once (once_flag& flag, Fn&& fn, Args&&...args);

要求 once_flag 对象的生命周期要长于 fn 的生命周期,定义为全局的比较好。

总之,需要某些过程仅执行1次的,如单例实例化,某些连接初始化,都可以考虑。

想起 streamer 项目里streamer_thread就是这个需求。

std::mutex

mutex 同步原语,用来保护共享的资源的;

  • std::mutex
  • std::recursize_mutex
  • std::time_mutex 带超时的普通 lock。

而 boost::shared_mutex 可对应 posix 里读写锁。

boost::shared_mutex mut;
//读保护用共享锁
read_sth {
  boost:shared_lock<boost::shared_mutex> lk(mut);
  ...//reading job
}
//写保护用普通锁
write_sth {
  std::lock_guard<boost::shared_mutex> lk(mut);
  ...//writing job
}

std::condition_variable

  • condition_varivable
  • condition_variable_any

线程安全队列一文。

std::unique_lock && std::lock_guard

ref

对原始 mutex 一层封装,核心是自动加解锁

//normal
void process(){
  LOCK();.
  ...
  UNLOCK();
}
//unique_lock
std::mutex m;
void process(){
  lock_guard lg(m); //lg的生命周期结束则解锁。RAII again.
  ...
}

二者区别: unique_lock 可以更灵活的使用锁,有更多的 api: 2018-01-24-15-03-52

std::async && std::future

ref1,ref2

类似 thread, async 异步提交任务,future 从异步任务获取返回值,就是 posix 的int pthread_join(pthread_t thread, void **retval);

std::future 提供了一种访问异步操作结果的机制, future 的异步等待函数有:

  • wait 仅等待完成
  • wait_for 超时等待
  • get 等待并取得结果

注意异步等待只能等待一次

可以设置 async 的参数:

  • std::launch:async (默认)
  • std::launch:deferred 延迟执行,直到 get or wait 调用时才执行
int working_thread(string name ,int a, int b)
{
	cout << name << " counting...\n";
	//chrono::seconds(1);// 类似sleep()
	sleep(1);
	return a+b;
}

void async_future_test()
{
	thread t(working_thread,string("baby"),1,1);
	t.detach();

	future<int> dad = async(working_thread, string("daddy"),2,2);
	future<int> mom = async(launch::deferred , working_thread, string("mommy"),3,3);
	future<int> peggy = async(launch::deferred , working_thread, string("peggy"),4,4);

	//等待async线程执行完毕并返回结果, 类似pthread_join
	cout << "dad get " << dad.get() << endl;
	cout << "mom get " << mom.get() << endl;
	cout << "peggy wait" << endl ;
	peggy.wait();
}

std::promise

std::promise 为获取线程函数中的某个值提供便利,在线程函数中给外面传进来的promise赋值,当线程函数执行完成之后就可以通过 promise 获取该值了,值得注意的是取值是间接的通过 promise 内部提供的 future 来获取的,比函数返回值要更灵活

“期望”使得函数化编程模式并发化(FP-style concurrency)在 C++中成为可能;一个“期望”对象可以在线程间互相传递,并允许其中一个计算结果依赖于另外一个的结果,而非对共享数据的显式访问。

promise 的方法:

  • set_value 设置 promise 返回值
  • set_value_at_thread_exit 设置 thread_exit 时 primise 值
  • set_exception 设置为 exception
  • set_exception_at_thread_exit

如果要获取线程的中间结果,用普通 thread,那肯定又是全局变量满天飞,以及各种加锁。

set_value甚至在线程没有结束前就可以返回中间结果。

如果在 set_value 之前,promise 已经被析构,又叫违背承诺,则 get 可能无限等待下去。

int promise_thread(int a, int b, int c, promise<vector<int>> & p)
{
	vector<int> vec;
	if(a && b && c) {
		vec.push_back(a+b);
		vec.push_back(b+c);
		vec.push_back(a+b+c);
		if(a && b && c) {
			vec.push_back(a+b);
			vec.push_back(b+c);
			vec.push_back(a+b+c);
			try{
				//p.set_value_at_thread_exit(vec);
				p.set_value(vec);
			} catch(exception_ptr e) {
				p.set_exception(e);
		}

	} else return 1;

	} else return 1;

	sleep(1);

	cout << "promise_thread end" << endl;

	return 0;
}

void promise_test()
{
	promise<vector<int>> promise_result;

	auto task =  thread(promise_thread,1,2,3,ref(promise_result));
	future<vector<int>> f  = promise_result.get_future();
	auto finial = f.get();

	task.detach();

	//线程还没执行完毕就可以取出中间结果
	cout << "got result\n";
	for(auto i:finial) {
		cout << i << endl;
	}
}

只能说把对线程的使用做到超级灵活,就看你怎么用。

std::packaged_task

更高级的封装了,高度抽象还是为了使用更方便,应付更复杂的场景。

std::shared_future

std::future 也有局限性,在很多线程在等待的时候,只有一个线程能获取等待结果。当多个线程需要等待相同的事件的结果,你就需要使用 std::shared_future 来替代 std::future 了

std::future 是只移动的,所以其所有权可以在不同的实例中互相传递,但是只有一个实例可以获得特定的同步结果;而 std::shared_future 实例是可拷贝的,所以多个对象可以引用同一关联“期望”的结果。

多个线程等待时,每个都可以用自己的share_future去 get.

std::future 有一个 share()成员函数,可用来创建新的 std::shared_future ,并且可以直接转移“期望”的所有权。

std::promise<std::string> p;
std::shared_future<std::string> sf(p.get_future());  // 1 隐式转移所有

auto sf=p.get_future().share();