min_heap 管理时间的问题


假设有 1000k 个超时 100ms 的事件,1000k 都是在 100ms 内添加完成,也就是再第1次超时发生前就完成。如此多是 event_add 对于 min heap 其实压力也不小,O(log(N))的复杂度不见得能接受。


因为有相同时间的这个特性,考虑把这些 event 用另外的数据结构(queue array)管理,因为 event_add 的先后顺序,决定了后面 add 的肯定是后发生的。仅需要添加一个内部event,时间设置为 100ms(纯粹的超时监控即可,不需要 fd,也就是按常规的放入 min_heap),当其发生时,相当于是 queue head 的 event 触发了,再去处理队首的 event 即可。

libevent 采用 common_timeout_list** 来管理相同的超时时间,也就是是一个队列数组,index 表示不同的超时时间,不同的超时 event 放在不同的 commont_timeout_list*,**来缓冲直接放入 min_heap 的压力**。

具体做法就是,event_add 时,如果设置了 common_timeout,则会根据设置优先进入 common_timeout_list 而不是 min_heap。但是会把队首的 event 按正常方式 add, 添加1个肯定比直接添加 10000 个效率高。

设置 common_timeout

设置这个时长需要手动设置才行,因此这个机制的的使用和应用场景是直接相关,通常并不需要使用。 调用的函数为event_base_init_common_timeout。 分配队列数组,并设置好某个时间对应 index 的队列的首元素,注册internal event,这是 libevent 第 2 个用到 internal event 的地方,之前在 sigevent 处理时用到过一次。

