Redis5设计与源码分析 (第14章 集合相关命令的实现)

14.1 相关命令介绍

Redis的set实现了无序集合,集合成员唯一。set底层基于dict和intset,在学习集合命令前,需要先了解dict和intset的结构;

集合元素为字符串和数字,分别用dict和intset存储。对于单个集合,Redis实现了元素的新增、删除、遍历等操作;对于多个集合,Redis实现了集合间求交集、并集和差集等操作。

1.添加成员

sadd命令的作用是为集合添加新成员。

格式:sadd key member [member ...]

说明: 将一个或多个元素加入到集合key当中,返回加入成功的个数。

示例:>sadd testKey a b

源码分析:

函数入口在saddCommand。

{"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},

saddCommond基本的校验后,先得到一个有效的set,并将待添加元素依次添加进set:

void saddCommand(client *c) {

robj *set;

int j, added = 0;

set = lookupKeyWrite(c->db,c->argv[1]);

if (set == NULL) {

set = setTypeCreate(c->argv[2]->ptr);

dbAdd(c->db,c->argv[1],set);

} else {

if (set->type != OBJ_SET) {

addReply(c,shared.wrongtypeerr);

return;

}

}

for (j = 2; j < c->argc; j++) {

if (setTypeAdd(set,c->argv[j]->ptr)) added++;

}

...

}

 

源码的核心函数是setTypeAdd,由于集合底层有两种实现方式(dict和intset)'

int setTypeAdd(robj *subject, sds value) {

long long llval;

if (subject->encoding == OBJ_ENCODING_HT) { //类型

//当encoding方式为OBJ_ENCODING_HT时,set的底层用的是字典,将key直接添加进dict。注意的是,用dict存储集合元素时,元素值存储于字典的key中,字典的value值为null

dict *ht = subject->ptr;

dictEntry *de = dictAddRaw(ht,value,NULL);

if (de) {

dictSetKey(ht,de,sdsdup(value));

dictSetVal(ht,de,NULL); //字典的value值为null

return 1;

}

} else if (subject->encoding == OBJ_ENCODING_INTSET) { //INTSET类型

// ②当encoding方式为OBJ_ENCODING_INTSET时,又有两种情况:若新增的元素本身非数字(value转long long失败),需要通过setTypeConvert转化后再存储;若新增的元素本身是数字,则用intsetAdd新增元素。且当新增成功,但intset的元素个数过多(个数大于server.set_max_intset_entries时。该参数可配置,默认为512),同样会触发setType-Convert转 化:

if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {

uint8_t success = 0;

subject->ptr = intsetAdd(subject->ptr,llval,&success);

if (success) {

//intset转dict.

if (intsetLen(subject->ptr) > server.set_max_intset_entries)

setTypeConvert(subject,OBJ_ENCODING_HT);

return 1;

}

} else {

//value转long long失败,触发转化 intset转dict

setTypeConvert(subject,OBJ_ENCODING_HT);

     //intset转dict

serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);

return 1;

}

} else {

serverPanic("Unknown set encoding");

}

return 0;

}

 

setTypeConvert将集合的编码方式由OBJ_ENCODING_INTSET改为OBJ_ENCODING_HT。其思路是新建一个dict,遍历旧intset中的所有元素,并添加至新dict中

void setTypeConvert(robj *setobj, int enc) {

dict *d = dictCreate(&setDictType,NULL);

dictExpand(d,intsetLen(setobj->ptr));

si = setTypeInitIterator(setobj);

while (setTypeNext(si,&element,&intele) != -1) {

element = sdsfromlonglong(intele);

serverAssert(dictAdd(d,element,NULL) == DICT_OK);

}

...

}

为了避免转化过程中发生字典的rehash操作,代码中用dictExpand主动扩容;

2.删除成员

srem命令的作用是删除集合中的指定成员。

格式:srem key member [member ...]

说明: 删除集合key中的一个或多个元素,返回删除成功的元素数量。

示例:>srem testKey testValue

 

源码分析: 函数入口在sremCommand。

sremCommand函数与saddCommand函数几乎一致:同样完成基本的校验后,得到一个有效的set,调用setTypeRemove方法,将待删除元素依次从set中移除

