Redis5设计与源码分析 (第18章 数据流相关命令的实现)

18.1 相关命令介绍

Redis的Stream命令都以x开头。

1.xadd命令

作用: 将指定消息数据追加到指定的Stream队列中或裁减列中数据长度。

格式:xadd key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ...

说明: 每条消息由一或多个阈值对组成,消息插入队列中后会返回唯一的消息ID。

xadd是唯一可以向Stream队列添加数据的命令。

1) MAXLEN:当Stream中数据量过大时,通过此关键字裁剪长度,删除旧数据至指定的值;当 数据量小于等于指定值时,不进行剪切。其中裁剪模式有两种。

·~:模糊裁剪,优化精确裁剪,一般用此模式,效率更高。

·=:精确裁剪,数据存储的listpack结构体中,裁剪长度的所有阈值是依照数据从老到新的方式,依次把listpack释放掉,此模式下删除最后一个listpack中的数据比较费时,所以推荐用模糊裁剪。

2)ID:添加消息可指定具体值或用"*"代替,指定的值必须大于当前队列中最大的消息ID,为*时则默认生成一个最新的ID,ID值取的是当前时间+序列号。

示例:

①添加一条数据,不指定ID值。 xadd mytopic * name tom age 20

②添加一条数据,指定ID值: xadd mytopic 1547127055889-0 name jim age 21

③修改长度,如果发现添加新元素后的超过100万条消息,则删除旧消息使长度大约缩减至100万个元素。

 

2.xrange命令

用于读取给定ID范围内的消息数据,并可以设置返回数据的条数。

格式:xrange key start end [COUNT count]

说明: 将返回两个ID之间(闭区间)的所有消息,消息排序为ID递增排序。

·start:开始消息ID,指定具体值或通过"-"特殊符号来表示最小ID。

·end:结束消息ID,指定具体值或通过"+"特殊符号来表示最大ID。

·COUNT:设定返回的消息数量。

示例 : xrange mytopic - + COUNT 2

3.xrevrange命令

说明: 与xrange用法唯一区别是返回数据的顺序为消息ID的递减序,正好与xrange返回的数据顺序相反。

4.xdel命令

用于删除Stream队列中指定的一或多个消息ID对应的数据。

格式:

xdel key ID [ID ...]

说明

·key:类型必须为OBJ_STREAM,否则报错。

·ID:为指定的一或多个消息ID。注意:ID不可为特殊符号"-"和"+",不支持范围删除

示例: xdel mytopic 1547127055879-0

 

 

5.xgroup命令

用于队列的消费组管理,包含对消费组的创建、删除、修改等操作。

格式:xgroup [CREATE key groupname id-or-$]

         [SETID key id-or-$]

[DESTROY key groupname]

[DELCONSUMER key groupname consumername]

[HELP]

说明:S tream队列可以被多个消费组订阅,每个消费组都会记录最近一次消费的消息last_id,一个消费组可以拥有多个消费者去消费,组内消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_id往前移动,每个消费者有一个组内唯一名称。而xgroup命令就是用于消费组管理。

·CREATE:创建一个新消费组。该选项末尾设置了MKSTREAM参数,当创建消费组的键值对不存在时,则会创建一个新的消费组。

·SETID:修改某个消费组消费的消息last_id。

·DESTROY:删除指定消费组。

·DELCONSUMER:删除指定消费组中某个消费者。

·HELP:查看使用帮助。

示例

1)创建一个消费组:xgroup CREATE mytopic cg1 1547127055879-0

//创建一个消费组cg1,从消息id为1547127055879-0的消息开始消费

最后一个参数是指定该消费组开始消费的消息ID,其中"0"或"0-

0",表示从头开始消费,如果使用特殊符"$",则表示队列中最后一

项ID,只读取消息队列中新到的消息。

2)修改消费组的last_id:

xgroup SETID mytopic cg1 1547127055888-0

//修改消费组cg1,从消息id为1547127055888-0的消息开始消费

6.xreadgroup命令

用于从消费组中可靠地消费n条消息,如果指定的消费者不存在,则创建之。

格式

xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

说明

·group:消费组名称。

·consumer:消费者名称。

·COUNT:消费多少条数据。

·BLOCK:是否为阻塞模式,milliseconds为阻塞多少毫秒。

·STREAMS:Stream队列名称,可指定多个。若指定多个,则ID也要对应指定相同个数。

·ID:读取只大于指定消息ID后未确认的消息;特殊符号">",读取未传递给其他任何消费者的消息,也就是新消息。

·NOACK:该消息不需要确认。

示例:

从Stream队列的消费组cg1中新建一个消费者c1,并消费一条数据。

XREADGROUP GROUP cg1 c1 COUNT 1 STREAMS mytopic >

7.xread命令

用于从Stream队列中读取N条消息,一般用作遍历队列中的消息。

格式

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

说明: 此命令读取消息后无须通过XACK确认,也不需要强制指定消费组名称与消费者名称。

·COUNT:读取多少条数据;

·BLOCK:是否为阻塞模式,milliseconds为阻塞多少毫秒;

·STREAMS:Stream队列名称;

·ID:指定从消息ID开始读取,即消息ID大于指定的ID的消息,可为"$"特殊符号,代表从最后一条开始读取。

示例: xread COUNT 10 STREAMS mytopic 0

8.xack命令

用于确认一或多个指定ID的消息,使其从待确认列表中删除

格式:xack key group ID [ID ...]

说明: 为了确保每个消息能被消费者消费到,通过xreadgroup消费的消息会存储在该消费组的未确认列表中,直到客户端确认该消息,才会从未确认列表中删除。

·group:消费组名称;

·ID:确认的消息ID。

示例:xack mytopic cg1 1547127055889-0

9.xpending命令

读取某消费组或者某个消费者的未确认消息,返回未确认的消息ID、空闲时间、被读取次数。

格式:xpending key group [start end count] [consumer]

说明

·group:指定的消费组;

·start:范围开始ID,可以为特殊符"-"表示开始或指定ID;

·end:范围结束ID,可以为特殊符"+"标识结尾或指定ID;

·count:读取条数;

·consumer:指定的消费者。

示例:

