Redis5设计与源码分析 (第13章 列表相关命令的实现)

13.1 相关命令介绍

Redis列表对象的底层数据结构是quicklist;

13.1.1 命令列表

表13-1 列表命令

13.1.2 栈和队列命令列表

栈与队列是操作受限制的线性表,

栈只允许在线性表的同一侧执行插入或删除操作,具有先进后出的特性;

队列只允许在一侧插入另一侧删除,具有先进先出的特性。

如表13-2所示,通过这些列表命令可以很容易实现栈与队列。

表13-2 栈和队列相关命令

1.实现栈

如图13-1所示,列表节点为(1,3,4),我们对它进行了两次右侧入栈操作,列表节点变为了(1,3,4,10,9)。然后我们对该列表进行了一次出栈操作,列表节点最终为(1,3,4,10)。

图13-1 栈的操作

栈要求操作在同一侧进行,可以通过lpushlpop命令组合实现,也可以通过rpush和lpop命令组合实现。(左进左出或者右进右出)

如果需要在pop时等待数据产生,可以使用阻塞的pop,即通过lpush和blpop命令组合实现阻塞的栈,或者通过rpushbrpop命令组合实现阻塞的栈。

2.实现队列

如图13-2所示,列表的节点为(1,3,4)。在右侧进行了入队列操作,节点变为(1,3,4,10)。在左侧进行了出队列操作,列表节点变为了(3,4,10)。

图13-2 队列的操作

队列要求在一侧插入、另一侧删除,可以通过lpushrpop命令组合实现,或者通过rpushlpop命令组合实现。(左进右出或者右进左出)

如果需要在pop时等待数据产生,可以使用阻塞的pop,即通过lpush和brpop命令组合实现阻塞的队列,或者通过rpush和blpop命令组合实现阻塞的队列。

13.2 push/pop相关命令

13.2.1 push类命令的实现

lpush命令作用是在列表头部插入元素,并返回列表的总长度

格式: lpush key value [value...]

入口函数为lpushCommand,内部调用pushGenericCommand实现,参数LIST_HEAD表示列表的头部。

void pushGenericCommand(client *c, int where) {

int j, pushed = 0;

//根据key查找列表对象,如果没有查找到,则创建新的列表对象

robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

for (j = 2; j < c->argc; j++) {

if (!lobj) {

lobj = createQuicklistObject(); //创建新的列表对象, 底层为qucklist

quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,

server.list_compress_depth);

dbAdd(c->db,c->argv[1],lobj);

}

listTypePush(lobj,c->argv[j],where); //将值value插入列表

pushed++;

}

...

}

函数listTypePush首先会校验列表对象的编码,如果不是OBJ_ENCODING_quicklist则直接返回错误,即列表对象的底层数据结构只有quicklist。函数quicklistPush是数据结构quicklist提供的API,用于将value插入到quicklist指定位置,在第7章已经详细介绍了 。

void listTypePush(robj *subject, robj *value, int where) {

if (subject->encoding == OBJ_ENCODING_QUICKLIST) {

int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;

value = getDecodedObject(value);

size_t len = sdslen(value->ptr);

quicklistPush(subject->ptr, value->ptr, len, pos);

decrRefCount(value);

} else {

serverPanic("Unknown list encoding");

}

}

lpushx、rpush、rpushx命令实现与LPUSH非常类似;

13.2.2 pop类命令的实现

rpop命令的作用是从列表尾部弹出元素,并返回给客户端。

格式:rpop key

示例:

127.0.0.1:6379> rpush mylist 1 2 3 4

(integer) 4

127.0.0.1:6379> rpop mylist

"4"

rpop命令的入口函数为rpopCommand,内部调用popGenericCommand实现,参数LIST_TAIL表示列表的尾部。

void popGenericCommand(client *c, int where) {

robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]);

if (o == NULL || checkType(c,o,OBJ_LIST)) return;

//函数listTypePop内部主要调用quicklistPopCustom实现quicklist元素的获取,这是数据结构quicklist提供的API

robj *value = listTypePop(o,where);

if (value == NULL) {

addReplyNull(c);

} else {

char *event = (where == LIST_HEAD) ? "lpop" : "rpop";

addReplyBulk(c,value);

decrRefCount(value);

if (listTypeLength(o) == 0) {

dbDelete(c->db,c->argv[1]);

}

}

}

命令lpop的实现与rpop非常类似,都是调用popGenericCommand函数完成的,只是第2个参数传递的是quicklist_HEAD;