void sremCommand(client *c) {

... //参数校验

for (j = 2; j < c->argc; j++) {

if (setTypeRemove(set,c->argv[j]->ptr)) {

deleted++;

if (setTypeSize(set) == 0) {

dbDelete(c->db,c->argv[1]);

//keyremoved标识,用于最后通知notifyKeyspaceEvent。

keyremoved = 1;

break;

}

}

} ....

}

 

主要的移除逻辑在setTypeRemove中完成。移除时同样分OBJ_ENCODING_HT和OBJ_ENCODING_INTSET两种情况处理

int setTypeRemove(robj *setobj, sds value) {

long long llval;

if (setobj->encoding == OBJ_ENCODING_HT) {

//若encoding为OBJ_ENCODING_HT时,则调用dictDelete处理删除元素时,会检查字典容量,字典容量不足也会触发扩容操作

if (dictDelete(setobj->ptr,value) == DICT_OK) {

if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr);

return 1;

}

} else if (setobj->encoding == OBJ_ENCODING_INTSET) {

//当encoding为OBJ_ENCODING_INTSET时,调用intsetRemove处理

if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {

int success;

setobj->ptr = intsetRemove(setobj->ptr,llval,&success);

if (success) return 1;

}

} .....

return 0;

}

3.(随机)获取成员

随机获取集合一个或多个成员的命令。

(1)spop/srandmember命令

spop与srandmember命令都会返回一个随机元素,故统一讲解。

不同之处,spop会将该返回的元素从集合中移除,而srandmember不会。

spop命令:

说明: 删除并返回集合中的一个或多个随机元素。

格式:spop key [count]

示例:> smembers test

源码分析: 函数入口在spopCommand

处理移除并返回一个元素的逻辑

先校验arg中的set,之后通过setTypeRandomElement返回一个随机元素,最后将该元素从集合中删除。删除时会检查集合编码,若集合encoding为OBJ_ENCODING_INTSET,则调用intsetRemove函数删除;否则调用setTypeRemove函数删除。

void spopCommand(client *c) {

robj *set, *ele, *aux;

sds sdsele;

int64_t llele;

int encoding;

//参数校验

if (c->argc == 3) {

spopWithCountCommand(c); //移除并返回多个元素的函数入口

return;

} else if (c->argc > 3) {

addReply(c,shared.syntaxerr);

return;

}

/* set不存在或类型错误则返回空 */

if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))

== NULL || checkType(c,set,OBJ_SET)) return;

/* 随机获取一个元素。当元素是数字时,返回值保存在*llele中;

当元素是字符串时,返回值保存在*sdsele中 ;

集合有intset和字典两种实现,setTypeRandomElement调用不同的随机获取函数: */

encoding = setTypeRandomElement(set,&sdsele,&llele);

/* 删除元素,获取到随机元素后,重新创建了变量ele来保存它 */

if (encoding == OBJ_ENCODING_INTSET) {

ele = createStringObjectFromLongLong(llele);

set->ptr = intsetRemove(set->ptr,llele,NULL);

} else {

ele = createStringObject(sdsele,sdslen(sdsele));

setTypeRemove(set,ele->ptr);

}

...

}

 

字典和inset的随机函数分别为dictGetRandomKey与intsetRandom。其中dictGetRan-domKey做了两次随机第1次随机找到非空的bucket,bucket存放着一个链表,第2次随机返回bucket链表中的元素。

处理返回多个元素的逻辑

函数入口在spopWithCountCommand,Redis分4种情况讨论:

随机元素数量 =0 ,返回空("empty list or set"):

if (count == 0) {

addReply(c,shared.emptyset[c->resp]);

return;

}

随机元素数量 >= 集合基数 时,返回整个集合,并删除集合:

if (count >= size) {

/* 调用求并集函数,返回本集合所有元素 */

sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION);

/* 删除集合 */

dbDelete(c->db,c->argv[1]);

notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);

}

0< 随机元素数量< 某个临界值 , Redis循环调用random及remove函数查询并删除元素:

