浅谈几种服务器端模型——反应堆模式(基于epoll的反应堆)

引言:前面一章简单介绍了关于epoll 的使用方式,这一章介绍一下一个简单的反应堆模型,没有实现超时机制的管理。最主要的是要介绍一下关于异步事件反应堆的设计方式。

反应堆的模型图在上一张可以看到,但是那个是盗来的一张图,twisted 的反应堆。今天给不熟悉这个部分的朋友介绍一下基于 epoll 的反应堆,过程类似于libevent.

反应堆可以提供几个操作:

(0)创建一个反应堆:

mc_event_base_t * mc_base_new(void) ;

返回一个操作句柄.  

(1)为某一个需要监听的文件描述符加入回调函数,并注册事件类型。

1
2
3
4
5
6
int mc_event_set( mc_event_t *ev , short revent , int fd , mc_ev_callback callback , void *args )  ;
    /*
     * Initialize a event , add callback and event type
     * if the event exists , this function will change the mode of this event
     * and fd
     */

 这里的 revent 由宏定义为几种类型:

  

1
2
3
4
5
#define MC_EV_READ     0x0001
#define MC_EV_WRITE    0x0002
#define MC_EV_SIGNAL   0x0004
#define MC_EV_TIMEOUT  0x0008
#define MC_EV_LISTEN   0x0010

相应的操作可以使用 | 运算来并几个需要监听的事件类型。

事件类型定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typedef struct mc_event_s
{
     
      struct mc_event_s   *next    ;
      
      
      struct mc_event_s   *prev    ;
     
     unsigned int min_heap_index  ;
      
     int ev_fd      ;   // file des of event
     short revent   ;   // event type
      
     struct timeval  ev_timeval   ; // event timeout time
     mc_ev_callback callback ;// callback of this event
     void  *args                  ;
     int ev_flags                 ;
      
     mc_event_base_t    *base     ;
}mc_event_t ;

事件结构本身后面解释。 

(2)将需要监听的并且已经初始化的事件加入反应堆。

1
2
3
4
5
6
int mc_event_post( mc_event_t *ev , mc_event_base_t * base ) ;
    /*
     * Post this event to event_base
     * struct base has two queue , active queue and added queue
     * this function will post event to added queue , but not in active queue
     */

将刚才注册了事件类型和回调函数的事件加入 base, 即将其看做一个反应堆。

(3)最后提供了一个 dispatch 函数,反应堆开始循环,等待事件的发生。如果对应的 fd 上的事件发生,调用相应的回调函数。由第一步注册。

1
2
3
4
5
6
int mc_dispatch( mc_event_base_t * base ) ;
    /*
     * start loop
     * and dispatch event by
     * mc_event_loop
     */

反应堆支持在循环过程中,通过相应的回调函数再注册事件,类似于热加入,热移除。

实现方式很简单,就是在第一个事件的回调函数上调用 mc_event_set()然后注册。再加入 base.

base 的结构如下 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct mc_event_base_s
{
    void         *  added_list      ;
    void         *  active_list     ;
    unsigned int    event_num       ;
    unsigned int    event_active_num;
     
    /*
     *mc_minheap        minheap         ;
     */
    int             epoll_fd        ;  //for epoll only
    int             ev_base_stop    ;
    int             magic           ;
    struct timeval  event_time      ;  
}mc_event_base_t ;

让我们来看一个简单的 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*_____________________test bellow ______________________*/
#define mc_sock_fd  int
 
 
#define DEFAULT_NET AF_INET
#define DEFAULT_DATA_GRAM   SOCK_STREAM
#define DEFAULT_PORT        (1115)
#define DEFAULT_BACKLOG     (200)
 
/* simple connection */
struct _connection
{
    int fd            ;
    mc_event_t  read  ;
    mc_event_t  write ;
    char buf[1024]    ;
    mc_event_base_t * base ;
};
void setreuseaddr( mc_sock_fd fd )
{
    int yes = 1 ;
    setsockopt( fd , SOL_SOCKET , SO_REUSEADDR , &yes , sizeof(int) );
}
int mc_socket()
{
    int retsock = socket(DEFAULT_NET,DEFAULT_DATA_GRAM,0) ;
    if( retsock < 0  )
    {
        /* we should add some debug information here
        fprintf(LOGPATH,"socket error\n");
        */
        return -1 ;
    }
    return retsock ;
}      
 
