Redis5设计与源码分析 (第21章 主从复制)

用户执行slaveof命令或者在配置文件中设置slaveof选项来开启复制功能。

例如,现在有两台服务器 , 服务器127.0.0.1:6379发送下面命令:

127.0.0.1:6379>slaveof 127.0.0.1 7000

此时服务器127.0.0.1:6379会成为服务器127.0.0.1:7000的从服务器(slaver),服务器127.0.0.1:7000会成为服务器127.0.0.1:6379的主服务器(master);

通过复制功能,从服务器数据可以和主服务器数据保持同步。

21.1 主从复制功能实现

主要作用

1)读写分离,单台服务器能支撑的QPS是有上限的,我们可以部署一台主服务器、多台从服务器,主服务器只处理写请求从服务器通过复制功能同步主服务器数据,只处理读请求,以此提升Redis服务能力;另外我们还可以通过复制功能来让主服务器免于执行持久化操作:只要关闭主服务器的持久化功能,然后由从服务器去执行持久化操作即可。

2)数据容灾,任何服务器都有宕机的可能,我们同样可以通过主从复制功能提升Redis服务的可靠性;由于从服务器与主服务器数据保持同步,一旦主服务器宕机,可以立即将请求切换到从服务器,从而避免Redis服务中断。

slaveof命令的主要流程

Redis 2.8以前实现:

1)服务器127.0.0.1:6379向服务器127.0.0.1:7000发送sync命令,请求同步数据。

2)主服务器接收到sync命令请求,开始执行bgsave命令持久化数据到RDB文件,并且在持久化数据期间会将所有新执行的写入命令保存到一个缓冲区

3)当持久化数据执行完毕,主服务器将该RDB文件发送给从服务器,从服务器接收并将文件中的数据加载到内存

4)主服务器将缓冲区中的命令请求发送给从服务器。

5)每当主服务器接收到写命令请求时,都会将命令按照Redis协议格式发送给从服务器,从服务器接收并处理主服务器发送过来的命令请求。

步骤2中持久化操作(bgsave),非常耗费资源的操作;

新的主从复制解决方案:

从服务器会记录已经从主服务器接收到的数据量复制偏移量);

而主服务器会维护一个复制缓冲区,记录自己已执行且待发送给从服务器的命令请求,

同时还需要记录复制缓冲区第一个字节的复制偏移量。

从服务器请求同步主服务器的命令也改为了psync

当从服务器连接到主服务器时,会向主服务器发送psync命令请求同步数据,同时告诉已经接收到的复制偏移量

主服务器判断该复制偏移量是否还包含在复制缓冲区

如果包含,不需要执行持久化操作,直接向从服务器发送复制缓冲区中命令请求,这称为部分重同步

如果不包含,执行持久化操作,同时将所有新执行的写命令缓存在复制缓冲区中,并重置复制缓冲区第一个字节的复制偏移量,这称为完整重同步

另外每台Redis服务器都有一个运行ID,从服务器每次发送psync请求同步数据时,会携带自己需要同步主服务器的运行ID。主服务器接收到psync命令时,需要判断命令参数运行ID自己运行ID是否相等,只有相等才有可能执行部分重同步。而当从服务器首次请求主服务器同步数据时,从服务器不知道主服务器的运行ID,此时运行ID以"?"填充,同时复制偏移量初始为-1。

psync命令格式为 : "psync<MASTER_RUN_ID><OFFSET>",

主从复制初始化流程如图21-1所示。

图21-1 主从复制初始化流程图

从图21-1可以看到,当主服务器判断可以执行部分重同步时向从服务器返回"+CONTINUE";需要执行完整重同步时向从服务器返回"+FULLRESYNC RUN_ID OFFSET",其中RUN_ID为主服务器自己的运行ID,OFFSET为复制偏移量。

执行部分重同步的要求比较严格的

1)RUN_ID必须相等;

2)复制偏移量必须包含在复制缓冲区中。

在生产环境中,经常会出现以下两种情况:

·从服务器重启(复制信息丢失);

·主服务器故障导致主从切换(从多个从服务器重新选举出一台机器作为主服务器,主服务器运行ID发生改变)。

这时候无法执行部分重同步的,而这两种情况又很常见,因此Redis 4.0针对主从复制又提出了两点优化,提出了psync2协议

方案1:持久化主从复制信息。

Redis服务器关闭时,将主从复制信息(复制的主服务器RUN_ID与复制偏移量)作为辅助字段存储在RDB文件中;

Redis启动加载RDB文件,恢复主从复制信息,重新同步主服务器时携带持久化主从复制信息 ;

if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)

== -1) return -1;

if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)

== -1) return -1;

方案2:存储上一个主服务器复制信息。

/* Replication (master) */

char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */

char replid2[CONFIG_RUN_ID_SIZE+1]; /* 初始化replid2为空字符串*/

long long master_repl_offset; /* My current replication offset */

