min_heap 管理时间的问题

推荐这篇文章,基于他的理解。

假设有 1000k 个超时 100ms 的事件,1000k 都是在 100ms 内添加完成,也就是再第1次超时发生前就完成。如此多是 event_add 对于 min heap 其实压力也不小,O(log(N))的复杂度不见得能接受。

解决思路

因为有相同时间的这个特性,考虑把这些 event 用另外的数据结构(queue array)管理,因为 event_add 的先后顺序,决定了后面 add 的肯定是后发生的。仅需要添加一个内部event,时间设置为 100ms(纯粹的超时监控即可,不需要 fd,也就是按常规的放入 min_heap),当其发生时,相当于是 queue head 的 event 触发了,再去处理队首的 event 即可。

libevent 采用 common_timeout_list** 来管理相同的超时时间,也就是是一个队列数组,index 表示不同的超时时间,不同的超时 event 放在不同的 commont_timeout_list*,**来缓冲直接放入 min_heap 的压力**。

具体做法就是,event_add 时,如果设置了 common_timeout,则会根据设置优先进入 common_timeout_list 而不是 min_heap。但是会把队首的 event 按正常方式 add, 添加1个肯定比直接添加 10000 个效率高。

设置 common_timeout

设置这个时长需要手动设置才行,因此这个机制的的使用和应用场景是直接相关,通常并不需要使用。 调用的函数为event_base_init_common_timeout。 分配队列数组,并设置好某个时间对应 index 的队列的首元素,注册internal event,这是 libevent 第 2 个用到 internal event 的地方,之前在 sigevent 处理时用到过一次。

const struct timeval *
event_base_init_common_timeout(struct event_base *base,
    const struct timeval *duration)
{
	int i;
	struct timeval tv;
	const struct timeval *result=NULL;
	struct common_timeout_list *new_ctl;

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
	//如果有进位处理进位
	if (duration->tv_usec > 1000000) {
		memcpy(&tv, duration, sizeof(struct timeval));
		if (is_common_timeout(duration, base))
			tv.tv_usec &= MICROSECONDS_MASK;
		tv.tv_sec += tv.tv_usec / 1000000;
		tv.tv_usec %= 1000000;
		duration = &tv;
	}
	for (i = 0; i < base->n_common_timeouts; ++i) {
		const struct common_timeout_list *ctl =
		    base->common_timeout_queues[i];
		//设置的duration已经分配过common_timeout_list  跳出
		if (duration->tv_sec == ctl->duration.tv_sec &&
		    duration->tv_usec ==
		    (ctl->duration.tv_usec & MICROSECONDS_MASK)) {
			EVUTIL_ASSERT(is_common_timeout(&ctl->duration, base));
			result = &ctl->duration;
			goto done;
		}
	}
	//超出限额 最多设置256个
	if (base->n_common_timeouts == MAX_COMMON_TIMEOUTS) {
		event_warnx("%s: Too many common timeouts already in use; "
		    "we only support %d per event_base", __func__,
		    MAX_COMMON_TIMEOUTS);
		goto done;
	}

	//如果空间不够,扩大数组队列
	if (base->n_common_timeouts_allocated == base->n_common_timeouts) {
		int n = base->n_common_timeouts < 16 ? 16 :
		    base->n_common_timeouts*2;
		struct common_timeout_list **newqueues =
		    mm_realloc(base->common_timeout_queues,
			n*sizeof(struct common_timeout_queue *));
		if (!newqueues) {
			event_warn("%s: realloc",__func__);
			goto done;
		}
		base->n_common_timeouts_allocated = n;
		base->common_timeout_queues = newqueues;
	}
	//分配一个新的common_timeout_list,代表一个新的超时时间
	//作为list* 首元素
	new_ctl = mm_calloc(1, sizeof(struct common_timeout_list));
	if (!new_ctl) {
		event_warn("%s: calloc",__func__);
		goto done;
	}
	TAILQ_INIT(&new_ctl->events);
	new_ctl->duration.tv_sec = duration->tv_sec;
	new_ctl->duration.tv_usec =
	    duration->tv_usec | COMMON_TIMEOUT_MAGIC |
	    (base->n_common_timeouts << COMMON_TIMEOUT_IDX_SHIFT);
	//分配一个内部event, 注册回调为common_timeout_callback
	//类似signal里为socket pair的读端fd 注册了internal event的READ事件
	//evtime_assign表示仅监控的超时,不需要fd
	//注册回调为common_timeout_callback 参数为这个common_timeout结构
	evtimer_assign(&new_ctl->timeout_event, base,
	    common_timeout_callback, new_ctl);
	new_ctl->timeout_event.ev_flags |= EVLIST_INTERNAL;
	event_priority_set(&new_ctl->timeout_event, 0);
	new_ctl->base = base;
	base->common_timeout_queues[base->n_common_timeouts++] = new_ctl;
	result = &new_ctl->duration;

done:
	if (result)
		EVUTIL_ASSERT(is_common_timeout(result, base));

	EVBASE_RELEASE_LOCK(base, th_base_lock);
	return result;
}

