Redis5设计与源码分析 (第7章 quicklist的实现)

7.1 quicklist简介

Redis对外提供的6种基本数据结构List的底层实现;

quicklist由List和ziplist结合而成;

  1. List简介

各对象按线性顺序排列

  1. quicklist简介

quicklist是双向链表,链表中的每个节点是一个ziplist结构;

当ziplist节点个数过多,quicklist退化为双向链表;

当ziplist元素个数过少时,quicklist可退化为ziplist;

7.2 数据存储

quicklist是一个由ziplist充当节点的双向链表。核心结构:

typedef struct quicklist {

quicklistNode *head;

quicklistNode *tail;

unsigned long count; /* total count of all entries in all ziplists */

unsigned long len; /* number of quicklistNodes */

int fill : QL_FILL_BITS; /* fill factor for individual nodes */

unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */

unsigned int bookmark_count: QL_BM_BITS;

quicklistBookmark bookmarks[];

} quicklist;

 

其中head、tail指向quicklist的首尾节点;

count为quicklist中元素总数;

len为quicklist Node(节点)个数;

fill用来指明每个quicklistNode中ziplist长度,当fill为正数时,表明每个ziplist最多含有的数据项数,当fill为负数时,含义如表7-1所示。

 

 

图7-3 带数据压缩的quicklist

quicklistNode

quicklistNode是quicklist中的一个节点,其结构如下:

typedef struct quicklistNode {

struct quicklistNode *prev;

struct quicklistNode *next;

unsigned char *zl;

unsigned int sz; /* ziplist size in bytes */

unsigned int count : 16; /* count of items in ziplist */

unsigned int encoding : 2; /* RAW==1 or LZF==2 */

unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */

unsigned int recompress : 1; /* was this node previous compressed? */

unsigned int attempted_compress : 1; /* node can't compress; too small */

unsigned int extra : 10; /* more bits to steal for future usage */

} quicklistNode;

 

其中,prev、next指向该节点的前后节点;

zl指向该节点对应的ziplist结构;

sz代表整个ziplist结构的大小;

encoding代表采用的编码方式:1代表是原生的,2代表使用LZF进行压缩;

container为quicklistNode节点zl指向的容器类型:1代表none,2代表使用ziplist存储数据;

recompress代表这个节点之前是否是压缩节点,若是,则在使用压缩节点前先进行解压缩,使用后需要重新压缩,此外为1,代表是压缩节点;

attempted_compress测试时使用;

extra为预留。

 

quicklistLZF结构

typedef struct quicklistLZF {

unsigned int sz; /* LZF size in bytes,表示compressed所占字节大小。*/

char compressed[];

} quicklistLZF;

当我们对ziplist利用LZF算法进行压缩时,quicklistNode节点指向的结构为quicklistLZF。

 

quicklistEntry结构

typedef struct quicklistEntry {

const quicklist *quicklist;

quicklistNode *node;

unsigned char *zi;

unsigned char *value;

long long longval;

unsigned int sz;

int offset;

} quicklistEntry;

quicklist指向当前元素所在的quicklist;

node指向当前元素所在的quicklistNode结构;

zi指向当前元素所在的ziplist;

value指向该节点的字符串内容;

longval为该节点的整型值;

sz代表该节点的大小,与value配合使用;

offset表明该节点相对于整个ziplist的偏移量,即该节点是ziplist第多少个entry。

 

quicklistIter

quicklistIter是quicklist中用于遍历的迭代器,结构如下:

typedef struct quicklistIter {

const quicklist *quicklist;

quicklistNode *current;

unsigned char *zi;

long offset; /* offset in current ziplist */

int direction;

} quicklistIter;

quicklist指向当前元素所处的quicklist;

current指向元素所在quicklistNode;

zi指向元素所在的ziplist;

offset表明节点在所在的ziplist中的偏移量;

direction表明迭代器的方向。

 

7.3 数据压缩