13.2.3 阻塞push/pop类命令的实现

blpop是阻塞版本的POP:即当列表对象不存在,会阻塞客户端,直到列表对象不为空或者阻塞时间超过timeout为止,timeout为0表示阻塞时间无限长

格式:blpop key [key ...] timeout

示例:127.0.0.1:6379> blpop list1 list2 list3 0

blpop命令的入口函数为blpopCommand,内部调用blockingPopGenericCommand实现,参数LIST_HEAD表示列表的尾部。

 

函数blockingPopGenericCommand遍历所有列表key,当列表不为空时,则从列表中弹出元素并返回给客户端,注意这里同时会返回列表key与弹出的元素value。从代码实现中同样可以看出,只要这多个列表中至少有一个列表不为空,命令就会直接返回而不会阻塞客户端。阻塞客户端的逻辑由函数blockForKeys实现。另外需要注意的一点是,这里解析的timeout超时时间是一个绝对时间。

/* Blocking RPOP/LPOP */

void blockingPopGenericCommand(client *c, int where) {

//遍历所有key,只要有一个列表不为空,就从该列表中弹出元素并返回

for (j = 1; j < c->argc-1; j++) {

o = lookupKeyWrite(c->db,c->argv[j]);

if (o != NULL) {

if (o->type != OBJ_LIST) {

addReply(c,shared.wrongtypeerr);

return;

} else {

if (listTypeLength(o) != 0) {

//弹出元素并返回给客户端

char *event = (where == LIST_HEAD) ? "lpop" : "rpop";

robj *value = listTypePop(o,where);

serverAssert(value != NULL);

//注意返回的是列表的key和value

addReplyArrayLen(c,2);

addReplyBulk(c,c->argv[j]);

addReplyBulk(c,value);

decrRefCount(value);

notifyKeyspaceEvent(NOTIFY_LIST,event,

c->argv[j],c->db->id);

if (listTypeLength(o) == 0) {

dbDelete(c->db,c->argv[j]);

}

return; //命令结束

}

}

}

}

/* 所有列表都为空,阻塞客户端k */

blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);

}

 

redisDb结构体,其中有一个字段blocking_keys,类型为字典,所有因为某个键阻塞的客户端都会被添加到该字典中,其结构如图13-3所示

图13-3 阻塞键结构

函数blockForKeys主要实现逻辑如下:

void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {

//c->bpop类型为结构体blockingState,

c->bpop.timeout = timeout; //c->bpop.timeout记录客户端阻塞超时时间

c->bpop.target = target;

if (target != NULL) incrRefCount(target);

for (j = 0; j < numkeys; j++) {

bkinfo *bki = zmalloc(sizeof(*bki));

if (btype == BLOCKED_STREAM)

bki->stream_id = ids[j];

/* c->bpop.keys是一个字典,记录客户端因为哪些key而阻塞. */

if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {

zfree(bki);

continue;

}

incrRefCount(keys[j]);

//将客户端添加到数据库对象redisDb的blocking_keys字典

de = dictFind(c->db->blocking_keys,keys[j]);

if (de == NULL) {

int retval;

//没有则需要创建链表,并添加到字典

l = listCreate();

//同时可以看到客户端对象被添加到c->db->blocking_keys字典中,key为阻塞的键,值为客户端链表

retval = dictAdd(c->db->blocking_keys,keys[j],l);

incrRefCount(keys[j]);

serverAssertWithInfo(c,keys[j],retval == DICT_OK);

} else {

l = dictGetVal(de);

}

listAddNodeTail(l,c); //将客户端追加到链表尾部

bki->listnode = listLast(l);

}

blockClient(c,btype); // 阻塞客户端

}

函数blockClient主要设置客户端标志位为阻塞状态,记录阻塞类型,同时统计阻塞的客户端数目。

void blockClient(client *c, int btype) {

c->flags |= CLIENT_BLOCKED;

c->btype = btype;

server.blocked_clients++;

server.blocked_clients_by_type[btype]++;

addClientToTimeoutTable(c);

}

 

有两个触发条件会解除客户端的阻塞状态,下面我们详细介绍。

(1)进行push操作

进行push操作时,如果该列表对象不存在,会创建一个空的列表并添加到数据库字典,此时会检测是否有某个客户端因为该列表键阻塞。函数如下:

