redis学习笔记(五): serverCron

serverCron是redis里主要的定时处理函数,在initServer中通过调用aeCreateTimeEvent,将serverCron做为callback注册到全局的eventLoop结构当中。它在主循环中的位置:

aeMain
{
    while (!stop)
    {
        beforeSleep
        aeApiPoll
        process file events
        /* process time events */
        for each timeEntry(te) in eventLoop
        {
            retval = te->timeProc()    /* 这里面timeProc就是serverCron */
            if (NOMORE == retval)
                Delete time entry from eventLoop
            else
                aeAddMillisecondsToNow(te, retval) /* te下一次触发的时隔更新为retval */
        }
    }
}

看serverCron的实现之前先看这个run_with_period的定义:

#define run_with_period(_ms_) 
  if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

由它的定义,run_with_period(_ms_)会在两种情况下返回1:

1. _ms_ <= 1000/server.hz,就是说_ms_比serverCron的执行间隔要小。

2. 或者_ms_比serverCron的执行间隔要大并且serverCron执行的次数刚好是_ms_/(1000/server.hz)的整数倍。

server.hz的意义是serverCron在一秒内执行的次数(从redis的实现来看,这个值是以ms为最小单位来计算的),那么1000/server.hz就是serverCron的执行间隔(ms),再结合run_with_period的定义可以看出,run_with_period表示每_ms_毫秒执行一段任务。

举个例子来说,server.hz是100,也就是servreCron的执行间隔是10ms(可能不完全精确,毕竟是单线程顺序执行)。

假如有一些任务需要每500ms执行一次,就可以在serverCron中用run_with_period(500)把每500ms需要执行一次的工作控制起来。所以,serverCron每执行到第500/10次,run_with_period(500)就会返回1

