memcached源代码阅读(2)main函数

Memcached源代码阅读笔记
采用单步跟踪的方式对源代码进行阅读

调试参数 start


if(WSAStartup(MAKEWORD(2,0), &wsaData) != 0) {
        fprintf(stderr, "Socket Initialization Error. Program  aborted\n");
        return;
    }

 /* init settings */
    settings_init();

初始化设置,这里主要是设置一些默认的启动参数

static void settings_init(void) {
    settings.access=0700;
    settings.port = 11211;
    settings.udpport = 0;
    settings.interf.s_addr = htonl(INADDR_ANY);
    settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
    settings.verbose = 0;
    settings.oldest_live = 0;
    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
    settings.socketpath = NULL;       /* by default, not using a unix socket */
    settings.managed = false;
    settings.factor = 1.25;
    settings.chunk_size = 48;         /* space for a modest key and value */
#ifdef USE_THREADS
    settings.num_threads = 4;
#else
    settings.num_threads = 1;
#endif
    settings.prefix_delimiter = ':';
    settings.detail_enabled = 0;
}

setbuf(stderr, NULL); //设置错误输出缓冲区为NULL,即发现错误立即显示


获取选项,非主要流程,暂时略过

while ((c = getopt(argc, argv, "a:bp:s:U:m:Mc:khirvd:l:u:P:f:s:n:t:D:")) != -1) {

。。。

}
 /* create the listening socket and bind it */
    if (settings.socketpath == NULL) {
        l_socket = server_socket(settings.port, 0);
        if (l_socket == -1) {
            fprintf(stderr, "failed to listen\n");
            exit(EXIT_FAILURE);
        }
    }
static int server_socket(const int port, const bool is_udp) {
    int sfd;
    struct linger ling = {0, 0};
    struct sockaddr_in addr;
    int flags =1;

    if ((sfd = new_socket(is_udp)) == -1) {
        return -1;
    }

    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));//  SO_REUSEADDR BOOL 允许套接口和一个已在使用中的地址捆绑(参见bind())
    if (is_udp) {
        maximize_sndbuf(sfd);
    } else {
        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));//SO_KEEPALIVE BOOL 发送“保持活动”包。
        setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));//不要因为数据未发送就阻塞关闭操作。设置本选项相当于将SO_LINGER的l_onoff元素置为零。
        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));//禁止发送合并的Nagle算法。
    }

    /*
     * the memset call clears nonstandard fields in some impementations
     * that otherwise mess things up.
     */
    memset(&addr, 0, sizeof(addr));

    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr = settings.interf;
    if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {//绑定端口
        perror("bind()");
        close(sfd);
        return -1;
    }
    if (!is_udp && listen(sfd, 1024) == -1) {//监听端口
        perror("listen()");
        close(sfd);
        return -1;
    }
    return sfd;
}
static int new_socket(const bool is_udp) {
    int sfd;
    int flags;

    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
        perror("socket()");
        return -1;
    }

    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) //设置为非阻塞
{
        perror("setting O_NONBLOCK");
        close(sfd);
        return -1;
    }
    return sfd;
}
/*
 * Sets a socket's send buffer size to the maximum allowed by the system.
 */
static void maximize_sndbuf(const int sfd) {
    socklen_t intsize = sizeof(int);
    int last_good = 0;
    int min, max, avg;
    char old_size;

    /* Start with the default size. */
    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {//获取缺省缓冲区大小
        if (settings.verbose > 0)
            perror("getsockopt(SO_SNDBUF)");
        return;
    }

    /* Binary-search for the real maximum. */
    min = old_size;
    max = MAX_SENDBUF_SIZE;//256M

    while (min <= max) {
        avg = ((unsigned int)(min + max)) / 2;
        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {//设置发送缓冲区大小,多次设置,在min和max找到一个最大的可以使用的发送缓冲区
            last_good = avg;
            min = avg + 1;
        } else {
            max = avg - 1;
        }
    }

    if (settings.verbose > 1)
        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
}

好继续,

  main_base = event_init();//调用Libevent 初始化函数
   /* initialize other stuff */
    item_init();//Item 初始化
    stats_init();
    assoc_init();
    conn_init();

static conn **freeconns;
static int freetotal;
static int freecurr;


static void conn_init(void) {
    freetotal = 200;
    freecurr = 0;
    if ((freeconns = (conn **)malloc(sizeof(conn *) * freetotal)) == NULL) {
        perror("malloc()");
    }
    return;
}

    /* Hacky suffix buffers. */
    suffix_init();
    slabs_init(settings.maxbytes, settings.factor);

