nginx&http 第三章 ngx 事件event accept epoll /init

tcp 三次握手成功后,listen fd  可读,在process_event_timer 中调用rev->handler(rev)处理;

其回调函数为: ngx_event_accept

/*
如何建立新连接
    上文提刭过,处理新连接事件的回调函数是ngx_event_accept,其原型如下。void ngx_event_accept (ngx_event_t★ev)
    下面简单介绍一下它的流程,如图9-6所示。
    下面对流程中的7个步骤进行说明。
    1)首先调用accept方法试图建立新连接,如果没有准备好的新连接事件,ngx_event_accept方法会直接返回。
    2)设置负载均衡阈值ngx_accept_disabled,这个阈值是进程允许的总连接数的1/8减去空闲连接数,
    3)调用ngx_get_connection方法由连接池中获取一个ngx_connection_t连接对象。
    4)为ngx_connection_t中的pool指针建立内存池。在这个连接释放到空闲连接池时,释放pool内存池。
    5)设置套接字的属性,如设为非阻塞套接字。
    6)将这个新连接对应的读事件添加到epoll等事件驱动模块中,这样,在这个连接上如果接收到用户请求epoll_wait,就会收集到这个事件。
    7)调用监听对象ngx_listening_t中的handler回调方法。ngx_listening_t结构俸的handler回调方法就是当新的TCP连接刚刚建立完成时在这里调用的。
    最后,如果监听事件的available标志位为1,再次循环到第1步,否则ngx_event_accept方法结束。事件的available标志位对应着multi_accept配置
    项。当available为l时,告诉Nginx -次性尽量多地建立新连接,它的实现原理也就在这里
*/
//这里的event是在ngx_event_process_init中从连接池中获取的 ngx_connection_t中的->read读事件
//accept是在ngx_event_process_init(但进程或者不配置负载均衡的时候)或者(多进程,配置负载均衡)的时候把accept事件添加到epoll中
void //该形参中的ngx_connection_t(ngx_event_t)是为accept事件连接准备的空间,当accept返回成功后,会重新获取一个ngx_connection_t(ngx_event_t)用来读写该连接
ngx_event_accept(ngx_event_t *ev) //在ngx_process_events_and_timers中执行              
{ //一个accept事件对应一个ev,如当前一次有4个客户端accept,应该对应4个ev事件,一次来多个accept的处理在下面的do {}while中实现
    socklen_t          socklen;
    ngx_err_t          err;
    ngx_log_t         *log;bangmang  
    ngx_uint_t         level;
    ngx_socket_t       s;

//如果是文件异步i/o中的ngx_event_aio_t,则它来自ngx_event_aio_t->ngx_event_t(只有读),如果是网络事件中的event,则为ngx_connection_s中的event(包括读和写)
    ngx_event_t       *rev, *wev; 
    ngx_listening_t   *ls;
    ngx_connection_t  *c, *lc;
    ngx_event_conf_t  *ecf;
    u_char             sa[NGX_SOCKADDRLEN];
#if (NGX_HAVE_ACCEPT4)
    static ngx_uint_t  use_accept4 = 1;
#endif

    if (ev->timedout) {
        if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
            return;
        }

        ev->timedout = 0;
    }

    ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);

    if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
        ev->available = ecf->multi_accept;   
    }

    lc = ev->data;
    ls = lc->listening;
    ev->ready = 0;

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                   "accept on %V, ready: %d", &ls->addr_text, ev->available);

    do { /* 如果是一次读取一个accept事件的话,循环体只执行一次, 如果是一次性可以读取所有的accept事件,则这个循环体执行次数为accept事件数*/
        socklen = NGX_SOCKADDRLEN;

#if (NGX_HAVE_ACCEPT4) //ngx_close_socket可以关闭套接字
        if (use_accept4) {
            s = accept4(lc->fd, (struct sockaddr *) sa, &socklen,
                        SOCK_NONBLOCK);
        } else {
            s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
        }
#else
    /*
            针对非阻塞I/O执行的系统调用则总是立即返回,而不管事件足否已经发生。如果事件没有眭即发生,这些系统调用就
        返回—1.和出错的情况一样。此时我们必须根据errno来区分这两种情况。对accept、send和recv而言,事件未发牛时errno
        通常被设置成EAGAIN(意为“再来一次”)或者EWOULDBLOCK(意为“期待阻塞”):对conncct而言,errno则被
        设置成EINPROGRESS(意为“在处理中")。
          */
        s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
#endif

        if (s == (ngx_socket_t) -1) {
            err = ngx_socket_errno;

            /* 如果要去一次性读取所有的accept信息,当读取完毕后,通过这里返回。所有的accept事件都读取完毕 */
            if (err == NGX_EAGAIN) { //如果event{}开启multi_accept,则在accept完该listen ip:port对应的ip和端口连接后,会通过这里返回
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err,
                               "accept() not ready");
                return;
            }

            level = NGX_LOG_ALERT;

            if (err == NGX_ECONNABORTED) {
                level = NGX_LOG_ERR;

            } else if (err == NGX_EMFILE || err == NGX_ENFILE) {
                level = NGX_LOG_CRIT;
            }

#if (NGX_HAVE_ACCEPT4)
            ngx_log_error(level, ev->log, err,
                          use_accept4 ? "accept4() failed" : "accept() failed");

            if (use_accept4 && err == NGX_ENOSYS) {
                use_accept4 = 0;
                ngx_inherited_nonblocking = 0;
                continue;
            }
#else
            ngx_log_error(level, ev->log, err, "accept() failed");
#endif

            if (err == NGX_ECONNABORTED) {
                if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
                    ev->available--;
                }

                if (ev->available) {
                    continue;
                }
            }