if (remaining*SPOP_MOVE_STRATEGY_MUL > count) {

while(count--) {

/*找出和删除*/

encoding = setTypeRandomElement(set,&sdsele,&llele);

if (encoding == OBJ_ENCODING_INTSET) {

addReplyBulkLongLong(c,llele);

objele = createStringObjectFromLongLong(llele);

set->ptr = intsetRemove(set->ptr,llele,NULL); //删除

} else {

addReplyBulkCBuffer(c,sdsele,sdslen(sdsele));

objele = createStringObject(sdsele,sdslen(sdsele));

setTypeRemove(set,sdsele); //删除

}

...

}

}

某个临界值< 随机元素数量< 集合基数, Redis将"反向处理"——创建一个新的set,将"剩余"元素加入该集合,返回原集合并在删除原集合后,将新集合转化为原集合: (看上去更省时省力)

while(remaining--) {

/* 将剩余元素从原集合中删除,并写入新集合 */

encoding = setTypeRandomElement(set,&sdsele,&llele);

if (encoding == OBJ_ENCODING_INTSET) {

sdsele = sdsfromlonglong(llele);

} else {

sdsele = sdsdup(sdsele);

}

if (!newset) newset = setTypeCreate(sdsele);

setTypeAdd(newset,sdsele); //写入新集合

setTypeRemove(set,sdsele); //从原集合删除

sdsfree(sdsele);

}

/* 迭代原集合,将集合元素写入client */

setTypeIterator *si;

si = setTypeInitIterator(set);

while((encoding = setTypeNext(si,&sdsele,&llele)) != -1) {

if (encoding == OBJ_ENCODING_INTSET) {

addReplyBulkLongLong(c,llele);

objele = createStringObjectFromLongLong(llele);

} else {

addReplyBulkCBuffer(c,sdsele,sdslen(sdsele));

objele = createStringObject(sdsele,sdslen(sdsele));

} ...

}

setTypeReleaseIterator(si);

/* 新集合覆盖原集合 */

dbOverwrite(c->db,c->argv[1],newset);

}

临界值的计算公式如下 :

unsigned long remaining = size-count; //剩余元素数量 = 集合容量-返回的随机元素数量Elements left after SPOP.

if (remaining*SPOP_MOVE_STRATEGY_MUL > count) //SPOP_MOVE_STRATEGY_MUL是系数,值为5

简单来说,当返回的随机元素数量大于集合剩余元素数量的5倍时,Redis选择"反向处理"。

 

srandmember命令:

格式:srandmember key [count]

说明: 返回集合中的一个或多个随机元素。

示例:> smembers test

>srandmember test

源码分析:

srandmember与spop几乎一致,调用setTypeRandomElement函数,去掉了删除逻辑。

 

(2)smembers命令

格式:smembers key

说明: 返回集合key中的所有成员。不存在的key被视为空集合。

示例:> smembers test

源码分析: 函数入口在sinterCommand。

该操作是"多集合取交集"的一种特殊情况,即"对一个集合取交集时返回它本身"。Redis底层调用sinterGenericCommand统一处理,在14.2.1节中一并介绍;

 

4.查找成员

查找成员的命令是sismember,命令的作用是判断元素是否在指定集合中

格式:sismember key member

说明: 判断member元素是否是集合key的成员,如果是返回1,否则返回0。

示例:> sadd test a b c

源码分析: 函数入口在sismemberCommand

集合有intset和字典两种实现,对于OBJ_ENCODING_HT和OBJ_ENCODING_INTSET,分别调用dictFindintsetFind两函数判断即可。

sismemberCommand核心调用的函数是setTypeIsMember;

int setTypeIsMember(robj *subject, sds value) {

long long llval;

if (subject->encoding == OBJ_ENCODING_HT) {

return dictFind((dict*)subject->ptr,value) != NULL;

} else if (subject->encoding == OBJ_ENCODING_INTSET) {

if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { //string2ll,转成整型

return intsetFind((intset*)subject->ptr,llval);

}

} else {

serverPanic("Unknown set encoding");

}

return 0;

}

5.移动成员

smove命令的作用是移动元素至指定集合。

格式:smove source destination member

说明: 将member元素从source集合移动到destination集合

示例:> smove test1set test2set "hello"

源码分析: 函数入口在smoveCommand。

函数逻辑分两部分。

①校验,考虑各种无法移动的情况。包括源集合不存在、集合类型错误、源集合与目标集合相同等情况;

