对pipe downstream的思考&分析

     回到ngx_http_upstream_send_response,如果是buffering,就会进入后面的处理过程,准备一个ngx_event_pipe_t结构的数据,这个结构可以通过upstream的u->pipe进行索引找到。首先设置p->output_filter输出过滤函数为ngx_http_output_filter,用来进行输出过滤并发送数据;将p->upstream 设置为跟后端 u->peer.connection; p->downstream设置为跟客户端的连接;

  之后便是拷贝了u->buffer到p->preread_bufs,这个u->buf是读取上游返回的数据的缓冲区,也就是proxy;这里面有http头部,也可能有body部分;然后便是设置upstream的读回调函数read_event_handler为read_event_handler,跟客户端的连接的写回调函数write_event_handler为ngx_http_upstream_process_downstream;这2个回调函数一个处理跟upstream的读取数据,一个处理跟客户端连接的发送数据,这是重点。设置完后就调用ngx_http_upstream_process_upstream尝试读取upstream的数据。

  p = u->pipe;
    //设置filter,可以看到就是http的输出filter
    p->output_filter = ngx_http_upstream_output_filter;
    p->output_ctx = r;
    p->tag = u->output.tag;
    p->bufs = u->conf->bufs;//设置bufs,它就是upstream中设置的bufs.u == &flcf->upstream;
    p->busy_size = u->conf->busy_buffers_size;
    p->upstream = u->peer.connection;//赋值跟后端upstream的连接。
    p->downstream = c;//赋值跟客户端的连接。
    p->pool = r->pool;
    p->log = c->log;
    p->limit_rate = u->conf->limit_rate;
    p->start_sec = ngx_time();

    p->cacheable = u->cacheable || u->store;
-------------------------------------------------------
 //下面申请一个缓冲链接节点,来存储刚才我们再读取后端的包,为了得到HTTP headers的时候不小心多读取到的数据。
        //其实只要FCGI等后端发给后端的包中,有一个包的前半部分是header,后一部分是body,就会有预读数据。

    p->preread_bufs = ngx_alloc_chain_link(r->pool);
    if (p->preread_bufs == NULL) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }
    p->preread_bufs->buf = &u->buffer;
    p->preread_bufs->next = NULL;
    u->buffer.recycled = 1;
    p->preread_size = u->buffer.last - u->buffer.pos;
------------------------------------------
ngx_http_upstream_process_upstream

ngx_http_upstream_process_upstream 用来读取后端cgi等数据,然后调用pipe转发到客户端;下面来看看ngx_event_pipe。

/*在有buffering的时候,使用event_pipe进行数据的转发,调用 ngx_event_pipe_*
函数读取数据,或者发送数据给客户端。
ngx_event_pipe将upstream响应发送回客户端。do_write代表是否要往客户端发送,写数据。
如果设置了,那么会先发给客户端,再读upstream数据,当然,如果读取了数据,也会调用这里的。
*/
ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
    ngx_int_t     rc;
    ngx_uint_t    flags;
    ngx_event_t  *rev, *wev;
//不断的用ngx_event_pipe_read_upstream读取客户端数据,然后调用ngx_event_pipe_write_to_downstream
    for ( ;; ) {
        if (do_write) {// do_write为1,向下游发送响应包体,并检查其返回值
            p->log->action = "sending to client";

            rc = ngx_event_pipe_write_to_downstream(p);
        // 返回NGX_OK时继续读取上游的响应事件,返回其他值需要终止ngx_event_pipe函数
            if (rc == NGX_ABORT) {
                return NGX_ABORT;
            }

            if (rc == NGX_BUSY) {
                return NGX_OK;
            }
        }

        p->read = 0;
        p->upstream_blocked = 0;

        p->log->action = "reading upstream";
        //从upstream读取数据到chain的链表里面,然后整块整块的调用input_filter进行协议的解析,
        //并将HTTP结果存放在p->in,p->last_in的链表里面。// 从上游读取响应数据
        if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
            return NGX_ABORT;
        }
        //upstream_blocked是在ngx_event_pipe_read_upstream里面设置的变量,代表是否有数据已经从upstream读取了

