每次客户端有可读数据触发时,优先检测是否还有数据没有发送,如果有则发送数据,然后在读取client数据
//向后端发送请求的调用过程
//ngx_http_upstream_send_request_body->ngx_output_chain->ngx_chain_writer
static ngx_int_t
ngx_http_upstream_send_request_body(ngx_http_request_t *r,
ngx_http_upstream_t *u, ngx_uint_t do_write)
{
int tcp_nodelay;
ngx_int_t rc;
ngx_chain_t *out, *cl, *ln;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream send request body");
if (!r->request_body_no_buffering) {
/* buffered request body */
/* request_sent 标志位为 1 表示是否已经传递了 request_bufs 缓冲区 */
if (!u->request_sent) {
/* 在第一次以 request_bufs 作为参数调用 ngx_output_chain
* 方法后,request_sent 会置为 1 */
u->request_sent = 1;
out = u->request_bufs; //如果是fastcgi这里面为实际发往后端的数据(包括fastcgi格式头部+客户端包体等)
} else {
out = NULL;
}
/* 1. 调用 ngx_out_chain 方法向上游服务器发送 ngx_http_upstream_t 结构体
* 中的 request_bufs 链表,这个方法对于发送缓冲区构成的 ngx_chain_t 链表
* 非常有用,它会把未发送完成的链表缓冲区保存下来,这样就不用每次调用时
* 都携带上 request_bufs 链表。*/
return ngx_output_chain(&u->output, out);
}
if (!u->request_sent) {
u->request_sent = 1;
out = u->request_bufs;
if (r->request_body->bufs) {
for (cl = out; cl->next; cl = out->next) { /* void */ }
cl->next = r->request_body->bufs;
r->request_body->bufs = NULL;
}
c = u->peer.connection;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
tcp_nodelay = 1;
if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
(const void *) &tcp_nodelay, sizeof(int)) == -1)
{
ngx_connection_error(c, ngx_socket_errno,
"setsockopt(TCP_NODELAY) failed");
return NGX_ERROR;
}
c->tcp_nodelay = NGX_TCP_NODELAY_SET;
}
r->read_event_handler = ngx_http_upstream_read_request_handler;// 每次有请求数据来时,先发送后读取
} else {
/* 进入到这里表示之前的请求数据还未发送完全,现在是再次触发
* 写事件时,将未发送完全的数据发送该上游服务器,这里将 out
* 置为 NULL,表示下面的 ngx_output_chain 函数不需要在传递参数 */
out = NULL;
}
for ( ;; ) {
if (do_write) {
rc = ngx_output_chain(&u->output, out);
if (rc == NGX_ERROR) {
return NGX_ERROR;//send error return
}
while (out) { // free 归还 free mem
ln = out;
out = out->next;
ngx_free_chain(r->pool, ln);
}
if (rc == NGX_OK && !r->reading_body) {
break;
}
}
if (r->reading_body) {
/* read client request body 也就是 请求缓存是否已经读玩 */
rc = ngx_http_read_unbuffered_request_body(r);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
out = r->request_body->bufs;
r->request_body->bufs = NULL;
}
/* stop if there is nothing to send */
if (out == NULL) {
rc = NGX_AGAIN;
break;
}
//out != NULL 继续向上游发送数据
do_write = 1;
}
if (!r->reading_body) {// 不懂------------不明白
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
r->read_event_handler =
ngx_http_upstream_rd_check_broken_connection;
}
}
return rc;
}
发送状态保存 :发送需要多次才能完成,所以发送时需要有缓存以及状态机记录发送进展
/*
函数目的是发送 in 中的数据,ctx 用来保存发送的上下文,因为发送通常情况下,不能一次完成。
nginx 因为使用了 ET 模式,在网络编程事件管理上简单了,但是编程中处理事件复杂了,
需要不停的循环做处理;事件的函数回调,次数也不确定,因此需要使用 context 上下文对象来保存发送到什么环节了。
注意:这里的in实际上是已经指向数据内容部分,或者如果发送的数据需要从文件中读取,
in中也会指定文件file_pos和file_last已经文件fd等,
可以参考ngx_http_cache_send ngx_http_send_header ngx_http_output_filter */
ngx_output_chain(ngx_output_chain_ctx_t *ctx, ngx_chain_t *in)
//in为需要发送的chain链,上面存储的是实际要发送的数据
{//ctx为&u->output, in为u->request_bufs这里nginx filter的主要逻辑都在这个函数里面,
//将in参数链表的缓冲块拷贝到
//ctx->in,然后将ctx->in的数据拷贝到out,然后调用output_filter发送出去。
//如果读取后端数据发往客户端,默认流程是
//ngx_event_pipe->ngx_event_pipe_write_to_downstream->p->output_filter(p->output_ctx, p->out);走到这里
off_t bsize;
ngx_int_t rc, last;
ngx_chain_t *cl, *out, **last_out;
ngx_uint_t sendfile = ctx->sendfile;
ngx_uint_t aio = ctx->aio;
ngx_uint_t directio = ctx->directio;
ngx_log_debugall(ctx->pool->log, 0, "ctx->sendfile:%ui, ctx->aio:%ui, ctx->directio:%ui", sendfile, aio, directio);
if (ctx->in == NULL && ctx->busy == NULL) //in是待发送的数据,busy是已经调用ngx_chain_writer但还没有发送完毕。
{
/*
* the short path for the case when the ctx->in and ctx->busy chains
* are empty, the incoming chain is empty too or has the single buf
* that does not require the copy
*/
// 要发送的 buf 只有一个,不需要复制
if (in == NULL) { //如果要发送的数据为空,也就是啥也不用发送。那就直接调用output_filter的了。
ngx_log_debugall(ctx->pool->log, 0, "ngx output chain, in = NULL");
return ctx->output_filter(ctx->filter_ctx, in);
}
if (in->next == NULL //说明发送buf只有一个
#if (NGX_SENDFILE_LIMIT)
&& !(in->buf->in_file && in->buf->file_last > NGX_SENDFILE_LIMIT)
#endif
&& ngx_output_chain_as_is(ctx, in->buf)) //这个函数主要用来判断是否需要复制buf。返回1,表示不需要拷贝,否则为需要拷贝
{
ngx_log_debugall(ctx->pool->log, 0, "only one chain buf to output_filter");
return ctx->output_filter(ctx->filter_ctx, in);
}
}
/* add the incoming buf to the chain ctx->in */
if (in) {//拷贝一份数据到ctx->in里面,需要老老实实的进行数据拷贝了。将in参数里面的数据拷贝到ctx->in里面。换了个in
if (ngx_output_chain_add_copy(ctx->pool, &ctx->in, in) == NGX_ERROR) {
return NGX_ERROR;
}
}
/* out为最终需要传输的chain,也就是交给剩下的filter处理的chain */
out = NULL;
last_out = &out; //下面遍历ctx->in链中的数据并且添加到该last_out中,也就是添加到out链中
last = NGX_NONE;
//到现在了,in参数的缓冲链表已经放在了ctx->in里面了。下面准备发送吧。
for ( ;; ) { //循环读取缓存中或者内存中的数据发送
//结合ngx_http_xxx_create_request(ngx_http_fastcgi_create_request)阅读,ctx->in中的数据实际上是从ngx_http_xxx_create_request组成ngx_chain_t来的,数据来源在ngx_http_xxx_create_request
while (ctx->in) {//遍历所有待发送的数据。将他们一个个拷贝到out指向的链表中
// 遍历 ctx->in chain 列表,处理 in,只会处理一次,如果发送不完数据,下次再进入函数,ctx->in 就是空
/*
* cycle while there are the ctx->in bufs
* and there are the free output bufs to copy in
*/
bsize = ngx_buf_size(ctx->in->buf);
//这块内存大小为0,然后又不是special 可能有问题。 如果是special的buf,应该是从ngx_http_send_special过来的
if (bsize == 0 && !ngx_buf_special(ctx->in->buf)) {
ngx_debug_point();
ctx->in = ctx->in->next;
continue;
}
/* 判断是否需要复制buf */
if (ngx_output_chain_as_is(ctx, ctx->in->buf)) {
//把ctx->in->buf从ctx->in上面取下来,然后加入到lst_out链表中
/* move the chain link to the output chain */
/* 如果不需要复制,则直接链接chain到out,然后继续循环 */
cl = ctx->in;
ctx->in = cl->next; //已经赋值的会从ctx->in上面摘掉
*last_out = cl;
last_out = &cl->next;
cl->next = NULL;
continue;
}
//注意从后端接收的数据到缓存文件中后,在filter模块中,有可能是新的buf数据指针了,因为ngx_http_copy_filter->ngx_output_chain中会重新分配内存读取缓存文件内容
//如果是需要赋值buf(一般都是sendfile的时候),用户空间内存里面没有数据,所以需要开辟空间来把文件中的内容赋值一份出来
/* 到达这里,说明我们需要拷贝buf,这里buf最终都会被拷贝进ctx->buf中, 因此这里先判断ctx->buf是否为空 */
if (ctx->buf == NULL) { //每次拷贝数据前,先给ctx->buf分配空间,在下面的ngx_output_chain_get_buf函数中
/* 如果为空,则取得buf,这里要注意,一般来说如果没有开启directio的话,这个函数都会返回NGX_DECLINED */
rc = ngx_output_chain_align_file_buf(ctx, bsize);
if (rc == NGX_ERROR) {
return NGX_ERROR;
}
if (rc != NGX_OK) {
if (ctx->free) {
/* get the free buf */
cl = ctx->free;
/* 得到free buf */
ctx->buf = cl->buf;
ctx->free = cl->next;
/* 将要重用的chain链接到ctx->poll中,以便于chain的重用 */
ngx_free_chain(ctx->pool, cl);
} else if (out || ctx->allocated == ctx->bufs.num) {//output_buffers 1 32768都用完了
/*
如果已经等于buf的个数限制,则跳出循环,发送已经存在的buf。 这里可以看到如果out存在的话,nginx会跳出循环,然后发送out,
等发送完会再次处理,这里很好的体现了nginx的流式处理
*/
break;
} else if (ngx_output_chain_get_buf(ctx, bsize) != NGX_OK) {/* 上面这个函数也比较关键,它用来取得buf。接下来会详细看这个函数 */
//该函数获取到的内存保存到ctx->buf中
return NGX_ERROR;
}
}
}
/* 从原来的buf中拷贝内容或者从原来的文件中读取内容
创建新的 chain 对象追加到 ctx->in 列表中,这些对象指向输入 in 中的 buf 对象
*/ //注意如果是aio on或者aio thread=poll方式返回的是NGX_AGAIN
rc = ngx_output_chain_copy_buf(ctx); //把ctx->in->buf中的内容赋值给ctx->buf
//ngx_output_chain_copy_bufc中tx->in中的内存数据或者缓存文件数据会拷贝到dst中,也就是ctx->buf,然后在ngx_output_chain_copy_buf函数
//外层会重新把ctx->buf赋值给新的chain,然后write出去 ,见下面的创建新chain
if (rc == NGX_ERROR) {
return rc;
}
if (rc == NGX_AGAIN) {
//AIO是异步方式,由内核自行发送出去,应用层不用管,读取文件中数据完毕后epoll会触发执行ngx_file_aio_event_handler中执行ngx_http_copy_aio_event_handler,表示内核已经读取完毕
if (out) {
break;
}
return rc;
}
/* delete the completed buf from the ctx->in chain */
if (ngx_buf_size(ctx->in->buf) == 0) {//这个节点大小为0,移动到下一个节点。
ctx->in = ctx->in->next;
}
cl = ngx_alloc_chain_link(ctx->pool);
if (cl == NULL) {
return NGX_ERROR;
}
//把ngx_output_chain_copy_buf中从原src拷贝的内容赋值给cl->buf,然后添加到lst_out的头部 也就是添加到out后面
cl->buf = ctx->buf;
cl->next = NULL;
*last_out = cl;
last_out = &cl->next;
ctx->buf = NULL;
//注意这里没有continue;直接往后走
}
if (out == NULL && last != NGX_NONE) {
if (ctx->in) {
return NGX_AGAIN;
}
return last;
}
last = ctx->output_filter(ctx->filter_ctx, out); // ngx_chain_writer
if (last == NGX_ERROR || last == NGX_DONE) {
return last;
}
ngx_chain_update_chains(ctx->pool, &ctx->free, &ctx->busy, &out,
ctx->tag);
last_out = &out;
}
}
对发送缓存的发送处理:
//向后端发送请求的调用过程ngx_http_upstream_send_request_body->ngx_output_chain->ngx_chain_writer
ngx_int_t
ngx_chain_writer(void *data, ngx_chain_t *in)
{
ngx_chain_writer_ctx_t *ctx = data;
off_t size;
ngx_chain_t *cl, *ln, *chain;
ngx_connection_t *c;
c = ctx->connection;
/*下面的循环,将in里面的每一个链接节点,添加到ctx->filter_ctx所指的链表中。并记录这些in的链表的大小。*/
for (size = 0; in; in = in->next) {
#if 1
if (ngx_buf_size(in->buf) == 0 && !ngx_buf_special(in->buf)) {
ngx_debug_point();
continue;
}
#endif
size += ngx_buf_size(in->buf);
ngx_log_debug2(NGX_LOG_DEBUG_CORE, c->log, 0,
"chain writer buf fl:%d s:%uO",
in->buf->flush, ngx_buf_size(in->buf));
cl = ngx_alloc_chain_link(ctx->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = in->buf; //把in->buf赋值给新的cl->buf,
cl->next = NULL;
//下面这两句实际上就是把cl添加到ctx->out链表头中,
*ctx->last = cl;
ctx->last = &cl->next; //向后移动last指针,指向新的最后一个节点的next变量地址。再次循环走到这里的时候,调用ctx->last=cl会把新的cl添加到out的尾部
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, c->log, 0,
"chain writer in: %p", ctx->out);
//遍历刚刚准备的链表,并统计其大小,这是啥意思?ctx->out为链表头,所以这里遍历的是所有的。
for (cl = ctx->out; cl; cl = cl->next) {
#if 1
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
ngx_debug_point();
continue;
}
#endif
size += ngx_buf_size(cl->buf);
}
if (size == 0 && !c->buffered) {//啥数据都么有,不用发了都
return NGX_OK;
}
//调用writev将ctx->out的数据全部发送出去。如果没法送完,则返回没发送完毕的部分。记录到out里面
//在ngx_event_connect_peer连接上游服务器的时候设置的发送链接函数ngx_send_chain = ngx_writev_chain 。
chain = c->send_chain(c, ctx->out, ctx->limit); //ngx_send_chain->ngx_writev_chain 到后端的请求报文是不会走filter过滤模块的,而是直接调用ngx_writev_chain->ngx_writev发送到后端
ngx_log_debug1(NGX_LOG_DEBUG_CORE, c->log, 0,
"chain writer out: %p", chain);
if (chain == NGX_CHAIN_ERROR) {
return NGX_ERROR;
}
for (cl = ctx->out; cl && cl != chain; /* void */) { //把ctx->out中已经全部发送出去的in节点从out链表摘除放入free中,重复利用
ln = cl;
cl = cl->next;
ngx_free_chain(ctx->pool, ln);
}
ctx->out = chain; //ctx->out上面现在只剩下还没有发送出去的in节点了
if (ctx->out == NULL) { //说明已经ctx->out链中的所有数据已经全部发送完成
ctx->last = &ctx->out;
if (!c->buffered) {
//发送到后端的请求报文之前buffered一直都没有操作过为0,如果是应答给客户端的响应,则buffered可能在进入ngx_http_write_filter调用
//c->send_chain()之前已经有赋值给,发送给客户端包体的时候会经过所有的filter模块走到这里
return NGX_OK;
}
}
return NGX_AGAIN; //如果上面的chain = c->send_chain(c, ctx->out, ctx->limit)后,out中还有数据则返回NGX_AGAIN等待再次事件触发调度
}
ngx_chain_t *
ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
{//调用writev一次发送多个缓冲区,如果没有发送完毕,则返回剩下的链接结构头部。
//ngx_chain_writer调用这里,调用方式为 ctx->out = c->send_chain(c, ctx->out, ctx->limit);
//第二个参数为要发送的数据
ssize_t n, sent;
off_t send, prev_send;
ngx_chain_t *cl;
ngx_event_t *wev;
ngx_iovec_t vec;
struct iovec iovs[NGX_IOVS_PREALLOCATE];
wev = c->write;//拿到这个连接的写事件结构
if (!wev->ready) {//连接还没准备好,返回当前的节点。
return in;
}
/* the maximum limit size is the maximum size_t value - the page size */
if (limit == 0 || limit > (off_t) (NGX_MAX_SIZE_T_VALUE - ngx_pagesize)) {
limit = NGX_MAX_SIZE_T_VALUE - ngx_pagesize;//够大了,最大的整数
}
send = 0;
vec.iovs = iovs;
vec.nalloc = NGX_IOVS_PREALLOCATE;
for ( ;; ) {
prev_send = send; //prev_send为上一次调用ngx_writev发送出去的字节数
/* create the iovec and coalesce the neighbouring bufs */
//把in链中的buf拷贝到vec->iovs[n++]中,注意只会拷贝内存中的数据到iovec中,不会拷贝文件中的
cl = ngx_output_chain_to_iovec(&vec, in, limit - send, c->log);
if (cl == NGX_CHAIN_ERROR) {
return NGX_CHAIN_ERROR;
}
ngx_debug_point();
return NGX_CHAIN_ERROR;
}
send += vec.size; //为ngx_output_chain_to_iovec中组包的in链中所有数据长度和
n = ngx_writev(c, &vec);
//我期望发送vec->size字节数据,但是实际上内核发送出去的很可能比vec->size小,n为实际发送出去的字节数,因此需要继续发送
if (n == NGX_ERROR) {
return NGX_CHAIN_ERROR;
}
sent = (n == NGX_AGAIN) ? 0 : n;//记录发送的数据大小。
c->sent += sent;//递增统计数据,这个链接上发送的数据大小
in = ngx_chain_update_sent(in, sent); //send是此次调用ngx_wrtev发送成功的字节数
//ngx_chain_update_sent返回后的in链已经不包括之前发送成功的in节点了,这上面只包含剩余的数据
if (send - prev_send != sent) { //这里说明最多调用ngx_writev两次成功发送后,这里就会返回
wev->ready = 0; //标记暂时不能发送数据了,必须重新epoll_add写事件
return in;
}
if (send >= limit || in == NULL) { //数据发送完毕,或者本次发送成功的字节数比limit还多,则返回出去
return in; //
}
}
}