①读取消费组cg1中消费者c1的所有待确认消息

xpending mytopic cg1 - + 2 c1

1) 1) "1547127055889-0" //消息ID

2) "c1" //消费者名称

3) (integer) 653752 //间隔多久未确认

4) (integer) 11 //已被读取次数

②读取消费组cg1的所有待确认消息。

127.0.0.1:6379> xpending mytopic CG1 - + 10

10.xclaim命令

用于改变一或多个未确认消息的所有权,新的所有者是在命令参数中指定。

格式:

xclaim key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]

说明:

·consumer:指定新的消费者

·min-idle-time:指定消息最小空闲数;

·ID:指定消息ID;

·IDLE:将该消息空闲时间设置为指定毫秒数,默认IDLE值为0;

·TIME:将该消息空闲时间设置为指定UNIX时间;

·RETRYCOUNT:被读取次数重置为指定次数,默认不去修改,避免丢失真实被读取次数;

·force:在待处理条目列表(PEL)中创建待处理消息条目,即使某些指定的ID尚未在分配给不同客户端的待处理条目列表(PEL)中;

·justid:只返回成功认领的消息ID数组。

示例: 认领ID为"1547294557195-0"的消息,仅当消息闲置至少1小时时,将所有权分配给消费者c2,并将该消息的空闲时间置为0,被交付读取次数也改为0。

xclaim mytopic1 cg1 c2 3600000 1547294557195-0 IDLE 0 RETRY-COUNT 0

11.xinfo命令

xinfo命令用于读取消息队列、消费组、消费者等的信息。

格式

xinfo [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

说明

·CONSUMERS:用于查看某个消费组下的消费者信息;

·GROUPS:用于查看某个Stream队列下的消费组信息;

·STREAM:用于查看某个Stream队列的整体组信息。

示例

1)查看消费组c1中消费者消费信息:

xinfo CONSUMERS mytopic cg1

2)查看Stream队列信息:

xinfo STREAM mytopic

3)查看Stream队列中消费组信息:

xinfo GROUPS mytopic

12.xtrim命令

作用是缩减消息队列。

格式:xtrim key MAXLEN [~] count

说明: 当Stream中数据量过大时,可通过此命令字来缩减Stream队列长度,删除Stream中旧数据直到长度减少至指定的值;当数据量小于等于指定值时,不做剪切,此命令与xadd中通过MAXLEN字段实现裁剪的逻辑是一致的。其中count为指定的长度。

其中裁剪模式有两种:

·~:模糊裁剪,优化精确裁剪一般用此模式,效率更高。

·=:精确裁剪。

示例: xadd mytopic * name tom age 20

13.xlen命令

xlen命令用于获取Stream队列的数据长度。

格式:xlen key ID [ID ...]

示例: xlen mytopic

18.2 基本操作命令原理分析

添消息、删除消息、范围查找、遍历消息、获取队列信息、长度统计、裁剪消息;

18.2.1 添加消息

xaddCommand函数。

void xaddCommand(client *c) {

streamID id;

int id_given = 0; /* Was an ID different than "*" specified? */

long long maxlen = -1; /* If left to -1 no trimming is performed. */

int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so

the maxium length is not applied verbatim. */

int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */

 

/* 1)解析参数,主要对MAXLEN/ID/[field value]对的解析,参数是否合法,并赋值给不同的变量 */

int i = 2;

for (; i < c->argc; i++) {

int moreargs = (c->argc-1) - i; /*添加的[field value]对参数长度 */

char *opt = c->argv[i]->ptr;

if (opt[0] == '*' && opt[1] == '') { //遇到符号"*" 则跳出循环

break;

} else if (!strcasecmp(opt,"maxlen") && moreargs) {

// 解析maxlen参数

if (moreargs >= 2 && next[0] == '~' && next[1] == '') {

approx_maxlen = 1; /*maxlen后的"~"approx_maxlen = 1标识出来*/

i++;

//之后读取maxlen的值,并转换成long long类型

if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)

!= C_OK) return;

 

} else {

/* 指定ID时读取id值,并用id_given = 1; 标识*/ */

if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;

id_given = 1;

break;

}

}

int field_pos = i+1;

 

/* 校验[field value]对的数据是否合法 */

if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {

addReplyError(c,"wrong number of arguments for XADD");

return;

}

 

if (id_given && id.ms == 0 && id.seq == 0) {

addReplyError(c,"The ID specified in XADD must be greater than 0-0");

return;

}

 

2)校验key对应的值是否为Stream类型:如果存在且类型为Stream,则获取对应的值。如果值存在,但不为Stream类型,则报错。如果不存对应键值对,则调用createStream-Object函数初始化一个空的Stream类型对象,写入db的字典中

robj *o;

stream *s;

if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;

s = o->ptr;

 

/* Return ASAP if the stream has reached the last possible ID */

if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {

addReplyError(c,"The stream has exhausted the last possible ID, "

"unable to add more items");

return;

}

 

3)调用streamAppendItem,往Stream中写入消息ID及内容数据。其中消息ID会当作key存储在Rax树中,每个ID并非独占一个节点,插入时会找到Rax树的最大节点,判断该节点中存储数据的data字段是否达极限

if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,

&id, id_given ? &id : NULL)

== C_ERR)

{

addReplyError(c,"The ID specified in XADD is equal or smaller than the "

"target stream top item");

return;

}

 

4)消息添加完后,则把新插入的消息ID返回给客户端

addReplyStreamID(c,&id);

 

signalModifiedKey(c,c->db,c->argv[1]);

 

5)如果传入了maxlen参数,则会调用streamTrimByLength函数剪切队列中数据,实现见18.2.6

notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);

server.dirty++;

 

if (maxlen >= 0) {

/* Notify xtrim event if needed. */

if (streamTrimByLength(s,maxlen,approx_maxlen)) {

notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);

}

if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);

}

 

/* Let's rewrite the ID argument with the one actually generated for

* AOF/replication propagation. */

robj *idarg = createObjectFromStreamID(&id);

rewriteClientCommandArgument(c,i,idarg);

decrRefCount(idarg);

 

/* We need to signal to blocked clients that there is new data on this

* stream. */