//当没有读取到响应数据,并且也不需要暂停读取响应的读取时,跳出当前循环,即不对do_write进行设置
        if (!p->read && !p->upstream_blocked) {
            break;
        }
        //还要转发到后端。 因为当读到的响应数据,或者需要暂停读取数据,先给客户端发送响应以释放缓冲区时,设置do_write进行响应的发送
        do_write = 1;
    }
    if (p->upstream->fd != (ngx_socket_t) -1) {
        rev = p->upstream->read;
        flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
          // 将上游读事件添加到epoll中
        if (ngx_handle_read_event(rev, flags) != NGX_OK) {
            return NGX_ABORT;
        }
        if (!rev->delayed) { // 同时设置读事件的超时定时器
            if (rev->active && !rev->ready) {
                ngx_add_timer(rev, p->read_timeout);

            } else if (rev->timer_set) {
                ngx_del_timer(rev);
            }
        }
    }
 // 将下游的写事件添加到epoll中,并且设置写事件的定时器
    if (p->downstream->fd != (ngx_socket_t) -1
        && p->downstream->data == p->output_ctx)
    {
        wev = p->downstream->write;
        if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
            return NGX_ABORT;
        }

        if (!wev->delayed) {
            if (wev->active && !wev->ready) {
                ngx_add_timer(wev, p->send_timeout);

            } else if (wev->timer_set) {
                ngx_del_timer(wev);
            }
        }
    }

    return NGX_OK;
}

整体流程是:

ngx_process_cycle循环调用ngx_process_events_and_timers,后者调用ngx_epoll_process_events处理读写事件;
1.c->read->handler = ngx_http_upstream_handler();SOCK连接最基础的读写回调handler,
2.u->read_event_handler = ngx_http_upstream_process_upstream();
3.ngx_event_pipe();
4.ngx_event_pipe_read_upstream() 进入主要读取处理函数。

ngx_event_pipe_read_upstream函数完成下面几个功能:

0.从preread_bufs,free_raw_bufs或者ngx_create_temp_buf寻找一块空闲的或部分空闲的内存;
1.调用p->upstream->recv_chain==ngx_readv_chain,用writev的方式读取FCGI的数据,填充chain。
2.对于整块buf都满了的chain节点调用input_filter(ngx_http_fastcgi_input_filter)进行upstream协议解析,比如FCGI协议,解析后的结果放入p->in里面;
3.对于没有填充满的buffer节点,放入free_raw_bufs以待下次进入时从后面进行追加。
4.当然了,如果对端发送完数据FIN了,那就直接调用input_filter处理free_raw_bufs这块数据

简单来说就是:设置fd 回调,事件被唤醒,循环进行数据读取、解析、保存、转发;然后根据业务场景需求 处理异常逻辑

ngx_event_pipe_t结构,这个结构维护着上下游间转发的响应包体,用于解决内存复制的问题

struct ngx_event_pipe_s {
    ngx_connection_t *upstream;                        // 与上游服务器间的连接
    ngx_connection_t *downstream;                      // 与下游客户端间的连接                        

    ngx_chain_t *free_raw_bufs;                        // 用于接收上游服务器响应的缓冲区链表,新收到的响应向链表头部插入
    ngx_chain_t *in;                                   // 接收到上游响应的缓冲区,ngx_event_pipe_copy_input_filter将buffer中的数据设置到in中
    ngx_chain_t **last_in;                             // 指向刚刚接收到的缓冲区

    ngx_chain_t *out;                                  // 将要发给客户端的缓冲区链表,
    ngx_chain_t *free;                                 // 等待释放的缓冲区
    ngx_chain_t *busy;                                 // 表示上次发送响应时未发完的缓冲区链表,下一次发送时会合并到out链表中

    /*
     * the input filter i.e. that moves HTTP/1.1 chunks
     * from the raw bufs to an incoming chain
     */

    ngx_event_pipe_input_filter_pt input_filter;       // 处理接收到的来自上游服务器的缓冲区,接收响应的处理方法
    void *input_ctx;                                   // input_filter函数的参数,通常设置为ngx_http_request_t