/**
 * Determines the chunk sizes and initializes the slab class descriptors
 * accordingly.
 */


void slabs_init(const size_t limit, const double factor) {
    int i = POWER_SMALLEST - 1;
    unsigned int size = sizeof(item) + settings.chunk_size;

    /* Factor of 2.0 means use the default memcached behavior */
    if (factor == 2.0 && size < 128)
        size = 128;

    mem_limit = limit;
    memset(slabclass, 0, sizeof(slabclass));

    while (++i < POWER_LARGEST && size <= POWER_BLOCK / 2) {
        /* Make sure items are always n-byte aligned */
        if (size % CHUNK_ALIGN_BYTES)
            size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);//字节对齐 ,32位系统是4字节对齐,64位系统是8字节对齐

        slabclass[i].size = size;
        slabclass[i].perslab = POWER_BLOCK / slabclass[i].size;
        size *= factor;
        if (settings.verbose > 1) {
            fprintf(stderr, "slab class %3d: chunk size %6u perslab %5u\n",
                    i, slabclass[i].size, slabclass[i].perslab);
        }
    }

    power_largest = i;
    slabclass[power_largest].size = POWER_BLOCK;
    slabclass[power_largest].perslab = 1;

    /* for the test suite:  faking of how much we've already malloc'd */
    {
        char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
        if (t_initial_malloc) {
            mem_malloced = (size_t)atol(t_initial_malloc);
        }

    }

}


用到的几个常数
#define POWER_SMALLEST 1
#define POWER_LARGEST  200
#define POWER_BLOCK 1048576 //2的20次方
#define CHUNK_ALIGN_BYTES (sizeof(void *))

函数生成如下序列(factor在1.25的情况)
-  slabclass 0x0046eb80 slabclass {size=0 perslab=0 slots=0x00000000 ...} slabclass_t [201]
+  [0] {size=0 perslab=0 slots=0x00000000 ...} slabclass_t
+  [1] {size=88 perslab=11915 slots=0x00000000 ...} slabclass_t
+  [2] {size=112 perslab=9362 slots=0x00000000 ...} slabclass_t
+  [3] {size=140 perslab=7489 slots=0x00000000 ...} slabclass_t
+  [4] {size=176 perslab=5957 slots=0x00000000 ...} slabclass_t
+  [5] {size=220 perslab=4766 slots=0x00000000 ...} slabclass_t
+  [6] {size=276 perslab=3799 slots=0x00000000 ...} slabclass_t
+  [7] {size=348 perslab=3013 slots=0x00000000 ...} slabclass_t
+  [8] {size=436 perslab=2404 slots=0x00000000 ...} slabclass_t
+  [9] {size=548 perslab=1913 slots=0x00000000 ...} slabclass_t
+  [10] {size=688 perslab=1524 slots=0x00000000 ...} slabclass_t
+  [11] {size=860 perslab=1219 slots=0x00000000 ...} slabclass_t
+  [12] {size=1076 perslab=974 slots=0x00000000 ...} slabclass_t
+  [13] {size=1348 perslab=777 slots=0x00000000 ...} slabclass_t
+  [14] {size=1688 perslab=621 slots=0x00000000 ...} slabclass_t
+  [15] {size=2112 perslab=496 slots=0x00000000 ...} slabclass_t
+  [16] {size=2640 perslab=397 slots=0x00000000 ...} slabclass_t
+  [17] {size=3300 perslab=317 slots=0x00000000 ...} slabclass_t
+  [18] {size=4128 perslab=254 slots=0x00000000 ...} slabclass_t
+  [19] {size=5160 perslab=203 slots=0x00000000 ...} slabclass_t
+  [20] {size=6452 perslab=162 slots=0x00000000 ...} slabclass_t
+  [21] {size=8068 perslab=129 slots=0x00000000 ...} slabclass_t
+  [22] {size=10088 perslab=103 slots=0x00000000 ...} slabclass_t
+  [23] {size=12612 perslab=83 slots=0x00000000 ...} slabclass_t
+  [24] {size=15768 perslab=66 slots=0x00000000 ...} slabclass_t
+  [25] {size=19712 perslab=53 slots=0x00000000 ...} slabclass_t
+  [26] {size=24640 perslab=42 slots=0x00000000 ...} slabclass_t
+  [27] {size=30800 perslab=34 slots=0x00000000 ...} slabclass_t
+  [28] {size=38500 perslab=27 slots=0x00000000 ...} slabclass_t
+  [29] {size=48128 perslab=21 slots=0x00000000 ...} slabclass_t
+  [30] {size=60160 perslab=17 slots=0x00000000 ...} slabclass_t
+  [31] {size=75200 perslab=13 slots=0x00000000 ...} slabclass_t
+  [32] {size=94000 perslab=11 slots=0x00000000 ...} slabclass_t
+  [33] {size=117500 perslab=8 slots=0x00000000 ...} slabclass_t
+  [34] {size=146876 perslab=7 slots=0x00000000 ...} slabclass_t
+  [35] {size=183596 perslab=5 slots=0x00000000 ...} slabclass_t
+  [36] {size=229496 perslab=4 slots=0x00000000 ...} slabclass_t
+  [37] {size=286872 perslab=3 slots=0x00000000 ...} slabclass_t
+  [38] {size=358592 perslab=2 slots=0x00000000 ...} slabclass_t
+  [39] {size=448240 perslab=2 slots=0x00000000 ...} slabclass_t
+  [40] {size=1048576 perslab=1 slots=0x00000000 ...} slabclass_t


 /* create the initial listening connection */
    if (!(listen_conn = conn_new(l_socket, conn_listening,
                                 EV_READ | EV_PERSIST, 1, false, main_base))) {
        fprintf(stderr, "failed to create listening connection");
        exit(EXIT_FAILURE);
    }


