ngx ------ngx_cache_manager_process_cycle




static
void ngx_cache_manager_process_cycle(ngx_cycle_t *cycle, void *data) {----------------------------------- /* * Set correct process type since closing listening Unix domain socket * in a master process also removes the Unix domain socket file. */ ngx_process = NGX_PROCESS_HELPER; ngx_close_listening_sockets(cycle); /* Set a moderate number of connections for a helper process. */ cycle->connection_n = 512; //work 进程 初始化 ngx_worker_process_init(cycle, -1); ngx_memzero(&ev, sizeof(ngx_event_t)); ev.handler = ctx->handler; //ngx_cache_manager_process_handler ngx_cache_loader_process_handler ev.data = ident; ev.log = cycle->log; ident[3] = (void *) -1; ngx_use_accept_mutex = 0; ngx_setproctitle(ctx->name); ngx_add_timer(&ev, ctx->delay, NGX_FUNC_LINE); //ctx->dealy秒执行ctx->handler; for ( ;; ) { if (ngx_terminate || ngx_quit) { ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); exit(0); } if (ngx_reopen) { ngx_reopen = 0; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs"); ngx_reopen_files(cycle, -1); } ------------//work 进程 循环处理 fd io 以及 timer------------------------------------------------------ ngx_process_events_and_timers(cycle); } }
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)  
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;
    
    ---------------------------------------
    ngx_use_accept_mutex = 1;
   //ngx_use_accept_mutex表示是否需要通过对accept加锁来解决惊群问题。当nginx worker进程数>1时且配置文件中打开accept_mutex时,这个标志置为1   
    if (ngx_use_accept_mutex) {
        /*
              ngx_accept_disabled表示此时满负荷,没必要再处理新连接了,我们在nginx.conf曾经配置了每一个nginx worker进程能够处理的最大连接数,
          当达到最大数的7/8时,ngx_accept_disabled为正,说明本nginx worker进程非常繁忙,将不再去处理新连接,这也是个简单的负载均衡
              在当前使用的连接到达总连接数的7/8时,就不会再处理新连接了,同时,在每次调用process_events时都会将ngx_accept_disabled减1,
          直到ngx_accept_disabled降到总连接数的7/8以下时,才会调用ngx_trylock_accept_mutex试图去处理新连接事件  如果ngx_accept_disabled大于了0,就表示该进程接受的
连接过多,因此就放弃一次争抢accept mutex的机会,同时将 自己减1。然后,继续处理已有连接上的事件。Nginx就借用 此变量实现了进程关于连接的基本负载均衡。

*/
        if (ngx_accept_disabled > 0) { //为正说明可用连接用了超过八分之七,则让其他的进程在下面的else中来accept
            ngx_accept_disabled--;

        } else {
            /*
                 如果ngx_trylock_accept_mutex方法没有获取到锁,接下来调用事件驱动模块的process_events方法时只能处理已有的连接上的事件;
                 如果获取到了锁,调用process_events方法时就会既处理已有连接上的事件,也处理新连接的事件。
              
                如何用锁来避免惊群?
                   尝试锁accept mutex,只有成功获取锁的进程,才会将listen  
                   套接字放入epoll中。因此,这就保证了只有一个进程拥有  
                   监听套接口,故所有进程阻塞在epoll_wait时,不会出现惊群现象。  
                   这里的ngx_trylock_accept_mutex函数中,如果顺利的获取了锁,那么它会将监听端口注册到当前worker进程的epoll当中   

               获得accept锁,多个worker仅有一个可以得到这把锁。获得锁不是阻塞过程,都是立刻返回,获取成功的话ngx_accept_mutex_held被置为1。
               拿到锁,意味着监听句柄被放到本进程的epoll中了,如果没有拿到锁,则监听句柄会被从epoll中取出。 
              */
        /*
           如果ngx_use_accept_mutex为0也就是未开启accept_mutex锁,则在ngx_worker_process_init->ngx_event_process_init 中把accept连接读事件统计到epoll中
           否则在ngx_process_events_and_timers->ngx_process_events_and_timers->ngx_trylock_accept_mutex中把accept连接读事件统计到epoll中
---------------------------尝试锁accept mutex,只有成功获取锁的进程,才会将listen 套接字放入epoll中。因此,这就保证了只有一个进程拥有  监听套接口,故所有进程阻塞在epoll_wait时,不会出现惊群现象。 

*/
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { //不管是获取到锁还是没获取到锁都是返回NGX_OK
                return;
            }

            /*
                拿到锁的话,置flag为NGX_POST_EVENTS,这意味着ngx_process_events函数中,任何事件都将延后处理,会把accept事件都放到
                ngx_posted_accept_events链表中,epollin|epollout事件都放到ngx_posted_events链表中 ---
 获取锁的进程,将添加一个NGX_POST_EVENTS标志, 此标志的作用是将所有产生的事件放入一个队列中, 等释放锁后,再慢慢来处理事件。
因为,处理事件可能 会很耗时,如果不先释放锁再处理的话,该进程就长 时间霸占了锁,导致其他进程无法获取锁,这样accept 的效率就低了。            
*/ if (ngx_accept_mutex_held) { flags |= NGX_POST_EVENTS; } else { /* 拿不到锁,也就不会处理监听的句柄,这个timer实际是传给epoll_wait的超时时间,修改为最大ngx_accept_mutex_delay意味 着epoll_wait更短的超时返回,以免新连接长时间没有得到处理 */ if (timer == NGX_TIMER_INFINITE || timer > ngx_accept_mutex_delay) { //如果没获取到锁,则延迟这么多ms重新获取说,继续循环,也就是技术锁被其他进程获得,本进程最多在epoll_wait中睡眠0.5s,然后返回 timer = ngx_accept_mutex_delay; //保证这么多时间超时的时候出发epoll_wait返回,从而可以更新内存时间 } } } } delta = ngx_current_msec; /* 1.如果进程获的锁,并获取到锁,则该进程在epoll事件发生后会触发返回,然后得到对应的事件handler,加入延迟队列中,然后释放锁,然 后在执行对应handler,同时更新时间,判断该进程对应的红黑树中是否有定时器超时, 2.如果没有获取到锁,则默认传给epoll_wait的超时时间是0.5s,表示过0.5s继续获取锁,0.5s超时后,会跟新当前时间,同时判断是否有过期的 定时器,有则指向对应的定时器函数 */ /* 1.ngx_event_s可以是普通的epoll读写事件(参考ngx_event_connect_peer->ngx_add_conn或者ngx_add_event),通过读写事件触发 2.也可以是普通定时器事件(参考ngx_cache_manager_process_handler->ngx_add_timer(ngx_event_add_timer)),通过ngx_process_events_and_timers中的 epoll_wait返回,可以是读写事件触发返回,也可能是因为没获取到共享锁,从而等待0.5s返回重新获取锁来跟新事件并执行超时事件来跟新事件并且判断定 时器链表中的超时事件,超时则执行从而指向event的handler,然后进一步指向对应r或者u的->write_event_handler read_event_handler 3.也可以是利用定时器expirt实现的读写事件(参考ngx_http_set_write_handler->ngx_add_timer(ngx_event_add_timer)),触发过程见2,只是在handler中不会执行write_event_handler read_event_handler */ //linux下,普通网络套接字调用ngx_epoll_process_events函数开始处理,异步文件i/o设置事件的回调方法为ngx_epoll_eventfd_handler (void) ngx_process_events(cycle, timer, flags); delta = ngx_current_msec - delta; //(void) ngx_process_events(cycle, timer, flags)中epoll等待事件触发过程花费的时间 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait timer range(delta): %M", delta); //当感应到来自于客户端的accept事件,epoll_wait返回后加入到post队列,执行完所有accpet连接事件后,立马释放ngx_accept_mutex锁,这样其他进程就可以立马获得锁accept客户端连接
//* 这里完成对队列中的accept事件的处理,实际就是调用 ngx_event_accept函数来获取一个新的连接,然后放入 epoll中。 
ngx_event_process_posted(cycle, &ngx_posted_accept_events); //一般执行ngx_event_accept //释放锁后再处理下面的EPOLLIN EPOLLOUT请求 --------------所有accept事件处理完成,如果拥有锁的话,就赶紧释放了。 其他进程还等着抢了。 if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); } if (delta) { ngx_event_expire_timers(); //处理红黑树队列中的超时事件handler } /* 然后再处理正常的数据读写请求。因为这些请求耗时久,所以在ngx_process_events里NGX_POST_EVENTS标志将事件都放入ngx_posted_events 链表中,延迟到锁释放了再处理。 */ ngx_event_process_posted(cycle, &ngx_posted_events); //普通读写事件放在释放ngx_accept_mutex锁后执行,提高客户端accept性能 }

https://yikun.github.io/2014/03/26/nginxevent/

原文地址:https://www.cnblogs.com/codestack/p/13463877.html