    ngx_event_pipe_output_filter_pt output_filter;     // 向下游发送响应的方法,默认为ngx_http_output_filter
    void *output_ctx;                                  // output_filter函数的参数,通常设置为ngx_http_request_t

    unsigned read:1;                                   // 为1表示当前已经读到来自上游的响应
    unsigned cacheable:1;                              // 为1时表示启用文件缓存
    unsigned single_buf:1;                             // 为1时表示接收上游的响应时一次只能接收一个ngx_buf_t缓冲区
    unsigned free_bufs:1;                              // 为1时表示当不再接收上游的响应包体时,尽可能快的释放缓冲区
    unsigned upstream_done:1;                          // input_filter中用到的标识位,表示Nginx与上游间的交互已经结束
    unsigned upstream_error:1;                         // 与上游连接出现错误时,将该标识为置为1,比如超时,解析错误等
    unsigned upstream_eof:1;                           // 与上游的连接已经关闭时,该标志位置为1
    unsigned upstream_blocked:1;                       // 表示暂时阻塞读取上游响应的流程,先发送响应,再用释放的缓冲区接收响应
    unsigned downstream_done:1;                        // 为1时表示与下游的交互已经结束
    unsigned downstream_error:1;                       // 与下游连接出现错误时,设置为1
    unsigned cyclic_temp_file:1;                       // 为1时会试图复用临时文件中曾用过的空间

    ngx_int_t allocated;                               // 表示已经分配的缓冲区的数目,其受bufs.num成员的限制
    ngx_bufs_t bufs;                                   // 记录了接收上游响应的内存缓冲区的大小,bufs.size记录每个缓冲区大小,bufs.num记录缓冲区个数
    ngx_buf_tag_t tag;                                 // 用于设置、比较缓冲区链表中ngx_buf_t结构体的tag标志位

    ssize_t busy_size;

    off_t read_length;                                 // 已经接收到上游响应包体长度
    off_t length;                                      // 表示临时文件的最大长度

    off_t max_temp_file_size;                          // 表示临时文件的最大长度
    ssize_t temp_file_write_size;                      // 表示一次写入文件时的最大长度

    ngx_msec_t read_timeout;                           // 读取上游响应的超时时间
    ngx_msec_t send_timeout;                           // 向下游发送响应的超时时间
    ssize_t send_lowat;                                // 向下游发送响应时,TCP连接中设置的参数

    ngx_pool_t *pool;                                  // 用于分配内存缓冲区的连接池对象
    ngx_log_t *log;                                    // 用于记录日志的ngx_log_t对象

    ngx_chain_t *preread_bufs;                         // 表示接收上游服务器响应头部的阶段,已经读到的响应包体
    size_t preread_size;                               // 表示接收上游服务器响应头部的阶段,已经读到的响应包体长度
    ngx_buf_t *buf_to_file;                            // 

    size_t limit_rate;                                 // 发送速率的限制
    time_t start_sec;                                  // 连接的启动时间

    ngx_temp_file_t *temp_file;                        // 存放上游响应的临时文件

    /* STUB */ int num;                                // 已经使用的ngx_buf_t的数目
}