long long second_replid_offset; /*初始化 -1. */

当主服务器发生故障,自己成为新的主服务器时,便使用replid2second_replid_offset存储之前主服务器的运行ID与复制偏移量;

void shiftReplicationId(void) {

memcpy(server.replid2,server.replid,sizeof(server.replid));

server.second_replid_offset = server.master_repl_offset+1;

changeReplicationId();

}

判断是否能执行部分重同步的条件也改变为:

if (strcasecmp(master_replid, server.replid) &&

(strcasecmp(master_replid, server.replid2) ||

psync_offset > server.second_replid_offset))

{ ...

goto need_full_resync;

}

假设m为主服务器(运行ID为M_ID),A、B和C为三个从服务器;某一时刻主服务器m发生故障,从服务器A升级为主服务器(同时会记录replid2=M_ID),从服务器B和C重新向主服务器A发送"psync M_ID psync_offset"请求;显然根据上面条件,只要psync_offset满足条件,就可以执行部分重同步。

21.2 主从复制源码基础

主从复制相关变量大部分都定义在redisServer结构体中:

struct redisServer {

/* Replication (master) */

char replid[CONFIG_RUN_ID_SIZE+1]; /* */

Redis服务器运行ID,长度为(40)的随机字符串 . 对于主服务器,replid表示当前服务器的运行ID;对于从服务器,replid表示其复制的主服务器运行ID。生成方法:

void changeReplicationId(void) {

getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);

server.replid[CONFIG_RUN_ID_SIZE] = '';

}

 

int repl_ping_slave_period; /* Master pings the slave every N seconds */

主 从 之间通过TCP长连接交互数据的,需要周期性地发送心跳包来检测连接有效性,该字段表示发送心跳包的周期,主服务器周期向所有从服务器发送心跳包。可通过配置参数repl-ping-replica-period或者repl-ping-slave-period设置,默认为10。

if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&

listLength(server.slaves))

{

ping_argv[0] = createStringObject("PING",4);

replicationFeedSlaves(server.slaves, server.slaveseldb,

ping_argv, 1);

}

}

 

char *repl_backlog; /* Replication backlog for partial syncs */

复制缓冲区,用于缓存主服务器已执行且待发送给从服务器的命令请求;缓冲区大小由字段 repl_backlog_size指定,其可通过配置参数repl-backlog-size设置,默认为1MB。

long long repl_backlog_size; /* Backlog circular buffer size */

long long repl_backlog_histlen; /* 复制缓冲区中存储的命令请求数据长度。*/

long long repl_backlog_idx;

复制缓冲区中存储的命令请求最后一个字节索引位置,即向复制缓冲区写入数据时会从该索引位置开始。

例如,函数feedReplicationBacklog用于向缓冲区中写入数据,实现如下:

void feedReplicationBacklog(void *ptr, size_t len) {

unsigned char *p = ptr;

//缓冲区最后一个字节的复制偏移量

server.master_repl_offset += len;

//复制缓冲区为先进先出的循环队列

while(len) {

size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;

if (thislen > len) thislen = len;

memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);

server.repl_backlog_idx += thislen;

//repl_backlog_idx索引已经到缓冲区最大位置,需要移动到缓冲区首部

if (server.repl_backlog_idx == server.repl_backlog_size)

server.repl_backlog_idx = 0;

len -= thislen;

p += thislen;

//记录缓冲区中存储的命令请求数据长度

server.repl_backlog_histlen += thislen;

}

//缓冲区中数据量最大为缓冲区大小

if (server.repl_backlog_histlen > server.repl_backlog_size)

server.repl_backlog_histlen = server.repl_backlog_size;

//设置缓冲区中数据第一个字节的复制偏移量.

server.repl_backlog_off = server.master_repl_offset -

server.repl_backlog_histlen + 1;

}

复制缓冲区是一个先进先出的循环队列,当写入数据量超过缓冲区大小时,旧的数据会被覆盖。

因此随着每次数据的写入,需要更新缓冲区中数据第一个字节的复制偏移量repl_backlog_off,同时记录下次写入数据时的索引位置repl_backlog_idx,以及当前缓冲区中有效数据长度repl_backlog_histlen

 

long long repl_backlog_off; /* Replication "master offset" of first

复制缓冲区中第一个字节的复制偏移量。

list *slaves, 记录所有的从服务器,是一个链表,节点值类型为client。

int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */

当前有效从服务器的数目。

什么样的从服务器是有效的呢?

主服务器会记录每个从服务器上次心跳检测成功的时间repl_ack_time,并且定时检测当前时间距离repl_ack_time是否超过一定超时门限,如果超过则认为从服务器处于失效状态。

字段repl_min_slaves_max_lag存储的就是该超时门限,可通过配置参数min-slaves-max-lag或者min-replicas-max-lag设置,默认为10,单位秒。

函数refreshGoodSlavesCount实现了从服务器有效性的检测,逻辑如下:

void refreshGoodSlavesCount(void) {

//没有配置repl_min_slaves_to_write与repl_min_slaves_max_lag,

函数会直接返回没有必要检测,

if (!server.repl_min_slaves_to_write ||

!server.repl_min_slaves_max_lag) return;

listRewind(server.slaves,&li);

while((ln = listNext(&li))) {

client *slave = ln->value;

time_t lag = server.unixtime - slave->repl_ack_time;

//上次心跳成功时间小于repl_min_slaves_max_lag认为从服务器有效

if (slave->replstate == SLAVE_STATE_ONLINE &&

lag <= server.repl_min_slaves_max_lag) good++;

}

server.repl_good_slaves_count = good;

}

字段repl_min_slaves_to_write表示当有效从服务器的数目小于该值时,主服务器会拒绝执行写命令回顾8.3.2节命令调用,处理命令请求之前 就会校验从服务器数目,如下:

if (server.masterhost == NULL &&

server.repl_min_slaves_to_write &&

server.repl_min_slaves_max_lag &&

is_write_command &&

server.repl_good_slaves_count < server.repl_min_slaves_to_write)

{

rejectCommand(c, shared.noreplicaserr);

return C_OK;

}

 

int repl_min_slaves_to_write; /*主 写 需要最小的有效 从 数量. */

int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */

 

/* Replication (slave) */

char *masteruser; /* AUTH with this user and masterauth with master */

char *masterauth;

当主服务器配置了"requirepass password"时,即表示从服务器必须通过密码认证才能同步主服务器数据。同样的需要在从服务器配置"masterauth<master-password>",用于设置请求同步主服务器时的认证密码。

client *master; master即为主服务器,类型为client。

char *masterhost; /* 主服务器IP地址,masterport主服务器端口*/

int repl_serve_stale_data; /* Serve stale data when link is down? */

当主从服务器断开连接时,该变量表示从服务器是否继续处理命令请求,可通过配置参数slave-serve-stale-data或者replica-serve-stale-data设置,默认为1,即可以继续处理命令请求。该校验同样在8.3.2节命令调用处完成,如下:

if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&

server.repl_serve_stale_data == 0 &&

is_denystale_command)

{

rejectCommand(c, shared.masterdownerr);

return C_OK;

}

 

int repl_slave_ro; /* 从服务器是否只读(不处理写命令) */

可通过配置参数slave-read-only或者replica-read-only设置,默认为1,即从服务器不处理写命令请求,除非该命令是主服务器发送过来的。该校验同样在8.3.2节命令调用处完成,如下:

if (server.masterhost && server.repl_slave_ro &&

!(c->flags & CLIENT_MASTER) &&

is_write_command)

{

rejectCommand(c, shared.roslaveerr);

return C_OK;

}

}

21.3 slaver源码分析

用户可以通过执行slaveof命令开启主从复制功能。当Redis服务器接收到slaveof命令时,需要主动连接主服务器请求同步数据。

slaveof命令的处理函数为replicaofCommand,这是slaver源码的入口,主要实现如下:

void replicaofCommand(client *c) {

/* slaveof no one命令可以取消复制功能 */

if (!strcasecmp(c->argv[1]->ptr,"no") &&

!strcasecmp(c->argv[2]->ptr,"one")) {

    ...

}

} else { ...

replicationSetMaster(c->argv[1]->ptr, port);

}

}

/* Set replication to the specified master address and port. */

void replicationSetMaster(char *ip, int port) {

sdsfree(server.masterhost);

server.masterhost = sdsnew(ip);

server.masterport = port;

    ...

server.repl_state = REPL_STATE_CONNECT;

}

用户可以通过命令"slaveof no one"取消主从复制功能,此时主从服务器之间会断开连接,从服务器成为普通的Redis实例。

两个问题:

  1. replicaofCommand函数只是记录主服务器IP地址与端口,什么时候连接主服务器呢?
  2. 变量repl_state有什么作用?

第一个问题 (什么时候连接主服务器)

replicaofCommand函数实现并没有向主服务器发起连接请求,说明该操作应该是一个异步操作,那么很有可能是在时间事件中执行,搜索时间事件处理函数serverCron会发现,以一秒为周期执行主从复制相关操作

run_with_period(1000) replicationCron();

在函数replicationCron中,从服务器向主服务器发起了连接请求:

if (server.repl_state == REPL_STATE_CONNECTING) {

connSetReadHandler(conn, syncWithMaster);

connSetWriteHandler(conn, NULL);

server.repl_state = REPL_STATE_RECEIVE_PONG;

err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL);

if (err) goto write_error;

return;

}

待从服务器成功连接到主服务器时,还会创建对应的文件事件:

aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL)

 

另外,replicationCron函数还用于检测主从连接是否超时,定时向主服务器发送心跳包,定时报告自己的复制偏移量等。

time(NULL)-server.repl_transfer_lastio > server.repl_timeout ;

