http代理阅读3 发送mem处理

每次客户端有可读数据触发时,优先检测是否还有数据没有发送,如果有则发送数据,然后在读取client数据

//向后端发送请求的调用过程
//ngx_http_upstream_send_request_body->ngx_output_chain->ngx_chain_writer
static ngx_int_t
ngx_http_upstream_send_request_body(ngx_http_request_t *r,
    ngx_http_upstream_t *u, ngx_uint_t do_write)
{
    int                        tcp_nodelay;
    ngx_int_t                  rc;
    ngx_chain_t               *out, *cl, *ln;
    ngx_connection_t          *c;
    ngx_http_core_loc_conf_t  *clcf;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http upstream send request body");

    if (!r->request_body_no_buffering) {

       /* buffered request body */
    /* request_sent 标志位为 1 表示是否已经传递了 request_bufs 缓冲区 */
       if (!u->request_sent) { 
        /* 在第一次以 request_bufs 作为参数调用 ngx_output_chain 
             * 方法后,request_sent 会置为 1 */
           u->request_sent = 1;
           out = u->request_bufs; //如果是fastcgi这里面为实际发往后端的数据(包括fastcgi格式头部+客户端包体等)

       } else {
           out = NULL;
       }
       /* 1. 调用 ngx_out_chain 方法向上游服务器发送 ngx_http_upstream_t 结构体
               * 中的 request_bufs 链表,这个方法对于发送缓冲区构成的 ngx_chain_t 链表
               * 非常有用,它会把未发送完成的链表缓冲区保存下来,这样就不用每次调用时
               * 都携带上 request_bufs 链表。*/
       return ngx_output_chain(&u->output, out);
    }

    if (!u->request_sent) {
        u->request_sent = 1;
        out = u->request_bufs;

        if (r->request_body->bufs) {
            for (cl = out; cl->next; cl = out->next) { /* void */ }
            cl->next = r->request_body->bufs;
            r->request_body->bufs = NULL;
        }

        c = u->peer.connection;
        clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

        if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
            ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");

            tcp_nodelay = 1;

            if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
                           (const void *) &tcp_nodelay, sizeof(int)) == -1)
            {
                ngx_connection_error(c, ngx_socket_errno,
                                     "setsockopt(TCP_NODELAY) failed");
                return NGX_ERROR;
            }

            c->tcp_nodelay = NGX_TCP_NODELAY_SET;
        }

        r->read_event_handler = ngx_http_upstream_read_request_handler;// 每次有请求数据来时,先发送后读取

    } else {
         /* 进入到这里表示之前的请求数据还未发送完全,现在是再次触发
         * 写事件时,将未发送完全的数据发送该上游服务器,这里将 out
         * 置为 NULL,表示下面的 ngx_output_chain 函数不需要在传递参数 */
        out = NULL;
    }

    for ( ;; ) {

        if (do_write) {
            rc = ngx_output_chain(&u->output, out);

            if (rc == NGX_ERROR) {
                return NGX_ERROR;//send error return 
            
            }

            while (out) { // free 归还 free mem
                ln = out;
                out = out->next;
                ngx_free_chain(r->pool, ln);
            }

            if (rc == NGX_OK && !r->reading_body) {
                break;
            }
        }

        if (r->reading_body) {
            /* read client request body   也就是 请求缓存是否已经读玩 */

            rc = ngx_http_read_unbuffered_request_body(r);

            if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
                return rc;
            }

            out = r->request_body->bufs;
            r->request_body->bufs = NULL;
        }

        /* stop if there is nothing to send */

        if (out == NULL) {
            rc = NGX_AGAIN;
            break;
        }
        //out != NULL 继续向上游发送数据
        do_write = 1;
    }

    if (!r->reading_body) {// 不懂------------不明白
        if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
            r->read_event_handler =
                                  ngx_http_upstream_rd_check_broken_connection;
        }
    }

    return rc;
}

发送状态保存 :发送需要多次才能完成,所以发送时需要有缓存以及状态机记录发送进展

