libevent源码剖析

  libevent是一个使用C语言编写的,轻量级的开源高性能网络库,使用者很多,研究者也很多。由于代码简洁,设计思想简明巧妙,因此很适合用来学习,提升自己C语言的能力。

  libevent有这样显著地几个亮点:

    1.事件驱动,高性能

    2.轻量级,专注于网络,不如ACE那么庞大臃肿

    3.代码精炼易读

    4.跨平台,支持Windows,Linux,*BSD和Mac Os;

    5.支持多种IO多路复用技术,epoll,poll,dev/poll、select和kqueue等

    6.支持IO,定时器和信号等事件

    7.注册事件优先级

  基于以上优点,libevent已经被广泛的应用,作为底层的网络库;比如memcached、Vomit、Nylon、Netchat等。

  下面我们就来从程序的基本使用场景和代码的整体处理流程入手来对libevent库进行学习。

     当应用程序向libevent注册一个事件后,libevent内部是怎样处理的呢,以下是基本流程:

        1)首先应用程序准备并初始化event,设置好事件类型和回调函数

        2)向libevent添加该事件event。

        3)程序调用event_base_dispatch()系列函数进入无线循环,等待事件.

     这只是大概的流程,以下是对于流程中的一些基本概念的讲解:

     一、事件event

      libevent是基于事件驱动的,从名字上可以看出event是整个库的核心。首先给出event结构体的声明,它位于event.h文件中:

 1 struct event {
 2  TAILQ_ENTRY (event) ev_next;
 3  TAILQ_ENTRY (event) ev_active_next;
 4  TAILQ_ENTRY (event) ev_signal_next;
 5  unsigned int min_heap_idx; /* for managing timeouts */
 6  struct event_base *ev_base;
 7  int ev_fd;
 8  short ev_events;
 9  short ev_ncalls;
10  short *ev_pncalls; /* Allows deletes in callback */
11  struct timeval ev_timeout;
12  int ev_pri;  /* smaller numbers are higher priority */
13  void (*ev_callback)(int, short, void *arg);
14  void *ev_arg;
15  int ev_res;  /* result passed to event callback */
16  int ev_flags;
17 };

        1) ev_events:event关注的事件类型,它可以是以下3种类型:

          IO事件:EV_WRITE和EV_READ

          定时事件:EV_TIMEOUT

          信号:EV_SIGNAL

          辅助选项:EV_PERSIST,表明是一个永久事件

        2) ev_next,ev_active_next,ev_signal_next都是双向链表节点指针;他们是libevent对不同事件类型和在不同的时期,对事件的管理时使用到的字段。

          libevent使用双向链表保存所有注册的IO和Signal事件,ev_next就是该IO事件在链表中的位置,称此链表为“已注册事件链表”;

          同样ev_signal_next就是signal事件的signal事件链表中的位置。

          ev_active_next:libevent将所有激活事件放入到链表active list中,然后遍历active list执行调度。

          每当事件event转变为就绪状态时,libevent就会把它移入到active event list[priority]中,其中priority是event的优先级;接着libevent会根据自己的调度策略选择就绪事件,调用其callback()函数执行事件处理;并根据就绪的句柄和时间类型填充callback函数的参数。

        3)min_heap_idx和ev_timeout:如果事件是timeout事件,他们是event在小根堆中的索引和超时值,libevent使用小根堆来管理定时事件

        4)ev_base:该事件所属反应堆实例,这是一个event_base结构体。

        5)ev_fd:对于IO事件,这是绑定的文件描述符,对于signal事件,是绑定的信号

        6)ev_callback:event的回调函数,被ev_base调用,执行事件处理程序,这是一个函数指针,原型为:

          void (*ev_callback)(int fd,short events,void* arg)

         其中参数fd对应于ev_fd;events对应于events;arg对应于ev_arg;

        7)ev_arg:void*,表明可以是任意类型的数据,在设置event时指定;

        8)eb_flags:libevent用于标记event信息的字段,表明其当前的状态,可能的值有:

          #define EVLIST_TIMEOUT 0x01 // event在time堆中
          #define EVLIST_INSERTED 0x02 // event在已注册事件链表中
          #define EVLIST_SIGNAL 0x04 // 未见使用
          #define EVLIST_ACTIVE 0x08 // event在激活链表中
          #define EVLIST_INTERNAL 0x10 // 内部使用标记
          #define EVLIST_INIT     0x80 // event已被初始化

        9)ev_ncalls:事件就绪执行时,调用ev_callback的次数,通常为1

        10)ev_res:记录了当前激活事件的类型

      

      要想向libevent添加一个事件,首先需要设置event对象,这通过调用libevent提供的函数有:event_set(),event_base_set(),event_priority_set()来完成;下面分别讲解:

      void event_set(struct event *ev,int fd,short events,void (*callback)(int,short,void*),void *arg)

        ev:执行要初始化的event对象

        fd:对于信号来说是绑定的文件描述符,对于信号来说是绑定的signal信号

        events:在该fd上关注的事件类型,它可以是EV_READ,EV_WRITE,EV_SIGNAL;

        callback:这是一个函数指针,当fd上的事件event发生时,调用该函数执行处理

        arg:传递给callback函数指针的参数

      int event_base_set(struct event_base* base,struct event *ev)

        设置event ev将要注册到的evnet_base;

      int event_priority_set(struct event* ev,int pri)

        设置event ev的优先级,注意,当ev正处于就绪状态时,不能设置,返回-1.

    二、事件处理框架event_base

      以下是base_event结构体的声明,它位于event-internal.h文件中:

 1 struct event_base {
 2  const struct eventop *evsel;
 3  void *evbase; 
 4  int event_count;  /* counts number of total events */
 5  int event_count_active; /* counts number of active events */
 6  int event_gotterm;  /* Set to terminate loop */
 7  int event_break;  /* Set to terminate loop immediately */
 8  /* active event management */
 9  struct event_list **activequeues;
10  int nactivequeues;
11  /* signal handling info */
12  struct evsignal_info sig;
13  struct event_list eventqueue;
14  struct timeval event_tv;
15  struct min_heap timeheap;
16  struct timeval tv_cache;
17 };

        以下是结构体各字段的含义:

        1)evsel和evbase这两个字段的设置可能会让人有些迷惑,这里我们可以把evsel和evbase看做是类和静态函数的关系,比如添加事件时的调用行为:evsel->add(evbase,ev),实际执行操作的是evbase,这相当于class::add(instance,ev),instance就是class的一个对象实例。

        2)activequeues是一个二级指针,前面讲过libevent支持事件优先级,因此你可以你把它看做是数组,其中的元素activequeues[priority]是一个链表,链表的每个节点指向一个优先级为priority的就绪事件event。

        3)eventqueue:链表,保存了所有的注册事件event的指针。

        4)sig是用来管理信号的结构体

        5)timeheap是管理定时事件的小根堆

        6)event_tv和tv_cache是libevent用于事件管理的变量

      我们已经对event_base有了一个初步的了解,那么event_base如何创建和初始化的呢?

        创建一个event_base对象也即是创建了一个新的libevent实例,程序需要通过调用event_init()函数来创建,该函数首先为event_base实例申请空间,然后初始化timer mini-heap,选择并初始化合适的系统多路复用机制,初始化各事件的链表;函数还检测了系统的时间设置,为后面的事件管理打下了基础

     三、事件主循环

        libevent将IO事件、定时器和信号事件处理很好的结合到了一起,那么它是如何做到的呢?

        libevent的事件主循环主要是通过event_base_loop()函数完成的,主要操作流程如下图所示,event_base_loop所做的就是持续执行下面的循环

        

        下面是源码,可以参考