if (server.blocked_clients_by_type[BLOCKED_STREAM])

signalKeyAsReady(c->db, c->argv[1]);

}

 

//上面几个步骤的内部函数

robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {

// 检查key ,查找db中是否已经存在该键值对

robj *o = lookupKeyWrite(c->db,key);

if (o == NULL) { // 不存在则初始化一个空的stream类型的对象

o = createStreamObject();

dbAdd(c->db,key,o); // 写入db

} else {

if (o->type != OBJ_STREAM) { //db中已存在,类型不对则报错

addReply(c,shared.wrongtypeerr);

return NULL;

}

}

return o; //返回key对应Stream类型的值

}

 

 

int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {

...

if (server.stream_node_max_bytes &&

lp_bytes >= server.stream_node_max_bytes)

{

lp = NULL;

} else if (server.stream_node_max_entries) {

int64_t count = lpGetInteger(lpFirst(lp));

if (count >= server.stream_node_max_entries) lp = NULL;

}

}

其默认配置:

stream-node-max-bytes 4096

stream-node-max-entries 100

 

如节点中data为空或达到存储上限,则重新创建一个新节点,把对应的消息ID及内容(field-value对)插入。

data字段存储数据用的是listpack表,会把不同的消息分成不同的entryentry中存储偏移量+消息内容(field value对)。

entry节点消息存储也分为两种:

1 是消息内容的fieldvalue两个值都存储,

2 是只存储field-value对中的value值。

区分用哪种方式存储的办法是和该队列中第1条消息做对比,如果结构一致,则采用第2种方式,更省内存,结构不一致则用第一种方式存储消息内容。所以在创建消息队列首次添加的数据时,一定要采用更通用的结构,避免浪费内存

 

18.2.2 删除消息

void xdelCommand(client *c) {

robj *o;

//1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错

if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL

|| checkType(c,o,OBJ_STREAM)) return;

stream *s = o->ptr;

 

/* 2)参数检验,循环校验参数,判断ID值格式是否正确,一个格式不正确则报错*/*/

streamID id;

for (int j = 2; j < c->argc; j++) { //检验id格式是否正确

if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;

}

 

/* 3)删除消息,根据消息ID先从Rax树中查找到其内容所属的节点,从节点遍历找到该消息对应的地址,把头部的flag标识为删除态,直到该listpack删除到最后一个消息时才真正释放整块内存,并从Rax树中摘除该节点*/

int deleted = 0;

for (int j = 2; j < c->argc; j++) {

streamParseStrictIDOrReply(c,c->argv[j],&id,0); /* Retval already checked. */

deleted += streamDeleteItem(s,&id); //内部调用 streamIteratorRemoveEntry

}

 

/* 4)返回消息,调用addReplyLongLong函数,把删除的消息数量写入输出缓冲 */

...

addReplyLongLong(c,deleted);

}

18.2.3 范围查找

xrange与xrevrange两个命令的分别调用xrangeCommand与xrevrangeCommand函数,底层统一再调用的是xrangeGenericCommand函数。会通过参数rev区分,对应的值分别为0和1;

void xrangeGenericCommand(client *c, int rev) {

robj *o;

stream *s;

streamID startid, endid;

long long count = -1;

robj *startarg = rev ? c->argv[3] : c->argv[2];

robj *endarg = rev ? c->argv[2] : c->argv[3];

1)解析参数与参数校验

//判断startarg格式是否正确

if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return;

// 判断endarg格式是否正确

if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return;

 

/* 传入了COUNT,则校验之后的参数是否正确. */

if (c->argc > 4) {

for (int j = 4; j < c->argc; j++) {

int additional = c->argc-j-1;

if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {

//读取count的值,并转换为longlong类型

if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL)

!= C_OK) return;

if (count < 0) count = 0;

j++; /* Consume additional arg. */

} else {

addReply(c,shared.syntaxerr);

return;

}

}

}

 

/* 2)校验key对应的值是否为Stream类型:如果存在且类型为Stream,则获取对应的值。

如果值存在但不为stream类型,则报错。 */

if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL ||

checkType(c,o,OBJ_STREAM)) return;

 

s = o->ptr;

 

if (count == 0) {

addReplyNullArray(c);

} else {

if (count == -1) count = 0;

// 3)调用streamReplyWithRange函数进行范围匹配查找

streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);

}

}

 

正序范围查找主要分如下两步。

反序范围查找:和正序查找类似,根据end指定的消息ID找到位置后,遍历顺序相反即可。

size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {

...

①根据start参数中指定的ID,从Rax树中查找到最后一个比该ID小的节点并往后遍历,直到找到该ID为止

if (!(flags & STREAM_RWR_RAWENTRIES))

arraylen_ptr = addReplyDeferredLen(c);

streamIteratorStart(&si,s,start,end,rev);

while(streamIteratorGetID(&si,&id,&numfields)) {

②从这个位置往后遍历,每读取一条消息则往输出缓冲区写入一条,当本节点遍历完则继续遍历Rax树的下一个节点,直到所有节点遍历完或者遇到一个比end参数所指定的消息ID大的值则结束。

/* Update the group last_id if needed. */

if (group && streamCompareID(&id,&group->last_id) > 0) {

group->last_id = id;

if (noack) propagate_last_id = 1;

}

 

/* Emit a two elements array for each item. The first is

* the ID, the second is an array of field-value pairs. */

addReplyArrayLen(c,2);

addReplyStreamID(c,&id);

addReplyArrayLen(c,numfields*2);

 

/* Emit the field-value pairs. */

while(numfields--) {

unsigned char *key, *value;

int64_t key_len, value_len;

streamIteratorGetField(&si,&key,&value,&key_len,&value_len);

addReplyBulkCBuffer(c,key,key_len);

addReplyBulkCBuffer(c,value,value_len);

}

 

if (group && !noack) {

unsigned char buf[sizeof(streamID)];

streamEncodeID(buf,&id);

 

streamNACK *nack = streamCreateNACK(consumer);

int group_inserted =

raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);

int consumer_inserted =

raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);

 

if (group_inserted == 0) {

streamFreeNACK(nack);

nack = raxFind(group->pel,buf,sizeof(buf));

serverAssert(nack != raxNotFound);

raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);

/* Update the consumer and NACK metadata. */

nack->consumer = consumer;

nack->delivery_time = mstime();

nack->delivery_count = 1;

/* Add the entry in the new consumer local PEL. */

raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);

} else if (group_inserted == 1 && consumer_inserted == 0) {

serverPanic("NACK half-created. Should not be possible.");

}

 