//NGX_EMFILE与NGX_ENFILE通常表示进程fd已经耗尽,此时一般需要禁用当前监听socket的再accept //新的连接
            if (err == NGX_EMFILE || err == NGX_ENFILE) {// Too many descriptors are in use by this process44//NGX_EMFILE与NGX_ENFILE通常表示进程fd已经耗尽,此时一般需要禁用当前监听socket的再accept //新的连接
                if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle, 1)
                    != NGX_OK)
                {
                    return;
                }

                if (ngx_use_accept_mutex) {
                    if (ngx_accept_mutex_held) {
                        ngx_shmtx_unlock(&ngx_accept_mutex);
                        ngx_accept_mutex_held = 0;
                    }
//当前进程连接accpet失败,则可以暂时设置为1,下次来的时候由其他进程竞争accpet锁,下下次该进程继续竞争该accept,因为在下次的时候ngx_process_events_and_timers
//ngx_accept_disabled = 1; 减去1后为0,可以继续竞争
                    ngx_accept_disabled = 1; 
                } else { ////如果是不需要实现负载均衡,则扫尾延时下继续在ngx_process_events_and_timers中accept
                    ngx_add_timer(ev, ecf->accept_mutex_delay, NGX_FUNC_LINE);
                }
            }

            return;
        }

#if (NGX_STAT_STUB)
        (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1);
#endif
//worker工作负载较高,此种情况下后续一段时间Nginx将会放弃抢占accept_mutex锁,这样将不会再有新的客户端连接
        //到此worker进程中; 如果ngx_accept_disabled<=0,那么表示空闲连接较多,后续会与其他worker进程来抢占accept_mutex锁