接收上游响应函数处理

ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
{
    off_t         limit;
    ssize_t       n, size;
    ngx_int_t     rc;
    ngx_buf_t    *b;
    ngx_msec_t    delay;
    ngx_chain_t  *chain, *cl, *ln;

    if (p->upstream_eof || p->upstream_error || p->upstream_done) {
        return NGX_OK;
    }
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                   "pipe read upstream: %d", p->upstream->read->ready);

    for ( ;; ) {
      // 检查上游连接是否结束,如果已经结束,不再接收新的响应,跳出循环
        if (p->upstream_eof || p->upstream_error || p->upstream_done) {
            break;
        }
  // 如果preread_bufs为NULL代表读包头时没有读到包体信息或者已经处理完成,ready为0表示没有上游响应可以接收,跳出循环
        if (p->preread_bufs == NULL && !p->upstream->read->ready) {
            break;
        }
    // preread_bufs存放着接收包头时可能读取到的包体信息,如果不为空,则先要优先处理这部分包体信息
        if (p->preread_bufs) {
            /* use the pre-read bufs if they exist  已经读取的数据 */
            chain = p->preread_bufs;// 用chain保存待处理的缓冲区,重置preread_bufs,下次循环则不会再走到该逻辑
            p->preread_bufs = NULL;
            n = p->preread_size;if (n) {
                p->read = 1;  // 有待处理的包体信息,将read设置为1,表示接收到的包体待处理
            }
        } else {
            if (p->limit_rate) {
                if (p->upstream->read->delayed) {
                    break;
                }

                limit = (off_t) p->limit_rate * (ngx_time() - p->start_sec + 1)
                        - p->read_length;

                if (limit <= 0) {
                    p->upstream->read->delayed = 1;
                    delay = (ngx_msec_t) (- limit * 1000 / p->limit_rate + 1);
                    ngx_add_timer(p->upstream->read, delay);
                    break;
                }

            } else {
                limit = 0;
            }
// free_raw_bufs用于表示一次ngx_event_pipe_read_upstream方法调用过程中接收到的上游响应
            if (p->free_raw_bufs) {
                /* use the free bufs if they exist */
                chain = p->free_raw_bufs;
                if (p->single_buf) {
                    p->free_raw_bufs = p->free_raw_bufs->next;
                    chain->next = NULL;
                } else {
                    p->free_raw_bufs = NULL;
                }// 判断当前已分配的缓冲区的数量是否超过了bufs.num,没有超过时可以继续分配
            } else if (p->allocated < p->bufs.num) {
                /* allocate a new buf if it's still allowed */
                b = ngx_create_temp_buf(p->pool, p->bufs.size);
                if (b == NULL) {
                    return NGX_ABORT;
                }
                p->allocated++;
                chain = ngx_alloc_chain_link(p->pool);
                if (chain == NULL) {
                    return NGX_ABORT;
                }
                chain->buf = b;
                chain->next = NULL;
            } else if (/*---- 缓冲区已经达到上限,如果写事件的ready为1时表示可以向下游发送响应,而delay为0代表并不是由于限速的原因导致写事件就 
当ready为1,且delay为0时,可以向下游发送响应来释放缓冲区了           -----------                 * if the bufs are not needed to be saved in a cache and
                 * a downstream is ready then write the bufs to a downstream
                 */
            }
            

            n = p->upstream->recv_chain(p->upstream, chain, limit);

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                           "pipe recv chain: %z", n);

            if (p->free_raw_bufs) { // 将新接收到的缓冲区放置到free_raw_bufs链表的最后
                chain->next = p->free_raw_bufs;
            }
            p->free_raw_bufs = chain;

            if (n == NGX_ERROR) {
                p->upstream_error = 1;
                break;
            }
            if (n == NGX_AGAIN) {
                if (p->single_buf) {
                    ngx_event_pipe_remove_shadow_links(chain->buf);
                }
                break;
            }
            p->read = 1;
            if (n == 0) {//没有读到数据,肯定upstream发送了FIN包,那就读取完成了。
                p->upstream_eof = 1;
                break;
            }
        }
        delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
        p->read_length += n;
        cl = chain;
        p->free_raw_bufs = NULL;
