Redis(四):独立功能的实现

发布与订阅

Redis 的发布与订阅功能有PUBLISH命令,SUBSCRIBE命令,PSUBSCRIBE命令,PUBSUB命令等组成。
客户端可以通过SUBSCRIBE命令订阅一个或多个频道,当其它客户端向被订阅的频道发送消息时,频道所有的订阅者都会收到这消息。

频道的订阅与退订

Redis会在redisServer中用pubsub_channels字典来记录订阅的客户端和频道的关系。其中字典的键是被订阅的频道,而字典的值是一个客户端链表,保存了订阅这个频道的所有客户端。
比如有一个客户端执行了SUBSCRIBE HEllO,另一个客户端执行SUBSCRIBE HELLO WORLD,那么此时redisServerpubsub_channels的结构如下:

订阅频道

当客户端执行SUBSCRIBE <channel1> <channel2...>命令时,服务器会现在pubsub_channels字典中查询是否有对应的键,如果存在,则将客户端添加到键对应的链表的末端,如果不存在,则在字典中添加键,并关联新的链表,然后将客户端加入链表。

退订频道

当客户端执行UNSUBSCRIBE命令时,服务器会在pubsub_channels的字典中找到对应的键,然后遍历链表,找到客户端未自身的节点移除。如果移除完节点后,链表为空,那么会在字典中删除该键。

模式的订阅与退订

Redis会在redisServer中用pubsub_patterns链表保存客户端模式订阅的关系。其中链表的一个节点是一个pubsubPattern

typedef struct pubsubPattern {
    client *client;
    robj *pattern;
} pubsubPattern;

其中client指向客户端,而pattern代表订阅的模式。
其结构大概如下:

订阅模式

当客户端执行PSUBSCRIBE命令时,服务器会把客户端创建一个新的pubsubPattern结构,用来记录客户端和模式,并添加到链表的末端。

退订模式

当客户端执行PUNSUBCRIBE命令时,服务器会在pubsub_patterns链表中遍历查找客户端和模式都符合的pubsubPattern节点,并从链表中移除。

消息发送

当服务器收到来自客户端的PUBLISH <channel> <message>命令时,

  • 首先服务器会先从pubsub_channels字典中找对应的键,然后遍历链表中的客户端,发送message消息。
  • 之后服务器再遍历pubsub_patterns链表,对符合channel的模式的客户端发送消息。
查询订阅信息