//设置负载均衡阀值 最开始free_connection_n=connection_n,见ngx_event_process_init ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; //判断可用连接的数目和总数目的八分之一大小,如果可用的小于八分之一,为正 //在服务器端accept客户端连接成功(ngx_event_accept)后,会通过ngx_get_connection从连接池获取一个ngx_connection_t结构,也就是每个客户端连接对于一个ngx_connection_t结构, //并且为其分配一个ngx_http_connection_t结构,ngx_connection_t->data = ngx_http_connection_t,见ngx_http_init_connection //从连接池中获取一个空闲ngx_connection_t,用于客户端连接建立成功后向该连接读写数据,函数形参中的ngx_event_t对应的是为accept事件对应的 //ngx_connection_t中对应的event 获取一个ngx_connection_t对象来容纳上面accept的socket c = ngx_get_connection(s, ev->log); //ngx_get_connection中c->fd = s; //注意,这里的ngx_connection_t是从连接池中从新获取的,和ngx_epoll_process_events中的ngx_connection_t是两个不同的。 if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_active, 1); #endif c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; } c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr == NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, sa, socklen); log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_close_accepted_connection(c); return; } /* set a blocking mode for iocp and non-blocking mode for others */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_IOCP_EVENT) { if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; 设置当前新建ngx_connection_t的接收发送函数。这里默认情况下我们采用epoll事件驱动机制,因此 //ngx_io的值为ngx_os_io c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; c->unexpected_eof = 1; #if (NGX_HAVE_UNIX_DOMAIN) if (c->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; #if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0; #endif } #endif //注意,这里的ngx_connection_t是从连接池中从新获取的,和ngx_epoll_process_events中的ngx_connection_t是两个不同的。
设置当前connection所关联的读写事件的相关标志。当前为客户端新连接,因此wev->ready=1,而rev->ready保持
		//默认的值0即可。(注: 在调用ngx_get_connection()函数获取connection时,一般会默认将相应的标志清0)
        rev = c->read; 
        wev = c->write;

        wev->ready = 1;

        if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
            rev->ready = 1;
        }

        if (ev->deferred_accept) {
            rev->ready = 1;
#if (NGX_HAVE_KQUEUE)
            rev->available = 1;
#endif
        }

        rev->log = log;
        wev->log = log;

        /*
         * TODO: MT: - ngx_atomic_fetch_add()
         *             or protection by critical section or light mutex
         *
         * TODO: MP: - allocated in a shared memory
         *           - ngx_atomic_fetch_add()
         *             or protection by critical section or light mutex
         */

        c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);

#if (NGX_STAT_STUB)
        (void) ngx_atomic_fetch_add(ngx_stat_handled, 1);
#endif

        if (ls->addr_ntop) {
            c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
            if (c->addr_text.data == NULL) {
                ngx_close_accepted_connection(c);
                return;
            }

            c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
                                             c->addr_text.data,
                                             ls->addr_text_max_len, 0);
            if (c->addr_text.len == 0) {
                ngx_close_accepted_connection(c);
                return;
            }
        }

#if (NGX_DEBUG)
        {

        ngx_str_t             addr;
        struct sockaddr_in   *sin;
        ngx_cidr_t           *cidr;
        ngx_uint_t            i;
        u_char                text[NGX_SOCKADDR_STRLEN];
#if (NGX_HAVE_INET6)
        struct sockaddr_in6  *sin6;
        ngx_uint_t            n;
#endif

        cidr = ecf->debug_connection.elts;
        for (i = 0; i < ecf->debug_connection.nelts; i++) {
            if (cidr[i].family != (ngx_uint_t) c->sockaddr->sa_family) {
                goto next;
            }

            switch (cidr[i].family) {

#if (NGX_HAVE_INET6)
            case AF_INET6:
                sin6 = (struct sockaddr_in6 *) c->sockaddr;
                for (n = 0; n < 16; n++) {
                    if ((sin6->sin6_addr.s6_addr[n]
                        & cidr[i].u.in6.mask.s6_addr[n])
                        != cidr[i].u.in6.addr.s6_addr[n])
                    {
                        goto next;
                    }
                }
                break;
#endif

#if (NGX_HAVE_UNIX_DOMAIN)
            case AF_UNIX:
                break;
#endif

            default: /* AF_INET */
                sin = (struct sockaddr_in *) c->sockaddr;
                if ((sin->sin_addr.s_addr & cidr[i].u.in.mask)
                    != cidr[i].u.in.addr)
                {
                    goto next;
                }
                break;
            }

            log->log_level = NGX_LOG_DEBUG_CONNECTION|NGX_LOG_DEBUG_ALL;
            break;

        next:
            continue;
        }

        if (log->log_level & NGX_LOG_DEBUG_EVENT) {
            addr.data = text;
            addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
                                     NGX_SOCKADDR_STRLEN, 1);

            ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0,
                           "*%uA accept: %V fd:%d", c->number, &addr, s);
        }

        }