回调 common_timeout_callback

common_timeout_callback里,只激活队头的 event,如果队列仍然有 event,依据当前绝对时间重新 event_add 事件。

/* Callback: invoked when the timeout for a common timeout queue triggers.
 * This means that (at least) the first event in that queue should be run,
 * and the timeout should be rescheduled if there are more events. */
static void
common_timeout_callback(evutil_socket_t fd, short what, void *arg)
{
	struct timeval now;
	struct common_timeout_list *ctl = arg;
	struct event_base *base = ctl->base;
	struct event *ev = NULL;
	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
	gettime(base, &now);
	while (1) {
		//拿到队首event,应该是最近的发生的common_timeout事件。
		ev = TAILQ_FIRST(&ctl->events);
		//如果没有发生超时,则退出
		if (!ev || ev->ev_timeout.tv_sec > now.tv_sec ||
		    (ev->ev_timeout.tv_sec == now.tv_sec &&
			(ev->ev_timeout.tv_usec&MICROSECONDS_MASK) > now.tv_usec))
			break;
		//处理队列,第1个将remove掉,(下一次就是下一个)
		event_del_internal(ev);
		//将event激活,将执行回调
		event_active_nolock(ev, EV_TIMEOUT, 1);
	}
	//如果event不为空,用当前的绝对时间再event_add一次,即重新将会重新进入该回调
	if (ev)
		common_timeout_schedule(ctl, &now, ev);
	EVBASE_RELEASE_LOCK(base, th_base_lock);
}

event_add 中的 common_timeout

event_add 是在event_add_internal中,前面讲过了。 其中有这么一段代码,现在很清楚它在干嘛了:

  gettime(base, &now);
   /*
    * 是否是common_timeout,是根据tv.usec的12bit来决定的
    * 4bit COMMON_TIMEOUT_MAGIC +  8bit index
    */
  common_timeout = is_common_timeout(tv, base);
  if (tv_is_absolute) {
    ev->ev_timeout = *tv;
  } else if (common_timeout) {
    //计算绝对时间
    struct timeval tmp = *tv;
    tmp.tv_usec &= MICROSECONDS_MASK;
    evutil_timeradd(&now, &tmp, &ev->ev_timeout);
    ev->ev_timeout.tv_usec |=
        (tv->tv_usec & ~MICROSECONDS_MASK);
  } else {
    evutil_timeradd(&now, tv, &ev->ev_timeout);
  }

  event_debug((
      "event_add: timeout in %d seconds, call %p",
      (int)tv->tv_sec, ev->ev_callback));

  //insert时,common_timeout将insert到自己的list
  event_queue_insert(base, ev, EVLIST_TIMEOUT);
  if (common_timeout) {
    struct common_timeout_list *ctl =
        get_common_timeout_list(base, &ev->ev_timeout);
    //如果是首元素,马上就会开始common_time的计时。
    if (ev == TAILQ_FIRST(&ctl->events)) {
      common_timeout_schedule(ctl, &now, ev);
    }
  } else {
    /* See if the earliest timeout is now earlier than it
      * was before: if so, we will need to tell the main
      * thread to wake up earlier than it would
      * otherwise. */
    if (min_heap_elt_is_top(ev))
      notify = 1;
  }

event_queue_insert时:

case EVLIST_TIMEOUT: {
		//如果是common_timeout 放入common_timeout_list 而不是min heap
		if (is_common_timeout(&ev->ev_timeout, base)) {
			struct common_timeout_list *ctl =
			    get_common_timeout_list(base, &ev->ev_timeout);
			insert_common_timeout_inorder(ctl, ev);
		} else
			min_heap_push(&base->timeheap, ev);
		break;
	}

自然的,在event_queue_remove时,也需要处理:

case EVLIST_TIMEOUT:
  if (is_common_timeout(&ev->ev_timeout, base)) {
    struct common_timeout_list *ctl =
        get_common_timeout_list(base, &ev->ev_timeout);
    TAILQ_REMOVE(&ctl->events, ev,
        ev_timeout_pos.ev_next_with_common_timeout);
  } else {
    min_heap_erase(&base->timeheap, ev);
  }
  break;