Redis文件事件处理器

一、Redis 文件事件处理器由四个部分组成:套接字、I/O多路复用程序、文件时间分派器(dispatcher)、事件处理器。

文件事件是对套接字操作的抽象,每当一个套接字准备好执行连接应答(accept)、写入(write)、读取(read)、关闭(close)等操作时,就会相应产生一个文件事件。

I/O多路复用器负责通过loop循环监听多个套接字,同时将一系列套接字按循序存储到一个队列中,由队列向文件事件分派器传送队列中套接字。这个队列中套接字是有序的,它会当一个套接字事件被处理完毕后,会立马向文件事件分配器传送下一个套接字。

文件事件分配器接受队列中的套接字并根据套接字产生的事件类型,相应调用不同的事件处理器。

图1  Redis 文件事件处理器过程 

 

 图2  I/O多路复用程序通过队列向文件事件分派器传送套接字 

 

 

图3 Redis I/O 多路复用调用的多路复用库

二、在 Redis 的事件处理器中,服务器中最常用有:

(1)连接应答处理器      (2)命令请求处理器     (3)命令恢复处理器

(1)在 networking.c 文件中 acceptTcpHandler 函数实现了 Redis 的连接应答处理器,用于对连接服务器进行监听,对套接字的客户端进行应答相应。

#define MAX_ACCEPTS_PER_CALL 1000
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata);
  //循环处理连接应答
while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } }

在Redis 服务器进行初始化的时候,程序会将这个连接应答处理器和服务器监听套接字的 AE_READABLE 事件关联起来。当有客户端用 sys/socket.h/connect 函数连接服务器监听套接字的时候,套接字就会产生 AE_READABLE 事件,触发连接应答处理器执行相应的套接字应答操作。

// 在套接字fd 上打开一个连接,并连接到 const的sockaddr,socklen_t_len 代表着字节长度。
// 对于无连接套接字类型的,只需设置默认地址发送以及接受地址,成功会返回0,错误会返回-1。
extern int connect (int __fd, __CONST_SOCKADDR_ARG __addr, socklen_t __len);

 图4 客户端和服务器之间进行连接请求应答

(2)通过 networking.c/readQueryFromClient 这个函数,命令请求处理器负责将套接字读入客户端并发送指令内容,相关实现代码在 unistd.d/read 函数中。

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
    if (postponeClientRead(c)) return;

    readlen = PROTO_IOBUF_LEN;
  
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    
nread
= connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); }
sdsIncrLen(c
->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClientAsync(c); return; } processInputBuffer(c); }
extern ssize_t read (int __fd, void *__buf, size_t __nbytes) __wur;

 

当一个客户端通过连接应答处理器成功连接服务器之后,服务器会将客户端套接字 AE_READABLE 事件和命令请求处理器关联起来,当客户端向服务器发送命令请求的时候,套接字就会产生 AE_READABLE 事件,引发命令请求处理器进行处理。

 图5 服务器接收来自客户端的命令请求

(3)networking.c/sendReplyToClient 函数是 Redis 的命令恢复处理器,这个处理器负责将服务器执行命令后得到的命令回复通过套接字返回给客户端,具体时限为 unistd.h/write 函数的包装。

void sendReplyToClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    writeToClient(c,1);
}
extern ssize_t write (int __fd, const void *__buf, size_t __n) __wur;

当服务器又命令回复需要传送给客户端的时候,服务器会将客户端套接字的 AE_WRITABLE 事件和命令回复处理器关联起来,当客户端准备好接受服务器传回的命令回复时,就会产生 AE_WRITABLE 事件,引发命令回复处理器执行,并执行相应套接字的写入操作。

 图6 服务器向客户端发送命令回复

 

 图7 服务器与客户端之间的通信过程

 

 

原文地址:https://www.cnblogs.com/xiaowei123/p/13118176.html