/* Propagate as XCLAIM. */

if (spi) {

robj *idarg = createObjectFromStreamID(&id);

streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);

decrRefCount(idarg);

}

}

 

arraylen++;

if (count && count == arraylen) break; //遍历结束

}

 

if (spi && propagate_last_id)

streamPropagateGroupID(c,spi->keyname,group,spi->groupname);

 

streamIteratorStop(&si);

if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);

return arraylen;

}

18.2.4 获取队列信息

读取队列、消费组、消费者信息的xinfo命令,对应的函数是xinfoCommand函数,xinfoCommand函数查询基本信息主要分为如下几步;

void xinfoCommand(client *c) {

...

opt = c->argv[1]->ptr;

key = c->argv[2];

 

/* 1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错. */

robj *o = lookupKeyWriteOrReply(c,key,shared.nokeyerr);

if (o == NULL || checkType(c,o,OBJ_STREAM)) return;

s = o->ptr;

 

/*2)根据传入的第2个参数做判断,区分去获取哪些信息,

如果为"CONSUMERS",则用于查看某个消费组下的消费者信息 */

if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {

/* XINFO CONSUMERS <key> <group>. */

streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);

if (cg == NULL) {

addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "

"for key name '%s'",

(char*)c->argv[3]->ptr, (char*)key->ptr);

return;

}

 

addReplyArrayLen(c,raxSize(cg->consumers));

raxIterator ri;

raxStart(&ri,cg->consumers);

raxSeek(&ri,"^",NULL,0);

mstime_t now = mstime();

while(raxNext(&ri)) {

streamConsumer *consumer = ri.data;

mstime_t idle = now - consumer->seen_time;

if (idle < 0) idle = 0;

 

addReplyMapLen(c,3);

addReplyBulkCString(c,"name");

addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));

addReplyBulkCString(c,"pending");

addReplyLongLong(c,raxSize(consumer->pel));

addReplyBulkCString(c,"idle");

addReplyLongLong(c,idle);

}

raxStop(&ri);

//·GROUPS:用于查看某个Stream队列下的消费组信息;

} else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {

if (s->cgroups == NULL) {

addReplyArrayLen(c,0);

return;

}

 

addReplyArrayLen(c,raxSize(s->cgroups));

raxIterator ri;

raxStart(&ri,s->cgroups);

raxSeek(&ri,"^",NULL,0);

while(raxNext(&ri)) {

streamCG *cg = ri.data;

addReplyMapLen(c,4);

addReplyBulkCString(c,"name");

addReplyBulkCBuffer(c,ri.key,ri.key_len);

addReplyBulkCString(c,"consumers");

addReplyLongLong(c,raxSize(cg->consumers));

addReplyBulkCString(c,"pending");

addReplyLongLong(c,raxSize(cg->pel));

addReplyBulkCString(c,"last-delivered-id");

addReplyStreamID(c,&cg->last_id);

}

raxStop(&ri);

// STREAM:用于查看某个Stream队列的整体组信息

} else if (!strcasecmp(opt,"STREAM")) {

/* XINFO STREAM <key> [FULL [COUNT <count>]]. */

xinfoReplyWithStreamInfo(c,s);

} else {

addReplySubcommandSyntaxError(c);

}

}

18.2.5 长度统计

读取队列长度的xlen命令,底层调用xlenCommand函数;

void xlenCommand(client *c) {

robj *o;

// 1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错

if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL

|| checkType(c,o,OBJ_STREAM)) return;

stream *s = o->ptr;

//2)返回值:根据读取的Stream,直接调用addReplyLongLong输出其长度

addReplyLongLong(c,s->length);

}

18.2.6 剪切消息

底层调用对应的函数是xtrimCommand函数;

void xtrimCommand(client *c) {

robj *o;

/*1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错. */

if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL

|| checkType(c,o,OBJ_STREAM)) return;

stream *s = o->ptr;

 

/* 2)解析与校验参数。maxlen必传,如果模糊缩减则把参数approx_maxlen标识为1*/

int trim_strategy = TRIM_STRATEGY_NONE;

long long maxlen = -1;

int approx_maxlen = 0;

int maxlen_arg_idx = 0;

/* Parse options. */

int i = 2; /* Start of options. */

for (; i < c->argc; i++) {

int moreargs = (c->argc-1) - i; /* Number of additional arguments. */

char *opt = c->argv[i]->ptr;

if (!strcasecmp(opt,"maxlen") && moreargs) {

approx_maxlen = 0;

trim_strategy = TRIM_STRATEGY_MAXLEN;

char *next = c->argv[i+1]->ptr;

/* Check for the form MAXLEN ~ <count>. */

if (moreargs >= 2 && next[0] == '~' && next[1] == '') {

approx_maxlen = 1;

i++;

} else if (moreargs >= 2 && next[0] == '=' && next[1] == '') {

i++;

}

if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)

!= C_OK) return;

 

if (maxlen < 0) {

addReplyError(c,"The MAXLEN argument must be >= 0.");

return;

}

i++;

maxlen_arg_idx = i;

} else {

addReply(c,shared.syntaxerr);

return;

}

}

 

/* 3)调用streamTrimByLength函数缩减队列*/

int64_t deleted = 0;

if (trim_strategy == TRIM_STRATEGY_MAXLEN) {

deleted = streamTrimByLength(s,maxlen,approx_maxlen);

} else {

addReplyError(c,"XTRIM called without an option to trim the stream");

return;

}

 

...

addReplyLongLong(c,deleted);

}

 