void dbAdd(redisDb *db, robj *key, robj *val) {

sds copy = sdsdup(key->ptr);

int retval = dictAdd(db->dict, copy, val);

serverAssertWithInfo(NULL,key,retval == DICT_OK);

if (val->type == OBJ_LIST ||

val->type == OBJ_ZSET ||

val->type == OBJ_STREAM)

signalKeyAsReady(db, key);

if (server.cluster_enabled) slotToKeyAdd(key->ptr);

}

 

函数signalKeyAsReady会将该键添加到server.ready_keys链表中,节点类型为ready-List

void signalKeyAsReady(redisDb *db, robj *key) {

readyList *rl;

/* 没有客户端因为该键而阻塞 */

if (dictFind(db->blocking_keys,key) == NULL) return;

/* 已经处理过了 */

if (dictFind(db->ready_keys,key) != NULL) return;

/* 把该键添加到ready_keys字典*/

rl = zmalloc(sizeof(*rl));

rl->key = key;

rl->db = db;

incrRefCount(key);

listAddNodeTail(server.ready_keys,rl);

incrRefCount(key);

serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);

}

redis的命令处理入口processCommand,在调用完命令后,如果检测到server.ready_keys链表不为空,则说明有客户端可以解除阻塞状态了:

int processCommand(client *c) {

...

call(c,CMD_CALL_FULL);

c->woff = server.master_repl_offset;

//处理阻塞键

if (listLength(server.ready_keys))

handleClientsBlockedOnKeys();

}

 

如下代码所示函数handleClientsBlockedOnKeys首先遍历server.ready_keys链表,获取到多个列表键,同时在db->blocking_keys字典中查找是否存在某个因为该列表键阻塞的客户端,如果有则解除客户端阻塞状态。函数unblockClient主要用于去除客户端阻塞状态并向客户端返回数据。

void handleClientsBlockedOnKeys(void) {

while(listLength(server.ready_keys) != 0) {

list *l;

 

l = server.ready_keys;

server.ready_keys = listCreate();

//遍历server.ready_keys链表

while(listLength(l) != 0) {

listNode *ln = listFirst(l);

readyList *rl = ln->value;

//删除db->ready_keys,signalKeyAsReady处有校验

dictDelete(rl->db->ready_keys,rl->key);

server.fixed_time_expire++;

updateCachedTime(0);

//查找列表对象 .

robj *o = lookupKeyWrite(rl->db,rl->key);

if (o != NULL) {

if (o->type == OBJ_LIST)

serveClientsBlockedOnListKey(o,rl);

else if (o->type == OBJ_ZSET)

serveClientsBlockedOnSortedSetKey(o,rl);

else if (o->type == OBJ_STREAM)

serveClientsBlockedOnStreamKey(o,rl);

serveClientsBlockedOnKeyByModule(rl);

}

server.fixed_time_expire--;

 

/* Free this item. */

decrRefCount(rl->key);

zfree(rl);

listDelNode(l,ln);

}

listRelease(l); /* We have the new list on place at this point. */

}

}

 

void serveClientsBlockedOnListKey(robj *o, readyList *rl) {

//获取到因为该列表键阻塞的所有客户端 */

dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);

if (de) {

list *clients = dictGetVal(de);

int numclients = listLength(clients);

 

while(numclients--) {

listNode *clientnode = listFirst(clients);

client *receiver = clientnode->value;

robj *value = listTypePop(o,where);

if (value) { //遍历客户端链表,解除其阻塞状态

if (dstkey) incrRefCount(dstkey);

unblockClient(receiver);

...

} else {

break;

}

}

}

if (listTypeLength(o) == 0) {

dbDelete(rl->db,rl->key);

notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);

}

}

 

(2)阻塞超时

阻塞超时同样会解除客户端阻塞状态,同时向客户端返回nil。服务器如何检测客户端通过时间事件处理函数serverCron检测客户端阻塞超时。入口函数为clientsCron,处理客户端所有定时任务,它会遍历所有客户端,并调用函数clientsCronHandleTimeout实现阻塞超时处理逻辑,注意第2个参数为当前时间:

void clientsCron(void) {

int numclients = listLength(server.clients);

int iterations = numclients/server.hz;

mstime_t now = mstime();

if (iterations < CLIENTS_CRON_MIN_ITERATIONS)

iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?

numclients : CLIENTS_CRON_MIN_ITERATIONS;

while(listLength(server.clients) && iterations--) {

client *c;

listNode *head;

listRotateTailToHead(server.clients);

head = listFirst(server.clients);

c = listNodeValue(head);

if (clientsCronHandleTimeout(c,now)) continue;

if (clientsCronResizeQueryBuffer(c)) continue;

if (clientsCronTrackExpansiveClients(c)) continue;

if (clientsCronTrackClientsMemUsage(c)) continue;

}

}