②从源集合中删除元素,并将元素插入目标集合。若目标集合不存在,创建新集合后再插入。

void smoveCommand(client *c) {

robj *srcset, *dstset, *ele;

srcset = lookupKeyWrite(c->db,c->argv[1]);

dstset = lookupKeyWrite(c->db,c->argv[2]);

ele = c->argv[3];

/*源集合不存在 return 0 */

if (srcset == NULL) {

addReply(c,shared.czero);

return;

}

/* 集合类型错误. */

if (checkType(c,srcset,OBJ_SET) ||

(dstset && checkType(c,dstset,OBJ_SET))) return;

/*源集合与目标集合相同 */

if (srcset == dstset) {

addReply(c,setTypeIsMember(srcset,ele->ptr) ?

shared.cone : shared.czero);

return;

}

/* 元素不能从原集合中删除 . */

if (!setTypeRemove(srcset,ele->ptr)) {

addReply(c,shared.czero);

return;

}

notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);

/*空时从数据库中删除src集*/

if (setTypeSize(srcset) == 0) {

dbDelete(c->db,c->argv[1]);

notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);

}

/*目标集合不存在,创建新集合*/

if (!dstset) {

dstset = setTypeCreate(ele->ptr);

dbAdd(c->db,c->argv[2],dstset);

}

...

/* ele成功添加到dstset时,一个额外的键已更改 */

if (setTypeAdd(dstset,ele->ptr)) {

server.dirty++;

notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id);

}

addReply(c,shared.cone);

}

6.获取基数

scard命令的作用是获取集合中的元素数量(集合基数)。

格式:scard key

说明: 返回集合key的基数(集合中元素的数量)。

示例:>scard test1

源码分析: 函数入口在scardCommand。

scardCommand函数先做基本的校验,最终调用setTypeSize函数查询基数。由于集合底层通过字典和intset实现,setTypeSize也分情况调用不同的查询函数:

unsigned long setTypeSize(const robj *subject) {

if (subject->encoding == OBJ_ENCODING_HT) {

return dictSize((const dict*)subject->ptr); //查询dict大小 h[0]和h[1] used数量之和

} else if (subject->encoding == OBJ_ENCODING_INTSET) {

return intsetLen((const intset*)subject->ptr); //查询intset大小,直接返回统计成员变量

} else {

serverPanic("Unknown set encoding");

}

}

 

uint32_t intsetLen(const intset *is) {

return intrev32ifbe(is->length);

}

 

7.遍历成员

sscan命令用于增量遍历集合元素。

格式:sscan cursor [match pattern] [count count]

说明: sscan每次执行返回少量元素和一个新的游标,该游标用于下次遍历时延续之前的遍历过程,指定游标为0时,表示开始一轮新的迭代,而当服务器返回游标值为0时,表示本轮遍历结束。

示例:> sscan test1 0

源码分析:

函数入口在sscanCommand,最终调用scanGenericCommand:

scanGenericCommand(c,set,cursor);

scanGenericCommand的函数逻辑主要分为4步:

①参数校验;

②获取本次遍历的随机元素;

③根据pattern过滤元素;

④返回相应元素。

void scanGenericCommand(client *c, robj *o, unsigned long cursor) {

/* Sep 1: Parse options. */

    .....

/* Step 2: Iterate the collection. 如果是ziplist则由少量元素组成 . */

if (ht) {//hashtable类型

void *privdata[2];

/* 最大迭代数量为10倍count ,防止哈希表稀疏填充, 避免浪费时间不返回. */

long maxiterations = count*10;

privdata[0] = keys; //存放结果元素的链表

privdata[1] = o;

do { //dictscan将随机返回一个字典元素

cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);

} while (cursor &&

maxiterations-- &&

listLength(keys) < (unsigned long)count);

} else if (o->type == OBJ_SET) {

    //编码是inset类型

int pos = 0;

int64_t ll;

while(intsetGet(o->ptr,pos++,&ll))

//将intset的所有元素全部加入结果集,并将curosr置为0(代表本次遍历结束)

listAddNodeTail(keys,createStringObjectFromLongLong(ll));

cursor = 0;

} else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {

unsigned char *p = ziplistIndex(o->ptr,0);

unsigned char *vstr;

unsigned int vlen;

long long vll;

while(p) {

ziplistGet(p,&vstr,&vlen,&vll);

listAddNodeTail(keys,

(vstr != NULL) ? createStringObject((char*)vstr,vlen) :

createStringObjectFromLongLong(vll));

p = ziplistNext(o->ptr,p);

}

cursor = 0;

} else {

serverPanic("Not handled encoding in SCAN.");

}