serverCron的实现如下:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    int j;
    REDIS_NOTUSED(eventLoop);
    REDIS_NOTUSED(id);
    REDIS_NOTUSED(clientData);

    /* Software watchdog: deliver the SIGALRM that will reach the signal
     * handler if we don't return here fast enough. */
    /* 用SIGALRM信号触发watchdog的处理过程,具体的函数为watchdogSignalHandler */
    if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

    /* Update the time cache. */
    /* 更新server.unixtime和server.mstime */
    updateCachedTime();

    /* 每100ms更新一次统计量,包括这段时间内的commands, net_input_bytes, net_output_bytes */
    run_with_period(100) {
        trackInstantaneousMetric(REDIS_METRIC_COMMAND,server.stat_numcommands);
        trackInstantaneousMetric(REDIS_METRIC_NET_INPUT,
                server.stat_net_input_bytes);
        trackInstantaneousMetric(REDIS_METRIC_NET_OUTPUT,
                server.stat_net_output_bytes);
    }

    /* We have just REDIS_LRU_BITS bits per object for LRU information.
     * So we use an (eventually wrapping) LRU clock.
     *
     * Note that even if the counter wraps it's not a big problem,
     * everything will still work but some object will appear younger
     * to Redis. However for this to happen a given object should never be
     * touched for all the time needed to the counter to wrap, which is
     * not likely.
     *
     * Note that you can change the resolution altering the
     * REDIS_LRU_CLOCK_RESOLUTION define. */
     /* 根据server.lruclock的定义,getLRUClock返回的是当前时间换算成秒数的低23位 */
    server.lruclock = getLRUClock();

    /* Record the max memory used since the server was started. */
    /* 记录最大内存使用情况 */
    if (zmalloc_used_memory() > server.stat_peak_memory)
        server.stat_peak_memory = zmalloc_used_memory();

    /* Sample the RSS here since this is a relatively slow call. */
    /* 记录当前的RSS值 */
    server.resident_set_size = zmalloc_get_rss();

    /* We received a SIGTERM, shutting down here in a safe way, as it is
     * not ok doing so inside the signal handler. */
    /* 如果收到了SIGTERM信号,尝试退出 */
    if (server.shutdown_asap) {
        if (prepareForShutdown(0) == REDIS_OK) exit(0);
        redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
        server.shutdown_asap = 0;
    }

    /* Show some info about non-empty databases */
    /* 每5秒输出一次非空databases的信息到log当中 */
    run_with_period(5000) {
        for (j = 0; j < server.dbnum; j++) {
            long long size, used, vkeys;

            size = dictSlots(server.db[j].dict);
            used = dictSize(server.db[j].dict);
            vkeys = dictSize(server.db[j].expires);
            if (used || vkeys) {
                redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                /* dictPrintStats(server.dict); */
            }
        }
    }

    /* Show information about connected clients */
    /* 如果不是sentinel模式,则每5秒输出一个connected的client的信息到log */
    if (!server.sentinel_mode) {
        run_with_period(5000) {
            redisLog(REDIS_VERBOSE,
                "%lu clients connected (%lu slaves), %zu bytes in use",
                listLength(server.clients)-listLength(server.slaves),
                listLength(server.slaves),
                zmalloc_used_memory());
        }
    }

    /* We need to do a few operations on clients asynchronously. */
    /* 清理空闲的客户端或者释放query buffer中未被使用的空间 */
    clientsCron();

    /* Handle background operations on Redis databases. */
    /* databases的处理,rehash就在这里 */
    databasesCron();

    /* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    /* 如果开启了aof_rewrite的调度并且当前没有在background执行rdb/aof的操作,则进行background的aof操作 */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

    /* Check if a background saving or AOF rewrite in progress terminated. */    
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
        /* 如果有aof或者rdb在后台进行,则等待对应的退出。注意,这里用了WNOHANG,所以不会阻塞在wait3 */
        int statloc;
        pid_t pid;

        /* wait3返回非0值,要么是子进程退出,要么是出错 */
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            /* 如果是出错,在log中记录这次错误
             * 如果是rdb任务退出,调用backgroundSaveDoneHandler进行收尾工作
             * 如果是aof任务退出,调用backgroundRewriteDoneHandler进行收尾工作
             */
            if (pid == -1) {
                redisLog(LOG_WARNING,"wait3() returned an error: %s. "
                    "rdb_child_pid = %d, aof_child_pid = %d",
                    strerror(errno),
                    (int) server.rdb_child_pid,
                    (int) server.aof_child_pid);
            } else if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
            } else {
                redisLog(REDIS_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
            }
            /* 如果当前有rdb/aof任务在处理,则将dict_can_resize设置为0(表示不允许进行resize),否则,设置为1 */
            updateDictResizePolicy();
        }
    } else {
        /* 当前没有rdb/aof任务在执行,这里来判断是否要开启新的rdb/aof任务 */
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now */
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            /* Save if we reached the given amount of changes,
             * the given amount of seconds, and if the latest bgsave was
             * successful or if, in case of an error, at least
             * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 REDIS_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == REDIS_OK))
            {
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveBackground(server.rdb_filename);
                break;
            }
         }

         /* Trigger an AOF rewrite if needed */
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
         }
    }


    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    /* 如果开启了aof_flush_postponed_start,则在每次serverCron流程里都将server.aof_buf写入磁盘文件。
     * PS, server.aof_buf是从上一次写aof文件到目前为止所执行过的命令集合,所以是append only file
     */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * an higher frequency. */
    /* 每一秒检查一次上一轮aof的写入是否发生了错误,如果有错误则尝试重新写一次 */
    run_with_period(1000) {
        if (server.aof_last_write_status == REDIS_ERR)
            flushAppendOnlyFile(0);
    }

    /* Close clients that need to be closed asynchronous */
    /* server.clients_to_close链表上的元素都是待关闭的连接 */
    freeClientsInAsyncFreeQueue();

    /* Clear the paused clients flag if needed. */
    /* clients被paused时,会相应地记录一个超时的时间,如果那个时间已经到来,则给client打上REDIS_UNBLOCKED标记(slave的client不处理),并加到server.unblocked_clients上 */
    clientsArePaused(); /* Don't check return value, just use the side effect. */

    /* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. */
    /* 每1秒执行一次replication */
    run_with_period(1000) replicationCron();

    /* Run the Redis Cluster cron. */
    /* 每100ms执行一次clusterCron */
    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }

    /* Run the Sentinel timer if we are in sentinel mode. */
    /* 每100ms执行一次sentine的定时器 */
    run_with_period(100) {
        if (server.sentinel_mode) sentinelTimer();
    }

    /* Cleanup expired MIGRATE cached sockets. */
    /* 每1秒清理一次server.migrate_cached_sockets链表上的超时sockets */
    run_with_period(1000) {
        migrateCloseTimedoutSockets();
    }

    /* serverCron执行次数 */
    server.cronloops++;
    /* 返回下一次执行serverCron的间隔 */
    return 1000/server.hz;
}
原文地址:https://www.cnblogs.com/flypighhblog/p/7757996.html