/* 
函数目的是发送 in 中的数据,ctx 用来保存发送的上下文,因为发送通常情况下,不能一次完成。
nginx 因为使用了 ET 模式,在网络编程事件管理上简单了,但是编程中处理事件复杂了,
需要不停的循环做处理;事件的函数回调,次数也不确定,因此需要使用 context 上下文对象来保存发送到什么环节了。
注意:这里的in实际上是已经指向数据内容部分,或者如果发送的数据需要从文件中读取,
in中也会指定文件file_pos和file_last已经文件fd等,
   可以参考ngx_http_cache_send ngx_http_send_header ngx_http_output_filter */
ngx_output_chain(ngx_output_chain_ctx_t *ctx, ngx_chain_t *in)  
//in为需要发送的chain链,上面存储的是实际要发送的数据
{//ctx为&u->output, in为u->request_bufs这里nginx filter的主要逻辑都在这个函数里面,
//将in参数链表的缓冲块拷贝到
//ctx->in,然后将ctx->in的数据拷贝到out,然后调用output_filter发送出去。

//如果读取后端数据发往客户端,默认流程是
//ngx_event_pipe->ngx_event_pipe_write_to_downstream->p->output_filter(p->output_ctx, p->out);走到这里
    off_t         bsize;
    ngx_int_t     rc, last;
    ngx_chain_t  *cl, *out, **last_out;

    ngx_uint_t sendfile = ctx->sendfile;
    ngx_uint_t aio = ctx->aio;
    ngx_uint_t directio = ctx->directio;
    
    ngx_log_debugall(ctx->pool->log, 0, "ctx->sendfile:%ui, ctx->aio:%ui, ctx->directio:%ui", sendfile, aio, directio);
    if (ctx->in == NULL && ctx->busy == NULL) //in是待发送的数据,busy是已经调用ngx_chain_writer但还没有发送完毕。
    {
        /*
         * the short path for the case when the ctx->in and ctx->busy chains
         * are empty, the incoming chain is empty too or has the single buf
         * that does not require the copy
         */
// 要发送的 buf 只有一个,不需要复制
        if (in == NULL) { //如果要发送的数据为空,也就是啥也不用发送。那就直接调用output_filter的了。
            ngx_log_debugall(ctx->pool->log, 0, "ngx output chain, in = NULL");
            return ctx->output_filter(ctx->filter_ctx, in);
        }

        if (in->next == NULL //说明发送buf只有一个
#if (NGX_SENDFILE_LIMIT)
            && !(in->buf->in_file && in->buf->file_last > NGX_SENDFILE_LIMIT)
#endif
            && ngx_output_chain_as_is(ctx, in->buf)) //这个函数主要用来判断是否需要复制buf。返回1,表示不需要拷贝,否则为需要拷贝 
        {
            ngx_log_debugall(ctx->pool->log, 0, "only one chain buf to output_filter");
            return ctx->output_filter(ctx->filter_ctx, in);
        }
    }

    /* add the incoming buf to the chain ctx->in */
   
    if (in) {//拷贝一份数据到ctx->in里面,需要老老实实的进行数据拷贝了。将in参数里面的数据拷贝到ctx->in里面。换了个in
        if (ngx_output_chain_add_copy(ctx->pool, &ctx->in, in) == NGX_ERROR) {
            return NGX_ERROR;
        }
    }

    /* out为最终需要传输的chain,也就是交给剩下的filter处理的chain */  
    out = NULL;
    last_out = &out; //下面遍历ctx->in链中的数据并且添加到该last_out中,也就是添加到out链中
    last = NGX_NONE;
    //到现在了,in参数的缓冲链表已经放在了ctx->in里面了。下面准备发送吧。

    for ( ;; ) { //循环读取缓存中或者内存中的数据发送
        //结合ngx_http_xxx_create_request(ngx_http_fastcgi_create_request)阅读,ctx->in中的数据实际上是从ngx_http_xxx_create_request组成ngx_chain_t来的,数据来源在ngx_http_xxx_create_request
        while (ctx->in) {//遍历所有待发送的数据。将他们一个个拷贝到out指向的链表中
// 遍历 ctx->in chain 列表,处理 in,只会处理一次,如果发送不完数据,下次再进入函数,ctx->in 就是空
            /*
             * cycle while there are the ctx->in bufs
             * and there are the free output bufs to copy in
             */

            bsize = ngx_buf_size(ctx->in->buf);
            //这块内存大小为0,然后又不是special 可能有问题。 如果是special的buf,应该是从ngx_http_send_special过来的
            if (bsize == 0 && !ngx_buf_special(ctx->in->buf)) {
                ngx_debug_point();
                ctx->in = ctx->in->next;
                continue;
            }
            /* 判断是否需要复制buf */    
            if (ngx_output_chain_as_is(ctx, ctx->in->buf)) {
                //把ctx->in->buf从ctx->in上面取下来,然后加入到lst_out链表中
                /* move the chain link to the output chain */
                /* 如果不需要复制,则直接链接chain到out,然后继续循环 */ 
                cl = ctx->in;
                ctx->in = cl->next; //已经赋值的会从ctx->in上面摘掉

                *last_out = cl;
                last_out = &cl->next;
                cl->next = NULL;

                continue;
            }


//注意从后端接收的数据到缓存文件中后,在filter模块中,有可能是新的buf数据指针了,因为ngx_http_copy_filter->ngx_output_chain中会重新分配内存读取缓存文件内容

            //如果是需要赋值buf(一般都是sendfile的时候),用户空间内存里面没有数据,所以需要开辟空间来把文件中的内容赋值一份出来
            
            /* 到达这里,说明我们需要拷贝buf,这里buf最终都会被拷贝进ctx->buf中, 因此这里先判断ctx->buf是否为空 */ 
            if (ctx->buf == NULL) { //每次拷贝数据前,先给ctx->buf分配空间,在下面的ngx_output_chain_get_buf函数中

                /* 如果为空,则取得buf,这里要注意,一般来说如果没有开启directio的话,这个函数都会返回NGX_DECLINED */  
                rc = ngx_output_chain_align_file_buf(ctx, bsize);

                if (rc == NGX_ERROR) {
                    return NGX_ERROR;
                }

                if (rc != NGX_OK) {

                    if (ctx->free) {

                        /* get the free buf */

                        cl = ctx->free;
                        /* 得到free buf */    
                        ctx->buf = cl->buf;
                        ctx->free = cl->next;
                        /* 将要重用的chain链接到ctx->poll中,以便于chain的重用 */  
                        ngx_free_chain(ctx->pool, cl);

                    } else if (out || ctx->allocated == ctx->bufs.num) {//output_buffers 1 32768都用完了
                   /* 
                        如果已经等于buf的个数限制,则跳出循环,发送已经存在的buf。 这里可以看到如果out存在的话,nginx会跳出循环,然后发送out,
                        等发送完会再次处理,这里很好的体现了nginx的流式处理 
                        */ 
                        break;

                    } else if (ngx_output_chain_get_buf(ctx, bsize) != NGX_OK) {/* 上面这个函数也比较关键,它用来取得buf。接下来会详细看这个函数 */
                        //该函数获取到的内存保存到ctx->buf中
                        return NGX_ERROR;
                    }
                }
            }
            
            /* 从原来的buf中拷贝内容或者从原来的文件中读取内容 
            创建新的 chain 对象追加到 ctx->in 列表中,这些对象指向输入 in 中的 buf 对象
            */  //注意如果是aio on或者aio thread=poll方式返回的是NGX_AGAIN    
            rc = ngx_output_chain_copy_buf(ctx); //把ctx->in->buf中的内容赋值给ctx->buf
//ngx_output_chain_copy_bufc中tx->in中的内存数据或者缓存文件数据会拷贝到dst中,也就是ctx->buf,然后在ngx_output_chain_copy_buf函数
//外层会重新把ctx->buf赋值给新的chain,然后write出去 ,见下面的创建新chain

            if (rc == NGX_ERROR) {
                return rc;
            }

            if (rc == NGX_AGAIN) { 
            //AIO是异步方式,由内核自行发送出去,应用层不用管,读取文件中数据完毕后epoll会触发执行ngx_file_aio_event_handler中执行ngx_http_copy_aio_event_handler,表示内核已经读取完毕
                if (out) {
                    break;
                }

                return rc;
            }

            /* delete the completed buf from the ctx->in chain */

            if (ngx_buf_size(ctx->in->buf) == 0) {//这个节点大小为0,移动到下一个节点。
                ctx->in = ctx->in->next;
            }

            cl = ngx_alloc_chain_link(ctx->pool);
            if (cl == NULL) {
                return NGX_ERROR;
            }
            //把ngx_output_chain_copy_buf中从原src拷贝的内容赋值给cl->buf,然后添加到lst_out的头部  也就是添加到out后面
            cl->buf = ctx->buf;
            cl->next = NULL;
            *last_out = cl;
            last_out = &cl->next;
            ctx->buf = NULL;

            //注意这里没有continue;直接往后走
        }

        if (out == NULL && last != NGX_NONE) {

            if (ctx->in) {
                return NGX_AGAIN;
            }

            return last;
        }

        last = ctx->output_filter(ctx->filter_ctx, out); //  ngx_chain_writer

        if (last == NGX_ERROR || last == NGX_DONE) {
            return last;
        }

        ngx_chain_update_chains(ctx->pool, &ctx->free, &ctx->busy, &out,
                                ctx->tag);
        last_out = &out;
    }
}