int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {

if (s->length <= maxlen) return 0; //只能缩减长度

 

raxIterator ri;

raxStart(&ri,s->rax); //初始化rax迭代器

raxSeek(&ri,"^",NULL,0); //找到rax首个节点

 

int64_t deleted = 0;

while(s->length > maxlen && raxNext(&ri)) {

unsigned char *lp = ri.data, *p = lpFirst(lp);

int64_t entries = lpGetInteger(p);

//从首个节点开始,循环删除Rax树中每个节点,直到删除数量足够,会把队列中老数据按节点逐个删除

/*如果可以删除整个节点*/

if (s->length - entries >= maxlen) {

lpFree(lp);

raxRemove(s->rax,ri.key,ri.key_len,NULL);

raxSeek(&ri,">=",ri.key,ri.key_len);

s->length -= entries;

deleted += entries;

continue;

}

//当执行的是模糊删除时,默认会多保留一些数据,也就是说,当传入的第3个参数为符号"~"时,会把最后一个需要删除的消息ID所在的节点数据保留,而精确删除则会把最后一个需要删除的消息ID之前的数据都删掉

 

if (approx) break;

 

/*我们必须将listpack中的单个条目标记为已删除。我们首先更新条目/删除的计数器*/

int64_t to_delete = s->length - maxlen;

serverAssert(to_delete < entries);

lp = lpReplaceInteger(lp,&p,entries-to_delete);

p = lpNext(lp,p); /* Seek deleted field. */

int64_t marked_deleted = lpGetInteger(p);

lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);

p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */

 

/* Skip all the master fields. */

int64_t master_fields_count = lpGetInteger(p);

p = lpNext(lp,p); /* Seek the first field. */

for (int64_t j = 0; j < master_fields_count; j++)

p = lpNext(lp,p); /* Skip all master fields. */

p = lpNext(lp,p); /* Skip the zero master entry terminator. */

 

/* 'p' is now pointing to the first entry inside the listpack.

* We have to run entry after entry, marking entries as deleted

* if they are already not deleted. */

while(p) {

int flags = lpGetInteger(p);

int to_skip;

 

/* 标记entry删除 */

if (!(flags & STREAM_ITEM_FLAG_DELETED)) {

flags |= STREAM_ITEM_FLAG_DELETED;

lp = lpReplaceInteger(lp,&p,flags);

deleted++;

s->length--;

if (s->length <= maxlen) break; /* Enough entries deleted. */

}

p = lpNext(lp,p); /* Skip ID ms delta. */

p = lpNext(lp,p); /* Skip ID seq delta. */

p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */

if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {

to_skip = master_fields_count;

} else {

to_skip = lpGetInteger(p);

to_skip = 1+(to_skip*2);

}

 

while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */

p = lpNext(lp,p); /* Skip the final lp-count field. */

}

 

/* 垃圾回收*/

entries -= to_delete;

marked_deleted += to_delete;

if (entries + marked_deleted > 10 && marked_deleted > entries/2) {

/* TODO: perform a garbage collection. */

}

/* Update the listpack with the new pointer. */

raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);

break; /* 到这里表示删除数量足够. */

}

raxStop(&ri);

return deleted;

}

18.3 分组命令原理分析

三个数据结构

streamCG 用于存储消费组相关信息、

streamConsumer 用于存储消费者相关信息、

streamNACK 用于存储未确认消息;

18.3.1 分组管理

消费组管理的xgroup命令,调用xgroupCommand函数;

void xgroupCommand(client *c) {

stream *s = NULL;

sds grpname = NULL;

streamCG *cg = NULL;

char *opt = c->argv[1]->ptr; /* Subcommand name. */

int mkstream = 0;

robj *o;

 

/* 1)校验,xgroup命令第2个参数共有5个选项,当选项不为HELP时,则需根据key读取对应的value值,并判断其类型是否为OBJ_STREAM,如果不是则报错。

当选项为CREATE且带了MKSTREAM参数时,则不判断value值是否存在与是否为Stream类型。

当选项为SETID DELCONSUMER时,则需要判断指定的组是否存在,不存在组信息则报错 */

if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {

if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {

addReplySubcommandSyntaxError(c);

return;

}

mkstream = 1;

grpname = c->argv[3]->ptr;

}

 

/* 当选项不为HELP时,根据key读取对应的value值,并判断是否为OBJ_STREAM,不是则报错。 */

if (c->argc >= 4) {

o = lookupKeyWrite(c->db,c->argv[2]);

if (o) {

if (checkType(c,o,OBJ_STREAM)) return;

s = o->ptr;

}

grpname = c->argv[3]->ptr;

}

 

/* Check for missing key/group. */

if (c->argc >= 4 && !mkstream) {

/* key必须存在,否则error */

if (s == NULL) {

error...

}

 

/* group不存在报错 */

if ((cg = streamLookupCG(s,grpname)) == NULL &&

(!strcasecmp(opt,"SETID") ||

!strcasecmp(opt,"DELCONSUMER")))

{

error....

return;

}

}

 

/* 2)根据不同参数做不同的处理。*/

if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {

     //①create 参数,创建一个新消费组

streamID id; //消息ID如为特殊符号"$",是则把该队列中最大的个id赋值给它

if (!strcmp(c->argv[4]->ptr,"$")) {

if (s) {

id = s->last_id;

}

} else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {

        //不为特殊符号,则检验传入id的合法性,不合法则报错。

return;

}

 

//指定了MKSTREAM参数,且db中该键值对不存在,创建一个类型为stream新对象存入db */

if (s == NULL) {

serverAssert(mkstream);

o = createStreamObject();

dbAdd(c->db,c->argv[2],o);

s = o->ptr;

signalModifiedKey(c,c->db,c->argv[2]);

}

//创建一个新的消费组,但此处逻辑有漏洞,使用s前需要校验其所属对象是否为stream

如不是则可能把内存写坏,导致redis-server直接挂掉

streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);

...

} else if (!strcasecmp(opt,"SETID") && c->argc == 5) {

//②setid 参数,修改某个消费组消费的last_id:

// 根据消费组名从s.cgroups这个Rax树中查找出streamCG,

修改字段last_id的值为参数指定的ID值即可。

streamID id;

if (!strcmp(c->argv[4]->ptr,"$")) {

id = s->last_id;

} else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK) {

return;

}

cg->last_id = id;