quicklist每个节点的实际数据存储结构为ziplist,主要优势在于节省存储空间。Redis允许对ziplist进一步压缩,Redis采用的压缩算法是LZF,压缩过后的数据可以分成多个片段,每个片段有2部分:一部分是解释字段,另一部分是存放具体的数据字段。解释字段可以占用1~3个字节,数据字段可能不存在。结构如图7-4所示。

图7-4 LZF压缩后的数据结构图

1)字面型,解释字段占用1个字节,数据字段长度由解释字段后5位决定。示例如图7-5所示,图中L是数据长度字段,数据长度是长度字段组成的字面值加1。

2)简短重复型,解释字段占用2个字节,没有数据字段,数据内容与前面数据内容重复,重复长度小于8,示例如图7-6所示,图中L是长度字段,数据长度为长度字段的字面值加2,o是偏移量字段,位置偏移量是偏移字段组成的字面值加1。

3)批量重复型,解释字段占3个字节,没有数据字段,数据内容与前面内容重复。示例如图7-7所示,图中L是长度字段,数据长度为长度字段的字面值加9,o是偏移量字段,位置偏移量是偏移字段组成的字面值加1。

7.3.1 压缩

LZF数据压缩的基本思想是:数据与前面重复的,记录重复位置以及重复长度否则直接记录原始数据内容

压缩算法的流程如下:遍历输入字符串,对当前字符及其后面2个字符进行散列运算,如果在Hash表中找到曾经出现的记录,则计算重复字节的长度以及位置,反之直接输出数据。下面给出了LZF源码的核心部分。

define IDX(h) (((h >> 8) - h*5) & ((1 << 16) - 1))

 

//in_data, in_len待压缩数据以及长度; out_data, out_len压缩后数据及长度

unsigned int

lzf_compress (const void *const in_data, unsigned int in_len,

void *out_data, unsigned int out_len )

{

//htab用于散列运算,进而获取上次重复点的位置

int htab[1 << 16] = {0};

unsigned int hval = ((ip[0] << 8) | ip[1]);

while (ip < in_end - 2)

{

//计算该元素以及其后面2个元素的Hash值,计算在Hash表中的位置

hval = ((hval << 8) | ip[2]);

unsigned int *hslot = htab + IDX (hval);

ref = *hslot;

if (...){ //之前出现过

//统计重复长度, ip为输入数据当前处理位置指针, ref为数据之前出现的位置

do

len++;

while (len < maxlen && ref[len] == ip[len]);

//将重复长度,重复位置的偏移量写入, op为当前输出位置指针, off为偏移位置, len为重复长度

if (len < 7) *op++ = (off >> 8) + (len << 5);

else{

*op++ = (off >> 8) + (7 << 5);

*op++ = len - 7;

}

//更新Hash表

}else{

//直接输出当前字符

}

}

//将剩余数据写入输出数组, 返回压缩后的数据长度

}

7.3.2 解压缩

可能存在重复数据与当前位置重叠的情况,例如在当前位置前的15个字节处,重复了20个字节,此时需要按位逐个复制。源码实现的核心部分如下:

unsigned int

lzf_decompress (const void *const in_data, unsigned int in_len,

void *out_data, unsigned int out_len)