#endif
//#define ngx_add_conn         ngx_event_actions.add_conn //connect和accept返回的时候用到  已经channel读的时候用
//ngx_epoll_add_connection----add fd----2 ---ep if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { //如果是epoll,不会走到这里面去 if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; 调用某一种监听socket的handler回调函数。查询'ls->handler' 可以看到,
对于http模块,其绑定的handler为
ngx_http_init_connection; 对于mail模块,其绑定的handler为ngx_mail_init_connection();
对于stream模块 其绑定的handler为ngx_stream_init_connection()
ls
->handler(c);//ngx_http_init_connection if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } } while (ev->available); //一次性读取所有当前的accept,直到accept返回NGX_EAGAIN,然后退出 }
//在创建子进程的里面执行  ngx_worker_process_init,
static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
    ngx_uint_t           m, i;
    ngx_event_t         *rev, *wev;
    ngx_listening_t     *ls;
    ngx_connection_t    *c, *next, *old;
    ngx_core_conf_t     *ccf;
    ngx_event_conf_t    *ecf;
    ngx_event_module_t  *module;

    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
    ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module);

    /*
         当打开accept_mutex负载均衡锁,同时使用了master模式并且worker迸程数量大于1时,才正式确定了进程将使用accept_mutex负载均衡锁。
     因此,即使我们在配置文件中指定打开accept_mutex锁,如果没有使用master模式或者worker进程数量等于1,进程在运行时还是不会使用
     负载均衡锁(既然不存在多个进程去抢一个监听端口上的连接的情况,那么自然不需要均衡多个worker进程的负载)。
         这时会将ngx_use_accept_mutex全局变量置为1,ngx_accept_mutex_held标志设为0,ngx_accept_mutex_delay则设为在配置文件中指定的最大延迟时间。
     */
    if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
        ngx_use_accept_mutex = 1;
        ngx_accept_mutex_held = 0;
        ngx_accept_mutex_delay = ecf->accept_mutex_delay;

    } else {
        ngx_use_accept_mutex = 0;
    }

#if (NGX_WIN32)

    /*
     * disable accept mutex on win32 as it may cause deadlock if
     * grabbed by a process which can't accept connections
     */

    ngx_use_accept_mutex = 0;

#endif

    ngx_queue_init(&ngx_posted_accept_events);
    ngx_queue_init(&ngx_posted_events);

    //初始化红黑树实现的定时器。
    if (ngx_event_timer_init(cycle->log) == NGX_ERROR) {
        return NGX_ERROR;
    }

    //在调用use配置项指定的事件模块中,在ngx_event_module_t接口下,ngx_event_actions_t中的init方法进行这个事件模块的初始化工作。
    for (m = 0; ngx_modules[m]; m++) {
        if (ngx_modules[m]->type != NGX_EVENT_MODULE) {
            continue;
        }

        if (ngx_modules[m]->ctx_index != ecf->use) { //找到epoll或者select的module模块
            continue;
        }

        module = ngx_modules[m]->ctx;

        if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) { //执行epoll module中的ngx_epoll_init
            /* fatal */
            exit(2);
        }

        break; /*跳出循环,只可能使用一个具体的事件模型*/  
    }

#if !(NGX_WIN32)
    /*
    如果nginx.conf配置文件中设置了timer_resolution酡置项,即表明需要控制时间精度,这时会调用setitimer方法,设置时间间隔
    为timer_resolution毫秒来回调ngx_timer_signal_handler方法
     */
    if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) {
        struct sigaction  sa;
        struct itimerval  itv;
        
        //设置定时器
        /*
            在ngx_event_ actions t的process_events方法中,每一个事件驱动模块都需要在ngx_event_timer_alarm为1时调
            用ngx_time_update方法()更新系统时间,在更新系统结束后需要将ngx_event_timer_alarm设为0。
          */
        ngx_memzero(&sa, sizeof(struct sigaction)); //每隔ngx_timer_resolution ms会超时执行handle
        sa.sa_handler = ngx_timer_signal_handler;
        sigemptyset(&sa.sa_mask);

        if (sigaction(SIGALRM, &sa, NULL) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "sigaction(SIGALRM) failed");
            return NGX_ERROR;
        }

        itv.it_interval.tv_sec = ngx_timer_resolution / 1000;
        itv.it_interval.tv_usec = (ngx_timer_resolution % 1000) * 1000;
        itv.it_value.tv_sec = ngx_timer_resolution / 1000;
        itv.it_value.tv_usec = (ngx_timer_resolution % 1000 ) * 1000;

        if (setitimer(ITIMER_REAL, &itv, NULL) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "setitimer() failed");
        }
    }

    /* 
     如果使用了epoll事件驱动模式,那么会为ngx_cycle_t结构体中的files成员预分配旬柄。
     */
    if (ngx_event_flags & NGX_USE_FD_EVENT) {
        struct rlimit  rlmt;

        if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                          "getrlimit(RLIMIT_NOFILE) failed");
            return NGX_ERROR;
        }

        cycle->files_n = (ngx_uint_t) rlmt.rlim_cur; //每个进程能够打开的最多文件数

        cycle->files = ngx_calloc(sizeof(ngx_connection_t *) * cycle->files_n,
                                  cycle->log);
        if (cycle->files == NULL) {
            return NGX_ERROR;
        }
    }