/*
首先cl = chain;用cl指向刚刚读取的或预读的数据,n代表数据的大小,
于是循环的将cl指向的链表里面的数据(总大小为n)进行协议解析,我们知道,
这个协议解析会调用FCGI协议或者其他协议的回调的,那就是:input_filter,
对于FCGI是ngx_http_fastcgi_input_filter,其他可能为ngx_event_pipe_copy_input_filter。
FCGI这个回调是在ngx_http_fastcgi_handler函数里面初始化时设置的。
*/
        while (cl && n > 0) {/*如果还有链表数据并且长度不为0,也就是这次的还没有处理完
        那如果之前保留有一部分数据呢?
不会的,如果之前预读了数据,那么上面的大if语句else里面进不去,
就是此时的n肯定等于preread_bufs的长度preread_size。
        //如果之前没有预读数据,但free_raw_bufs不为空,那也没关系,
        free_raw_bufs里面的数据肯定已经在下面几行处理过了。
*/
            ngx_event_pipe_remove_shadow_links(cl->buf);

            size = cl->buf->end - cl->buf->last;

            if (n >= size) {//缓冲区已满,需要调用input_filter函数处理
                cl->buf->last = cl->buf->end;
                /* STUB */ cl->buf->num = p->num++;
// 当前缓冲区已满,需要处理,
//下面的input_filter方法是ngx_event_pipe_copy_input_filter函数,
//其主要在in链表中增加这个缓冲区

                if (p->input_filter(p, cl->buf) == NGX_ERROR) {

                    return NGX_ABORT;
                } // 更新待处理的包体的长度,释放已经处理的缓冲区
                n -= size;
                ln = cl;
                cl = cl->next;
                //继续处理下一块,并释放这个节点。
                ngx_free_chain(p->pool, ln);

            } else {//如果这个节点的空闲内存数目大于剩下要处理的,就将剩下的存放在这里。
                cl->buf->last += n;
                n = 0;
            }
        }

        if (cl) { //将上面没有填满一块内存块的数据链接放到free_raw_bufs的前面。
        //注意上面修改了cl->buf->last,后续的读入数据不会覆盖这些数据的
            for (ln = cl; ln->next; ln = ln->next) { 
                /* void foreach last */  
            }
 // 走到这里时cl的链表中一定有缓冲区没有用满(最后一个?),此时cl不为NULL;或者cl的所有缓冲区都已经被处理回收了,此时cl为NULL
            ln->next = p->free_raw_bufs;
            p->free_raw_bufs = cl;
        }

        if (delay > 0) {
            p->upstream->read->delayed = 1;
            ngx_add_timer(p->upstream->read, delay);
            break;
        }
    }


/*upstream数据发送完毕了,那么upstream_eof会被设置为1,在函数最后会进行扫尾工作,
把半满的free_raw_bufs数据进行解析。这里我们可以看到buffering的含义就在这里:
nginx会尽量读取upstream的数据,直到填满一块buffer,由fastcgi_buffers等参数决定的大小,
才会发送给客户端。千万别误解为读取完所有的数据才发送,而是读取了一块buffe*/
    if (p->free_raw_bufs && p->length != -1) {
        cl = p->free_raw_bufs;

        if (cl->buf->last - cl->buf->pos >= p->length) {

            p->free_raw_bufs = cl->next;

            /* STUB */ cl->buf->num = p->num++;

            if (p->input_filter(p, cl->buf) == NGX_ERROR) {
                return NGX_ABORT;
            }

            ngx_free_chain(p->pool, cl);
        }
    }

    if (p->length == 0) {
        p->upstream_done = 1;
        p->read = 1;
    }
// upstream_eof为1时表示上游服务器关闭了连接,upstream_error表示处理过程中出现了错误,而free_raw_bufs不为空代表还有需要处理的包体信息
    if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {

        /* STUB */ p->free_raw_bufs->buf->num = p->num++;
 // 调用input_filter处理剩余的包体信息
        if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
            return NGX_ABORT;
        }
        p->free_raw_bufs = p->free_raw_bufs->next;
 // free_bufs为1时代表需要尽快释放缓冲区中用到内存,此时应该调用ngx_pfree尽快释放shadow域为空的缓冲区
        if (p->free_bufs && p->buf_to_file == NULL) {
            for (cl = p->free_raw_bufs; cl; cl = cl->next) {
                if (cl->buf->shadow == NULL) {
                    ngx_pfree(p->pool, cl->buf->start);
                }
            }
        }
    }

    if (p->cacheable && (p->in || p->buf_to_file)) {

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                       "pipe write chain");

        rc = ngx_event_pipe_write_chain_to_temp_file(p);

        if (rc != NGX_OK) {
            return rc;
        }
    }

    return NGX_OK;
}

