redis stream类型 常用命令

参考文档:
https://www.runoob.com/redis/redis-stream.html

创建队列

127.0.0.1:6379> xadd s1 * name czg
"1631604969523-0"
127.0.0.1:6379> xadd s1 * name zhangsan
"1631604969547-0"
127.0.0.1:6379> xadd s1 * name wangwu
"1631604969547-1"
127.0.0.1:6379> xadd s1 * name zhaoliu

查看消息个数

127.0.0.1:6379> xlen s1
(integer) 4

查看队列全部内容

127.0.0.1:6379> xrange s1 - +
1) 1) "1631604969523-0"
   2) 1) "name"
      2) "czg"
2) 1) "1631604969547-0"
   2) 1) "name"
      2) "zhangsan"
3) 1) "1631604969547-1"
   2) 1) "name"
      2) "wangwu"
4) 1) "1631604969594-0"
   2) 1) "name"
      2) "zhaoliu"

查看队列全部内容,限制输出条数

127.0.0.1:6379> xrange s1 - + count 2
1) 1) "1631604969523-0"
   2) 1) "name"
      2) "czg"
2) 1) "1631604969547-0"
   2) 1) "name"
      2) "zhangsan"

查看队列部分内容

127.0.0.1:6379> xrange s1 1631604969523-0 1631604969547-1
1) 1) "1631604969523-0"
   2) 1) "name"
      2) "czg"
2) 1) "1631604969547-0"
   2) 1) "name"
      2) "zhangsan"
3) 1) "1631604969547-1"
   2) 1) "name"
      2) "wangwu"

裁剪队列消息个数 XTRIM 命令,保留最近的两条消息

创建测试队列,添加数据
127.0.0.1:6379> xadd test_stream * name test
"1631605351625-0"
127.0.0.1:6379>  xadd test_stream * name test2
"1631605703243-0"
127.0.0.1:6379>  xadd test_stream * name test3
"1631605704377-0"
127.0.0.1:6379>  xadd test_stream * name test4
"1631605705890-0"

127.0.0.1:6379> xlen test_stream
(integer) 4
127.0.0.1:6379>  xtrim test_stream maxlen 2
(integer) 2
127.0.0.1:6379> xlen test_stream
(integer) 2
127.0.0.1:6379> xrange test_stream - +
1) 1) "1631605704377-0"
   2) 1) "name"
      2) "test3"
2) 1) "1631605705890-0"
   2) 1) "name"
      2) "test4"

XDEL删除一条消息

127.0.0.1:6379>  xdel test_stream 1631605704377-0
(integer) 1
127.0.0.1:6379> xrange test_stream - +
1) 1) "1631605705890-0"
   2) 1) "name"
      2) "test4"

Python代码连接示例:

>>> import redis
>>> r=redis.StrictRedis(host='127.0.0.1',port=6379,db=0)
>>> r.xreadgroup('czg','xiaofeizhe-name',{'s1':">"},1)
[[b's1', [(b'1631604969594-0', {b'name': b'zhaoliu'})]]]
>>> 
>>> r.xreadgroup('czg','xiaofeizhe-name',{'s1':">"},1)
[[b's1', [(b'1631607281623-0', {b'wei': b'c1'})]]]
>>> r.xreadgroup('czg','xiaofeizhe-name',{'s1':">"},1)
[[b's1', [(b'1631607283760-0', {b'wei': b'c2'})]]]
>>>   

使用 XREAD 以阻塞或非阻塞方式获取消息列表

127.0.0.1:6379> xread count 2 streams s1 0-0
1) 1) "s1"
   2) 1) 1) "1631604969523-0"
         2) 1) "name"
            2) "czg"
      2) 1) "1631604969547-0"
         2) 1) "name"
            2) "zhangsan"
127.0.0.1:6379> xread  streams s1 0-0
1) 1) "s1"
   2) 1) 1) "1631604969523-0"
         2) 1) "name"
            2) "czg"
      2) 1) "1631604969547-0"
         2) 1) "name"
            2) "zhangsan"
      3) 1) "1631604969547-1"
         2) 1) "name"
            2) "wangwu"
      4) 1) "1631604969594-0"
         2) 1) "name"
            2) "zhaoliu"