#endif

    cycle->connections =
        ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
    if (cycle->connections == NULL) {
        return NGX_ERROR;
    }

    c = cycle->connections;

    cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
                                   cycle->log);
    if (cycle->read_events == NULL) {
        return NGX_ERROR;
    }

    rev = cycle->read_events;
    for (i = 0; i < cycle->connection_n; i++) {
        rev[i].closed = 1;
        rev[i].instance = 1;
    }

    cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
                                    cycle->log);
    if (cycle->write_events == NULL) {
        return NGX_ERROR;
    }

    wev = cycle->write_events;
    for (i = 0; i < cycle->connection_n; i++) {
        wev[i].closed = 1;
    }

    i = cycle->connection_n;
    next = NULL;

    /*
    接照序号,将上述3个数组相应的读/写事件设置到每一个ngx_connection_t连接对象中,同时把这些连接以ngx_connection_t中的data成员
    作为next指针串联成链表,为下一步设置空闲连接链表做好准备
     */
    do {
        i--;

        c[i].data = next;
        c[i].read = &cycle->read_events[i];
        c[i].write = &cycle->write_events[i];
        c[i].fd = (ngx_socket_t) -1;

        next = &c[i];
    } while (i);
----// 也就是 connect  和 event 进行赋值 关联
    /*
    将ngx_cycle_t结构体中的空闲连接链表free_connections指向connections数组的最后1个元素,也就是第10步所有ngx_connection_t连
    接通过data成员组成的单链表的首部。
     */
    cycle->free_connections = next;
    cycle->free_connection_n = cycle->connection_n;

    /* for each listening socket */
    /*
     在刚刚建立好的连接池中,为所有ngx_listening_t监听对象中的connection成员分配连接,同时对监听端口的读事件设置处理方法
     为ngx_event_accept,也就是说,有新连接事件时将调用ngx_event_accept方法建立新连接()。
     */
    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) { 

#if (NGX_HAVE_REUSEPORT)
        //master进程执行ngx_clone_listening中如果配置了多worker,监听80端口会有worker个listen赋值,master进程在ngx_open_listening_sockets
        //中会监听80端口worker次,那么子进程创建起来后,不是每个字进程都关注这worker多个 listen事件了吗?为了避免这个问题,nginx通过
        //在子进程运行ngx_event_process_init函数的时候,通过ngx_add_event来控制子进程关注的listen,最终实现只关注master进程中创建的一个listen事件
        if (ls[i].reuseport && ls[i].worker != ngx_worker) {
            continue;
        }
#endif
        
        c = ngx_get_connection(ls[i].fd, cycle->log); //从连接池中获取一个ngx_connection_t

        if (c == NULL) {
            return NGX_ERROR;
        }

        c->log = &ls[i].log;

        c->listening = &ls[i]; //把解析到listen配置项信息赋值给ngx_connection_s中的listening中
        ls[i].connection = c;

        rev = c->read;

        rev->log = c->log;
        rev->accept = 1;

#if (NGX_HAVE_DEFERRED_ACCEPT)
        rev->deferred_accept = ls[i].deferred_accept;
#endif

        if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
            if (ls[i].previous) {

                /*
                 * delete the old accept events that were bound to
                 * the old cycle read events array
                 */

                old = ls[i].previous->connection;

                if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT)
                    == NGX_ERROR)
                {
                    return NGX_ERROR;
                }

                old->fd = (ngx_socket_t) -1;
            }
        }