{

u8 const *ip = (const u8 *)in_data;

u8 *op = (u8 *)out_data;

u8 const *const in_end = ip + in_len;

u8 *const out_end = op + out_len;

 

do { /* ip指向当前待处理的输入数据 */

unsigned int ctrl = *ip++;

if (ctrl < (1 << 5)) /* literal run */

{ ctrl++;

else{ switch (len)

..... //直接读取后面的数据

{

default:

len += 2;

if (op >= ref + len){

memcpy (op, ref, len); /*直接复制重复的数据*/

op += len;

}

else { /* 重复数据与当前位置产生重叠,按字节顺序复制 */

do

*op++ = *ref++;

while (--len);

}

break;

case 9: *op++ = *ref++; /* fall-thru */

.....

}

} while (ip < in_end);

return op - (u8 *)out_data;

 

 

7.4 基本操作

quicklist是一种数据结构,重点对其源码实现中的边界条件进行分析。由于quicklist利用ziplist结构进行实际的数据存储,所以quicklist的大部分操作实际是利用ziplist的函数接口实现的,对于ziplist结构。

7.4.1 初始化

quicklistCreate函数完成,该函数的主要功能就是初始化quicklist结构。默认初始化的quicklist结构如图7-8所示。

/* Create a new quicklist.

* Free with quicklistRelease(). */

quicklist *quicklistCreate(void) {

struct quicklist *quicklist; //声明quicklist变量

quicklist = zmalloc(sizeof(*quicklist)); //为quicklist申请空间

quicklist->head = quicklist->tail = NULL; //初始化quicklist结构体变量

quicklist->len = 0;

quicklist->count = 0;

quicklist->compress = 0;

quicklist->fill = -2; //从表7-1中可知, ziplist大小限制是8KB

quicklist->bookmark_count = 0;

return quicklist;

}

Redis默认quicklistNode每个ziplist的大小限制是8KB,并且不对节点进行压缩。在初始化完成后,也可以设置其属性,接口有:

void quicklistSetCompressDepth(quicklist *quicklist, int depth);

void quicklistSetFill(quicklist *quicklist, int fill);

void quicklistSetOptions(quicklist *quicklist, int fill, int depth);

 

除了上述的默认初始化方法外,还可以在初始化时设置quicklist

的属性。相关接口的定义如下:

quicklist *quicklistNew(int fill, int compress);

//根据fill, compress先构造新的quicklist, 之后将zl添加进去

quicklist *quicklistCreateFromZiplist(int fill, int compress, unsigned char *zl);

 

7.4.2 添加元素

接口为quicklistPush,可以在头部或者尾部进行插入,具体的操作函数为quicklistPushHeadquicklistPushTail。两者的思路基本一致 。

quicklistPushHead的基本思路是

查看quicklist原有的head节点是否可以插入,如果可以就直接利用ziplist的接口进行插入,否则新建quicklistNode节点进行插入。

函数的入参 为待插入的quicklist,需要插入的数据value及其大小sz;

函数返回值 代表是否新建了head节点,0代表没有新建,1代表新建了head。

int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {

quicklistNode *orig_head = quicklist->head;

if (likely(

//_quicklistNodeAllowInsert用来判断quicklist的某个quicklistNode节点是否允许继续插入。

_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {

//头部节点仍然可以插入

quicklist->head->zl =

ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);

quicklistNodeUpdateSz(quicklist->head);

} else {

//头部节点不可以继续插入, 新建quicklistNode, ziplist

quicklistNode *node = quicklistCreateNode();

node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

//将新建的quicklistNode插入到quicklist结构体中

quicklistNodeUpdateSz(node);

//_quicklistInsertNodeBefore用于在quicklist的某个节点之前插入新的quicklistNode节点

_quicklistInsertNodeBefore(quicklist, quicklist->head, node);

}

//update quicklist count info

quicklist->count++;

quicklist->head->count++;

return (orig_head != quicklist->head);

}

 

任意位置插入

除了push操作外,quicklist还提供了在任意位置插入的方法。对外接口为quicklistInsert Before与quicklistInsertAfter,二者的底层实现都是_quicklistInsert。quicklist的一般插入操作如图7-9所示。

 

对于quicklist的一般插入可以分为可以继续插入和不能继续插入。

1)当前插入位置所在的quicklistNode仍然可以继续插入,此时可以直接插入。

2)当前插入位置所在的quicklistNode不能继续插入,此时可以分为如下几种情况。

①需要向当前quicklistNode第一个元素entry1)前面插入元素,当前ziplist所在的quicklistNode的前一个quicklistNode可以插入,则将数据插入到前一个quicklistNode。如果前一个quicklistNode不能插入(不包含前一个节点为空的情况),则新建一个quicklistNode插

入到当前quicklistNode前面。

②需要向当前quicklistNode的最后一个元素entryN)后面插入元素,当前ziplist所在的quicklistNode的后一个quicklistNode可以插入,则直接将数据插入到后一个quicklistNode。如果后一个quicklistNode不能插入(不包含为后一个节点为空的情况),则新建一个quicklistNode插入到当前quicklistNode的后面。