接收响应的处理完后,立即就是发送响应处理流程 :

ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
{
    downstream = p->downstream;
    flushed = 0;

    for ( ;; ) {
        if (p->downstream_error) { //往请求端发送出错
        //busy, out, in 三个缓冲chain释放 同时释放shadow缓冲并将空闲的buffer加入到pipe中
            return ngx_event_pipe_drain_chains(p);
        }
        // 检查与上游的连接是否结束
        if (p->upstream_eof || p->upstream_error || p->upstream_done) {

            /* pass the p->out and p->in chains to the output filter */

            for (cl = p->busy; cl; cl = cl->next) {
                cl->buf->recycled = 0;
            }
                 // 发送out链表中的缓冲区给客户端
            if (p->out) {
                    -------------------
                for (cl = p->out; cl; cl = cl->next) {
                    cl->buf->recycled = 0;
                }
                rc = p->output_filter(p->output_ctx, p->out);
                    ------------------------------
            }

            if (p->writing) {//还有往请求端写入的缓冲链
                break;
            }

            if (p->in) { // 发送in链表中的缓冲区给客户端
               
                for (cl = p->in; cl; cl = cl->next) {
                    cl->buf->recycled = 0;
                }
                rc = p->output_filter(p->output_ctx, p->in);
            --------------------------------------------
            }
            /* TODO: free unused bufs */
            // 标识需要向下游发送的响应已经完成
            p->downstream_done = 1;
            break;
        }
-------------------------
        /* bsize is the size of the busy recycled bufs */
        prev = NULL;
        bsize = 0; // 计算busy缓冲区中待发送的响应长度
        for (cl = p->busy; cl; cl = cl->next) {

            if (cl->buf->recycled) {
                if (prev == cl->buf->start) {
                    continue;
                }
                bsize += cl->buf->end - cl->buf->start;
                prev = cl->buf->start;
            }
        }

       
        out = NULL;
        // 检查是否超过了busy_size的配置,当超过配置值时跳转至flush处检查和发送out缓冲区
        if (bsize >= (size_t) p->busy_size) {
            flush = 1;
            goto flush;
        }

     --------------------------

        for ( ;; ) {
            if (p->out) {  // 先检查out链表是否为NULL,不为空则先发送out链表的缓冲区
                cl = p->out;

                if (cl->buf->recycled) {
                    ngx_log_error(NGX_LOG_ALERT, p->log, 0,
                                  "recycled buffer in pipe out chain");
                }

                p->out = p->out->next;

            } else if (!p->cacheable && !p->writing && p->in) {
                cl = p->in;// 当out链表中的数据被处理完成后,开始处理in链表中的数据

                if (cl->buf->recycled && prev_last_shadow) {
                    if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
                        flush = 1;
                        break;
                    }
                    bsize += cl->buf->end - cl->buf->start;
                }
                prev_last_shadow = cl->buf->last_shadow;
                p->in = p->in->next;
            } else {
                break;
            }

            cl->next = NULL;

            if (out) {
                *ll = cl;
            } else {
                out = cl;
            }
            ll = &cl->next;
        }

    flush:

       --------------------------------------
         // 发送响应给客户端
        rc = p->output_filter(p->output_ctx, out);
        // 更新free、busy和out缓冲区
        ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);

        if (rc == NGX_ERROR) {
            p->downstream_error = 1;
            return ngx_event_pipe_drain_chains(p);
        }

        for (cl = p->free; cl; cl = cl->next) {
        // 遍历free链表中的缓冲区,释放缓冲区中shadow域
            ----------------------------

            /* TODO: free buf if p->free_bufs && upstream done */
            /* add the free shadow raw buf to p->free_raw_bufs */
            if (cl->buf->last_shadow) {
                if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
                    return NGX_ABORT;
                }
                cl->buf->last_shadow = 0;
            }
            cl->buf->shadow = NULL;
        }
    }

    return NGX_OK;
}

看完pipe down 发现其主要是解决读取数据包 数据如何缓存拷贝问题。主要是要分清楚 上游读 下游写 交错经行时的数据报文拷贝

原文地址:https://www.cnblogs.com/codestack/p/13947147.html