int mc_bind(mc_sock_fd listenfd )
{
    struct sockaddr_in serveraddr ;
    bzero(&serveraddr,sizeof(serveraddr));
 
    serveraddr.sin_family = AF_INET ;
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    serveraddr.sin_port = htons(DEFAULT_PORT);
    return bind(listenfd,(struct sockaddr *)&serveraddr , sizeof(serveraddr ));
}
 
int mc_isten(mc_sock_fd listenfd)
{
    return listen(listenfd,DEFAULT_BACKLOG);
}
 
 
 
void handler_accept( int fd , short revent , void *args )
{
    struct sockaddr_in in_addr ;
    size_t in_len ;
    int s   ;
    int done = 0 ;
    struct _connection * lc = (struct _connection *)args ;
     
    in_len = sizeof( in_addr );
    mc_setnonblocking(fd) ;
    while( !done )
    {
        s = accept( fd , (struct sockaddr *)&in_addr , &in_len );
        if( s == -1 )
        {
            if( (errno == EAGAIN )|| (errno == EWOULDBLOCK ) )
            {
                break;
            }
            else
            {
                perror("accept");
                break;
            }
        }
        if( s == 0 )
        {
            fprintf(stderr,"Accept a connection on %d \n",fd );
        }
        done = 1 ;
    }
        mc_setnonblocking(s) ;
        lc->fd = s ;
        mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );
         
         
        mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );
        mc_event_post( &(lc->write) , lc->base );
          
         
}
void handler_read( int fd , short revent , void *args )
{
    mc_setnonblocking(fd) ;
    struct _connection * lc ;
    lc  = (struct _connection *)args ;
    read( fd , lc->buf , 1024 );
    mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );
}
 
void handler_write( int fd , short revent , void *args )
{
    mc_setnonblocking(fd) ;
    struct _connection * lc ;
    lc  = (struct _connection *)args ;
    write( fd , lc->buf , 1024 );
    mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );
}
 
void cab( int fd , short revent , void *args )
{
    mc_setnonblocking(fd) ;
    char buf[1024] = "xx00xx00xx00xx00\n";
    write(fd,buf,1024);
}
int main()
{
    mc_event_t mev ;
    mc_event_base_t  *base = mc_base_new() ;
    struct _connection lc ;
    lc.base = base ;
     
    int sockfd = mc_socket() ;
    mc_bind(sockfd);
    mc_isten(sockfd);
     
    mc_event_set( &(lc.read) , MC_EV_READ , sockfd , handler_accept , &lc );
    mc_event_post( &(lc.read) , base );
    mc_dispatch(base);
    return 0;
}

  

首先:封装的几个套接口操作没有考虑错误处理,作为简单的实例。

定义了一个 connection 结构,用于表示每一个到来的连接,这里的 struct _connection 中包含读写事件和一个缓冲区,还有指向反应堆的指针和对应注册的fd

工作过程如下:(集中看  main函数)

(1)创建一个反应堆。

(2)实例化一个 connection

(3)创建套接口,bind,listen 老生常谈,这里就不多说了

(4)将这个监听套接口注册相应的回调函数,这里我们注册的是 handler_accept() 函数,回调函数类型都是  void *XXX(  int  , short , void *) ;

       当监听套接口发生可读事件时,第一次我们认为是相应的监听套接口得到了新的连接,所以,第一次调用的时候直接调用注册了的回调函数 handler_accept().

在handler_accept() 函数中,我们为这个连接的读写事件添加了相应的回调函数,并把连接描述符(不是监听描述符)注册到这个上。下次这个套接口可读的时候调用handler_read(),可写的时候调用handler_write(). 如果需要改变状态或改变回调函数,只需要一个状态机或者别的方式来确定需要的回调函数是哪一个,在我们的handler_write() 和 handler_read()中可以改变回调函数,代码所示。

PS:注意一点的是我们的事件是一个实例,不管是在connection结构中或是自己定义,都需要不断的向操作系统申请空间,如果采用对象池或者connection池的方式,可以减少服务器的负载。

总结:反应堆模式最基本的操作就是:注册事件(为需要监听的fd加入回调函数)----->将事件加入反应堆------>开始事件循环------>事件发生,调用回调函数。

异步操作的精髓就是在这里,而不是同步的等待每一个事件。下一章讲解这个反应堆的实现,越来越带感咯.

文章属原创,转载请注明出处 联系作者: Email:zhangbolinux@sina.com QQ:513364476
原文地址:https://www.cnblogs.com/Leo_wl/p/2470495.html