...

} else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {

//③destroy 参数,删除指定消费组:

//根据消费组姓名,从s.cgroups这个Rax树中删除并释放内存。

if (cg) {

raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL);

streamFreeCG(cg);

...

}

} else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {

//④delconsumer 参数,删除指定消费组中某个消费者

 

long long pending = streamDelConsumer(cg,c->argv[4]->ptr);

addReplyLongLong(c,pending);

server.dirty++;

notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",

c->argv[2],c->db->id);

} else if (c->argc == 2 && !strcasecmp(opt,"HELP")) {

//⑤help 参数

addReplyHelp(c, help);

} else {

addReplySubcommandSyntaxError(c);

}

}

 

 

内部函数1:streamCreateCG

最终创建消费组调用的是streamCreateCG函数,该函数的主要实现步骤如下

streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {

if (s->cgroups == NULL) s->cgroups = raxNew();

if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)

return NULL;

//1 ,初始化streamCG结构体,设置streamCG.last_id为参数指定的ID值;

streamCG *cg = zmalloc(sizeof(*cg));

cg->pel = raxNew();

cg->consumers = raxNew();

cg->last_id = *id;

// 2 ,往s.cgroups这个Rax树中写入新初始化的streamCG结构体,

其中key为分组名称,关联的值为streamCG结构体。

raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);

return cg;

}

 

内部函数2:streamDelConsumer

uint64_t streamDelConsumer(streamCG *cg, sds name) {

//根据消费组姓名从s.cgroups这个Rax树中查找出streamCG;

streamConsumer *consumer =

streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH);

if (consumer == NULL) return 0;

 

uint64_t retval = raxSize(consumer->pel);

 

/*根据消费者名从streamCG.Consumers这个Rax树中查找出消费者Consumers */

raxIterator ri;

raxStart(&ri,consumer->pel);

raxSeek(&ri,"^",NULL,0);

while(raxNext(&ri)) {

streamNACK *nack = ri.data;

// 迭代遍历Consumers.pel这个Rax树,从中删除并释放所有待确认的消息ID;

raxRemove(cg->pel,ri.key,ri.key_len,NULL);

streamFreeNACK(nack);

}

raxStop(&ri);

 

//根据消费者名把该消费者从streamCG.Consumers这个Rax树中删除。

raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL);

streamFreeConsumer(consumer);

return retval;

}

18.3.2 消费消息

有xreadgroup及xread两个命令,底层调用都是xreadCommand函数,实现主要分为如下4步。

define XREAD_BLOCKED_DEFAULT_COUNT 1000

void xreadCommand(client *c) {

1)判断执行的是哪条命令:如果传入参数中第一个字符串总长度为10,则代表的是xreadgrou命令。

int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */

robj *groupname = NULL;

robj *consumername = NULL;

/* 解析参数*/

for (int i = 1; i < c->argc; i++) {

...

}

...

2)遍历读取指定的key关联的值信息,并进行类型判断与参数校验,不为OBJ_STREAM则报错。

for (int i = streams_arg + streams_count; i < c->argc; i++) {

//遍历参数中每个key

int id_idx = i - streams_arg - streams_count;

robj *key = c->argv[i-streams_count];

robj *o = lookupKeyRead(c->db,key); //读取key关联的value

if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; //value类型判断

streamCG *group = NULL;

 

if (groupname) { // xreadgrou会指定消费组

if (o == NULL ||

(group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)

{

//消费组不存在报错,所以指定消费多个key时,需要每个key单独建立相同消费组名

error...

goto cleanup;

}

groups[id_idx] = group;

}

 

if (strcmp(c->argv[i]->ptr,"$") == 0) { //特殊符号"$"只能由xread命令使用

if (xreadgroup) {

error...

goto cleanup;

}

...

continue;

} else if (strcmp(c->argv[i]->ptr,">") == 0) {

if (!xreadgroup) { //特殊符号">"只能由xreadgroup命令使用

error...

goto cleanup;

}

...

continue;

}

//校验参数中指定ID格式是否合法

if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)

goto cleanup;

}

 

3)遍历指定的多个key,调用streamReplyWithRange函数。按照参数中指定的ID,从对应的Stream队列中读取count条数据。

/* 尝试同步服务于客户. */

size_t arraylen = 0;

void *arraylen_ptr = NULL;

for (int i = 0; i < streams_count; i++) {

robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);

if (o == NULL) continue;

stream *s = o->ptr;

streamID *gt = ids+i;

int serve_synchronously = 0;

int serve_history = 0;

 

/* 检查是否存在可以同步为client服务的条件 */

if (groups) {

if (gt->ms != UINT64_MAX ||

gt->seq != UINT64_MAX)

{

serve_synchronously = 1;

serve_history = 1;

} else if (s->length) {

streamID maxid, *last = &groups[i]->last_id;

streamLastValidID(s, &maxid);

if (streamCompareID(&maxid, last) > 0) {

serve_synchronously = 1;

*gt = *last;

}

}

} else if (s->length) {

/* 对于没有组的消费者,从流中提供至少一item,去同步服务。*/

streamID maxid;

streamLastValidID(s, &maxid);

if (streamCompareID(&maxid, gt) > 0) {

serve_synchronously = 1;

}

}

 

if (serve_synchronously) {

arraylen++;

if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);

/* start (包含在内) ID开始,调用函数处理 */

streamID start = *gt;

streamIncrID(&start);

 

if (c->resp == 2) addReplyArrayLen(c,2);

addReplyBulk(c,c->argv[streams_arg+i]);

streamConsumer *consumer = NULL;

if (groups) consumer = streamLookupConsumer(groups[i],

consumername->ptr,

SLC_NONE);

streamPropInfo spi = {c->argv[i+streams_arg],groupname};

int flags = 0;

if (noack) flags |= STREAM_RWR_NOACK;

if (serve_history) flags |= STREAM_RWR_HISTORY;

//进行范围匹配查找

streamReplyWithRange(c,s,&start,NULL,count,0,

groups ? groups[i] : NULL,

consumer, flags, &spi);

if (groups) server.dirty++;

}

}

 

/* We replied synchronously! Set the top array len and return to caller. */