③不满足前面2个条件的所有其他种情况,将当前所在的quicklistNode以当前待插入位置为基准,拆分成左右两个quicklistNode,之后将需要插入的数据插入到其中一个拆分出来的

quicklistNode中。

这部分的源码实现,主要依赖于一般的链表操作以及ziplist提供的插入接口。至于quicklistNode的拆分是先复制一份ziplist,通过对新旧两个ziplist进行区域删除操作实现的。这部分源码实现较为简单,此处不再赘述。

 

 

7.4.3 删除元素

quicklist对于元素删除提供了删除单一元素以及删除区间元素2种方案。

 

删除单一元素

可以使用quicklist对外的接口quicklistDelEntry实现,也可以通过quicklistPop将头部或者尾部元素弹出。

quicklistDelEntry函数调用底层quicklistDelIndex函数,该函数可以删除quicklistNode指向的ziplist中的某个元素,其中p指向ziplist中某个entry的起始位置。

quicklistPop可以弹出头部或者尾部元素,具体实现是通过ziplist的接口获取元素值,再通过上述的quicklistDelIndex将数据删除。

//删除指定位置的元素

int quicklistDelIndex(quicklist *quicklist, quicklistNode *node, unsigned char **p)

//data, sz用于存储ziplist中的字符串数据,slong用于存储整型数据

int quicklistPop(quicklist *quicklist, int where, unsigned char **data,

unsigned int *sz, long long *slong);

 

删除区间元素

quicklistDelRange接口,该函数可以从指定位置删除指定数量的元素。

函数原型如下:int quicklistDelRange(quicklist *quicklist, const long start,const long count)

返回0代表没有删除任何元素,返回1只能代表操作成功。

图7-10 quicklist区间删除

如图7-10所示,在进行区间删除时,先找到start所在位置对应的quicklistNode,计算当前quicklistNode需要删除的元素个数,如果仍有元素待删除,则移动至下一个quicklistNode继续删除。之后,依次循环下去,直到删除了所需的元素个数或者后续数据已空,核心部分代码如下:

/* extent为剩余需要删除的元素个数, entry.offset是当前需要删除的起始位置, del表示本节点需要删除的元素. */

while (extent) {

//保存下个quicklistNode, 因为本节点可能会被删掉

quicklistNode *next = node->next;

unsigned long del;

int delete_entire_node = 0;

if (entry.offset == 0 && extent >= node->count) {

/* 需要删除整个quicklistNode. */

delete_entire_node = 1;

del = node->count;

} else if (entry.offset >= 0 && extent >= node->count) {

/* 删除本节点剩余所有元素 */

del = node->count - entry.offset;

} else if (entry.offset < 0) {

/* entry.offset < 0代表从后向前, 相反数代表这个ziplist后面剩余元素个数。*/

del = -entry.offset;

 

/* 如果正偏移量大于剩余范围,则*我们仅删除剩余范围,而不是整个偏移量。*/

if (del > extent)

del = extent;

} else {

/*删除本节点部分元素. */

del = extent;

}

 

if (delete_entire_node) {

__quicklistDelNode(quicklist, node);

} else {

quicklistDecompressNodeForUse(node);

node->zl = ziplistDeleteRange(node->zl, entry.offset, del);

quicklistNodeUpdateSz(node);

node->count -= del;

quicklist->count -= del;

quicklistDeleteIfEmpty(quicklist, node);

if (node)

quicklistRecompressOnly(quicklist, node);

}

 

extent -= del; //剩余待删除元素个数

node = next; //下个quicklistNode

entry.offset = 0; //从下个quicklistNode起始位置开始删

}

 

7.4.4 更改元素

quicklist更改元素是基于index,主要的处理函数为quicklistReplaceAtIndex。