int event_base_loop(struct event_base *base, int flags)
{
    const struct eventop *evsel = base->evsel;
    void *evbase = base->evbase;
    struct timeval tv;
    struct timeval *tv_p;
    int res, done;
    // 清空时间缓存
    base->tv_cache.tv_sec = 0;
    // evsignal_base是全局变量,在处理signal时,用于指名signal所属的event_base实例
    if (base->sig.ev_signal_added)
        evsignal_base = base;
    done = 0;
    while (!done) { // 事件主循环
        // 查看是否需要跳出循环,程序可以调用event_loopexit_cb()设置event_gotterm标记
        // 调用event_base_loopbreak()设置event_break标记
        if (base->event_gotterm) {
            base->event_gotterm = 0;
            break;
        }
        if (base->event_break) {
            base->event_break = 0;
            break;
        }
        // 校正系统时间,如果系统使用的是非MONOTONIC时间,用户可能会向后调整了系统时间
        // 在timeout_correct函数里,比较last wait time和当前时间,如果当前时间< last wait time
        // 表明时间有问题,这是需要更新timer_heap中所有定时事件的超时时间。
        timeout_correct(base, &tv);
   
        // 根据timer heap中事件的最小超时时间,计算系统I/O demultiplexer的最大等待时间
        tv_p = &tv;
        if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
            timeout_next(base, &tv_p);
        } else {
            // 依然有未处理的就绪时间,就让I/O demultiplexer立即返回,不必等待
            // 下面会提到,在libevent中,低优先级的就绪事件可能不能立即被处理
            evutil_timerclear(&tv);
        }
        // 如果当前没有注册事件,就退出
        if (!event_haveevents(base)) {
            event_debug(("%s: no events registered.", __func__));
            return (1);
        }
        // 更新last wait time,并清空time cache
        gettime(base, &base->event_tv);
        base->tv_cache.tv_sec = 0;
        // 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等;
        // 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中
        res = evsel->dispatch(base, evbase, tv_p);
        if (res == -1)
            return (-1);
        // 将time cache赋值为当前系统时间
        gettime(base, &base->tv_cache);
        // 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中
        timeout_process(base);
        // 调用event_process_active()处理激活链表中的就绪event,调用其回调函数执行事件处理
        // 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表,
        // 然后处理链表中的所有就绪事件;
        // 因此低优先级的就绪事件可能得不到及时处理;
        if (base->event_count_active) {
            event_process_active(base);
            if (!base->event_count_active && (flags & EVLOOP_ONCE))
                done = 1;
        } else if (flags & EVLOOP_NONBLOCK)
            done = 1;
    }
    // 循环结束,清空时间缓存
    base->tv_cache.tv_sec = 0;
    event_debug(("%s: asked to terminate loop.", __func__));
    return (0);
}
View Code

       I/O和Timer事件的统一

         libevent将Timer和Signal事件都统一到了系统的IO多路复用机制中了从上面的流程图中我们可以看出一点端倪,那么libevent是如何做到的呢?

         首先是I/O和Timer事件的统一。系统的I/O机制向select()和epoll_wait()都允许程序制定一个最大等待时间timeout,即使没有事件发生,他们也能保证在timeout时间内返回。那么根据所有Timer事件的最小超时时间来设置系统I/O的timeout时间;当系统I/O返回时,再激活所有就绪的Timer事件就可以了,这样就能将Timer事件完美的融合到系统的I/O机制中了。

         libevent使用堆来管理Timer事件,其key值就是事件的超时时间,根据堆中具有最小超时值的事件和当前时间来计算等待时间。

       I/O和Signal事件的统一

         Signal是异步事件的经典,将Signal事件统一到系统的I/O多路复用中就不像Timer事件那么自然了。Signal事件的出现对于进程来讲完全是随机的,进程不能只是测试一个变量来判别是否发生了一个信号,而是必须告诉内核“在此信号发生时,请执行如下操作”。当Signal发生时,系统并不立即调用event的callback函数处理信号,而是设法通知系统的I/O机制,让其返回,然后再统一和I/O事件以及Timer一起处理。

          那么,系统是如何设法通知系统的I/O机制呢?基本的方法就是采用“消息机制”。在libevent中是通过socket pair完成的。socket pair就是一个socket对,一个读socket,一个写socket。Socket pair创建好了之后,读socket在libevent的event_base实例上注册了一个persist的读事件。这样当写socket写入数据时,读socket就会相应的得到通知了。前面提到过,libevent会在事件主循环中检查标记,来确定是否有触发的Signal,如果 标记被设置就处理这些signal。这段代码在各个具体的I/O机制中,以epoll为例,在epoll_dispatch()函数中,代码片段如下:

          res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
             if (res == -1) {
                  if (errno != EINTR) {
                     event_warn("epoll_wait");
                     return (-1);
                }
                evsignal_process(base);// 处理signal事件
                return (0);
            } else if (base->sig.evsignal_caught) {
                evsignal_process(base);// 处理signal事件
            }

      注册、注销signal事件

        注册signal事件是通过evsignal_add(struct event *ev)函数完成的,libevent对所有的信号注册同一个处理函数evsignal_handler(),该函数注册过程如下:

        1.取得ev要注册到的信号signo;

        2.如果信号signo未被注册,那么就为signo注册信号处理函数evsignal_handler();

        3.如果事件ev_signal还没有注册,就注册ev_signal事件;

        4.将事件ev添加到signo的event链表中

        注销一个已注册的signal事件就更简单了,直接从其已注册事件的链表中移除即可。如果链表已空,那么就恢复旧有的处理函数;处理函数的evsignal_handler()函数就是记录信号的发生次数,并通知event_base有信号触发。

     三、支持多路复用

        libevent的核心是事件驱动、同步非阻塞,为了达到这一目标必须采用系统提供的I/O多路复用技术,而这些复用技术在不同的平台上却各有不同,如何能提供统一的支持方式呢?

        libevent支持多种I/O多路复用的关键就在于结构体evnetop,这个结构体前面也提到过,他的成员是一系列函数指针,定义在event-internal.h文件中:

        struct eventop {
              const char *name;
              void *(*init)(struct event_base *); // 初始化
              int (*add)(void *, struct event *); // 注册事件
              int (*del)(void *, struct event *); // 删除事件
              int (*dispatch)(struct event_base *, void *, struct timeval *); // 事件分发
              void (*dealloc)(struct event_base *, void *); // 注销,释放资源
              /* set if we need to reinitialize the event base */
              int need_reinit;
        };

        在libevent中,每种IO多路复用技术的实现都必须提供这五种函数接口来完成自身的初始化、销毁释放;对事件的注册、注销和分发。比如对于epoll,libevent实现了5个对应的接口函数,并在初始化时经eventop的5个函数指针指向这5个函数,那么程序就可以使用epoll作为IO多路复用机制了。

     四、时间管理

        为了支持定时器,libevent必须需和系统时间打交道主要涉及到时间的加减辅助函数、时间缓存、时间校正和定时器堆的时间值调整等。

        libevent在初始化时会检测系统时间的类型,通过调用detect_monotonic()完成,它通过clock_gettime()来检测系统是否支持monotonic时钟类型

          static void detect_monotonic(void)
          {
            #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
               struct timespec    ts;
               if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0)
                  use_monotonic = 1; // 系统支持monotonic时间
            #endif
          }

        monotonic事件值得是系统从boot后到现在的时间,如果系统支持monotonic事件就将全局变量use_monotonic设置为1。

        1.时间缓存

          结构体event_base中的tv_cache,用来记录时间缓存。如果tv_cache已经设置,那么就直接使用缓存的时间,否则需要再次

 执行系统调用获取系统时间

        2.时间校正

          如果系统支持monotonic时间,该时间是从boot后到现在所经过的时间,因此不需要执行校正。如果系统不支持monotonic时间,用户可能会手动调整时间,校正由函数timeout_correct()完成。

            

 1 static void timeout_correct(struct event_base *base, struct timeval *tv)
 2 {
 3     struct event **pev;
 4     unsigned int size;
 5     struct timeval off;
 6     if (use_monotonic) // monotonic时间就直接返回,无需调整
 7         return;
 8     gettime(base, tv); // tv <---tv_cache
 9     // 根据前面的分析可以知道event_tv应该小于tv_cache
10     // 如果tv < event_tv表明用户向前调整时间了,需要校正时间
11     if (evutil_timercmp(tv, &base->event_tv, >=)) {
12         base->event_tv = *tv;
13         return;
14     }
15     // 计算时间差值
16     evutil_timersub(&base->event_tv, tv, &off);
17     // 调整定时事件小根堆
18     pev = base->timeheap.p;
19     size = base->timeheap.n;
20     for (; size-- > 0; ++pev) {
21         struct timeval *ev_tv = &(**pev).ev_timeout;
22         evutil_timersub(ev_tv, &off, ev_tv);
23     }
24     base->event_tv = *tv; // 更新event_tv为tv_cache
25 }

        在调整小根堆时,因为所有定时事件的时间值都会被减去相同的值,因此虽然堆中元素的时间键值被改变了,但是相对关系没有改变,不会改变堆的整体结构。因此只要遍历堆中的所有元素,将每个元素的时间键值减去相同的值即可完成调整,不需要重新调整堆的结构。调整完后,要将event_tv值重新设置为tv_cache值。

    五、libevent支持多线程

      libevent不是线程安全的,但是这并不表示libevent不支持多线程模式,其实方法在前面已经将signal事件处理时就接触到了,那就是消息通知机制。以下:

        1)暴力抢占

          停止正在执行的任务,马上去执行新来的任务。好处是消息可以立即得到处理,需要注意的是必须处理好线程切换问题。

        2)纯粹的消息通知机制

          执行完正在执行的任务,再执行新任务。通过消息通知,切换问题省心了,不过消息是不能立即处理的,而且所有的内容都是通过消息发送,增加了通信的开销。

        3)消息通知+同步层

          有个折中的办法可以减少消息通信的开销,就是提取一个同步层,把工作安排都存放在一个工作队列中,而且能够保证“任何人把新任务扔到这个队列”和“自己取出当前第一个任务”等这些操作都能够保证不会把队列搞乱

          工作队列实际上就是一个加锁的容器(队列,链表),这个很容易实现,而消息通知仅需一个字节,具体的任务都push到了工作队列中,减少了开销

原文地址:https://www.cnblogs.com/qingjiaowoxiaoxioashou/p/7227990.html