conn *conn_new(const int sfd, const int init_state, const int event_flags,
                const int read_buffer_size, const bool is_udp, struct event_base *base) {
    conn *c = conn_from_freelist();

    if (NULL == c) {
        if (!(c = (conn *)malloc(sizeof(conn)))) {
            perror("malloc()");
            return NULL;
        }
        c->rbuf = c->wbuf = 0;
        c->ilist = 0;
        c->suffixlist = 0;
        c->iov = 0;
        c->msglist = 0;
        c->hdrbuf = 0;

        c->rsize = read_buffer_size;
        c->wsize = DATA_BUFFER_SIZE;
        c->isize = ITEM_LIST_INITIAL;
        c->suffixsize = SUFFIX_LIST_INITIAL;
        c->iovsize = IOV_LIST_INITIAL;
        c->msgsize = MSG_LIST_INITIAL;
        c->hdrsize = 0;

        c->rbuf = (char *)malloc((size_t)c->rsize);
        c->wbuf = (char *)malloc((size_t)c->wsize);
        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);

        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
                c->msglist == 0 || c->suffixlist == 0) {
            if (c->rbuf != 0) free(c->rbuf);
            if (c->wbuf != 0) free(c->wbuf);
            if (c->ilist !=0) free(c->ilist);
            if (c->suffixlist != 0) free(c->suffixlist);
            if (c->iov != 0) free(c->iov);
            if (c->msglist != 0) free(c->msglist);
            free(c);
            perror("malloc()");
            return NULL;
        }

        STATS_LOCK();
        stats.conn_structs++;
        STATS_UNLOCK();
    }

    if (settings.verbose > 1) {
        if (init_state == conn_listening)
            fprintf(stderr, "<%d server listening\n", sfd);
        else if (is_udp)
            fprintf(stderr, "<%d server listening (udp)\n", sfd);
        else
            fprintf(stderr, "<%d new client connection\n", sfd);
    }

    c->sfd = sfd;
    c->udp = is_udp;
    c->state = init_state;
    c->rlbytes = 0;
    c->rbytes = c->wbytes = 0;
    c->wcurr = c->wbuf;
    c->rcurr = c->rbuf;
    c->ritem = 0;
    c->icurr = c->ilist;
    c->suffixcurr = c->suffixlist;
    c->ileft = 0;
    c->suffixleft = 0;
    c->iovused = 0;
    c->msgcurr = 0;
    c->msgused = 0;

    c->write_and_go = conn_read;
    c->write_and_free = 0;
    c->item = 0;
    c->bucket = -1;
    c->gen = 0;

    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);//Libevent eventset 设置事件处理函数
    event_base_set(base, &c->event); //libevent函数加入到eventbase
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {//libevent函数 加入到进入事件循环
        if (conn_add_to_freelist(c)) {
            conn_free(c);
        }
        return NULL;
    }

    STATS_LOCK();
    stats.curr_conns++;
    stats.total_conns++;
    STATS_UNLOCK();

    return c;
}
# define conn_from_freelist()        do_conn_from_freelist()
conn *do_conn_from_freelist() {
    conn *c;

    if (freecurr > 0) {
        c = freeconns[--freecurr];
    } else {
        c = NULL;
    }

    return c;
}
conn定义如下
typedef struct {
    int    sfd;
    int    state;
    struct event event;
    short  ev_flags;
    short  which;   /** which events were just triggered */

    char   *rbuf;   /** buffer to read commands into */
    char   *rcurr;  /** but if we parsed some already, this is where we stopped */
    int    rsize;   /** total allocated size of rbuf */
    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */

    char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    int    write_and_go; /** which state to go into after finishing current write */
    void   *write_and_free; /** free this memory after finishing writing */

    char   *ritem;  /** when we read in an item's value, it goes here */
    int    rlbytes;

    /* data for the nread state */

    /**
     * item is used to hold an item structure created after reading the command
     * line of set/add/replace commands, but before we finished reading the actual
     * data. The data is read into ITEM_data(item) to avoid extra copying.
     */

    void   *item;     /* for commands set/add/replace  */
    int    item_comm; /* which one is it: set/add/replace */

    /* data for the swallow state */
    int    sbytes;    /* how many bytes to swallow */

    /* data for the mwrite state */
    struct iovec *iov;
    int    iovsize;   /* number of elements allocated in iov[] */
    int    iovused;   /* number of elements used in iov[] */

    struct msghdr *msglist;
    int    msgsize;   /* number of elements allocated in msglist[] */
    int    msgused;   /* number of elements used in msglist[] */
    int    msgcurr;   /* element in msglist[] being transmitted now */
    int    msgbytes;  /* number of bytes in current msg */

    item   **ilist;   /* list of items to write out */
    int    isize;
    item   **icurr;
    int    ileft;

    char   **suffixlist;
    int    suffixsize;
    char   **suffixcurr;
    int    suffixleft;

    /* data for UDP clients */
    bool   udp;       /* is this is a UDP "connection" */
    int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
    struct sockaddr request_addr; /* Who sent the most recent request */
    socklen_t request_addr_size;
    unsigned char *hdrbuf; /* udp packet headers */
    int    hdrsize;   /* number of headers' worth of space is allocated */

    int    binary;    /* are we in binary mode */
    int    bucket;    /* bucket number for the next command, if running as
                         a managed instance. -1 (_not_ 0) means invalid. */
    int    gen;       /* generation requested for the bucket */
} conn;