repl_transfer_lastio 存储的是主从服务器上次交互时间,

repl_timeout 表示主从服务器超时时间,可通过参数repltimeout配置,默认为60,单位秒,超过此时间则认为主从服务器之间的连接出现故障,从服务器会主动断开连接。

 

void replicationSendAck(void) {

client *c = server.master;

if (c != NULL) {

c->flags |= CLIENT_MASTER_FORCE_REPLY;

addReplyArrayLen(c,3);

addReplyBulkCString(c,"REPLCONF");

addReplyBulkCString(c,"ACK");

addReplyBulkLongLong(c,c->reploff);

c->flags &= ~CLIENT_MASTER_FORCE_REPLY;

}

}

从服务器通过命令"REPLCONF ACK<reploff>"定时向主服务器汇报自己的复制偏移量,主服务器使用变量repl_ack_time存储接收到该命令的时间,以此作为检测从服务器是否有效的标准。

 

第二个问题 (变量repl_state作用):

slaveof命令执行过程

当从服务器接收到slaveof命令时,主动连接主服务器请求同步数据,这需要若干个步骤交互:

1)连接Socket;

2)发送PING请求包确认连接是否正确;

3)发起密码认证(如果需要);

4)信息同步;

5)发送PSYNC命令;

6)接收RDB文件并载入;

7)连接建立完成,等待主服务器同步命令请求。

变量repl_state表示的就是主从复制流程的进展(从服务器状态),定义了以下状态:

#define REPL_STATE_NONE 0 /* 未开启主从复制功能,当前服务器是普通的Redis实例 */

#define REPL_STATE_CONNECT 1 /* 待发起Socket连接主服务器*/

#define REPL_STATE_CONNECTING 2 /*Socket连接成功 */

/* --- 握手状态,必须有序 --- */

#define REPL_STATE_RECEIVE_PONG 3 /* 已发送PING请求包,等待主服务器PONG回复 */

#define REPL_STATE_SEND_AUTH 4 /* 待发起密码认证 */

#define REPL_STATE_RECEIVE_AUTH 5 /* 已经发起了密码认证请求"AUTH<password>",等待接收主服务器回复; */

#define REPL_STATE_SEND_PORT 6 /* 待发送端口号 */

#define REPL_STATE_RECEIVE_PORT 7 /*已发送端口号"REPLCONFlistening-port<port>",等待接收主服务器回复; */

#define REPL_STATE_SEND_IP 8 /* 待发送IP地址 */

#define REPL_STATE_RECEIVE_IP 9 /* 已发送IP地址"REPLCONF ip-address<ip>",等待接收主服务器回复;该IP地址与端口号用于主服务器主动建立Socket连接,并向从服务器同步数据; */

#define REPL_STATE_SEND_CAPA 10 /* 主从复制功能进行过优化升级,从服务器需要告诉主服务器自己支持的主从复制能力,通过命令"REPLCONFcapa<capability>"实现; */

#define REPL_STATE_RECEIVE_CAPA 11 /*等待接收主服务器回复 */

#define REPL_STATE_SEND_PSYNC 12 /* 待发送PSYNC命令*/

#define REPL_STATE_RECEIVE_PSYNC 13 /* 等待接收主服务器PSYNC命令的回复结果; */

/* --- 握手状态结束 --- */

#define REPL_STATE_TRANSFER 14 /* 正在接收RDB文件 */

#define REPL_STATE_CONNECTED 15 /* RDB文件接收并载入完毕,主从复制连接建立成功。此时从服务器只需要等待接收主服务器同步数据即可。*/

 

待从服务器成功连接到主服务器时,还会创建对应的文件事件,处理函数syncWithMaster(当Socket可读或者可写时调用执行),主要实现从服务器与主服务器的交互流程,即完成从服务器的状态转换。下面分析从服务器状态转换源码实现,其中符号"→"表示状态转换。

/* 当非阻塞连接能够*与主机建立连接时,此处理程序将触发. */

void syncWithMaster(connection *conn) {

char tmpfile[256], *err = NULL;

int dfd = -1, maxtries = 5;

int psync_result;

/* 用户将实例转换为SLAVEOF NO ONE的实例之后触发此事件,直接返回。/

if (server.repl_state == REPL_STATE_NONE) {

connClose(conn);

return;

}

......

/* 1) REPL_STATE_CONNECTING → REPL_STATE_RECEIVE_PONG */

// 当检测到当前状态为REPL_STATE_CONNECTING,从服务器发送PING命令请求,并修改状态为REPL_STATE_RECEIVE_PONG,函数直接返回。

if (server.repl_state == REPL_STATE_CONNECTING) {

serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");

/* 删除可写事件,以使可读事件保持状态,等待PONG答复. */

connSetReadHandler(conn, syncWithMaster);

connSetWriteHandler(conn, NULL);

server.repl_state = REPL_STATE_RECEIVE_PONG;

/* 发送PING,不检查任何错误,有超时可以解决此问题 */

err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL);

if (err) goto write_error;

return;

}

 

