Redis 的过期策略和内存淘汰机制
定期删除,Redis 默认每个 100ms 检查,有过期 Key 则删除。需要说明的是,Redis 不是每个 100ms 将所有的 Key 检查一次,而是随机抽取进行检查。如果只采用定期删除策略,会导致很多 Key 到时间没有删除。于是,惰性删除派上用场。
定时删除有三个要点:
1 每一轮的servetCron都会随机检查过期键,设置了过期时间的键都放在了expires里。默认redis有16个库,这16个库会依次检查,每个库最多随机的检查20个(默认值)
2 检查是保存状态的,也就是说上次检查到了6号库,那么这次就检查7号库
3 删除过期键是有时间限制的,默认1ms 代码 。
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ 1000微妙,也就是1毫秒
采用定期删除+惰性删除就没其他问题了么
不是的,如果定期删除没删除掉 Key。并且你也没及时去请求 Key,也就是说惰性删除也没生效。这样,Redis 的内存会越来越高。那么就应该采用内存淘汰机制。
在 redis.conf 中有一行配置:
# maxmemory-policy volatile-lru
该配置就是配内存淘汰策略的:
- noeviction:当内存不足以容纳新写入数据时,新写入操作会报错。
- allkeys-lru:当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的 Key。(推荐使用,目前项目在用这种)(最近最久使用算法)
- allkeys-random:当内存不足以容纳新写入数据时,在键空间中,随机移除某个 Key。(应该也没人用吧,你不删最少使用 Key,去随机删)
- volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的 Key。这种情况一般是把 Redis 既当缓存,又做持久化存储的时候才用。(不推荐)
- volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个 Key。(依然不推荐)
- volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的 Key 优先移除。(不推荐)
Redis 和数据库双写一致性问题
看我之前写过的延时双删
如何应对缓存穿透和缓存雪崩问题
1 穿透 业务层过滤请求,或者用布隆过滤器判断key是否存在
2 雪崩,失效时间设置为随机值,避免同一时刻失效
7、如何解决 Redis 的并发竞争 Key 问题
这个问题大致就是,同时有多个子系统去 Set 一个 Key。这个时候要注意什么呢?大家基本都是推荐用 Redis 事务机制。
但是我并不推荐使用 Redis 的事务机制。因为我们的生产环境,基本都是 Redis 集群环境,做了数据分片操作。你一个事务中有涉及到多个 Key 操作的时候,这多个 Key 不一定都存储在同一个 redis-server 上。因此,Redis 的事务机制,十分鸡肋。
如果对这个 Key 操作,不要求顺序
这种情况下,准备一个分布式锁,大家去抢锁,抢到锁就做 set 操作即可,比较简单。
如果对这个 Key 操作,要求顺序
假设有一个 key1,系统 A 需要将 key1 设置为 valueA,系统 B 需要将 key1 设置为 valueB,系统 C 需要将 key1 设置为 valueC。
期望按照 key1 的 value 值按照 valueA > valueB > valueC 的顺序变化。这种时候我们在数据写入数据库的时候,需要保存一个时间戳。
假设时间戳如下:
系统 A key 1 {valueA 3:00}
系统 B key 1 {valueB 3:05}
系统 C key 1 {valueC 3:10}
那么,假设系统 B 先抢到锁,将 key1 设置为{valueB 3:05}。接下来系统 A 抢到锁,发现自己的 valueA 的时间戳早于缓存中的时间戳,那就不做 set 操作了,以此类推。其他方法,比如利用队列,将 set 方法变成串行访问也可以。
Pipleline
pipeline并不是redis的命令,但是多种客户端都实现了pipeline,道理也很简单,就是把多次的报文不大的网络io合并成一个,减少网络开销。
比如在jedis中就有pipeline命令
public void pipeCompare() { Jedis redis = new Jedis("192.168.1.111", 6379); redis.auth("12345678");//授权密码 对应redis.conf的requirepass密码 Map<String, String> data = new HashMap<String, String>(); redis.select(8);//使用第8个库 redis.flushDB();//清空第8个库所有数据 // hmset long start = System.currentTimeMillis(); // 直接hmset for (int i = 0; i < 10000; i++) { data.clear(); //清空map data.put("k_" + i, "v_" + i); redis.hmset("key_" + i, data); //循环执行10000条数据插入redis } long end = System.currentTimeMillis(); System.out.println(" 共插入:[" + redis.dbSize() + "]条 .. "); System.out.println("1,未使用PIPE批量设值耗时" + (end - start) / 1000 + "秒.."); redis.select(8); redis.flushDB(); // 使用pipeline hmset Pipeline pipe = redis.pipelined(); start = System.currentTimeMillis(); // for (int i = 0; i < 10000; i++) { data.clear(); data.put("k_" + i, "v_" + i); pipe.hmset("key_" + i, data); //将值封装到PIPE对象,此时并未执行,还停留在客户端 } pipe.sync(); //将封装后的PIPE一次性发给redis end = System.currentTimeMillis(); System.out.println(" PIPE共插入:[" + redis.dbSize() + "]条 .. "); System.out.println("2,使用PIPE批量设值耗时" + (end - start) / 1000 + "秒 ..");