state字段取值如下枚举
enum conn_states {
    conn_listening,  /** the socket which listens for connections */
    conn_read,       /** reading in a command line */
    conn_write,      /** writing out a simple response */
    conn_nread,      /** reading in a fixed number of bytes */
    conn_swallow,    /** swallowing unnecessary bytes w/o storing */
    conn_closing,    /** closing this connection */
    conn_mwrite,     /** writing out many items sequentially */
};

conn_new这个函数负责将原始套接字封装成为一个conn对象,同时会注册与该conn对象相关的IO事件,并指定该连接(conn)的初始状态。
listen_conn = conn_new(l_socket, conn_listening,
                                 EV_READ | EV_PERSIST, 1, false, main_base)
这个连接的初始状态为conn_listening,
监听READ事件,EV_PERSIST,表明是一个永久事件

/* initialise clock event */
    clock_handler(0, 0, 0);

static void clock_handler(const int fd, const short which, void *arg) {
    struct timeval t = {t.tv_sec = 1, t.tv_usec = 0};
    static bool initialized = false;

    if (initialized) {
        /* only delete the event if it's actually there. */
        evtimer_del(&clockevent);
    } else {
        initialized = true;
    }

    evtimer_set(&clockevent, clock_handler, 0);
    event_base_set(main_base, &clockevent);
    evtimer_add(&clockevent, &t);

    set_current_time();
}
设置时钟事件,奇怪的是const short which, void *arg这两个参数都没用上。
时钟事件每秒出发一次struct timeval t = {t.tv_sec = 1, t.tv_usec = 0};
delete_handler(0, 0, 0); /* sets up the event */
每5秒触发一次,清理删除的item项。
static void delete_handler(const int fd, const short which, void *arg) {
    struct timeval t = {t.tv_sec = 5, t.tv_usec = 0};
    static bool initialized = false;

    if (initialized) {
        /* some versions of libevent don't like deleting events that don't exist,
           so only delete once we know this event has been added. */
        evtimer_del(&deleteevent);
    } else {
        initialized = true;
    }

    evtimer_set(&deleteevent, delete_handler, 0);
    event_base_set(main_base, &deleteevent);
    evtimer_add(&deleteevent, &t);
    run_deferred_deletes();
}

event_base_loop(main_base, 0);
开始事件循环

main()函数分析完毕。

原文地址:https://www.cnblogs.com/yanzhenan/p/2278402.html