redis 订阅发布

  第一次接触订阅&&发布模型的时候是在openvswitch里面,其使用ovsdb-nosql数据库处理盒子产品的数据库控制平面;

目前看redis 的时候又看到了订阅&&发布,所以来看看源码以及其使用的数据结构!

Redis中是如何实现此中模式的:???

  • 1.在RedisClient 内部维护了一个pubsub_channels的Channel列表,记录了此客户端所订阅的频道
  • 2.在Server服务端,同样维护着一个类似的变量叫做,pubsub_channels,这是一个dict字典变量,每一个Channel对应着一批订阅了此频道的Client,也就是Channel-->list of Clients
  • 3.当一个Client publish一个message的时候,会先去服务端的pubsub_channels找相应的Channel,遍历里面的Client,然后发送通知,即完成了整个发布订阅模式。
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. 
 *
 * 设置客户端 c 订阅频道 channel 。
 *
 * 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。
 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    // 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);

        // 关联示意图
        // {
        //  频道名        订阅频道的客户端
        //  'channel-a' : [c1, c2, c3],
        //  'channel-b' : [c5, c2, c1],
        //  'channel-c' : [c10, c2, c1]
        // }
        /* Add the client to the channel -> list of clients hash table */
        // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
        // 如果 channel 不存在于字典,那么添加进去
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }

        // before:
        // 'channel' : [c1, c2]
        // after:
        // 'channel' : [c1, c2, c3]
        // 将客户端添加到链表的末尾
        listAddNodeTail(clients,c);
    }

    /* Notify the client */
    // 回复客户端。
    // 示例:
    // redis 127.0.0.1:6379> SUBSCRIBE xxx
    // Reading messages... (press Ctrl-C to quit)
    // 1) "subscribe"
    // 2) "xxx"
    // 3) (integer) 1
    addReply(c,shared.mbulkhdr[3]);
    // "subscribe
" 字符串
    addReply(c,shared.subscribebulk);
    // 被订阅的客户端
    addReplyBulk(c,channel);
    // 客户端订阅的频道和模式总数
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
    return retval;
}
/* -----------------------------------------------------------------------------
 * Higher level functions to queue data on the client output buffer.
 * The following functions are the ones that commands implementations will call.
 * -------------------------------------------------------------------------- */
//sendReplyToClient为实际的数据write的地方,并且会现在一次性最多发送多少,避免阻塞
void addReply(redisClient *c, robj *obj) {

    // 为客户端安装写处理器到事件循环
    if (prepareClientToWrite(c) != REDIS_OK) return;

    /* This is an important place where we can avoid copy-on-write
     * when there is a saving child running, avoiding touching the
     * refcount field of the object if it's not needed.
     *
     * 如果在使用子进程,那么尽可能地避免修改对象的 refcount 域。
     *
     * If the encoding is RAW and there is room in the static buffer
     * we'll be able to send the object to the client without
     * messing with its page. 
     *
     * 如果对象的编码为 RAW ,并且静态缓冲区中有空间
     * 那么就可以在不弄乱内存页的情况下,将对象发送给客户端。
     */
    if (sdsEncodedObject(obj)) {
        // 首先尝试复制内容到 c->buf 中,这样可以避免内存分配
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
            // 如果 c->buf 中的空间不够,就复制到 c->reply 链表中
            // 可能会引起内存分配
            _addReplyObjectToList(c,obj);
    } else if (obj->encoding == REDIS_ENCODING_INT) {
        /* Optimization: if there is room in the static buffer for 32 bytes
         * (more than the max chars a 64 bit integer can take as string) we
         * avoid decoding the object and go for the lower level approach. */
        // 优化,如果 c->buf 中有等于或多于 32 个字节的空间
        // 那么将整数直接以字符串的形式复制到 c->buf 中
        if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
            char buf[32];
            int len;

            len = ll2string(buf,sizeof(buf),(long)obj->ptr);
            if (_addReplyToBuffer(c,buf,len) == REDIS_OK)
                return;
            /* else... continue with the normal code path, but should never
             * happen actually since we verified there is room. */
        }
        // 执行到这里,代表对象是整数,并且长度大于 32 位
        // 将它转换为字符串
        obj = getDecodedObject(obj);
        // 保存到缓存中
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
            _addReplyObjectToList(c,obj);
        decrRefCount(obj);
    } else {
        redisPanic("Wrong obj->encoding in addReply()");
    }
}
/* Add a Redis Object as a bulk reply 
 *
 * 返回一个 Redis 对象作为回复
 */
void addReplyBulk(redisClient *c, robj *obj) {
    addReplyBulkLen(c,obj);
    addReply(c,obj);
    addReply(c,shared.crlf);
}

如上就是单个channel的订阅方式了,总结如下:

    1. 客户端自行管理需要订阅的channel, 放到 c->pubsub_channels 中;
    2. redis使用的一个统一的 server->pubsub_channels dict容器进行管理所有的channel;
    3. 对于多个客户端订阅一个channel, redis 使用list进行管理追加;

http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子
原文地址:https://www.cnblogs.com/codestack/p/15469296.html