/* Step 3: 过滤元素 . */

node = listFirst(keys);

while (node) {

robj *kobj = listNodeValue(node);

nextnode = listNextNode(node);

int filter = 0;

/* 过滤不匹配pattern的元素 . */

if (!filter && use_pattern) {

if (sdsEncodedObject(kobj)) {

if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))

filter = 1; //不匹配,过滤

} else {

char buf[LONG_STR_SIZE];

int len;

 

serverAssert(kobj->encoding == OBJ_ENCODING_INT);

len = ll2string(buf,sizeof(buf),(long)kobj->ptr);

if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;

}

}

/* 过滤不想要的元素. */

if (!filter && o == NULL && typename){

robj* typecheck = lookupKeyReadWithFlags(c->db, kobj, LOOKUP_NOTOUCH);

char* type = getObjectTypeName(typecheck);

if (strcasecmp((char*) typename, type)) filter = 1;

}

/*过滤已过期的元素(key). */

if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;

 

/* 移除被过滤的元素 */

if (filter) {

decrRefCount(kobj);

listDelNode(keys, node);

}

...

/* Step 4: 返回客户端 */

...

}

14.2 集合运算

Redis实现了集合求交集、求差集、求并集三种基本运算。

14.2.1 交集

1.sinter

sinter命令的作用是求多集合的交集。

格式:sinter key [key ...]

说明: sinter命令返回一个集合的全部成员,该集合是所有给定集合的交集。不存在的key被视为空集。当给定集合中有一个空集时,结果也为空集(根据集合运算定律)。

示例:

> sadd testset1 a b c

> sadd testset2 a b d

> sadd testset3 a e f

> sinter testset1 testset2 testset3

源码分析: 函数入口在sinterCommand。

 

图14-1 求交集流程图

如图14-1所示,集合求交集的基本逻辑是:

先将集合按基数大小排序,以一个集合(基数最小)为标准,遍历该集合中的所有元素,依次判断该元素是否在其余所有集合中:如果不在任一集合,舍弃该元素,否则加入结果集。

排序的意义在于减小遍历元素的个数交集不可能大于任一集合,故遍历最小集合是效率最高的。

void sinterGenericCommand(client *c, robj **setkeys,

unsigned long setnum, robj *dstkey) {

robj **sets = zmalloc(sizeof(robj*)*setnum); /所有待计算的集合数组

...

// 1)参数校验,排除空集合。函数首先校验集合是否存在或为空(为空删除),将存在且不为空的集合依次放入sets数组中:

for (j = 0; j < setnum; j++) {

robj *setobj = dstkey ?

lookupKeyWrite(c->db,setkeys[j]) :

lookupKeyRead(c->db,setkeys[j]);

if (!setobj) {

zfree(sets); //释放并删除空的key

... return;

} ...

sets[j] = setobj; //将集合放入待处理集合数组中

}

/* 2)将集合按基数从小到大排序,提升性能,以最小的为标尺,方便接下来依次比较: */

qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);

...

/* 3)遍历最小集合中的所有元素,依次判断是否在其余集合中,只要元素不在任一集合中,就不满足交集的条件,舍弃*/

si = setTypeInitIterator(sets[0]);

