event

event 是 libeven reactor 模式中的基本处理单位,所有事件都是基于 event 去添加,事件类型有:

  • 普通 IO
  • signal 事件
  • 定时器 以上事件都可以设置超时返回时间,成为相应的超时事件。 详细可以看看之前的这篇文章

event_add

本质调用event_add_internal,调用前加了把锁。

  1. 只要是超时事件,分配 min_heap 资源

  1. signal 事件的多线程 add 处理,在 signal hander 时,又要 add 相同事件,则需要排队 signal 事件和普通 io 的处理是不一样的。

  1. 以上两种事件分别添加到对应的数据结构里,再将 event 添加到 acitive_queue 里,标记状态为 INSERTED;

添加到数据结构

  • evmap_io_add 普通 io 事件添加

event 按 fd 作为索引,添加到链表数组中. 因为要当 queue 用,先进先出,所以是insert_tail,remove_head 如果 fd 没被添加过,通过 eventop(IO 复用模型)的 add 把 fd 添加到 IO 复用的监控列表里。 如果已经是 IO 多路复用已经添加过的 fd(event list 不为空),那就在队尾插入该事件。

  • evmap_signal_add signal 事件的添加

用的数据结构与上面的 evmap_io_add 一致。 这篇文章讲了 signal 这种异步机制是怎么纳入框架。sig_hander 里用 socketpair 写,读端用 event 标记为 READ…

event_queue_insert

设置 queue 状态,然后依据状态把 event 添加到 base 的管理链表中.

#define EVLIST_TIMEOUT	0x01
#define EVLIST_INSERTED	0x02
#define EVLIST_SIGNAL	0x04
#define EVLIST_ACTIVE	0x08
#define EVLIST_INTERNAL	0x10
#define EVLIST_INIT	0x80
  • EVLIST_INSERTED: 刚添加的事件,在 base 的 event_queue 中
  • EVLIST_ACTIVE: base 的 active_queues 中,类似 event_io_map 的链表数组,不同优先级索引寻访不同的链表头 active_queues[pri].
  • EVLIST_TIMEOUT: 超时事件,大量的相等时间的超时用 common_timeout_list 管理,否则用 min_heap_push;

特别的处理

  • n 个 event 可以从多个线程添加,处理多线程
  • 等待 event 时,添加 event? io 复用中包括一个专门的内部 event, add 时,向该 fd 发送一个字节,即可唤醒 IO 复用,然后添加新的事件(fd).

event_add_internal 代码附录

static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
    int tv_is_absolute)
{
    struct event_base *base = ev->ev_base;
    int res = 0;
    int notify = 0;

    //锁确认
	EVENT_BASE_ASSERT_LOCKED(base);
	_event_debug_assert_is_setup(ev);

	event_debug((
		 "event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%scall %p",
		 ev,
		 EV_SOCK_ARG(ev->ev_fd),
		 ev->ev_events & EV_READ ? "EV_READ " : " ",
		 ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
		 tv ? "EV_TIMEOUT " : " ",
		 ev->ev_callback));

EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));

	/*
	 * prepare for timeout insertion further below, if we get a
	 * failure on any step, we should not change any state.
	 */
	 //如果有超时时间tv,而且该event之前没有设置过超时,分配min_heap
	if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
		if (min_heap_reserve(&base->timeheap,
			1 + min_heap_size(&base->timeheap)) == -1)
			return (-1);  /* ENOMEM == errno */
	}

	/* If the main thread is currently executing a signal event's
	 * callback, and we are not the main thread, then we want to wait
	 * until the callback is done before we mess with the event, or else
	 * we can race on ev_ncalls and ev_pncalls below. */
	 //signal类的多线程event不能在callback期间同时添加,需挂起等待
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
	if (base->current_event == ev && (ev->ev_events & EV_SIGNAL)
	    && !EVBASE_IN_THREAD(base)) {
		++base->current_event_waiters;
		EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
	}
#endif

	if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
	    !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
		//普通io
		if (ev->ev_events & (EV_READ|EV_WRITE))
			res = evmap_io_add(base, ev->ev_fd, ev);
		else if (ev->ev_events & EV_SIGNAL)
		//signal
			res = evmap_signal_add(base, (int)ev->ev_fd, ev);
		//激活队列
		if (res != -1)
			event_queue_insert(base, ev, EVLIST_INSERTED);
		if (res == 1) {
			/* evmap says we need to notify the main thread. */
			notify = 1;
			res = 0;
		}
	}

	/*
	 * we should change the timeout state only if the previous event
	 * addition succeeded.
	 */
	if (res != -1 && tv != NULL) {
		struct timeval now;
		int common_timeout;

		/*
		 * for persistent timeout events, we remember the
		 * timeout value and re-add the event.
		 *
		 * If tv_is_absolute, this was already set.
		 */
		if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
			ev->ev_io_timeout = *tv;

		/*
		 * we already reserved memory above for the case where we
		 * are not replacing an existing timeout.
		 */
		if (ev->ev_flags & EVLIST_TIMEOUT) {
			/* XXX I believe this is needless. */
			if (min_heap_elt_is_top(ev))
				notify = 1;
			event_queue_remove(base, ev, EVLIST_TIMEOUT);
		}

		/* Check if it is active due to a timeout.  Rescheduling
		 * this timeout before the callback can be executed
		 * removes it from the active list. */
		if ((ev->ev_flags & EVLIST_ACTIVE) &&
		    (ev->ev_res & EV_TIMEOUT)) {
			if (ev->ev_events & EV_SIGNAL) {
				/* See if we are just active executing
				 * this event in a loop
				 */
				if (ev->ev_ncalls && ev->ev_pncalls) {
					/* Abort loop */
					*ev->ev_pncalls = 0;
				}
			}

			event_queue_remove(base, ev, EVLIST_ACTIVE);
		}

		gettime(base, &now);

		common_timeout = is_common_timeout(tv, base);
		if (tv_is_absolute) {
			ev->ev_timeout = *tv;
		} else if (common_timeout) {
			struct timeval tmp = *tv;
			tmp.tv_usec &= MICROSECONDS_MASK;
			evutil_timeradd(&now, &tmp, &ev->ev_timeout);
			ev->ev_timeout.tv_usec |=
			    (tv->tv_usec & ~MICROSECONDS_MASK);
		} else {
			evutil_timeradd(&now, tv, &ev->ev_timeout);
		}

		event_debug((
			 "event_add: timeout in %d seconds, call %p",
			 (int)tv->tv_sec, ev->ev_callback));

		event_queue_insert(base, ev, EVLIST_TIMEOUT);
		if (common_timeout) {
			struct common_timeout_list *ctl =
			    get_common_timeout_list(base, &ev->ev_timeout);
			if (ev == TAILQ_FIRST(&ctl->events)) {
				common_timeout_schedule(ctl, &now, ev);
			}
		} else {
			/* See if the earliest timeout is now earlier than it
			 * was before: if so, we will need to tell the main
			 * thread to wake up earlier than it would
			 * otherwise. */
			if (min_heap_elt_is_top(ev))
				notify = 1;
		}
	}

	/* if we are not in the right thread, we need to wake up the loop */
	if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
		evthread_notify_base(base);

	_event_debug_note_add(ev);

	return (res);
}

参考