/* 2)REPL_STATE_RECEIVE_PONG → REPL_STATE_SEND_AUTH → REPL_STATE_RECEIVE_AUTH(或REPL_STATE_SEND_PORT)

当前状态为REPL_STATE_RECEIVE_PONG,会从socket中读取主服务器PONG回复,并修改状态为REPL_STATE_SEND_AUTH;

这里函数没有返回,下面的if语句依然会执行。如果用户配置了参数"masterauth<master-password>",从服务器会向主服务器发送密码认证请求,同时修改状态为REPL_STATE_RECEIVE_AUTH。否则,修改状态为REPL_STATE_SEND_PORT,

同样,这里函数也没有返回,会继续执行 4)中状态转换逻辑。

/* 收到PONG回复. */

if (server.repl_state == REPL_STATE_RECEIVE_PONG) {

err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);

server.repl_state = REPL_STATE_SEND_AUTH;

...

}

/* AUTH with the master if required. */

if (server.repl_state == REPL_STATE_SEND_AUTH) {

if (server.masteruser && server.masterauth) {

err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",

server.masteruser,server.masterauth,NULL);

server.repl_state = REPL_STATE_RECEIVE_AUTH;

return;

} else if (server.masterauth) {

err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",server.masterauth,NULL);

server.repl_state = REPL_STATE_RECEIVE_AUTH;

return;

} else {

server.repl_state = REPL_STATE_SEND_PORT;

}

}

 

/* 3)REPL_STATE_RECEIVE_AUTH → REPL_STATE_SEND_PORT

当前状态REPL_STATE_RECEIVE_AUTH,会从Socket中读取主服务器回复结果,并修改状态为REPL_STATE_SEND_PORT,继续执行4)中状态转换逻辑。 */

/* Receive AUTH reply. */

if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {

err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);

server.repl_state = REPL_STATE_SEND_PORT;

}

 

/* 4)REPL_STATE_SEND_PORT → REPL_STATE_RECEIVE_PORT

当检测到当前状态为REPL_STATE_SEND_PORT,从服务器向主服务器发送端口号,并修改状态为REPL_STATE_RECEIVE_PORT,函数直接返回。 */

/* 设置从端口,以便Master的INFO命令可以正确列出从的监听端口。 */

if (server.repl_state == REPL_STATE_SEND_PORT) {

err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",

"listening-port",portstr, NULL);

server.repl_state = REPL_STATE_RECEIVE_PORT;

return;

}

 

/* 5)REPL_STATE_RECEIVE_PORT → EPL_STATE_SEND_IP → REPL_STATE_REC

当前状态为REPL_STATE_RECEIVE_PORT,会从Socket中读取主服务器回复结果,并修改状态为REPL_STATE_SEND_IP。会继续执行下面的if语句;向主服务器发送IP地址,并修改状态为REPL_STATE_RECEIVE_IP,函数返回。 */

if (server.repl_state == REPL_STATE_RECEIVE_PORT) {

err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);

server.repl_state = REPL_STATE_SEND_IP;

}

/* 如果未设置slave-announce-ip选项,则跳过REPLCONF ip-address */

if (server.repl_state == REPL_STATE_SEND_IP &&

server.slave_announce_ip == NULL) {

server.repl_state = REPL_STATE_SEND_CAPA;

}

if (server.repl_state == REPL_STATE_SEND_IP) {

err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",

"ip-address",server.slave_announce_ip, NULL);

server.repl_state = REPL_STATE_RECEIVE_IP;

return;

}

 

/* 6)REPL_STATE_RECEIVE_IP → REPL_STATE_SEND_CAPA → REPL_STATE_RECEIVE_CAPA

当前状态为REPL_STATE_RECEIVE_IP时,会从Socket中读取主服务器回复结果,并修改状态为REPL_STATE_SEND_CAPA。继续执行下面的if语句;

这里向主服务器发送"REPLCONF capa eof capapsync2",capa为单词capability的简写,表示的是从服务器支持的主从复制功能。Redis主从复制经历过优化升级,高版本的Redis服务器可能支持更多的功能,因此这里从服务器需要向主服务器同步自身具备的功能。 */

if (server.repl_state == REPL_STATE_RECEIVE_IP) {

err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);

server.repl_state = REPL_STATE_SEND_CAPA;

}

 

/* 根据主从复制功能实现,主服务器在接收到psync命令时,如果必须执行完整重同步,会持久化数据库到RDB文件,完成后将RDB文件发送给从服务器。而当从服务器支持"eof"功能时,主服务器便可以直接将数据库中的数据以RDB协议格式通过Socket发送给从服务器,免去了本地磁盘文件不必要的读写操作。

Redis 4.0针对主从复制提出了psync2协议,使得主服务器故障导致主从切换后,依然有可能执行部分重同步。而这时候当主服务器接收到psync命令时,向客户端回复的是"+CONTINUE<new_repl_id>"。参数"psync2"表明从服务器支持psync2协议。

最后从服务器修改状态为REPL_STATE_RECEIVE_CAPA,函数返回。*/