Redis 提供了PUBSUB命令用来查询订阅信息。PUBSUB一共有三个子命令,PUBSUB CHANNELS <pattern>PUBSUB NUMSUB <channels>PUBSUB NUMPAT(即pubsub_channelssize)。

  • PUBSUB CHANNELS <pattern>:用来查询服务器当前有哪些符合模式(``pattern)的频道,如果不加pattern,则列出所有的channel(即pubsub_channels中每个键对应链表的size`)
  • PUBSUB NUMSUB <channels>:用来统计有多少客户端在订阅指定的频道,如果不加channels,则统计所有的channel
  • PUBSUB NUMPAT:用来统计有多少客户端在订阅模式(既pubsub_patternssize

事务

Redis 通过MULTIEXECWATCHDISCARD命令来实现事物的功能。一个事务会将多个命令打包,一次性,顺序的执行这些命令,且中间不会执行其他客户端请求的命令。

事务的实现

一个事务从开始到结束一般分为三个阶段:

  • 事务开始
  • 命令入队
  • 事物执行/丢弃
事务开始

一个事务的开始是通过MULTI命令来实现的,当客户端请求MULTI命令,那么服务器会打开该客户端的flags属性中的REDIS_MULTI标识,表示该客户端由非事务状态切换为事务状态。

命令入队

当事务状态中的客户端向服务器发送命令时,如果发送的命令不为WATCHMUTLIEXECDISCARD,那么服务器会将命令入队,并向客户端放回QUEUED

redisClient结构中,有一个multiState,其结构定义如下:

typedef struct multiState {
    //multiCmd数组
    multiCmd *commands;     /* Array of MULTI commands */
    //命令数量
    int count;              /* Total number of MULTI commands */
    int cmd_flags;          /* The accumulated command flags OR-ed together.
                               So if at least a command has a given flag, it
                               will be set in this field. */
    int minreplicas;        /* MINREPLICAS for synchronous replication */
    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;

其中multiCmd结构如下:

typedef struct multiCmd {
    //用来保存命令参数
    robj **argv;
    //参数个数
    int argc;
    //指向对应的命令函数
    struct redisCommand *cmd;
} multiCmd;  

假设有一个客户端在发送了如下命令:

192.168.1.102:6379> MULTI
OK
192.168.1.102:6379> SET TEST HAHAH
QUEUED
192.168.1.102:6379> GET TEST
QUEUED
192.168.1.102:6379> SET TEST HEIHEIHEI
QUEUED
192.168.1.102:6379>

那么该客户端对应的结构显示为:

事务结束

当客户端向服务器发送DISCARD命令时,服务器会清楚客户端的事务状态,并且丢掉任务队列中入队的命令。

执行事务

当处于事务的客户端向服务器发送EXEC命令时,EXEC会立即被服务器执行,并且服务器会遍历命令队列中的命令,依次执行,然后将全部结果返回给客户端。

例如,针对上述的命令输入,当客户端执行了EXEC后,获得的回复如下:

192.168.1.102:6379> EXEC
1) OK
2) "HAHAH"
3) OK
192.168.1.102:6379>
WATCH命令

WATCH命令可以在执行EXEC命令之前,监视某些数据库键,如果在EXEC命令执行时,被监视的键被其他客户端修改,那么服务器将拒绝事务的执行。

WATCH命令的实现

当客户端提交WATCH命令时,客户端会将监视的键和客户端保存在redisDbwatched_keys中,watched_keys是一个字典,其中键表示被监视的数据库键,而值则是一个链表,其中的每个节点都指向监视该键的客户端。
当服务器在执行完某些修改数据库的命令后,不如SETSADD等,会触发一次multi.c/touchWatchKey函数,该函数会在数据库中watched_keys中查询对应的键是否存在,如果存在,则修改对应链表中的客户端,打开REDIS_DIRTY_CAS标志
当服务器收到一个客户端发的EXEC命令时,会先检查客户端的REDIS_DIRTY_CAS标志是否打开。如果打开则拒绝执行事务。

Redis事务的ACID性质

传统的数据库中,用 ACID 表示事务的可靠性和安全性。ACID是指原子性(Atomicity),一致性(Consistency),隔离性(lsolcation)和耐久性(Durablity)。

原子性:

是指事务要么全部执行,要么全部不执行。由于 Redis 事务会将命令打包,统一执行,因此Redis事物具有原子性。

一致性:

是指数据库在执行数据前后,数据库并不会存在非法或是错误的数据。
Redis 通过入队命令检测拦截非法命令,在执行时,即使遇到错误命令,也会继续执行之后的命令。

隔离性:

是指事务执行过程中,其他事务或操作的执行不会互相收到影响。由于 Redis 通过单线程执行命令,因此保证了事务与事务执行的顺序一定是串行的,由此确保了隔离性。

耐久性

是指事物的执行结构能够得到保存。Redis 事务是否具备耐久性和持久化策略相关。只有在开启AOF模式,并且appendfsync值为true时,才具备耐久性。


Lua脚本

Redis客户端可以使用Lua脚本原子的执行多个任务(比如,用在分布式锁原子性的释放上)。

Lua环境的创建过程

在 Redis 服务器启动的过程中,initServer方法会调用scriptingInit方法。
scripting.c/scriptingInit的代码如下:

void scriptingInit(void) {
    //创建Lua环境
    lua_State *lua = lua_open();
    
    //载入函数库,移除其中不支持的函数
    luaLoadLibraries(lua);
    luaRemoveUnsupportedFunctions(lua);

    
    //创建lua_scripts字典,用来保存执行或载入过的脚本
    server.lua_scripts = dictCreate(&shaScriptObjectDictType,NULL);

    //创建全局表格,并添加函数
    lua_newtable(lua);

    /* redis.call */
    lua_pushstring(lua,"call");
    lua_pushcfunction(lua,luaRedisCallCommand);
    lua_settable(lua,-3);

    /* redis.pcall */
    lua_pushstring(lua,"pcall");
    lua_pushcfunction(lua,luaRedisPCallCommand);
    lua_settable(lua,-3);

    /* redis.log and log levels. */
    lua_pushstring(lua,"log");
    lua_pushcfunction(lua,luaLogCommand);
    lua_settable(lua,-3);

    lua_pushstring(lua,"LOG_DEBUG");
    lua_pushnumber(lua,REDIS_DEBUG);
    lua_settable(lua,-3);

    lua_pushstring(lua,"LOG_VERBOSE");
    lua_pushnumber(lua,REDIS_VERBOSE);
    lua_settable(lua,-3);

    lua_pushstring(lua,"LOG_NOTICE");
    lua_pushnumber(lua,REDIS_NOTICE);
    lua_settable(lua,-3);

    lua_pushstring(lua,"LOG_WARNING");
    lua_pushnumber(lua,REDIS_WARNING);
    lua_settable(lua,-3);

    /* redis.sha1hex */
    lua_pushstring(lua, "sha1hex");
    lua_pushcfunction(lua, luaRedisSha1hexCommand);
    lua_settable(lua, -3);

    /* redis.error_reply and redis.status_reply */
    lua_pushstring(lua, "error_reply");
    lua_pushcfunction(lua, luaRedisErrorReplyCommand);
    lua_settable(lua, -3);
    lua_pushstring(lua, "status_reply");
    lua_pushcfunction(lua, luaRedisStatusReplyCommand);
    lua_settable(lua, -3);

    /* Finally set the table as 'redis' global var. */
    lua_setglobal(lua,"redis");

    //替换部分函数
    lua_getglobal(lua,"math");

    lua_pushstring(lua,"random");
    lua_pushcfunction(lua,redis_math_random);
    lua_settable(lua,-3);

    lua_pushstring(lua,"randomseed");
    lua_pushcfunction(lua,redis_math_randomseed);
    lua_settable(lua,-3);

    lua_setglobal(lua,"math");

    //创建辅助函数
    {
        char *compare_func =    "function __redis__compare_helper(a,b)
"
                                "  if a == false then a = '' end
"
                                "  if b == false then b = '' end
"
                                "  return a<b
"
                                "end
";
        luaL_loadbuffer(lua,compare_func,strlen(compare_func),"@cmp_func_def");
        lua_pcall(lua,0,0,0);
    }

    
    {
        char *errh_func =       "function __redis__err__handler(err)
"
                                "  local i = debug.getinfo(2,'nSl')
"
                                "  if i and i.what == 'C' then
"
                                "    i = debug.getinfo(3,'nSl')
"
                                "  end
"
                                "  if i then
"
                                "    return i.source .. ':' .. i.currentline .. ': ' .. err
"
                                "  else
"
                                "    return err
"
                                "  end
"
                                "end
";
        luaL_loadbuffer(lua,errh_func,strlen(errh_func),"@err_handler_def");
        lua_pcall(lua,0,0,0);
    }

    //创建伪客户端
    if (server.lua_client == NULL) {
        server.lua_client = createClient(-1);
        server.lua_client->flags |= REDIS_LUA_CLIENT;
    }

    //全局变量保护
    scriptingEnableGlobalsProtection(lua);
    
    //将lua环境变量保存到服务器中
    server.lua = lua;
}

对应上述过程总结如下:

  1. 创建 Lua 环境(lua_open)
  2. 载入 Lua 的函数库
  3. 在服务器中创建lua_scripts字典,其中键为脚本的 SHA1 值,值为执行或载入过的 Lua 脚本
  4. 创建全局表格用来保存基本函数(如callpcall等,可以通过 Lua 执行 Redis 的命令)
  5. 替换 Lua 的随机函数(保持数据库的一致性)
  6. 创建辅助函数
  7. 创建执行Redis命令的伪客户端
  8. 设置全局变量保护(避免执行脚本时,攻击全局变量)
  9. 将Lua环境保存到redisServer
环境协作组件
伪客户端

Lua 环境中创建了伪客户端(没有TCP连接的客户端,和启动时载入AOF文件的客户端相似),用来执行Redis命令。

当Lua调用redis.call或是redis.pcall函数时,函数中需要执行的 Redis 命令将传给伪客户端,伪客户端又将命令交给命令执行器执行,并向 Lua 环境返回结果。

lua_scripts 字典

redisServer.lua_scripts字典是用来保存该服务器执行或是载入过的Lua脚本的。其中键是脚本的SHA1校验和,而值是Lua脚本。

EVAL命令的实现
EVAL命令格式
EVAL script numskey <key> <key...> <arg> <arg...> 

其中script是我们要执行的脚本,numskey表示键名参数的个数,key表示键名参数,arg表示附加参数,这些参数都可以在script中通过KEYS[]ARGV[]被引用(其中基准下标为1)。
例如我们执行如下Lua脚本等同于执行了SET HELLO WORLD命令

192.168.1.102:6379> EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 HELLO WORLD
OK
EVAL命令执行过程
  • 定义脚本函数:服务器为客户端发送的脚本创建一个对应的函数,其中函数名为f_开头,并加上脚本的SHA1校验和,函数体就是脚本本身。
  • 将脚本保存至lua_scripts字典
  • 执行脚本函数
EVALSHA命令的实现

EVALSHA命令就是通过SHA1值从lua_scripts查找对应的脚本是否存在,如果存在,则执行通过f_SHA1校验和确认函数名,直接执行函数。

脚本管理命令
SCRIPT FLUSH

可以清楚lua_scripts字典保存的脚本,并重新创建 lua 环境。

SCRIPT EXISTS

通过脚本的SHA1校验和确认脚本是否存在于服务器

SCRIPT LOAD

上传脚本,但是不执行(只进行EVAL过程的前两步)

SCRIPT KILL

当脚本处理超时时,可以通过该命令关闭脚本

脚本复制

当服务器处于复制模式下时,具有写性质的脚本命令(EVALEVALSHASCRIPT FLUSHSCRIPT )也需要被复制到从服务器。

EVALSCRIPT FLUSHSCRIPT LOAD的复制

这三种命令复制不会存在主从服务器执行结果不一致的情况,因此复制时只需要简单的命令传播即可实现。

EVALSHA命令

由于可能存在主从服务器lua_scrips中保存的脚本不一致的问题,会发生主服务器执行EVALSHA时,脚本确实存在,而从服务却不存在,EVALSHA执行失败,导致主从数据不一致的问题。

例如,主服务器A先执行了一个EVAL LOAD命令,载入了一个脚本,而后,服务器B上线,并成为A的从服务器。 此时A在执行EVALSHA命令,运行刚载入的脚本,并将命令传播给从服务器B,由于B不存在该脚本,EVALSHA命令就会执行失败。

为了避免这种情况,redisServer服务器会通过repl_scriptcache_dict字典保存已经复制给全部从服务器的命令(也就是说,当出现一个新的从服务器,字典需要清空),其中键为脚本的SHA1,而值为NULL。当执行EVALSHA命令的复制过程时,如果repl_scriptcache_dict中可以找到该脚本,那么直接命令传播,如果找不到,那么服务器将根据lua_scripts中脚本的内容,将EVALSHA转换成等价的EVAL命令,再传播,并添加到repl_scriptcache_dict字典中。


排序

Redis 通过SORT命令实现对给定列表,集合,有序集合key中的元素进行排序的功能。
排序默认以数组为权重,值被解释为双精度浮点数,然后进行排序。

SORT命令格式如下:

SORT key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC | DESC] [ALPHA] [STORE destination]

上述命令中小写的均为参数,每个[]表示一个选项,[]内大写的部分为具体选项,后面小写的部分为选项参数,GET选项可以同时使用多个。

验证适合用SORT命令的数据类型

在客户端中做如下测试:

192.168.1.102:6379[15]> set stringtest 1
OK
192.168.1.102:6379[15]> sort stringtest
(error) WRONGTYPE Operation against a key holding the wrong kind of value
192.168.1.102:6379[15]> lpush listtest 1 2 3
(integer) 3
192.168.1.102:6379[15]> sort listtest
1) "1"
2) "2"
3) "3"
192.168.1.102:6379[15]> hmset hashtest key1 1 key2 2 key3 3
OK
192.168.1.102:6379[15]> sort hashtest
(error) WRONGTYPE Operation against a key holding the wrong kind of value
192.168.1.102:6379[15]> sadd settest 1 2 3
(integer) 3
192.168.1.102:6379[15]> sort settest
1) "1"
2) "2"
3) "3"
192.168.1.102:6379[15]> zadd zsettest 1 1 2 2 3 3
(integer) 3
192.168.1.102:6379[15]> sort zsettest
1) "1"
2) "2"
3) "3"

可以看到,字符串和哈希表无法使用SORT命令,而链表,集合和有序集合都可以使用SORT命令。

各选项的说明
ALPHA

ALPHA选项可以让SORT命令从默认的以数字为权重的排序改成以字母为权重。

假设有一个保存字符串的链表:

192.168.1.102:6379[15]> lpush alphatest a b c
(integer) 3
192.168.1.102:6379[15]>

当我们不添加ALPHA选项直接执行SORT命令时,会报错:

192.168.1.102:6379[15]> sort alphatest
(error) ERR One or more scores can't be converted into double
192.168.1.102:6379[15]>

说明SORT命令默认以双精度浮点数做权重进行排序,针对不能转换为双精度的浮点数的值,会运行出错。

当我们带上ALPHA选项后在运行SORT命令:

192.168.1.102:6379[15]> sort alphatest ALPHA
1) "a"
2) "b"
3) "c"

命令可以正常执行,且是以字母顺序排序。

LIMIT

LIMIT选项可以控制排序后的结果输出个数,接受offsetcount两个参数,其中offset表示跳过几个结果,count表示输出几个结果。

同样以上面的例子,我们增加LIMIT选项,让其输出第二和第三个结果:

192.168.1.102:6379[15]> sort alphatest ALPHA LIMIT 1 2
1) "b"
2) "c"
ASC | DESC

ASC表示以升序排列,DESC表示以降序排列,SORT命令默认以升序排列,当需要降序结果时,可以添加DESC选项。

上面的例子再增加DESC选项后,观察下输出的结果:

192.168.1.102:6379[15]> sort alphatest ALPHA LIMIT 1 2 DESC
1) "b"
2) "a"

从输出结果中,可以确认DESC能够降序输出,而且LIMIT是在排序完之后再控制输出个数。

BY

BY选项可以外部的KEY作为权重,代替默认以键值为权重的排序方式。

首先我们增加一些键值对作为辅助的排序权重:

192.168.1.102:6379[15]> mset a_weight 1 b_weight 2 c_weight 3
OK

然后针对一开始的链表,我们增加BY选项,在进行排序:

192.168.1.102:6379[15]> sort alphatest BY *_weight
1) "a"
2) "b"
3) "c"

可以发现,虽然我们并没有加ALPHA选项,但是通过BY选项,我们实际的权重是*_weight的键值,能够被正常转为双精度浮点型,因此也可以正常排序。

GET选项

GET选项是通过排序结果在去查询键值。
比如我们继续增加一些键值对:

192.168.1.102:6379[15]> mset a_toUpperSize A b_toUpperSize B c_toUpperSize C
OK

然后通过GET选项将输出的结果,作为键去查询:

192.168.1.102:6379[15]> sort alphatest ALPHA GET *_toUpperSize
1) "A"
2) "B"
3) "C"

再做个测试,假设此时我们删除了c_toUpperSize,然后再去SORTGET,会发生什么?

192.168.1.102:6379[15]> del c_toUpperSize
(integer) 1
192.168.1.102:6379[15]> sort alphatest ALPHA GET *_toUpperSize
1) "A"
2) "B"
3) (nil)

发现c对应输出结果变成了nil,结果等价于直接mget三个 key:

192.168.1.102:6379[15]> mget a_toUpperSize b_toUpperSize c_toUpperSize
1) "A"
2) "B"
3) (nil)
STORE

STORE选项可以将排序后的结果集存在一个新的键中。

192.168.1.102:6379[15]> sort alphatest ALPHA STORE sorted_result
(integer) 3
192.168.1.102:6379[15]> TYPE sorted_result
list
192.168.1.102:6379[15]> lrange sorted_result 0 -1
1) "a"
2) "b"
3) "c"

可以发现结果集被存在了新的键中。而且键的类型是链表。

SORT命令的实现

SORT命令相关的数据结构是redis.h/_redisSortObject

typedef struct _redisSortObject {
    //*obj指针指向被排序的数据库键
    robj *obj;
    //u用来记录分值,默认用score记录双精度浮点型,而使用BY或是ALPHA的情况下会使用cmpobj
    union {
        double score;
        robj *cmpobj;
    } u;
} redisSortObject;

SORT执行步骤:

  1. 当服务器执行SORT命令时,首先会根据被执行的数据库键的大小创建一个同等长度的_redisSortObject数组,然后遍历数组,将*obj指针指向数据库键中的每一项,
  2. 然后根据SORT命令的选项,确定u.score或是u.cmpobj。在根据u进行快速排序。
  3. 根据LIMITASC|DESC选项确定输出数组中哪些结果。
  4. 如果还有GET选项,服务器将根据排序的结果去数据库中查找对应数据库键。
  5. 如果存在STORE选项,则保存结果集。

二进制位数组

位数组的表示

Redis 中使用SDS对象表示数组,其中sdshdr.len表示保存了几个字节长度的数组(最后一个字节是)。

GETBIT命令

GETBIT命令用来获取某个二进制位数组中指定位的二进制值。

SETBIT命令

SETBIT命令用来设置某个二进制位数组中指定位的二进制值。

BITCOUNT命令

BITCOUNT命令用来统计二进制位数组总一共存在多少个1的位。实现方式参考查表法和汉明重量。

BITTOP命令

BITTOP命令可以用来对多个二进制位数组计算按位与,按位或,按位异或运算。或者可以对某个二进制进行取反。


慢日志查询

Redis 慢日志查询功能用来记录执行超过给定时长的命令请求,命令请求会被保存在一个链表中。有两个相关的配置:slowlog-log-slower-than(时长阈值)和slow-log-max-len(链表长度,先进先出)。

SLOWLOG GET命令可以获取保存在服务器上的慢查询日志。


监视器

普通Redis客户端可以通过发送MONITOR命令,成为服务器的监视器,当服务器在收到命令后,会向监视器发送命令信息。

原文地址:https://www.cnblogs.com/insaneXs/p/11913786.html