Redis源码剖析(十一)AOF持久化

AOF持久化的实现

命令追加

服务器执行写命令后,会将执行的写指令追加到 aof_buf 缓冲区:

struct redisServer{

    // AOF 缓冲区
    sds aof_buf;  

    // ......
}

追加到 aof_buf 缓冲区的命令是按照一定的协议格式保存的,catAppendOnlyGenericCommand 函数负责将命令转换为协议格式。从这个函数的实现可以清楚的看出协议格式是如何生成的。其具体格式为:

*<count>\r\n$<length>\r\n<content>\r\n

以 SET msg "hello" 为例,生成的协议格式应为:*3\r\n$3\r\nSET\r\n$3\r\nmsg\r\n$5\r\nhello\r\n

/*
 * 根据传入的命令和命令参数,将它们还原成协议格式。
 */
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;

    // 重建命令的个数,格式为 *<count>\r\n
    // 例如 *3\r\n
    buf[0] = '*';
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

    // 重建命令和命令参数,格式为 $<length>\r\n<content>\r\n
    // 例如 $3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nVALUE\r\n
    for (j = 0; j < argc; j++) {
        o = getDecodedObject(argv[j]);

        // 组合 $<length>\r\n
        buf[0] = '$';
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        dst = sdscatlen(dst,buf,len);

        // 组合 <content>\r\n
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        dst = sdscatlen(dst,"\r\n",2);

        decrRefCount(o);
    }

    // 返回重建后的协议内容
    return dst;
}

 

AOF文件的写入和同步

Redis服务器进程是一个事件循环,在每一个事件循环中,都会调用 flushAppendOnlyFile 来决定是否将 aof_buf 缓冲区中的数据写入AOF文件中,而 flushAppendOnlyFile 函数的行为取决于服务器配置 appendfsync 选项。

appendfsync 选项 函数行为 出现故障时丢失的数据量
always       aof_buf内容写入并同步到AOF文件 一个事件循环产生的命令数据
everysec aof_buf内容写入AOF文件,每隔1秒同步AOF文件 一秒钟的命令数据
no aof_buf内容写入AOF文件,何时同步由操作系统决定 上次同步AOF文件之后的所有命令数据

在现代os中,为了提高文件的写入操作,当用户调用到write函数将数据写入文件时,os先将数据写入到一个内存缓冲区里(写入),正常是等到缓冲区满了或是规定时间到了,才真正地将缓冲区里的数据写入磁盘(同步)

AOF文件的载入

载入AOF文件时,服务器通过 createFakeClient 来创建一个伪客户端执行AOF文件中保存的写命令。

struct redisClient *createFakeClient(void) {
    struct redisClient *c = zmalloc(sizeof(*c));

    selectDb(c,0);

    c->fd = -1;
    c->name = NULL;
    c->querybuf = sdsempty();
    c->querybuf_peak = 0;
    c->argc = 0;
    c->argv = NULL;
    c->bufpos = 0;
    c->flags = 0;
    c->btype = REDIS_BLOCKED_NONE;
    /* 
     * 将客户端设置为正在等待同步的附属节点,这样客户端就不会发送回复了。
     */
    c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
    c->reply = listCreate();
    c->reply_bytes = 0;
    c->obuf_soft_limit_reached_time = 0;
    c->watched_keys = listCreate();
    c->peerid = NULL;
    listSetFreeMethod(c->reply,decrRefCountVoid);
    listSetDupMethod(c->reply,dupClientReplyValue);
    initClientMultiState(c);

    return c;
}

AOF 重写

AOF重写的实现

AOF重写的功能是为了解决AOF文件体积膨胀的问题,新的AOF文件不会包含任何浪费空间的冗余命令。AOF重写的实现原理是,通过从数据库中读取键现在的值,然后用一条命令去记录键值对代替之前记录这个键值对的多条命令。

AOF重写由 rewriteAppendOnlyFile 实现:

int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();

    /*
     * 创建临时文件
     * 注意这里创建的文件名和 rewriteAppendOnlyFileBackground() 创建的文件名稍有不同
     */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }

    // 初始化文件 io
    rioInitWithFile(&aof,fp);

    // 设置每写入 REDIS_AOF_AUTOSYNC_BYTES 字节
    // 就执行一次 FSYNC 
    // 防止缓存中积累太多命令内容,造成 I/O 阻塞时间过长
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);

    // 遍历所有数据库
    for (j = 0; j < server.dbnum; j++) {

        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";

        redisDb *db = server.db+j;

        // 指向键空间
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;

        // 创建键空间迭代器
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }

        /*
         * 首先写入 SELECT 命令,确保之后的数据会被插入到正确的数据库上
         */
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

        /
         * 遍历数据库所有键,并通过命令将它们的当前状态(值)记录到新 AOF 文件中
         */
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;

            // 取出键
            keystr = dictGetKey(de);

            // 取出值
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);

            // 取出过期时间
            expiretime = getExpire(db,&key);

            /*
             * 如果键已经过期,那么跳过它,不保存
             */
            if (expiretime != -1 && expiretime < now) continue;

            /* Save the key and associated value 
             *
             * 根据值的类型,选择适当的命令来保存值
             */
            if (o->type == REDIS_STRING) {
                /* Emit a SET command */
                char cmd[]="*3\r\n$3\r\nSET\r\n";
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                /* Key and value */
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkObject(&aof,o) == 0) goto werr;
            } else if (o->type == REDIS_LIST) {
                if (rewriteListObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_SET) {
                if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_ZSET) {
                if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
            } else if (o->type == REDIS_HASH) {
                if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
            } else {
                redisPanic("Unknown object type");
            }

            /* Save the expire time 
             *
             * 保存键的过期时间
             */
            if (expiretime != -1) {
                char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";

                // 写入 PEXPIREAT expiretime 命令
                if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
            }
        }

        // 释放迭代器
        dictReleaseIterator(di);
    }

    // 冲洗并关闭新 AOF 文件
    if (fflush(fp) == EOF) goto werr;
    if (aof_fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /*
     * 原子地改名,用重写后的新 AOF 文件覆盖旧 AOF 文件
     */
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }

    redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");

    return REDIS_OK;

werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
}

AOF后台重写

  AOF的后台重写是通过创建子进程来实现的,之所以使用子进程而不是线程,是因为子进程带有服务器进程的完整数据副本,可以在避免使用同步的情况下,保证数据的安全性。但由于子进程在AOF重写时,服务器进程仍然在处理命令请求,因此在子进程完成AOF重写后,当前数据库的数据库状态和AOF文件保存的数据库状态不一致。

AOF后台重写的部分代码如下:

int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;
    // 子进程
    if ((childpid = fork()) == 0) {
        char tmpfile[256];

        // 创建临时文件,并进行 AOF 重写
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                redisLog(REDIS_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
            // 发送重写成功信号
            exitFromChild(0);
        } else {
            // 发送重写失败信号
            exitFromChild(1);
        }
    } else {
           // 父进程
        // ......
    }
    return REDIS_OK; 
}

  AOF重写的操作是在服务器进程的周期操作函数 serverCron 中进行的,在 serverCron 函数中,服务器进程会接收子进程发来的信号(子进程的退出信号),当服务器进程检查到负责AOF重写的子进程退出时,会将AOF重写缓冲区的数据写入AOF文件末尾

  AOF重写缓冲区的存在是为了解决AOF重写产生的AOF文件与当前数据库状态不一致的问题。服务器在创建子进程进行AOF重写后,每执行一个写指令,不仅会将该写指令追加到 aof_buf 缓冲区,还会追加到 AOF重写缓冲区。所以当AOF重写结束后,只要将AOF重写缓冲区的数据追加到新的AOF文件中,就可以保证AOF文件保存的数据库状态和当前数据库状态一致。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ......
    // 接收子进程发来的信号,非阻塞
    if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
        int exitcode = WEXITSTATUS(statloc);
        int bysignal = 0;
            
        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

        // BGSAVE 执行完毕
        if (pid == server.rdb_child_pid) {
            backgroundSaveDoneHandler(exitcode,bysignal);

        // BGREWRITEAOF 执行完毕
        } else if (pid == server.aof_child_pid) {
            backgroundRewriteDoneHandler(exitcode,bysignal);

        } else {
            redisLog(REDIS_WARNING,"Warning, detected child with unmatched pid: %ld",(long)pid);
        }
        updateDictResizePolicy();
    }
    // ......  
}

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    // ......
    // 将累积的重写缓存写入到临时文件中
    // 这个函数调用的 write 操作会阻塞主进程
    if (aofRewriteBufferWrite(newfd) == -1) {
        redisLog(REDIS_WARNING,"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
        close(newfd);
        goto cleanup;
    }
    // ......
}
原文地址:https://www.cnblogs.com/lizhimin123/p/10197431.html