redis 管道原理

命令行使用管道(命令以换行符分隔):

(printf "PING
PING
PING
"; sleep 1) | nc localhost 6379

redis server 接收客户端的输入,调用栈如下:

ae.c/aeProcessEvents
networking.c/processInputBuffer

redis 中客户端的结构体:

typedef struct client {
    // 输入缓冲区保存客户端发送的命令
    sds querybuf;
    // 字符串数组,要执行的命令,例如 PING
    robj **argv;
    // 记录 argv 长度
    int argc;
} client;

分析管道命令的执行过程:按换行符 split 命令,分三次执行 PING 命令。

void processInputBuffer(client *c) {
    server.current_client = c;
    /* Keep processing while there is something in the input buffer */
    // querybuf 的初始值是 "PING
PING
PING
"
    // 每经过一次 processInlineBuffer(c),减少一个 PING
    // "PING
PING
PING
" -> "PING
PING
" -> "PING
" -> ""
    while(sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;

        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands).
         *
         * The same applies for clients we want to terminate ASAP. */
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine request type when unknown. */
        // 普通命令以 * 开头,请求类型为 PROTO_REQ_MULTIBULK,管道命令类型为 PROTO_REQ_INLINE
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            if (processCommand(c) == C_OK)
                resetClient(c);
            /* freeMemoryIfNeeded may flush slave output buffers. This may result
             * into a slave, that may be the active client, to be freed. */
            if (server.current_client == NULL) break;
        }
    }
    server.current_client = NULL;
}

执行具体命令:

sever.c/processCommand
sever.c/call
原文地址:https://www.cnblogs.com/allenwas3/p/9283226.html