while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {

for (j = 1; j < setnum; j++) {

if (sets[j] == sets[0]) continue; //略过自己

if (encoding == OBJ_ENCODING_INTSET) {

/* intset with intset is simple... and fast */

if (sets[j]->encoding == OBJ_ENCODING_INTSET &&

!intsetFind((intset*)sets[j]->ptr,intobj))

     { break; // 不在则直接退出for循环,舍弃该元素, }

else if (sets[j]->encoding == OBJ_ENCODING_HT) {

elesds = sdsfromlonglong(intobj); //将int转为sdsobj

if (!setTypeIsMember(sets[j],elesds)) {

sdsfree(elesds); break;

}

sdsfree(elesds);

}

} else if (encoding == OBJ_ENCODING_HT) {

if (!setTypeIsMember(sets[j],elesds)) { break; }

}

}

/* 有当该元素在其余所有集合中都出现时,才将其加入目标集合r */

//stset中存放了最终的结果。注意在判断元素是否在集合中时,同样区分了OBJ_ENCODING_HT和OBJ_ENCODING_INTSET两种类型,调用了不同的底层方法;

if (j == setnum) {

if (!dstkey) {

if (encoding == OBJ_ENCODING_HT)

addReplyBulkCBuffer(c,elesds,sdslen(elesds));

else addReplyBulkLongLong(c,intobj);

cardinality++;

} else {

if (encoding == OBJ_ENCODING_INTSET) {

elesds = sdsfromlonglong(intobj);

setTypeAdd(dstset,elesds);

sdsfree(elesds);

} else { setTypeAdd(dstset,elesds); }

}

}

}

setTypeReleaseIterator(si);

 

//如果需要存储到目标集合, sinterstore命令用到

if (dstkey) {

/* 如果交集不是空集,请将结果集存储到目标中. */

int deleted = dbDelete(c->db,dstkey); //若指定集合key存在,先删除

if (setTypeSize(dstset) > 0) {

dbAdd(c->db,dstkey,dstset); ... //将返回结果写入指定集合中

} else { ... }

} else {

setDeferredSetLen(c,replylen,cardinality);

}

zfree(sets);

}

 

2.sinterstore

sinterstore命令与sinter命令类似,不同之处在于它将结果保存至指定集合,而不是简单返回。

格式:sinterstore destination key [key ...]

示例:

> sadd test1 a b c

> sadd test2 a b d

> sadd test3 a e f

> sinterstore test4 test1 test2 test3

> smembers test4

源码分析: sinterstore与sinter的源码处理在同一位置,只是返回前sinterstore将结果集保存。

 

14.2.2 并集

sunion命令的作用是求多集合的并集。

格式:sunion key [key ...]

说明: 返回一个集合的全部成员,该集合是所有给定集合的并集。不存在的key被视为空集。

源码分析:

函数入口在sunionCommand,最终调用sunionDiffGenericCommand。

求并集的基本思路是

遍历所有集合,调用setTypeAdd将所有元素依次插入结果集,插入过程中会去重,自然得到并集。sunionsdiff底层调用了同一函数sunionDiff-GenericCommand。

void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,

robj *dstkey, int op) {

// 校验集合,排除空集

for (j = 0; j < setnum; j++) {

robj *setobj = dstkey ?

lookupKeyWrite(c->db,setkeys[j]) :

lookupKeyRead(c->db,setkeys[j]);

if (!setobj) {

sets[j] = NULL;

continue;

}

if (checkType(c,setobj,OBJ_SET)) {

zfree(sets);

return;

}

sets[j] = setobj;

}

/* Select what DIFF algorithm to use. //这里是差集算法

sdiff为了提高性能,给出了两种策略;

策略一: 将A集合中元素在B1……Bn集合间一一查找,将查找不到的元素加入结果集。复杂度为O(M*n)。

策略二: 先将A集合加入结果集,再将B1……Bn集合中的所有元素在结果集中一一查找,将查到的元素从结果集中删除。复杂度为O(N)。

在算法选择时,大的逻辑是:如果A集合比较小,遍历A更划算;反之遍历其余集合更划算。

 

if (op == SET_OP_DIFF && sets[0]) {

//algo_X_work代表了该策略的权重,分值越小代表效率越高

long long algo_one_work = 0, algo_two_work = 0;

for (j = 0; j < setnum; j++) {

if (sets[j] == NULL) continue;

algo_one_work += setTypeSize(sets[0]);

algo_two_work += setTypeSize(sets[j]);

}

algo_one_work /= 2; /*算法algo_one_work效率更好,给其赋予更高的权重*/

diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;

if (diff_algo == 1 && setnum > 1) {

qsort(sets+1,setnum-1,sizeof(robj*),

qsortCompareSetsByRevCardinality);

}

}

 

