从上一篇文章《libevent是怎么选择底层实现的》可以看出来,调用event_base_new()函数就是初始化好底层实现,给event_base结构体中evsel赋值,evsel是一个eventop结构体,我们再来看下:
struct eventop {
const char *name;
void *(*init)(struct event_base *);
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
int (*dispatch)(struct event_base *, struct timeval *);
void (*dealloc)(struct event_base *);
int need_reinit;
enum event_method_feature features;
size_t fdinfo_len;
};
可以看到结构体里面都是回调函数,给evsel赋值的时候,相应的回调函数就会被注册好,以epoll为例:
const struct eventop epollops = {
"epoll",
epoll_init,
epoll_nochangelist_add,
epoll_nochangelist_del,
epoll_dispatch,
epoll_dealloc,
1,
EV_FEATURE_ET|EV_FEATURE_O1|EV_FEATURE_EARLY_CLOSE,
0
};
把该结构体赋给evsel,相应的epoll的处理函数就注册到了回调函数中,当有相应的事件发生时,我们就调用相应的回调函数。
同时在evsel赋值的时候,就会调用init回调函数进行初始化。
2. 事件主循环在event.c中有个event_base_dispatch函数,它就是事件主循环,里面会调用event_base_loop,event_base_loop实现如下:
int event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
EVbase_ACQUIRE_LOCK(base, th_base_lock);
if (base->running_loop) {
event_warnx("%s: reentrant invocation. only one event_base_loop"
" can run on each event_base at once.", __func__);
EVbase_RELEASE_LOCK(base, th_base_lock);
return -1;
}
base->running_loop = 1;
clear_time_cache(base);
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base_(base);
done = 0;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif
base->event_gotterm = base->event_break = 0;
while (!done) {
base->event_continue = 0;
base->n_deferreds_queued = 0;
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
evutil_timerclear(&tv);
}
if (0==(flags&EVLOOP_NO_EXIT_ON_EMPTY) &&
!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
event_queue_make_later_events_active(base);
clear_time_cache(base);
res = evsel->dispatch(base, tv_p);
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
update_time_cache(base);
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
done:
clear_time_cache(base);
base->running_loop = 0;
EVbase_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
可以看到,这是个死循环,除了某些特殊情况下会跳出循环外,其他时候会一直循环调用回调函数中dispatch函数进行处理。
3. 事件处理关于事件处理,我们还是跟流程,同样以epoll为例,在epoll.c的epoll_dispatch函数中,有如下代码:
//里面会调用epoll_wait函数
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
EVbase_ACQUIRE_LOCK(base, th_base_lock);
if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-1);
}
return (0);
}
event_debug(("%s: epoll_wait reports %d", __func__, res));
EVUTIL_ASSERT(res <= epollop->nevents);
for (i = 0; i < res; i++) {
int what = events[i].events;
short ev = 0;
#ifdef USING_TIMERFD
if (events[i].data.fd == epollop->timerfd)
continue;
#endif
if (what & (EPOLLHUP|EPOLLERR)) {
ev = EV_READ | EV_WRITE;
} else {
if (what & EPOLLIN)
ev |= EV_READ;
if (what & EPOLLOUT)
ev |= EV_WRITE;
if (what & EPOLLRDHUP)
ev |= EV_CLOSED;
}
if (!ev)
continue;
evmap_io_active_(base, events[i].data.fd, ev | EV_ET);
}
可以看到,所有的文件描述符变化最后都交给了evmap_io_active_去处理,evmap_io_active_函数实现如下:
void
evmap_io_active_(struct event_base *base, evutil_socket_t fd, short events)
{
struct event_io_map *io = &base->io;
struct evmap_io *ctx;
struct event *ev;
#ifndef EVMAP_USE_HT
if (fd < 0 || fd >= io->nentries)
return;
#endif
//获得一个evmap_io
GET_IO_SLOT(ctx, io, fd, evmap_io);
if (NULL == ctx)
return;
LIST_FOREACH(ev, &ctx->events, ev_io_next) {
if (ev->ev_events & events)
event_active_nolock_(ev, ev->ev_events & events, 1);
}
}
里面调用了event_active_nolock_,接下来流程如下:
event_active_nolock_ --> event_callback_activate_nolock_ --> event_queue_insert_active
这样就将事件插入到了已激活事件队列中。
然后回到主事件循环中,看如下代码:
int event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
EVbase_ACQUIRE_LOCK(base, th_base_lock);
if (base->running_loop) {
event_warnx("%s: reentrant invocation. only one event_base_loop"
" can run on each event_base at once.", __func__);
EVbase_RELEASE_LOCK(base, th_base_lock);
return -1;
}
base->running_loop = 1;
clear_time_cache(base);
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base_(base);
done = 0;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif
base->event_gotterm = base->event_break = 0;
while (!done) {
base->event_continue = 0;
base->n_deferreds_queued = 0;
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
evutil_timerclear(&tv);
}
if (0==(flags&EVLOOP_NO_EXIT_ON_EMPTY) &&
!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
event_queue_make_later_events_active(base);
clear_time_cache(base);
res = evsel->dispatch(base, tv_p);
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
update_time_cache(base);
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
done:
clear_time_cache(base);
base->running_loop = 0;
EVbase_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
我们可以看到调用完回调函数dispatch以后,如果存在激活事件,就会调用event_process_active函数,event_process_active调用event_process_active_single_queue函数,event_process_active_single_queue函数会调用事件上绑定的回调函数进行事件处理。
文章同步发表在cpp加油站(ID:xy13640954449), 欢迎来撩!



