Redis 学习-Redis 的其他功能

一、慢查询

找到 系统中瓶颈的命令

1. 客户端请求的生命周期:

①. 慢查询通常发生在第三阶段。

②. 客户端超时不一定是慢查询,但慢查询是客户端超时的一个可能因素。

2. 相关配置

慢查询命令会存放在一个先进先出的队列

查询队列的长度:

config get slowlog-max-len

默认值是 128,我们通常建议设置为 1000

config set slowlog-max-len=1000

查询慢查询的定义时长:

config get slowlog-log-slower-than

默认值是 10000 微秒= 10 毫秒,我们建议设置为 1 毫秒

config set slowlog-log-slower-than=1000

3. 相关命令

slowlog get [n] # 查询慢查询队列的 n 条
slowlog len # 获取慢查询队列长度
slowlog reset # 清空慢查询队列

二、pipeline 流水线

流水线是一个类似于 mget / mset 的一个批量操作。

区别在于 m 操作是 redis 原生的命令,他在执行队列中作为一个整体在排队。

流水线是 Java 客户端的命令,在排队时会跟其他命令杂乱在一起排队,非原子性的。但返回时会一起返回。

1. Jedis 客户端直连:

Jedis jedis = new Jedis("127.0.0.1", 6379);
for (int i = 0; i < 100; i++) {
    Pipeline pipeline  = jedis.pipelined();
    for (int j = i * 100; j < (i + 1) * 100; j++) {
        pipeline.hset("hashkey:" + j, "field" + j, "value" + j);
    }
    pipeline.syncAndReturnAll();  
}

2.  SpringBoot 提供的 RedisTemplate 客户端:

// 1 重写入参 RedisCallback 类的 doInRedis 方法
List<Object> list = redisTemplate.executePipelined((RedisConnection connection) -> {

    // 2 打开连接
    connection.openPipeline();

    // 3 要一次性执行的命令

    // 3.1 一个 set 操作
    connection.set("key1".getBytes(), "value1".getBytes());

    // 3.2 一个 mSet 操作
    Map<byte[], byte[]> tuple = new HashMap();
    tuple.put("m_key1".getBytes(), "m_value1".getBytes());
    tuple.put("m_key2".getBytes(), "m_value2".getBytes());
    tuple.put("m_key3".getBytes(), "m_value3".getBytes());
    connection.mSet(tuple);

    // 3.3 一个 get 操作
    connection.get("m_key2".getBytes());

    // 4 返回 null 即可
    return null;

}, RedisSerializer.string());

// 5 遍历结果
for (Object obj : list) {
    System.out.println(String.valueOf(obj));
}

执行结果:

三、消息队列与发布订阅

参考博客:https://www.cnblogs.com/qlqwjy/p/9763754.html

在 Redis 中,发布订阅与消息队列属于不同的概念。

消息队列:Redis 的列表类型天然支持消息队列,并且支持阻塞式读取。多个消费者之间需要抢一个消息。

发布订阅:多个消费者都可以消费到同一条消息,但是无法订阅以往的消息。

1. 消息队列

我们重新熟悉一下 Redis 的列表类型相关 API。

①. 从左边插入,从右边取出

127.0.0.1:6379> lpush mylist a b c
(integer) 3
127.0.0.1:6379> lrange mylist 0 -1
1) "c"
2) "b"
3) "a"
127.0.0.1:6379> rpop mylist
"a"
127.0.0.1:6379> rpop mylist
"b"

在实际使用中,我们为了及时消费,需要不停的 rpop 监听是否有消息进入,这样造成资源浪费。

②. 为了解决这一问题,redis 为我们提供了阻塞命令 brpop 和 blpop。

客户端 1 消费上次剩余的消息:

127.0.0.1:6379> brpop mylist 0
1) "mylist"
2) "c"

客户端 1 继续消费:

127.0.0.1:6379> brpop mylist 0

我们发现客户端阻塞,正在等待中

客户端 2 往键 mylist 中添加消息:

127.0.0.1:6379> lpush mylist 1
(integer) 1

这时查看 客户端 1,消息已经拿到 且耗时 10 秒:

127.0.0.1:6379> brpop mylist 0
1) "mylist"
2) "1"
(10.45s)

2. 发布订阅

①. 客户端 1 发布消息

127.0.0.1:6379> publish channel:1 hi
(integer) 0