而函数clientsCronHandleTimeout实现就需要比较客户端的超时时间timeout与当前时间即可。

至于命令brpop、brpoplpush都和blpop大同小异;

13.3 获取列表数据

对于列表对象,可以获取指定索引的元素,获取列表的长度,查询指定索引范围内的所有元素。

13.3.1 获取单个元素

lindex命令的作用是获取索引为index的元素。

格式:lindex key

示例:127.0.0.1:6379> lindex mylist 1

命令的入口函数为lindexCommand。函数quicklistIndex是数据结构Quicklist提供的API,用于查找指定索引位置的元素,在第7章已经详细介绍,这里不再赘述。如果查找到元素则返回该元素值,否则返回"$-1 "。

void lindexCommand(client *c) {

robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);

if (o == NULL || checkType(c,o,OBJ_LIST)) return;

long index;

robj *value = NULL;

if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))

return;

if (o->encoding == OBJ_ENCODING_QUICKLIST) {

quicklistEntry entry;

if (quicklistIndex(o->ptr, index, &entry)) {

if (entry.value) {

value = createStringObject((char*)entry.value,entry.sz);

} else {

value = createStringObjectFromLongLong(entry.longval);

}

addReplyBulk(c,value);

decrRefCount(value);

} else {

addReplyNull(c);

}

} else {

serverPanic("Unknown list encoding");

}

}

13.3.2 获取多个元素

lrange命令的作用是获取指定索引范围内的所有元素:0表示第1个元素,1表示第2个元素,以此类推;-1表示最后一个元素,-2表示倒数第2个元素,以此类推。

格式:lrange key start end

示例:127.0.0.1:6379> lrange mylist 0 -1

入口函数为lrangeCommand

首先需要计算得到正确的索引起始和终止范围。这里生成了一个迭代器list-TypeIterator,并调用方法listTypeNext迭代指定数目的元素。

迭代器底层主要通过数据结构quicklist的迭代器quicklistIter实现。

void lrangeCommand(client *c) {

...

llen = listTypeLength(o);

/*处理范围 转换负索引 */

if (start < 0) start = llen+start;

if (end < 0) end = llen+end;

if (start < 0) start = 0;

if (end >= llen) end = llen-1; //超过长度则设置为全长

rangelen = (end-start)+1; //范围长度

/* Return the result in form of a multi-bulk reply */

addReplyArrayLen(c,rangelen);

if (o->encoding == OBJ_ENCODING_QUICKLIST) {

//初始化迭代器

listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);

//迭代指定数目元素并返回给客户端

while(rangelen--) {

listTypeEntry entry;

listTypeNext(iter, &entry);

quicklistEntry *qe = &entry.entry;

if (qe->value) {

addReplyBulkCBuffer(c,qe->value,qe->sz);

} else {

addReplyBulkLongLong(c,qe->longval);

}

}

listTypeReleaseIterator(iter);

} else {

serverPanic("List encoding is not QUICKLIST!");

}

}

13.3.3 获取列表长度

llen命令的作用是获取列表长度(元素数目)。

格式: llen key

示例:127.0.0.1:6379> llen mylist

入口函数为llenCommand。列表的长度即quicklist列表中元素的数目,通过函数quicklistCount很容易实现。

void llenCommand(client *c) {

robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);

if (o == NULL || checkType(c,o,OBJ_LIST)) return;

//listTypeLength调用 quicklistCount(subject->ptr);

addReplyLongLong(c,listTypeLength(o));

}

13.4 操作列表

以向指定位置插入元素,也可以删除指定位置的元素等

13.4.1 设置元素

lset命令的作用是设置指定索引位置的元素值。

格式:lset key index value

示例: 127.0.0.1:6379> lrange mylist2 0 -1

 

入口函数为lsetCommand。

函数quicklistReplaceAtIndex是数据结构quick-list提供的API,用于修改指定索引位置的值;修改失败会返回错误提示"-ERR index out of range "。

void lsetCommand(client *c) {

robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);

if (o == NULL || checkType(c,o,OBJ_LIST)) return;

long index;

robj *value = c->argv[3]; //第三个参数是修改后的value

if (o->encoding == OBJ_ENCODING_QUICKLIST) {

quicklist *ql = o->ptr;

//指定位置设置元素,返回结果

int replaced = quicklistReplaceAtIndex(ql, index,

value->ptr, sdslen(value->ptr));

if (!replaced) { //修改失败返回错误提示:-ERR index out of range

addReply(c,shared.outofrangeerr);

} else {

addReply(c,shared.ok);

}

}

}