对发送缓存的发送处理:

//向后端发送请求的调用过程ngx_http_upstream_send_request_body->ngx_output_chain->ngx_chain_writer
ngx_int_t
ngx_chain_writer(void *data, ngx_chain_t *in)
{
    ngx_chain_writer_ctx_t *ctx = data;

    off_t              size;
    ngx_chain_t       *cl, *ln, *chain;
    ngx_connection_t  *c;

    c = ctx->connection;
    /*下面的循环,将in里面的每一个链接节点,添加到ctx->filter_ctx所指的链表中。并记录这些in的链表的大小。*/
    for (size = 0; in; in = in->next) {
#if 1
        if (ngx_buf_size(in->buf) == 0 && !ngx_buf_special(in->buf)) {
            ngx_debug_point();
            continue;
        }
#endif

        size += ngx_buf_size(in->buf);
        ngx_log_debug2(NGX_LOG_DEBUG_CORE, c->log, 0,
                       "chain writer buf fl:%d s:%uO",
                       in->buf->flush, ngx_buf_size(in->buf));

        cl = ngx_alloc_chain_link(ctx->pool);
        if (cl == NULL) {
            return NGX_ERROR;
        }

        cl->buf = in->buf; //把in->buf赋值给新的cl->buf,
        cl->next = NULL;
        //下面这两句实际上就是把cl添加到ctx->out链表头中,
        *ctx->last = cl; 
        ctx->last = &cl->next; //向后移动last指针,指向新的最后一个节点的next变量地址。再次循环走到这里的时候,调用ctx->last=cl会把新的cl添加到out的尾部
    }
    ngx_log_debug1(NGX_LOG_DEBUG_CORE, c->log, 0,
                   "chain writer in: %p", ctx->out);
                   
    //遍历刚刚准备的链表,并统计其大小,这是啥意思?ctx->out为链表头,所以这里遍历的是所有的。
    for (cl = ctx->out; cl; cl = cl->next) {
#if 1
        if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
            ngx_debug_point();

            continue;
        }
#endif
        size += ngx_buf_size(cl->buf);
    }

    if (size == 0 && !c->buffered) {//啥数据都么有,不用发了都
        return NGX_OK;
    }

    //调用writev将ctx->out的数据全部发送出去。如果没法送完,则返回没发送完毕的部分。记录到out里面
    //在ngx_event_connect_peer连接上游服务器的时候设置的发送链接函数ngx_send_chain = ngx_writev_chain      。
    chain = c->send_chain(c, ctx->out, ctx->limit); //ngx_send_chain->ngx_writev_chain  到后端的请求报文是不会走filter过滤模块的,而是直接调用ngx_writev_chain->ngx_writev发送到后端
    ngx_log_debug1(NGX_LOG_DEBUG_CORE, c->log, 0,
                   "chain writer out: %p", chain);
    if (chain == NGX_CHAIN_ERROR) {
        return NGX_ERROR;
    }

    for (cl = ctx->out; cl && cl != chain; /* void */) { //把ctx->out中已经全部发送出去的in节点从out链表摘除放入free中,重复利用
        ln = cl;
        cl = cl->next;
        ngx_free_chain(ctx->pool, ln);
    }

    ctx->out = chain; //ctx->out上面现在只剩下还没有发送出去的in节点了

    if (ctx->out == NULL) { //说明已经ctx->out链中的所有数据已经全部发送完成
        ctx->last = &ctx->out;

        if (!c->buffered) { 
        //发送到后端的请求报文之前buffered一直都没有操作过为0,如果是应答给客户端的响应,则buffered可能在进入ngx_http_write_filter调用
        //c->send_chain()之前已经有赋值给,发送给客户端包体的时候会经过所有的filter模块走到这里
            return NGX_OK;
        }
    }

    return NGX_AGAIN; //如果上面的chain = c->send_chain(c, ctx->out, ctx->limit)后,out中还有数据则返回NGX_AGAIN等待再次事件触发调度
}