向命名为 channel:1 的频道发布一个 hi。

结果返回 0,表示接收到这条消息的订阅者数量。发出去的消息不会被持久化,也就是说后续的订阅者是不会收到这条消息的。

②. 客户端 2 订阅频道

127.0.0.1:6379> subscribe channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:1"
3) (integer) 1

执行上面这条命令会进入订阅状态。

在订阅状态客户端可能会收到 3 种类型的回复。每种类型包含 3 个值。第 1 个值是消息的类型,根据消息类型的不同,第二个和第三个参数的含义可能不同。

消息类型分为:

  subscribe。表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。

  message。表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。

  unsubscribe。表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非"发布/订阅"模式的命令了。

③. 客户端 1 再次发布消息

127.0.0.1:6379> publish channel:1 hi
(integer) 1

返回值表示有 1 个订阅者收到消息。

④. 客户端 2 已看到内容为 hi 的 message 类型的消息

127.0.0.1:6379> subscribe channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:1"
3) (integer) 1
1) "message"
2) "channel:1"
3) "hi"

频道可以不用具体的名字,而使用通配符命名:

  ? 表示 1 个占位符

  * 表示任意个占位符,包含 0 个

  ?*  表示最少 1 个占位符

当我们向命名为 channel* 的频道发送消息时。

订阅命名为 channel1 channel2 channel_a 这 3 个频道的订阅者都会收到消息。

3. 在 SpringBoot 中实现订阅

①. 声明配置 Bean,订阅 channel:1 频道

@Configuration
public class RedisListener {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("channel:1"));
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
        return new MessageListenerAdapter(redisReceiver, "receiveMessage");
    }

}

@Service
class RedisReceiver {
    public void receiveMessage(String message) {

        System.out.println("订阅者:" + message);
    }
}

②. 使用 Junit 实现向频道 channel:1 发布消息

@Test
public void publish() {
     redisTemplate.convertAndSend("channel:1", "Java客户端向你示好");
}

③. 执行单元测试后,控制台输出

订阅者:"Java客户端向你示好"

四、bitmap 位图

减少内存的方案

比如统计每日用户的登录数

1. API

getbit key offset # 获取指定 key 的指定位置上的值
setbit key offset value # 对指定 key 的指定位置上设置值,只能设置为 0 或 1 
bitcount key [start end] # 获取位图指定范围内值为 1 的个数。
bitop op  destKey key1 [key2] # 做多个 bitmap 的 and (交集)、or (并集)、not(非)、xor(异或) 操作并将结果保存到 destKey 中。
bitpos key tartgetBit [start end] # 获取指定范围内第一个等于 tartgetBit 的值的位置,找不到返回 -1

2. 演示

127.0.0.1:6379> set hello big
OK
127.0.0.1:6379> getbit hello 0
(integer) 0
127.0.0.1:6379> getbit hello 1
(integer) 1
127.0.0.1:6379> setbit hello 7 1
(integer) 0
127.0.0.1:6379> get hello
"cig"
127.0.0.1:6379> bitcount hello
(integer) 13
127.0.0.1:6379> bitcount hello 1 3
(integer) 9
127.0.0.1:6379> set world small
OK
127.0.0.1:6379> bitop and helloWorld hello world
(integer) 5
127.0.0.1:6379> bitcount helloWorld (integer) 11 127.0.0.1:6379> bitpos hello 1 (integer) 1 127.0.0.1:6379> bitpos hello 0 1 2 (integer) 8

五、hyperloglog

极端的减少内存的方案 / 数据结构 

可以用来做独立用户统计,缺陷是有错误率,并且只能查询去重后的总数而不是查看具体元素。

1. API

pfadd key element [element ...] # 添加元素
pfcount  key # 计算去重后的总数
pfmerge destKey sourceKey # 合并多个 key 到 destKey 

2. 演示

127.0.0.1:6379> pfadd 14:user:list "user1" "user2" "user3" "user4"
(integer) 1
127.0.0.1:6379> pfcount 14:user:list
(integer) 4
127.0.0.1:6379> pfadd 14:user:list "user1" "user2" "user3" "user5"
(integer) 1
127.0.0.1:6379> pfcount 14:user:list
(integer) 5

六、geo(地理信息定位)

存储经纬度,计算两地距离,范围计算等

原文地址:https://www.cnblogs.com/libra0920/p/12027962.html