13.4.2 插入元素

linsert命令的作用是将值value插入到列表key,且位于值pivot之前或之后。

格式:linsert key before|after pivot value

示例: linsert mylist after 3 2 //在值3后面插入2

实现函数为linsertCommand

获取列表迭代器listTypeIterator后,会从头开始遍历所有元素,直到找到与值pivot相等的元素,并插入到值pivot的前面或者后面。 (迭代查找并更新)

void linsertCommand(client *c) {

... //确定指定位置前后

if (strcasecmp(c->argv[2]->ptr,"after") == 0) {

where = LIST_TAIL;

} else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {

where = LIST_HEAD;

} else {

addReply(c,shared.syntaxerr);

return;

}

/* //遍历quicklist,根据元素找到位置索引, Seek pivot from head to tail */

iter = listTypeInitIterator(subject,0,LIST_TAIL);

while (listTypeNext(iter,&entry)) {

if (listTypeEqual(&entry,c->argv[3])) {

listTypeInsert(&entry,c->argv[4],where); //插入元素

inserted = 1;

break;

}

}

listTypeReleaseIterator(iter);

}

13.4.3 删除元素

lrem命令的作用是移除列表中与参数value相等的元素,并返回被移除的元素数目。当count大于0时,从表头开始向表尾搜索,最大删除count个元素;当count小于0时,从表尾开始向表头搜索,最大删除"count绝对值"个元素;当count等于0时,删除所有与value相等的元素。

格式:lrem key count value

示例: 127.0.0.1:6379> lrem mylist2 2 3

入口函数为lremCommand。

toremove表示count的值,负数时起始索引为-1(即最后一个元素),遍历方向为向前遍历,正数时起始索引为0(即第1个元素),遍历方向为向后遍历。获取迭代器listTypeIterator之后,调用listTypeNext依次遍历所有元素,如果元素与值obj相等则删除元素,toremove等于0时删除所有与值obj相等的元素,否则最多删除"toremove"个元素。

void lremCommand(client *c) {

...

listTypeIterator *li;

if (toremove < 0) { //toremove即为count的值

toremove = -toremove; //负数转正

li = listTypeInitIterator(subject,-1,LIST_HEAD);

} else {

li = listTypeInitIterator(subject,0,LIST_TAIL);

}

listTypeEntry entry;

while (listTypeNext(li,&entry)) {

if (listTypeEqual(&entry,obj)) {

listTypeDelete(li, &entry);

server.dirty++;

removed++;

if (toremove && removed == toremove) break; //删除元素数目控制

}

}

listTypeReleaseIterator(li);

...

}

13.4.4 裁剪列表

ltrim命令的作用是对一个列表进行裁剪,即只保留区间start与stop内的元素,其他将被删除。

格式:ltrim key start stop

示例:127.0.0.1:6379> lpush mylist4 1 2 3 4

     127.0.0.1:6379> ltrim mylist4 1 2

入口函数是ltrimCommand。

它调用函数quicklistDelRange删除指定范围内的元素来实现操作,这是数据结构quicklist提供的API。

void ltrimCommand(client *c) {

robj *o;

long start, end, llen, ltrim, rtrim;

/* 转换负数 索引 convert negative indexes */

if (start < 0) start = llen+start;

if (end < 0) end = llen+end;

if (start < 0) start = 0;

if (start > end || start >= llen) {

/* 如果超过范围怎没有删除的数据*/

ltrim = llen;

rtrim = 0;

} else {

if (end >= llen) end = llen-1; //结束位置最多到列表末尾

ltrim = start; //左边索引位置

rtrim = llen-end-1; //右边索引位置

}

/* Remove list elements to perform the trim */

if (o->encoding == OBJ_ENCODING_QUICKLIST) {

quicklistDelRange(o->ptr,0,ltrim); //删除左边索引部分

quicklistDelRange(o->ptr,-rtrim,rtrim); //删除右边索引部分

} else {

serverPanic("Unknown list encoding");

}

....'

}

13.5 本章小结

Redis中列表的命令实现,列表底层的数据结构采用的是quicklist。

栈与队列的基本概念,如何通过push/pop实现栈与队列;

列表阻塞命令的实现,通过blpop命令讲解了客户端阻塞流程,以及解除客户端阻塞流程;

常见的列表操作和查询的命令。

  

原文地址:https://www.cnblogs.com/coloz/p/13812851.html