if (server.repl_state == REPL_STATE_SEND_CAPA) {

err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",

"capa","eof","capa","psync2",NULL);

server.repl_state = REPL_STATE_RECEIVE_CAPA;

return;

}

 

/* 7)REPL_STATE_RECEIVE_CAPA → REPL_STATE_SEND_PSYNC → REPL_STATE_RECEIVE_PSYNC:

函数slaveTryPartialResynchronization主要执行两个操作:

  1. 尝试获取主服务器运行ID以及复制偏移量,并向主服务器发送psync命令请求;
  2. 读取并解析psync命令回复,判断执行完整重同步还是部分重同步。函数第二个参数表明执行操作1还是操作2。 */

if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {

err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);

server.repl_state = REPL_STATE_SEND_PSYNC;

}

 

/* 尝试部分重新同步。如果没有缓存的master * slaveTryPartialResynchronization()将至少尝试使用PSYNC 开始完全重新同步,以便获得master run id 和全局偏移量,在下一次*重新连接时尝试部分重新同步尝试。 */

if (server.repl_state == REPL_STATE_SEND_PSYNC) {

if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {

}

server.repl_state = REPL_STATE_RECEIVE_PSYNC;

return;

}

if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {

goto error;

}

 

/* 7)REPL_STATE_RECEIVE_CAPA → REPL_STATE_SEND_PSYNC → REPL_STATE_RECEIVE_PSYNC:

调用函数slaveTryPartialResynchronization读取并解析psync命令回复时,如果返回的是PSYNC_CONTINUE,表明可以执行部分重同步(函数slaveTryPartialResynchronization内部会修改状态为REPL_STATE_CONNECTED)。否则说明需要执行完整重同步,从服务器需要准备接收主服务器发送的RDB文件,可以看到这里创建了文件事件,处理函数为readSyncBulkPayload,并修改状态为REPL_STATE_TRANSFER。*/

psync_result = slaveTryPartialResynchronization(conn,1);

if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */

if (psync_result == PSYNC_TRY_LATER) goto error;

if (psync_result == PSYNC_CONTINUE) {

return;

}

disconnectSlaves(); /* Force our slaves to resync with us as well. */

freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */

 

if (psync_result == PSYNC_NOT_SUPPORTED) {

goto error;

}

}

 

/* Prepare a suitable temp file for bulk transfer */

if (!useDisklessLoad()) {

while(maxtries--) {

snprintf(tmpfile,256,

"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());

dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);

if (dfd != -1) break;

sleep(1);

}

if (dfd == -1) {

serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));

goto error;

}

server.repl_transfer_tmpfile = zstrdup(tmpfile);

server.repl_transfer_fd = dfd;

}

 

/* 函数readSyncBulkPayload实现了RDB文件的接收与加载,加载完成后同时会修改状态为REPL_STATE_CONNECTED。

当从 状态成为REPL_STATE_CONNECTED时,表明从-主服务器建立连接,从 只需要接收并执行主 同步过来的命令请求即可,与执行普通客户端命令请求差别不大。*/

if (connSetReadHandler(conn, readSyncBulkPayload)

== C_ERR)

{

char conninfo[CONN_INFO_LEN];

serverLog(LL_WARNING,

"Can't create readable event for SYNC: %s (%s)",

strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));

goto error;

}

server.repl_state = REPL_STATE_TRANSFER;

server.repl_transfer_size = -1;

server.repl_transfer_read = 0;

server.repl_transfer_last_fsync_off = 0;

server.repl_transfer_lastio = server.unixtime;

return;

 

error:

...

return;

 

write_error:

...

goto error;

}

21.4 master源码分析

前面讲过,从服 收到slaveof命令会主动连接主服 请求同步数据,主要流程有:

①连接Socket;

②发送PING请求包确认连接是否正确;

③发起密码认证(如果需要);

④通过REPLCONF命令同步信息;

⑤发送PSYNC命令;

⑥接收RDB文件并载入;

⑦连接建立完成,等待主服务器同步命令请求。

主服 针对流程中①~③的处理比较简单,本节主要介绍主服针对④~⑦的处理。

主服务器处理命令REPLCONF命令入口函数

void replconfCommand(client *c) {

/* 处理每个选项-值对. */

for (j = 1; j < c->argc; j+=2) {

if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {

c->slave_listening_port = port;

} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {

memcpy(c->slave_ip,ip,sdslen(ip)+1);

} else if (!strcasecmp(c->argv[j]->ptr,"capa")) {

/*忽略此主机不理解的功能 */

if (!strcasecmp(c->argv[j+1]->ptr,"eof"))

c->slave_capa |= SLAVE_CAPA_EOF;

else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))

c->slave_capa |= SLAVE_CAPA_PSYNC2;

} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {

if (offset > c->repl_ack_off)

c->repl_ack_off = offset;

c->repl_ack_time = server.unixtime;

} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {

if (server.masterhost && server.master) replicationSendAck();

return;

}

}