#if (NGX_WIN32)

        if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
            ngx_iocp_conf_t  *iocpcf;
            rev->handler = ngx_event_acceptex;

            if (ngx_use_accept_mutex) {
                continue;
            }

            if (ngx_add_event(rev, 0, NGX_IOCP_ACCEPT) == NGX_ERROR) {
                return NGX_ERROR;
            }

            ls[i].log.handler = ngx_acceptex_log_error;

            iocpcf = ngx_event_get_conf(cycle->conf_ctx, ngx_iocp_module);
            if (ngx_event_post_acceptex(&ls[i], iocpcf->post_acceptex)
                == NGX_ERROR)
            {
                return NGX_ERROR;
            }

        } else {
            rev->handler = ngx_event_accept;

            if (ngx_use_accept_mutex) {
                continue;
            }

            if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
                return NGX_ERROR;
            }
        }

#else
        /*
        对监听端口的读事件设置处理方法
        为ngx_event_accept,也就是说,有新连接事件时将调用ngx_event_accept方法建立新连接
          */
        rev->handler = ngx_event_accept; 

        /* 
          使用了accept_mutex,暂时不将监听套接字放入epoll中, 而是等到worker抢到accept互斥体后,再放入epoll,避免惊群的发生。 
          */ //在建连接的时候,为了避免惊群,在accept的时候,只有获取到该原子锁,才把accept添加到epoll事件中,见ngx_process_events_and_timers->ngx_trylock_accept_mutex
        if (ngx_use_accept_mutex
#if (NGX_HAVE_REUSEPORT)
            && !ls[i].reuseport
#endif
           ) //如果是单进程方式
        {
            continue;
        }

        /*
          将监听对象连接的读事件添加到事件驱动模块中,这样,epoll等事件模块就开始检测监听服务,并开始向用户提供服务了。
          */ //如果ngx_use_accept_mutex为0也就是未开启accept_mutex锁,则在ngx_worker_process_init->ngx_event_process_init 中把accept连接读事件统计到epoll中
          //否则在ngx_process_events_and_timers->ngx_process_events_and_timers->ngx_trylock_accept_mutex中把accept连接读事件统计到epoll中

        char tmpbuf[256];
        
        snprintf(tmpbuf, sizeof(tmpbuf), "<%25s, %5d> epoll NGX_READ_EVENT(et) read add", NGX_FUNC_LINE);
        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, tmpbuf);
        if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) { //如果是epoll则为ngx_epoll_add_event
            return NGX_ERROR;
        }

#endif  

    }

    return NGX_OK;
}
//ngx_epoll_add_event表示添加某种类型的(读或者写,使用LT促发方式)事件?
//ngx_epoll_add_connection (读写一起添加上去, 使用EPOLLET边沿触发方式)
static ngx_int_t
ngx_epoll_add_connection(ngx_connection_t *c) //该函数封装为ngx_add_conn的,使用的时候为ngx_add_conn
{
    struct epoll_event  ee;

    ee.events = EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP; //注意这里是水平触发 
    ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance);

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
                   "epoll add connection(read and write): fd:%d ev:%08XD", c->fd, ee.events);

    if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
                      "epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);
        return NGX_ERROR;
    }

    c->read->active = 1;
    c->write->active = 1;

    return NGX_OK;
}
ngx_epoll_del_connection(ngx_connection_t *c, ngx_uint_t flags)
{
    int                 op;
    struct epoll_event  ee;

    /*
     * when the file descriptor is closed the epoll automatically deletes
     * it from its queue so we do not need to delete explicitly the event
     * before the closing the file descriptor
     */

    if (flags & NGX_CLOSE_EVENT) { //如果该函数紧接着会调用close(fd),那么就不用epoll del了,系统会自动del
        c->read->active = 0;
        c->write->active = 0;
        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
                   "epoll del connection: fd:%d", c->fd);

    op = EPOLL_CTL_DEL;
    ee.events = 0;
    ee.data.ptr = NULL;

    if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
                      "epoll_ctl(%d, %d) failed", op, c->fd);
        return NGX_ERROR;
    }

    c->read->active = 0;
    c->write->active = 0;

    return NGX_OK;
}




原文地址:https://www.cnblogs.com/codestack/p/12153512.html