127.0.0.1:6379> xread count 2 streams s1 0-0
1) 1) "s1"
   2) 1) 1) "1631604969523-0"
         2) 1) "name"
            2) "czg"
      2) 1) "1631604969547-0"
         2) 1) "name"
            2) "zhangsan"
127.0.0.1:6379> xread count 2 block 100000 streams s1 0-0
1) 1) "s1"
   2) 1) 1) "1631604969523-0"
         2) 1) "name"
            2) "czg"
      2) 1) "1631604969547-0"
         2) 1) "name"
            2) "zhangsan"

创建消费者组

XGROUP CREATE mystream consumer-group-name 0-0  
mystream:stream 名称
consumer-group-name:消费组名称
0-0:从头开始消费
如果0-0替换为 $ ,则代表从尾部开始消费

实际命令:
XGROUP CREATE s1 consumer-group-tou 0-0  
XGROUP CREATE s1 consumer-group-wei $

使用消费者组进行消费:

XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS s1 >
consumer-group-name:消费组名称, consumer-group-tou 或 consumer-group-wei
consumer-name:消费者名称
COUNT:消费几条记录
s1:stream 名称。
> 是标准语法,必须要有

实际命令: XREADGROUP GROUP consumer
-group-tou xiaofeizhe-name count 1 STREAMS s1 >
XREADGROUP GROUP consumer-group-wei xiaofeizhe-name count 1 STREAMS s1 >

使用 consumer-group-tou 消费,可以消费到数据

127.0.0.1:6379> XREADGROUP GROUP consumer-group-tou xiaofeizhe-name count 1 STREAMS s1 >
1) 1) "s1"
   2) 1) 1) "1631604969523-0"
         2) 1) "name"
            2) "czg"
127.0.0.1:6379> XREADGROUP GROUP consumer-group-tou xiaofeizhe-name count 1 STREAMS s1 >
1) 1) "s1"
   2) 1) 1) "1631604969547-0"
         2) 1) "name"
            2) "zhangsan"
127.0.0.1:6379> XREADGROUP GROUP consumer-group-tou xiaofeizhe-name count 1 STREAMS s1 >
1) 1) "s1"
   2) 1) 1) "1631604969547-1"
         2) 1) "name"
            2) "wangwu"

使用 consumer-group-wei 消费,没有数据

127.0.0.1:6379> XREADGROUP GROUP consumer-group-wei xiaofeizhe-name count 1 STREAMS s1 >
(nil)

继续往 s1 队列写几条消息,再消费可以正常消费。

127.0.0.1:6379> xadd s1 * wei c1
"1631607281623-0"
127.0.0.1:6379> xadd s1 * wei c2
"1631607283760-0"
127.0.0.1:6379> xadd s1 * wei c3
"1631607284772-0"
127.0.0.1:6379> xadd s1 * wei c4

127.0.0.1:6379> XREADGROUP GROUP consumer-group-wei xiaofeizhe-name count 1 STREAMS s1 >
1) 1) "s1"
   2) 1) 1) "1631607281623-0"
         2) 1) "wei"
            2) "c1"
127.0.0.1:6379> XREADGROUP GROUP consumer-group-wei xiaofeizhe-name count 1 STREAMS s1 >
1) 1) "s1"
   2) 1) 1) "1631607283760-0"
         2) 1) "wei"
            2) "c2"
127.0.0.1:6379> XREADGROUP GROUP consumer-group-wei xiaofeizhe-name count 1 STREAMS s1 >
1) 1) "s1"
   2) 1) 1) "1631607284772-0"
         2) 1) "wei"
            2) "c3"

xinfo stream [key名称] 

显示 stream 相关信息

127.0.0.1:6379> xinfo stream test_stream
 1) "length"
 2) (integer) 1
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1631605705890-0"
11) "first-entry"
12) 1) "1631605705890-0"
    2) 1) "name"
       2) "test4"
13) "last-entry"
14) 1) "1631605705890-0"
    2) 1) "name"
       2) "test4"

 xinfo groups [key名称]

显示消费组相关信息

127.0.0.1:6379> xinfo groups test_stream
1) 1) "name"
   2) "consumer-group-tou"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"
原文地址:https://www.cnblogs.com/nanxiang/p/15268124.html