addReply(c,shared.ok);

}

此函数主要解析客户端请求参数并存储在客户端对象client中,主要需要记录以下信息。

·从服 监听IP地址与端口,主服务器以此连接从服务器并同步数据。

·客户端能力标识eof标识主服务器可以直接将数据库中数据以RDB协议格式通过socket发送 给从服务器,免去了本地磁盘文件不必要的读写操作;psync2表明从服务器支持psync2议, 即从服务器可以识别主服务器回复的"+CONTINUE<new_repl_id>"。

· 从服务器的复制偏移量以及交互时间

主服务器处理psync命令的入口函数

接下来从服 将向主服 发送psync命令请求同步数据,主服务器处理psync命令的入口函数为syncCommand。主服务器首先判断是否可以执行部分重同步,如果可以则向客户端返回 "+CONTINUE",并返回复制缓冲区中的命令请求,同时更新有效从服务器数目。

void syncCommand(client *c) {

...

/* 如果这是PSYNC命令,尝试部分重新同步。 如果失败,继续进行通常的完全重新同步, */

if (!strcasecmp(c->argv[0]->ptr,"psync")) {

//内部函数下面具体分析

if (masterTryPartialResynchronization(c) == C_OK) {

server.stat_sync_partial_ok++;

return; /* No full resync needed, return. */

} else {

char *master_replid = c->argv[1]->ptr;

if (master_replid[0] != '?') server.stat_sync_partial_err++;

}

} else {

c->flags |= CLIENT_PRE_PSYNC;

}

server.stat_sync_full++;

 

/* Setup the slave as one waiting for BGSAVE to start. The following code

* paths will change the state if we handle the slave differently. */

c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;

if (server.repl_disable_tcp_nodelay)

connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */

c->repldbfd = -1;

c->flags |= CLIENT_SLAVE;

listAddNodeTail(server.slaves,c);

 

/* Create the replication backlog if needed. */

if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {

/* When we create the backlog from scratch, we always use a new

* replication ID and clear the ID2, since there is no valid

* past history. */

changeReplicationId();

clearReplicationId2();

createReplicationBacklog();

serverLog(LL_NOTICE,"Replication backlog created, my new "

"replication IDs are '%s' and '%s'",

server.replid, server.replid2);

}

 

/* 情况1:正在进行BGSAVE,目标为磁盘 */

if (server.rdb_child_pid != -1 &&

server.rdb_child_type == RDB_CHILD_TYPE_DISK)

{

/* Ok a background save is in progress. Let's check if it is a good

* one for replication, i.e. if there is another slave that is

* registering differences since the server forked to save. */

client *slave;

listNode *ln;

listIter li;

 

listRewind(server.slaves,&li);

while((ln = listNext(&li))) {

slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;

}

/* To attach this slave, we check that it has at least all the

* capabilities of the slave that triggered the current BGSAVE. */

if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {

/* Perfect, the server is already registering differences for

* another slave. Set the right state, and copy the buffer. */

copyClientOutputBuffer(c,slave);

replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);

serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");

}

 

/* 情况2:BGSAVE正在进行中,具有套接字目标 */

} else if (server.rdb_child_pid != -1 &&

server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)

{

serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");

/* 情况三 没有bgsave */

} else {

if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {

if (server.repl_diskless_sync_delay)

serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");

} else {

if (!hasActiveChildProcess()) {

//内部函数

startBgsaveForReplication(c->slave_capa);

} else {

serverLog(LL_NOTICE,

"No BGSAVE in progress, but another BG operation is active. "

"BGSAVE for replication delayed");

}

}

}

return;

}

 

内部函数masterTryPartialResynchronization

int masterTryPartialResynchronization(client *c) {

/* 判断服务器运行ID是否匹配,复制偏移量是否合法. */

if (strcasecmp(master_replid, server.replid) &&

(strcasecmp(master_replid, server.replid2) ||

psync_offset > server.second_replid_offset))

{

if (master_replid[0] != '?') { ... /* 全同步. */

goto need_full_resync;

}

/* 判断复制偏移量是否包含在复制缓冲区? */

if (!server.repl_backlog ||

psync_offset < server.repl_backlog_off ||

psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))

{ ...

goto need_full_resync;

}

//部分重同步,标识从服务器

c->flags |= CLIENT_SLAVE;

c->replstate = SLAVE_STATE_ONLINE;

c->repl_ack_time = server.unixtime;

c->repl_put_online_on_ack = 0;

//将该客户端添加到从服务器链表slaves

listAddNodeTail(server.slaves,c);

//根据从服务器能力返回+CONTINUE

if (c->slave_capa & SLAVE_CAPA_PSYNC2) {

buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s ", server.replid);

} else {

buflen = snprintf(buf,sizeof(buf),"+CONTINUE ");

}

if (connWrite(c->conn,buf,buflen) != buflen) {

freeClientAsync(c);

return C_OK;

}

//向客户端发送复制缓冲区中的命令请求

psync_len = addReplyReplicationBacklog(c,psync_offset);

/ /更新有效从服务器数目

refreshGoodSlavesCount();

return C_OK; /* The caller can return, no full resync needed. */

 

need_full_resync:

/* 如果需要完全同步,现在不能回复PSYNC。答复必须包含生成传输的RDB文件时的主偏移量,因此需要将答复延迟到该时刻。 */

return C_ERR;

}