ngx_chain_t *
ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
{//调用writev一次发送多个缓冲区,如果没有发送完毕,则返回剩下的链接结构头部。
//ngx_chain_writer调用这里,调用方式为 ctx->out = c->send_chain(c, ctx->out, ctx->limit);
//第二个参数为要发送的数据
    ssize_t        n, sent;
    off_t          send, prev_send;
    ngx_chain_t   *cl;
    ngx_event_t   *wev;
    ngx_iovec_t    vec;
    struct iovec   iovs[NGX_IOVS_PREALLOCATE];

    wev = c->write;//拿到这个连接的写事件结构

    if (!wev->ready) {//连接还没准备好,返回当前的节点。
        return in;
    }
    /* the maximum limit size is the maximum size_t value - the page size */
    if (limit == 0 || limit > (off_t) (NGX_MAX_SIZE_T_VALUE - ngx_pagesize)) {
        limit = NGX_MAX_SIZE_T_VALUE - ngx_pagesize;//够大了,最大的整数
    }

    send = 0;

    vec.iovs = iovs;
    vec.nalloc = NGX_IOVS_PREALLOCATE;

    for ( ;; ) {
        prev_send = send; //prev_send为上一次调用ngx_writev发送出去的字节数
        /* create the iovec and coalesce the neighbouring bufs */
        //把in链中的buf拷贝到vec->iovs[n++]中,注意只会拷贝内存中的数据到iovec中,不会拷贝文件中的
        cl = ngx_output_chain_to_iovec(&vec, in, limit - send, c->log);
        if (cl == NGX_CHAIN_ERROR) {
            return NGX_CHAIN_ERROR;
        }
            ngx_debug_point();
            return NGX_CHAIN_ERROR;
        }

        send += vec.size; //为ngx_output_chain_to_iovec中组包的in链中所有数据长度和

        n = ngx_writev(c, &vec); 
        //我期望发送vec->size字节数据,但是实际上内核发送出去的很可能比vec->size小,n为实际发送出去的字节数,因此需要继续发送

        if (n == NGX_ERROR) {
            return NGX_CHAIN_ERROR;
        }
        sent = (n == NGX_AGAIN) ? 0 : n;//记录发送的数据大小。
        c->sent += sent;//递增统计数据,这个链接上发送的数据大小
        in = ngx_chain_update_sent(in, sent); //send是此次调用ngx_wrtev发送成功的字节数
        //ngx_chain_update_sent返回后的in链已经不包括之前发送成功的in节点了,这上面只包含剩余的数据       
        if (send - prev_send != sent) { //这里说明最多调用ngx_writev两次成功发送后,这里就会返回
            wev->ready = 0; //标记暂时不能发送数据了,必须重新epoll_add写事件
            return in;
        }
        if (send >= limit || in == NULL) { //数据发送完毕,或者本次发送成功的字节数比limit还多,则返回出去
            return in; //
        }
    }
}
原文地址:https://www.cnblogs.com/codestack/p/13897700.html