redis集群源码阅读 之 集群设置主从

在集群节点之间握手及分配好槽点之后,可以为主节点设置一个或多个从节点,以更好的应对故障,保证集群的稳定性。

本文主要介绍一下redis执行replicate命令的过程。

  • 处理cluster replicate命令

  同样,处理cluster命令的api为clusterCommand,其中处理replicate命令过程如下:

if (nodeIsMaster(myself) &&(myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
     addReplyError(c,"To set a master the node must be empty and without assigned slots.");
      return;
  }

/* Set the master. */
// 将节点 n 设为本节点的主节点 clusterSetMaster(n); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_C ONFIG); addReply(c,shared.ok);
  • 将服务器设为指定地址的从服务器

  

void replicationSetMaster(char *ip, int port) {

    // 清除原有的主服务器地址(如果有的话)
    sdsfree(server.masterhost);
    server.masterhost = sdsnew(ip);
    server.masterport = port;
    // 如果之前有其他地址,那么释放它
    if (server.master) freeClient(server.master);
    // 断开所有从服务器的连接,强制所有从服务器执行重同步
    disconnectSlaves(); /* Force our slaves to resync with us as well. */
    // 清空可能有的 master 缓存,因为已经不会执行 PSYNC 了
    replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
    // 释放 backlog ,同理, PSYNC 目前已经不会执行了
    freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
    // 取消之前的复制进程(如果有的话)
    cancelReplicationHandshake();

    // 进入连接状态(重点)
    server.repl_state = REDIS_REPL_CONNECT;
    server.master_repl_offset = 0;
    server.repl_down_since = 0;
}

  此时从节点(其实还不是)的repl_state = REDIS_REPL_CONNECT。在replicationCron中会对该状态向目标主节点建立一个连接。

if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
            server.masterhost, server.masterport);
        if (connectWithMaster() == REDIS_OK) {
            redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
}
int connectWithMaster(void) {
    int fd;

    // 连接主服务器
    fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
    if (fd == -1) {
        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return REDIS_ERR;
    }
    // 监听主服务器 fd 的读和写事件,并绑定文件事件处理器
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
        return REDIS_ERR;
    }
}

  此时使用的是ipfd,也就是说从节点是主节点的一个redisclient。而主节点创建一个redisclient包括了一下步骤:

acceptCommonHandler -> createClient -> aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) -> processInputBuffer  ->  processCommand

   并注册了从服务器用于同步主服务器的回调函数syncWithMaster。在syncWithMaster中会处理一系列主从复制的操作,比如发送ping命令,身份验证,发送从节点端口信息。

  之后进行同步请求

// 根据返回的结果决定是执行部分 resync ,还是 full-resync
psync_result = slaveTryPartialResynchronization(fd);

  发送的是“psync”命令。主节点接受之后会将该redisclient加入到slaves

  

int masterTryPartialResynchronization(redisClient *c) {
    ...
    listAddNodeTail(server.slaves,c);
    ...
}

  之后主节点在replicationCron中会主动向从节点同步数据(包括全量复制和部分复制)。值得注意的是,在同步过程中,主节点会开辟一个命令缓冲区,缓存同步时的写命令。在同步完成之后还会进行命令同步。

原文地址:https://www.cnblogs.com/coderht/p/10706839.html