dstset = createIntsetObject();

 

//这里是并集

if (op == SET_OP_UNION) {

/* 将每个集合的每个元素添加到临时集合即可。. */

for (j = 0; j < setnum; j++) {

if (!sets[j]) continue; /* 集合为空跳过 */

si = setTypeInitIterator(sets[j]);

while((ele = setTypeNextObject(si)) != NULL) {

if (setTypeAdd(dstset,ele)) cardinality++; //将元素插入结果集

sdsfree(ele);

}

setTypeReleaseIterator(si);

}

} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {

/* 策略一源码 *

si = setTypeInitIterator(sets[0]);

while((ele = setTypeNextObject(si)) != NULL) {

for (j = 1; j < setnum; j++) {

if (!sets[j]) continue; /*集合为空则跳过. */

if (sets[j] == sets[0]) break; /* 比较集合是它本身,则跳过! */

if (setTypeIsMember(sets[j],ele)) break; //元素在比较集合中,抛弃该元素

}

if (j == setnum) {

/* 全部比较完成后,元素依然存在,则该元素为差集元素. */

setTypeAdd(dstset,ele);

cardinality++;

}

sdsfree(ele);

}

setTypeReleaseIterator(si);

} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {

/* 策略二源码 */

for (j = 0; j < setnum; j++) {

if (!sets[j]) continue; /*集合为空则跳过 */

si = setTypeInitIterator(sets[j]);

while((ele = setTypeNextObject(si)) != NULL) {

if (j == 0) { //j=0是A数组

if (setTypeAdd(dstset,ele)) cardinality++; //先将A集合加入结果集

} else {

//将在A集合中,且在其余集合中的元素从结果集中移除

if (setTypeRemove(dstset,ele)) cardinality--;

}

sdsfree(ele);

}

setTypeReleaseIterator(si);

if (cardinality == 0) break;

}

}

 

/* Output the content of the resulting set, if not in STORE mode */

if (!dstkey) {

addReplySetLen(c,cardinality);

si = setTypeInitIterator(dstset);

while((ele = setTypeNextObject(si)) != NULL) {

addReplyBulkCBuffer(c,ele,sdslen(ele));

sdsfree(ele);

}

setTypeReleaseIterator(si);

server.lazyfree_lazy_server_del ? freeObjAsync(dstset) :

decrRefCount(dstset);

} else {

/* If we have a target key where to store the resulting set

* create this key with the result set inside */

int deleted = dbDelete(c->db,dstkey);

if (setTypeSize(dstset) > 0) {

dbAdd(c->db,dstkey,dstset);

addReplyLongLong(c,setTypeSize(dstset));

notifyKeyspaceEvent(NOTIFY_SET,

op == SET_OP_UNION ? "sunionstore" : "sdiffstore",

dstkey,c->db->id);

} else {

decrRefCount(dstset);

addReply(c,shared.czero);

if (deleted)

notifyKeyspaceEvent(NOTIFY_GENERIC,"del",

dstkey,c->db->id);

}

signalModifiedKey(c,c->db,dstkey);

server.dirty++;

}

zfree(sets);

}

 

 

14.2.3 差集

sdiff命令的作用是求集合间的差集。

说明: 返回一个集合的全部成员,该集合是所有给定集合之间的差集。

格式:SDIFF key [key ...]

源码分析: 函数入口在sinterCommand,最终调用sunionDiffGenericCommand

求差集的逻辑跟求交集有类似之处,将待求差集的集合A作为"标尺",并将给定集合按基数大小排序,然后筛选掉不符合条件的元素。需要注意的是以下方面。

1)求交集时,按基数大小对给定集合进行升序排序:

qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);

2)求差集时,按基数大小对给定集合进行降序排序

qsort(sets+1,setnum-1,sizeof(robj*),

qsortCompareSetsByRevCardinality);

求差集时,我们从大集合开始比较(找到的概率更高,一旦匹配成功,说明该元素不是差集元素,省去了往后比较的时间)。

表14-1 筛选策略

14.3 本章小结

单集合的操作和多集合间的运算。集合底层基于dict和intset两基本数据结构插入和删除的效率也依赖dict与intset。

  

原文地址:https://www.cnblogs.com/coloz/p/13812852.html