执行部分重同步 条件的:

①服务器运行ID 与 复制偏移量 必须合法;

②复制偏移量必须包含在复制缓冲区中。

当可以执行部分重同步时,主服务器便将该客户端添加到自己的从服务器链表slaves,并标记客户端状态为SLAVE_STATE_ONLINE,客户端类型为CLIENT_SLAVE(从服务器)。

流程④中,从服务器已经通过命令请求REPLCONF向主服务器同步了自己支持的能力,主服务器根据该能力决定向从服务器返回"+CONTINUE"还是 "+CONTINUE<replid>"。

接下来主服务器还需要根据PSYNC请求参数中的复制偏移量,将复制缓冲区中的部分命令请求同步给从服务器。由于有新的从服务器连接成功,主服务器还需要更新有效从服务器数目,以此实现min_slaves功能。

 

内部函数startBgsaveForReplication

当主服务器判断需要执行完整重同步时,会fork子进程执行RDB持久化,并将持久化数据发送给从服务器。RDB持久化有两种选择:

①直接通过Socket发送给从服务器;②持久化数据到本地文件,待持久化完成后再将该文件发送给从服务器。

int startBgsaveForReplication(int mincapa) {

// 变量repl_diskless_sync可通过配置参数repl-diskless-sync进行设置,默认为0;即默认情况下,主服务器都是先持久化数据到本地文件,再将该文件发送给从服务器。

//参数mincapa为 c->slave_capa, 根据步骤④从服务器的同步信息确定。

int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);

if (rsiptr) {

if (socket_target)

retval = rdbSaveToSlavesSockets(rsiptr);

else

retval = rdbSaveBackground(server.rdb_filename,rsiptr);

...

}

 

当所有流程执行完毕后,主服务器每次接收到写命令请求时,都会将该命令请求广播给所有从服务器同时记录在复制缓冲区中。广播命令请求的实现函数为replicationFeedSlaves:

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {

...

/* 如果与上次选择的数据库不相等,需要先同步select命令*/

if (server.slaveseldb != dictid) {

robj *selectcmd;

...

/* select命令添加到复制缓冲区 */

if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);

 

/* 向所有从服务器发送select命令 */

listRewind(slaves,&li);

while((ln = listNext(&li))) { ...

addReply(slave,selectcmd);

}

}

server.slaveseldb = dictid;

 

/* Write the command to the replication backlog if any. */

if (server.repl_backlog) {

//将当前命令请求添加到复制缓冲区

char aux[LONG_STR_SIZE+3];

 

/* Add the multi bulk reply length. */

aux[0] = '*';

len = ll2string(aux+1,sizeof(aux)-1,argc);

aux[len+1] = ' ';

aux[len+2] = ' ';

feedReplicationBacklog(aux,len+3);

 

for (j = 0; j < argc; j++) {

long objlen = stringObjectLen(argv[j]);

aux[0] = '$';

len = ll2string(aux+1,sizeof(aux)-1,objlen);

aux[len+1] = ' ';

aux[len+2] = ' ';

feedReplicationBacklog(aux,len+3);

feedReplicationBacklogWithObject(argv[j]);

feedReplicationBacklog(aux+len+1,2);

}

}

 

/* 向所有从服务器同步命令请求. */

listRewind(slaves,&li);

while((ln = listNext(&li))) {

client *slave = ln->value;

/* Add the multi bulk length. */

addReplyArrayLen(slave,argc);

for (j = 0; j < argc; j++)

addReplyBulk(slave,argv[j]);

}

}

当前客户端连接的数据库可能并不是上次向从服务器同步数据的数据库,因此可能需要先向从服务器同步select命令修改数据库。针对每个写命令,主服务器都需要将命令请求同步给所有从服务器,同时 向从服务器同步的每个命令请求,都会记录到复制缓冲区中。

21.5 本章小结

主从复制的功能实现,Redis针对主从复制的优化设计思路。

主从复制源码实现时,其主要数据变量的定义,

主从复制的主要7个流程的实现。

 

原文地址:https://www.cnblogs.com/coloz/p/13812861.html