const struct timeval *
event_base_init_common_timeout(struct event_base *base,
    const struct timeval *duration)
	int i;
	struct timeval tv;
	const struct timeval *result=NULL;
	struct common_timeout_list *new_ctl;

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
	if (duration->tv_usec > 1000000) {
		memcpy(&tv, duration, sizeof(struct timeval));
		if (is_common_timeout(duration, base))
			tv.tv_usec &= MICROSECONDS_MASK;
		tv.tv_sec += tv.tv_usec / 1000000;
		tv.tv_usec %= 1000000;
		duration = &tv;
	for (i = 0; i < base->n_common_timeouts; ++i) {
		const struct common_timeout_list *ctl =
		//设置的duration已经分配过common_timeout_list  跳出
		if (duration->tv_sec == ctl->duration.tv_sec &&
		    duration->tv_usec ==
		    (ctl->duration.tv_usec & MICROSECONDS_MASK)) {
			EVUTIL_ASSERT(is_common_timeout(&ctl->duration, base));
			result = &ctl->duration;
			goto done;
	//超出限额 最多设置256个
	if (base->n_common_timeouts == MAX_COMMON_TIMEOUTS) {
		event_warnx("%s: Too many common timeouts already in use; "
		    "we only support %d per event_base", __func__,
		goto done;

	if (base->n_common_timeouts_allocated == base->n_common_timeouts) {
		int n = base->n_common_timeouts < 16 ? 16 :
		struct common_timeout_list **newqueues =
			n*sizeof(struct common_timeout_queue *));
		if (!newqueues) {
			event_warn("%s: realloc",__func__);
			goto done;
		base->n_common_timeouts_allocated = n;
		base->common_timeout_queues = newqueues;
	//作为list* 首元素
	new_ctl = mm_calloc(1, sizeof(struct common_timeout_list));
	if (!new_ctl) {
		event_warn("%s: calloc",__func__);
		goto done;
	new_ctl->duration.tv_sec = duration->tv_sec;
	new_ctl->duration.tv_usec =
	    duration->tv_usec | COMMON_TIMEOUT_MAGIC |
	    (base->n_common_timeouts << COMMON_TIMEOUT_IDX_SHIFT);
	//分配一个内部event, 注册回调为common_timeout_callback
	//类似signal里为socket pair的读端fd 注册了internal event的READ事件
	//注册回调为common_timeout_callback 参数为这个common_timeout结构
	evtimer_assign(&new_ctl->timeout_event, base,
	    common_timeout_callback, new_ctl);
	new_ctl->timeout_event.ev_flags |= EVLIST_INTERNAL;
	event_priority_set(&new_ctl->timeout_event, 0);
	new_ctl->base = base;
	base->common_timeout_queues[base->n_common_timeouts++] = new_ctl;
	result = &new_ctl->duration;

	if (result)
		EVUTIL_ASSERT(is_common_timeout(result, base));

	EVBASE_RELEASE_LOCK(base, th_base_lock);
	return result;

回调 common_timeout_callback

common_timeout_callback里,只激活队头的 event,如果队列仍然有 event,依据当前绝对时间重新 event_add 事件。

/* Callback: invoked when the timeout for a common timeout queue triggers.
 * This means that (at least) the first event in that queue should be run,
 * and the timeout should be rescheduled if there are more events. */
static void
common_timeout_callback(evutil_socket_t fd, short what, void *arg)
	struct timeval now;
	struct common_timeout_list *ctl = arg;
	struct event_base *base = ctl->base;
	struct event *ev = NULL;
	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
	gettime(base, &now);
	while (1) {
		ev = TAILQ_FIRST(&ctl->events);
		if (!ev || ev->ev_timeout.tv_sec > now.tv_sec ||
		    (ev->ev_timeout.tv_sec == now.tv_sec &&
			(ev->ev_timeout.tv_usec&MICROSECONDS_MASK) > now.tv_usec))
		event_active_nolock(ev, EV_TIMEOUT, 1);
	if (ev)
		common_timeout_schedule(ctl, &now, ev);
	EVBASE_RELEASE_LOCK(base, th_base_lock);

event_add 中的 common_timeout

event_add 是在event_add_internal中,前面讲过了。 其中有这么一段代码,现在很清楚它在干嘛了:

  gettime(base, &now);
    * 是否是common_timeout,是根据tv.usec的12bit来决定的
    * 4bit COMMON_TIMEOUT_MAGIC +  8bit index
  common_timeout = is_common_timeout(tv, base);
  if (tv_is_absolute) {
    ev->ev_timeout = *tv;
  } else if (common_timeout) {
    struct timeval tmp = *tv;
    tmp.tv_usec &= MICROSECONDS_MASK;
    evutil_timeradd(&now, &tmp, &ev->ev_timeout);
    ev->ev_timeout.tv_usec |=
        (tv->tv_usec & ~MICROSECONDS_MASK);
  } else {
    evutil_timeradd(&now, tv, &ev->ev_timeout);

      "event_add: timeout in %d seconds, call %p",
      (int)tv->tv_sec, ev->ev_callback));

  event_queue_insert(base, ev, EVLIST_TIMEOUT);
  if (common_timeout) {
    struct common_timeout_list *ctl =
        get_common_timeout_list(base, &ev->ev_timeout);
    if (ev == TAILQ_FIRST(&ctl->events)) {
      common_timeout_schedule(ctl, &now, ev);
  } else {
    /* See if the earliest timeout is now earlier than it
      * was before: if so, we will need to tell the main
      * thread to wake up earlier than it would
      * otherwise. */
    if (min_heap_elt_is_top(ev))
      notify = 1;


		//如果是common_timeout 放入common_timeout_list 而不是min heap
		if (is_common_timeout(&ev->ev_timeout, base)) {
			struct common_timeout_list *ctl =
			    get_common_timeout_list(base, &ev->ev_timeout);
			insert_common_timeout_inorder(ctl, ev);
		} else
			min_heap_push(&base->timeheap, ev);


  if (is_common_timeout(&ev->ev_timeout, base)) {
    struct common_timeout_list *ctl =
        get_common_timeout_list(base, &ev->ev_timeout);
    TAILQ_REMOVE(&ctl->events, ev,
  } else {
    min_heap_erase(&base->timeheap, ev);