基本思路是 先删除原有元素,之后插入新的元素。

quicklist不适合直接改变原有元素,主要由于其内部是ziplist结构,ziplist在内存中是连续存储的,当改变其中一个元素时,可能会影响后续元素。故而,quicklist采用先删除后插入的方案。实现源码如下,此处不再分析。

/* 将偏移量"索引"处的快速列表条目替换为长度为" sz"的"数据"/

int quicklistReplaceAtIndex(quicklist *quicklist, long index, void *data,

int sz) {

quicklistEntry entry;

if (likely(quicklistIndex(quicklist, index, &entry))) {

/* quicklistIndex provides an uncompressed node */

entry.node->zl = ziplistDelete(entry.node->zl, &entry.zi);

entry.node->zl = ziplistInsert(entry.node->zl, entry.zi, data, sz);

quicklistNodeUpdateSz(entry.node);

quicklistCompress(quicklist, entry.node);

return 1;

} else {

return 0;

}

}

7.4.5 查找元素

quicklist查找元素主要是针对index,即通过元素在链表中的下标查找对应元素。

基本思路是,首先找到index对应的数据所在的quicklistNode节点,之后调用ziplist的接口函数ziplistGet得到index对应的数据,源码中的处理函数为quicklistIndex。

//idx为需要查找的下标, 结果写入entry, 返回0代表没有找到,1代表找到

int quicklistIndex(const quicklist *quicklist, const long long idx,

quicklistEntry *entry) {

quicklistNode *n;

unsigned long long accum = 0;

unsigned long long index;

int forward = idx < 0 ? 0 : 1; /* idx值为负数时,代表从尾部向头部的偏移量,-1代表尾部元素*/

initEntry(entry); // 初始化entry, index以及quicklistNode

entry->quicklist = quicklist;

if (!forward) {

index = (-idx) - 1;

n = quicklist->tail;

} else {

index = idx;

n = quicklist->head;

}

if (index >= quicklist->count)

return 0;

// 遍历quicklistNode节点,找到index对应的quicklistNode

while (likely(n)) {

if ((accum + n->count) > index) {

break;

} else {

accum += n->count;

n = forward ? n->next : n->prev;

}

}

 

if (!n)

return 0;

 

// 计算index所在的ziplist的偏移量

entry->node = n;

if (forward) {

/* forward = normal head-to-tail offset. */

entry->offset = index - accum;

} else {

/* reverse = need negative offset for tail-to-head, so undo

* the result of the original if (index < 0) above. */

entry->offset = (-index) - 1 + accum;

}

 

quicklistDecompressNodeForUse(entry->node);

entry->zi = ziplistIndex(entry->node->zl, entry->offset);

// 利用ziplist获取元素

ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);

return 1;

}

 

对于迭代器遍历的情况,源码实现较为简单,主要是通过quicklistIter记录当前元素的位置信息以及迭代器的前进方向,限于篇幅,此处不再进行详细分析。

 

//获取指向头部,依次向后的迭代器;或者指向尾部,依次向前的迭代器

quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction);

//获取idx位置的迭代器,可以向后或者向前遍历

quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,

int direction, const long long idx);

//获取迭代器指向的下一个元素

int quicklistNext(quicklistIter *iter, quicklistEntry *node);

 

7.4.6 常用API

本节对quicklist常用的API进行总结,并给出其操作的时间复杂度。

我们假设quicklist的节点个数为n,即quicklistNode的个数为n;每个quicklistNode指向的ziplist的元素个数为m;区间操作中区间长度为l,具体如表7-2所示。

7.5 本章小结

本章主要介绍了Redis中常用的底层数据结构quicklist,主要介绍了quicklist常规情况以及压缩情况的底层存储。除此之外,我们详细介绍了quicklist的基本操作,讲述了各种情况下数据存储的变化。最后,我们给出了quicklist对外常用API接口及其复杂度。

 

  

原文地址:https://www.cnblogs.com/coloz/p/13812838.html