if (arraylen) {

if (c->resp == 2)

setDeferredArrayLen(c,arraylen_ptr,arraylen);

else

setDeferredMapLen(c,arraylen_ptr,arraylen);

goto cleanup;

}

/* 4)如果添加了BLOCK关键字,则调用blockForKeys函数,把当前链接标识成阻塞状态,

并且记录解除阻塞时间节点,等着下一次时间事件触发看是否超时,或当新的数据写入时解除阻塞。

新数据写入时会触发handleClientsBlockedOnKeys函数,会判断此次新增的key是否为阻塞等待的key,如果是,则继续比较ID是否有更新,如有更新,则读取最新的数据回复给该客户端,并解除阻塞。*/

if (timeout != -1) {

/* 如果在MULTI / EXEC中,并且列表为空,将其视为超时(超时为0*/

if (c->flags & CLIENT_MULTI) {

addReplyNullArray(c);

goto cleanup;

}

blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,

timeout, NULL, ids);

/* 如果没有count 给一个count=1000 ,防止返回大量数据*/

c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;

 

if (groupname) {

incrRefCount(groupname);

incrRefCount(consumername);

c->bpop.xread_group = groupname;

c->bpop.xread_consumer = consumername;

c->bpop.xread_group_noack = noack;

} else {

c->bpop.xread_group = NULL;

c->bpop.xread_consumer = NULL;

}

goto cleanup;

}

 

/* 没有BLOCK,也没有可以提供的任何流。回复时带有超时 */

addReplyNullArray(c);

 

cleanup: /* Cleanup. */

preventCommandPropagation(c);

if (ids != static_ids) zfree(ids);

zfree(groups);

}

18.3.3 响应消息

确认消息xack,底层调用xackCommand函数,主要分为如下3步:

void xackCommand(client *c) {

streamCG *group = NULL;

1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错;

robj *o = lookupKeyRead(c->db,c->argv[1]);

if (o) {

if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */

2)根据读取出来的值,从s->cgroups这个Rax树中根据参数中组名查找出指定的分组信息streamCG

//streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,

sdslen(groupname));

group = streamLookupCG(o->ptr,c->argv[2]->ptr);

}

 

/* No key or group? Nothing to ack. */

if (o == NULL || group == NULL) {

addReply(c,shared.czero);

return;

}

 

for (int j = 3; j < c->argc; j++) {

streamID id;

if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;

}

 

int acknowledged = 0;

for (int j = 3; j < c->argc; j++) {

streamID id;

unsigned char buf[sizeof(streamID)];

if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)

serverPanic("StreamID invalid after check. Should not be possible.");

streamEncodeID(buf,&id);

3)从streamCG.pel这个Rax树中查找参数中指定的消息ID,如存在则删除,否则什么也不做。

streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));

if (nack != raxNotFound) {

raxRemove(group->pel,buf,sizeof(buf),NULL);

raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);

streamFreeNACK(nack);

acknowledged++;

server.dirty++;

}

}

addReplyLongLong(c,acknowledged);

}

18.3.4 获取未响应消息列表

命令xpending,底层调用xpendingCommand函数,该函数实现主要分为如下3步;

void xpendingCommand(client *c) {

int justinfo = c->argc == 3;

robj *key = c->argv[1];

robj *groupname = c->argv[2];

robj *consumername = (c->argc == 7) ? c->argv[6] : NULL;

streamID startid, endid;

long long count;

/* Start and stop, and the consumer, can be omitted(省略). */

if (c->argc != 3 && c->argc != 6 && c->argc != 7) {

addReply(c,shared.syntaxerr);

return;

}

 

/* 1)参数检验,校验参数个数,校验ID值格式是否正确。*/

if (c->argc >= 6) {

if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)

return;

if (count < 0) count = 0;

if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)

return;

if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)

return;

}

 

/* 2)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错,

类型正确则根据指定的消费组名查找出group. */

robj *o = lookupKeyRead(c->db,c->argv[1]);

streamCG *group;

if (o && checkType(c,o,OBJ_STREAM)) return;

 

/* XPENDING <key> <group> variant. */

if (justinfo) { //①参数个数为3,读取指定消费组的所有未响应消息列表

addReplyArrayLen(c,4);

/* Total number of messages in the PEL. */

addReplyLongLong(c,raxSize(group->pel));

/* First and last IDs. */

if (raxSize(group->pel) == 0) {

addReplyNull(c); /* Start. */

addReplyNull(c); /* End. */

addReplyNullArray(c); /* Clients. */

} else {

/* Start. */

raxIterator ri;

raxStart(&ri,group->pel);

raxSeek(&ri,"^",NULL,0);

raxNext(&ri);

streamDecodeID(ri.key,&startid);

addReplyStreamID(c,&startid);

 

/* End. */

raxSeek(&ri,"$",NULL,0);

raxNext(&ri);

streamDecodeID(ri.key,&endid);

addReplyStreamID(c,&endid);

raxStop(&ri);

 

/* Consumers with pending messages. */

raxStart(&ri,group->consumers);

raxSeek(&ri,"^",NULL,0);

void *arraylen_ptr = addReplyDeferredLen(c);

size_t arraylen = 0;

while(raxNext(&ri)) { // 从头到尾迭代group.pel这个Rax

streamConsumer *consumer = ri.data;

if (raxSize(consumer->pel) == 0) continue;

addReplyArrayLen(c,2);

addReplyBulkCBuffer(c,ri.key,ri.key_len);

addReplyBulkLongLong(c,raxSize(consumer->pel));

arraylen++;

}

setDeferredArrayLen(c,arraylen_ptr,arraylen);

raxStop(&ri);

}

}

/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */

else {

streamConsumer *consumer = NULL;

if (consumername) { //找出消费者

consumer = streamLookupConsumer(group,

consumername->ptr,

SLC_NOCREAT|SLC_NOREFRESH);

 

/* 如果提到了消费者名称但它不存在,返回一个空数组*/

if (consumer == NULL) {

addReplyArrayLen(c,0);

return;

}

}

//消费者存在取消费者的未确认消息列表,否则取grouppel

rax *pel = consumer ? consumer->pel : group->pel;

unsigned char startkey[sizeof(streamID)];

unsigned char endkey[sizeof(streamID)];

raxIterator ri;

mstime_t now = mstime();

//找出迭代的开始ID与结束ID

streamEncodeID(startkey,&startid);

streamEncodeID(endkey,&endid);

raxStart(&ri,pel);

raxSeek(&ri,">=",startkey,sizeof(startkey));

void *arraylen_ptr = addReplyDeferredLen(c);

size_t arraylen = 0;

//从开始ID迭代,count0或者没有下一个元素

while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {

streamNACK *nack = ri.data;

 

arraylen++;

count--; //计数递减

addReplyArrayLen(c,4);

 

/* Entry ID. */

streamID id;

streamDecodeID(ri.key,&id);

addReplyStreamID(c,&id);

 

/* Consumer name. */

addReplyBulkCBuffer(c,nack->consumer->name,

sdslen(nack->consumer->name));

 

/* Milliseconds elapsed since last delivery. */

mstime_t elapsed = now - nack->delivery_time;

if (elapsed < 0) elapsed = 0;

addReplyLongLong(c,elapsed);

 

/* Number of deliveries. */

addReplyLongLong(c,nack->delivery_count);

}

raxStop(&ri);

setDeferredArrayLen(c,arraylen_ptr,arraylen);

}

}

18.3.5 修改指定未响应消息归属

命令是xclaim,对应的函数是xclaimCommand函数;

void xclaimCommand(client *c) {

streamCG *group = NULL;

1)根据key读取对应值,并判断类型是否为OBJ_STREAM,如果不是则报错

robj *o = lookupKeyRead(c->db,c->argv[1]);

long long minidle; /* Minimum idle time argument. */

long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */

mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */

int force = 0;

int justid = 0;

 

if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */

2)根据消费组名称取出消费组信息:

group = streamLookupCG(o->ptr,c->argv[2]->ptr);

 

3)参数解析与参数校验

if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,

"Invalid min-idle-time argument for XCLAIM")

!= C_OK) return;

if (minidle < 0) minidle = 0;

 

int j;

for (j = 5; j < c->argc; j++) {

streamID id;

if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;

}

int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */

mstime_t now = mstime();

streamID last_id = {0,0};

int propagate_last_id = 0;

 

for (; j < c->argc; j++) {

int moreargs = (c->argc-1) - j; /* Number of additional arguments. */

char *opt = c->argv[j]->ptr;

if (!strcasecmp(opt,"FORCE")) {

force = 1;

} else if (!strcasecmp(opt,"JUSTID")) {

justid = 1;

} else if (!strcasecmp(opt,"IDLE") && moreargs) {

j++;

if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,

"Invalid IDLE option argument for XCLAIM")

!= C_OK) return;

deliverytime = now - deliverytime;

} else if (!strcasecmp(opt,"TIME") && moreargs) {

j++;

if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,

"Invalid TIME option argument for XCLAIM")

!= C_OK) return;

} else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {

j++;

if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,

"Invalid RETRYCOUNT option argument for XCLAIM")

!= C_OK) return;

} else if (!strcasecmp(opt,"LASTID") && moreargs) {

j++;

if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return;

} else {

addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);

return;

}

}

 

if (streamCompareID(&last_id,&group->last_id) > 0) {

group->last_id = last_id;

propagate_last_id = 1;

}

 

if (deliverytime != -1) {

if (deliverytime < 0 || deliverytime > now) deliverytime = now;

} else {

deliverytime = now;

}

 

/* Do the actual claiming. */

streamConsumer *consumer = NULL;

void *arraylenptr = addReplyDeferredLen(c);

size_t arraylen = 0;

5)遍历指定的ID,依次修改其所属消费者

for (int j = 5; j <= last_id_arg; j++) {

streamID id;

unsigned char buf[sizeof(streamID)];

//校验ID是否合法

if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)

serverPanic("StreamID invalid after check. Should not be possible.");

streamEncodeID(buf,&id); //赋值给buf

 

/* group->pel中查找ID. */

streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));

 

/* 指定的ID不在待确认列表,若设置了force关键字,则继续判断是否在stream队列中存在,

     存在则插入到未确认消息列表中 */

if (force && nack == raxNotFound) {

streamIterator myiterator;

streamIteratorStart(&myiterator,o->ptr,&id,&id,0);

int64_t numfields;

int found = 0;

streamID item_id;

if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;

streamIteratorStop(&myiterator);

 

/* Item must exist for us to create a NACK for it. */

if (!found) continue;

/* Create the NACK. */

nack = streamCreateNACK(NULL);

raxInsert(group->pel,buf,sizeof(buf),nack,NULL);

}

 

if (nack != raxNotFound) {

/* 已存在待确认消息列表则把该消息从原有所属的消费者pel列表中删除,

并插入到新指定的消费者pel列表中 */

if (nack->consumer && minidle) {

mstime_t this_idle = now - nack->delivery_time;

if (this_idle < minidle) continue;

}

/* Remove the entry from the old consumer.

* Note that nack->consumer is NULL if we created the

* NACK above because of the FORCE option. */

if (nack->consumer)

raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);

/* Update the consumer and idle time. */

if (consumer == NULL)

consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);

nack->consumer = consumer;

nack->delivery_time = deliverytime;

/* Set the delivery attempts counter if given, otherwise

* autoincrement unless JUSTID option provided */

if (retrycount >= 0) {

nack->delivery_count = retrycount;

} else if (!justid) {

nack->delivery_count++;

}

/* Add the entry in the new consumer local PEL. */

raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);

/* Send the reply for this entry. */

if (justid) {

addReplyStreamID(c,&id);

} else {

size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,

NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);

if (!emitted) addReplyNull(c);

}

arraylen++;

 

/* Propagate this change. */

streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);

propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */

server.dirty++;

}

}

if (propagate_last_id) {

streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);

server.dirty++;

}

setDeferredArrayLen(c,arraylenptr,arraylen);

preventCommandPropagation(c);

}

18.4 本章小结

本章整体性讲解了Stream相关的命令的源码实现;

原文地址:https://www.cnblogs.com/coloz/p/13812858.html