redis 轻松入门

Redis

  • 学习笔记来源于“狂神说课程”

NoSQL概述

为什么使用 NoSql?

大数据时代,一般的数据库无法进行分析处理了

单机MySql 年代,单个数据库完全足够,更多是静态网页,服务器没有太大压力

整个网站瓶颈?

  1. 数据量太大,一个机器放不下

  2. 数据索引(B+Tree),一个机器内存放不下

  3. 访问量(读写混合),一个服务器承受不了

    三种情况之一,就需要升级

Memcached(缓存)+MySql+垂直拆分(读写分离)

发展过程:优化数据结构和索引—>文件缓存(IO)—>Memcached(当时最热门的技术)

缓存能解决读的问题

分库分表,水平拆分(Myswl 集群)

本质解决数据库(读、写问题)

早些年 MylSAM:表锁:查一个数据,整个表锁起来,十分影响效率,高并发出现严重问题

转战Innodb:行锁,只锁这一行

慢慢的就开始做分库分表解决写的问题,推出 MySql 集群,但没有解决问题

最近年代

2010–2020年(定位也是一种数据),MySql 等关系型数据库就不够用了!数据量很大。MySql 存储一些较大文件,博客,图片,数据库压力大,效率低。需要一种数据库来专门处理这种数据。

为什么用 NoSql?

用户个人信息,社交网络,地理位置。用户日志等等爆发增长无法使用关系型数据库。这需要使用 NoSql 数据库,能很好处理以上情况。

什么是 NoSql

NoSql

泛指非关系型数据库(表格、行、列),传统关系数据库很难对付 web2.0时代!

很多数据类型用户的个人信息,社交网络,地理位置。这些数据类型存储不需要一个固定格式。

NoSql 特点

  1. 方便扩展(数据之间没有关系,很好扩展)
  2. 大数据量高性能(Redis 一秒写 8 万次,读取 11 万,NoSql 的缓存记录级,性能比较高)
  3. 数据类型是多样型的(不需要事先设计数据库!随取随用!如果数据库量十分大的表,就比较很难设计了)
  4. 传统 RDBMS 和 NoSqL
传统的 RDBMS
- 组织化组织
- SQL
- 数据和关系都存在单独的表中 (row column)
- 严重的一致性
NoSql
- 不仅仅是数据
- 没有固定的查询语言
- kv 存储,列存储,文档春初,图形数据库(社交关系)
- 最终一致性
- CAP 定理 Consistency(一致性)、Availability(可用性) 、Partition tolerance(分区容错) 和 BASE(异地多活)
- 高性能,高可用,高可扩展性

不同业务使用不同业务源

# 1.商品信息
		名称、价格、商家信息:
		关系型数据库就可以解决了 MySql/Oracle
# 2.商品描述、评论(文字比较多的)
		文档型数据库,MongoDB
# 3.图片
		分布式文件系统 FastDFS
		- 阿里云 oss
		- Hadoop HDFS
# 4.商品关键字
		- 搜索引擎 solr elasticsearch
		- Iserach 多隆阿里巴巴
# 5.商品人的波段信息
		- 内存数据库
		Redis Memache...
# 6.商品的交易,外部的支付接口
		- 三方应用

大型互联网应用问题:

  • 数据类型多
  • 数据源繁多,经常重构
  • 数据要改造,大面积修改

NoSql四大分类

KV 键值对

  • 常用 Redis

文档型数据库(bson 格式和 json 一样)

  • 常用:MongoDB
    • MonhoDB 是一个基于分布式文件存储的数据库,主要处理大量的文档
    • MongoDB 是一个介于关系型数据库和非关系型数据库中中间产品!功能最丰富,最像关系型数据库

列存储数据库

  • HBase
  • 分布式文件系统

图关系数据库

  • Neo4J
  • 社交网络 推荐系统

Redis入门

概述

Redis 是什么?

Redis(Remote Dictionary Server ),即远程字典服务。也被称为结构化数据库,最热门的 NoSql 之一。

是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。

redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis 能干嘛?

  1. 内存存储、持久化,内存中是断电即失,所以说持久化很重要(rdb、aof)
  2. 效率高,可以用于高速缓存
  3. 发布订阅系统
  4. 地图信息分析
  5. 计时器、计数器(浏览量)

特性

  1. 多样的数据类型
  2. 持久化
  3. 集群
  4. 事务

····

Liunx 安装

  1. 下载安装包 redis-6.0.10

  2. 解压 reidis

  3. 进入解压文件,进入 redis 配置文件

  4. 基本的环境安装

    yum install gcc-c++
    make
    make install
    
  5. Reids 默认安装路径 /usr/local/bin

  6. 将 redis 配置文件复制到当前目录下

  7. 默认不是后台启动,修改配置文件

  8. 启动 redis,并连接测试

  9. 查看 reids 进程是否开启

    ps -ef|grep redis
    
  10. 关闭 redis 服务

Redis 测试性能

redis-benchmark 自带测试工具

# 测试性能命令
测试本机:100 个并发,10000 次请求
redis-benchmark -h localhost -p 6379 -c 100 -n 10000

如何查看性能分析

基础知识

  • 一共有 16 个数据库,默认使用第 1 个数据库,每个数据库是独立的

    127.0.0.1:6379> select 2    # 切换数据库
    OK
    127.0.0.1:6379[2]> dbsize   # 查看 2 号数据库的大小
    (integer) 0
    
    
    127.0.0.1:6379[2]> set name redis #设置 kv
    OK
    127.0.0.1:6379[2]> get name # 获取 value
    "redis"
    127.0.0.1:6379[2]> set r redis 
    OK
    127.0.0.1:6379[2]> get r
    "redis"
    127.0.0.1:6379[2]> keys * # 获取当前数据库的所有 key
    1) "r"
    2) "name"
    127.0.0.1:6379[2]> flushdb  # 清除当前数据库
    
    127.0.0.1:6379> flushall  # 清除全部数据库
    OK
    127.0.0.1:6379> keys *
    (empty array)
    

    Redis 是单线程的!

Redis 是基于内存操作的,CPU 不是 Redis 性能瓶颈,Redis 的瓶颈是根据机器内存和网络带宽,既然就可以使用单线程实现。

Redis 是 C 语言写的,官方提供数据100000+的 QPS,完全不比使用 kv 的 Memecache 差

Redis 为什么单线程这么快?

  1. 误区1:高性能的服务器一定是多线程的
  2. 误区 2:多线程(CPU 上下文切换消耗资源)一定比单线程效率高。CPU>内存>硬盘

核心:redis 是将所有数据全部放到内存中的,所以说使用单线程操作就是最高的,多线程(CPU 上下文切换消耗资源,是耗时操作),

对于内存系统来说,没有上下文切换效率就是最高的!多次读写都是在一个 CPU 上的,在内存情况下,就是最佳方案!

五大基本数据类型

Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings)散列(hashes)列表(lists)集合(sets)有序集合(sorted sets) 与范围查询, bitmapshyperloglogs地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication)LUA脚本(Lua scripting)LRU驱动事件(LRU eviction)事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。

Redis-Key

127.0.0.1:6379> set name zs # 设置 key
OK
127.0.0.1:6379> set age 18
OK
127.0.0.1:6379> keys * # 查看当前所有 key
1) "age"
2) "name"
127.0.0.1:6379> exists zs  # 判断当前 key 是否存在
(integer) 0
127.0.0.1:6379> move name 1 # 移动到第 1 个数据库
(integer) 1
127.0.0.1:6379> select 1 # 切换数据库
OK
127.0.0.1:6379[1]> expire name 10 # 设置 key 的过期时间(秒)
(integer) 1
127.0.0.1:6379[1]> ttl name  # 查看当前 key 的剩余时间
(integer) 3
127.0.0.1:6379[1]> get name
(nil)
27.0.0.1:6379[1]> set name redis
OK
127.0.0.1:6379[1]> type name  # c 查看当前 key 的类型
string
127.0.0.1:6379[1]> type age
string
127.0.0.1:6379[1]> del name  # 删除 key
(integer) 1
127.0.0.1:6379[1]> keys *  
1) "age"


---更多命令查看 redis 官网查看

String

127.0.0.1:6379> append name world   # 某个 key 后追加字符串,若该 key 不存在则相当于 set key
(integer) 10
127.0.0.1:6379> get name
"helloworld"
127.0.0.1:6379> strlen name  # 获取 字符串长度
(integer) 10
127.0.0.1:6379> exists name  # 判断某个 key 是否存在
(integer) 1


127.0.0.1:6379> set views 0 
OK
127.0.0.1:6379> incr views    # 自增 1 
(integer) 1
127.0.0.1:6379> get views
"1"
127.0.0.1:6379> incr views
(integer) 2
127.0.0.1:6379> get views
"2"
127.0.0.1:6379> decr views   # 自减 1
(integer) 1
127.0.0.1:6379> decr views
(integer) 0
127.0.0.1:6379> decr views
(integer) -1
127.0.0.1:6379> get views
"-1"


127.0.0.1:6379> incrby view 10  # 可以设置步长,指定增量这里是增加 10
(integer) 10
127.0.0.1:6379> decrby view 5  # 设置减少量减少 5
(integer) 5
127.0.0.1:6379> get view
"5"


# 字符串范围
127.0.0.1:6379> set name "hello,world" # 设置 name
OK
127.0.0.1:6379> get name
"hello,world"
127.0.0.1:6379> getrange name 0 5  # 截取字符串 
"hello,"
127.0.0.1:6379> getrange name 0 -1 # 获取全部的字符串,等同于 get key
"hello,world"
127.0.0.1:6379>

# 替换
27.0.0.1:6379> set key abcdefg
OK
127.0.0.1:6379> setrange key 1 xxx  # 替换指定位置开始的字符串
(integer) 7
127.0.0.1:6379> get key
"axxxefg"

# setex(set with expire) 设置过期时间
# setnx (set if no exist) 不存在设置,存在则设置失败
127.0.0.1:6379> setex key 30 hello  # 设置 key 30s 过期
OK
127.0.0.1:6379> ttl key
(integer) 25
127.0.0.1:6379> setnx name redis # 如果不存在 name 创建
(integer) 1
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> setnx name nosql # 如果存在 name 创建失败
(integer) 0
127.0.0.1:6379> get name
"redis"

#一次设置获取多个值
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3  # 同时设置多个值
OK
127.0.0.1:6379> keys *
1) "k2"
2) "k3"
3) "k1"
127.0.0.1:6379> mget k1 k2 k3 #同时获得多个值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> msetnx k1 v1 k4 v4  # msetnx 是一个原子性操作,要么一起成功,要么一起失败
(integer) 0
127.0.0.1:6379> get k4
(nil)

set user:1{name:zs,age:18} #设置一个 user:1 对象 值为 json 字符串保存一个对象

# 这里的 key 是巧妙设计:user:{id}:{属性},这样设计在 reids 中完全可以
set user:1{name:zs,age:18}
127.0.0.1:6379> set user:1 {name:zs,age:18}
OK
127.0.0.1:6379> get user:1
"{name:zs,age:18}"


127.0.0.1:6379> mset user:1:name zhangsang user:1:age 2
OK
127.0.0.1:6379> keys *
1) "user:1:age"
2) "user:1:name"
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsang"
2) "2"

getset #先 get 后 set
127.0.0.1:6379> getset db redis #如果不存在值,则返回 null,并设置这个值
(nil)
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mongodb #如果存在值,获取原来值,并将原来值设置成新的值
"redis"
127.0.0.1:6379> get db
"mongodb"

String类型使用场景:value 除了字符串还可以是我们的数字

  • 计数器
  • 统计关注量(粉丝数,对象存储)

List

所有 list 命令都是 l 开头的

# 插入元素
127.0.0.1:6379> lpush name 1 2 3 #将一个或多个值插入到列表头部(左)
(integer) 3
127.0.0.1:6379> lrange name 0 2
1) "3"
2) "2"
3) "1"
127.0.0.1:6379> rpush name 4 5 6  # 将一个或多个值插入尾部(右)
(integer) 6
127.0.0.1:6379> lrange name 0 -1
1) "3"
2) "2"
3) "1"
4) "4"
5) "5"
6) "6"


# 移除元素
127.0.0.1:6379> lpop name # 移除 list 的第一个元素
"3"
127.0.0.1:6379> rpop name # 移除 list 的最后一个元素
"6"
127.0.0.1:6379> lrange name 0 -1
1) "2"
2) "1"
3) "4"
4) "5"
#通过下标获取值
127.0.0.1:6379> lindex name 1 #下标从 0 开始
"1"
# 查询 list 的长度
127.0.0.1:6379> llen name
(integer) 4

# 移除指定值
127.0.0.1:6379> lrem name 1 5 # 移除指定 value 移除一个元素 ,指定值5的数 
(integer) 1
127.0.0.1:6379> lrange name 0 -1
1) "2"
2) "1"
3) "4"

#trim 修剪
127.0.0.1:6379> lrange name 0 -1
1) "2"
2) "1"
3) "4"
127.0.0.1:6379> ltrim name 1 2  #通过下标截取指定的长度。list 已经改变,只剩下截取的元素
OK
127.0.0.1:6379> lrange name 0 -1
1) "1"
2) "4"

rpoplpush #移除list 最后一个元素,并移动到新的list中
127.0.0.1:6379> lrange name 0 -1
1) "1"
2) "4"
127.0.0.1:6379> rpoplpush name  key  # 移除 name 中的最右端元素,添加到新的列表中(最左端)
"4" 
127.0.0.1:6379> lrange name 0 -1
1) "1"
127.0.0.1:6379> lrange key 0 -1
1) "4"

#lset 将 list 中指定下标的值替换成另一个值,更新操作
127.0.0.1:6379> exists list  #判断列表是否存在
(integer) 0
127.0.0.1:6379> lpush list value1
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "value1"
127.0.0.1:6379> lset name  0 value  #不存在列表会报错
(error) ERR no such key
127.0.0.1:6379> lrange list 0 0
1) "value1"
127.0.0.1:6379> lset list  0 value  # 存在列表进行替换
OK
127.0.0.1:6379> lrange list 0 0
1) "value"
127.0.0.1:6379>

#linsert 插入值,将某个值插入都某个元素前面或后面
127.0.0.1:6379> lrange name 0 -1
1) "hello"
2) "world"
127.0.0.1:6379> linsert name before hello you  #在 hello 前面插入值
(integer) 3
127.0.0.1:6379> lrange name 0 -1
1) "you"
2) "hello"
3) "world"
127.0.0.1:6379> linsert name after world china  #在 world 后面插入值
(integer) 4
127.0.0.1:6379> lrange name 0 -1
1) "you"
2) "hello"
3) "world"
4) "china"

小结

  • list 实际上是一个链表。before Node after,left,right 都可以插入值
  • 如果 key 不存在,创建新的链表
  • 如果 key 存在,新增内容
  • 如果移除所有值,空链表,也代表不存在
  • 在两边插入或改动值,效率高,中间插入效率低一些

Set(无序、不重复)

127.0.0.1:6379> sadd name zs  # set 集合中添加元素
(integer) 1
127.0.0.1:6379> sadd name ls
(integer) 1
127.0.0.1:6379> sadd name ww
(integer) 1
127.0.0.1:6379> smembers name # 查看指定 set 的所有值
1) "ls"
2) "ww"
3) "zs"
127.0.0.1:6379> sismember name zs # 判断一个值是不是在 set 集合中存在
(integer) 1
127.0.0.1:6379> sismember name wl
(integer) 0
127.0.0.1:6379> scard name #获取 set 集合中元素个数
(integer) 3

127.0.0.1:6379> srem name ww # 移除元素
(integer) 1
127.0.0.1:6379> scard name
(integer) 2
127.0.0.1:6379> smembers name
1) "ls"
2) "zs"

#随机抽取元素
127.0.0.1:6379> srandmember name  #随机抽选出一个元素
"ls"
127.0.0.1:6379> srandmember name
"ww"
127.0.0.1:6379> srandmember name 2 #随机抽选出指定个数元素
1) "ww"
2) "zs"
127.0.0.1:6379> srandmember name 2
1) "ls"
2) "zs"

#随机删除元素
127.0.0.1:6379> spop name
"ww"
127.0.0.1:6379> spop name
"zs"
127.0.0.1:6379> smembers name
1) "ls"

#移动指定元素
127.0.0.1:6379> sadd name1 ww
(integer) 1
127.0.0.1:6379> sadd name1 zs
(integer) 1
127.0.0.1:6379> smove name1 name ww  # 将一个指定元素的值,移动到另一个 set 集合
(integer) 1
127.0.0.1:6379> smembers name
1) "ls"
2) "ww"

#数字集合
127.0.0.1:6379> sadd key1 a b c d
(integer) 4
127.0.0.1:6379> smembers key1
1) "d"
2) "b"
3) "a"
4) "c"
127.0.0.1:6379> sadd key2 c d e f
(integer) 4
127.0.0.1:6379> smembers key2
1) "d"
2) "f"
3) "e"
4) "c"
127.0.0.1:6379> sdiff key1 key2 #差集
1) "b"
2) "a"
127.0.0.1:6379> sinter key1 key2 #交集 共同好友就可以这样实现
1) "d"
2) "c"
127.0.0.1:6379> sunion key1 key2 #并集
1) "a"
2) "c"
3) "e"
4) "d"
5) "b"
6) "f"

比如:微博,A 用户将所有关注和的人放在 set 集合中!将它的粉丝也放在一个集合(id 是不同的)
			共同关注,推荐好友

Hash

Map 集合,key-map<>,值是 map 集合,本质和 Syring 类型没有太大区别,只是 value 是由两个值组成

127.0.0.1:6379> hset hash key value  #set 一个具体的 kv
(integer) 1
127.0.0.1:6379> hget hash key
"value"
127.0.0.1:6379> hmset hash key hello key1 world #同时 set 多个 kv,key 相同时原来值会替换
(integer) 1
127.0.0.1:6379> hmget hash key key1  #同时 get 多个 value
"hello"
"world"
127.0.0.1:6379> hgetall hash #获取所有 kv
1) "key"
2) "hello"
3) "key1"
4) "world"

127.0.0.1:6379> hdel  hash key #删除 hash 指定的 key,则对应 value 也会删除
(integer) 1

127.0.0.1:6379> hlen hash   # 获取哈希的长度
(integer) 1

127.0.0.1:6379> hexists hash key  # 判断 hash 指定字段是否存在
(integer) 1
127.0.0.1:6379> hexists hash key2
(integer) 0

#只获得所有的 key
127.0.0.1:6379> hkeys hash
1) "key1"
2) "key"
#获得所有的值
127.0.0.1:6379> hvals hash
1) "world1"
2) "hello"

#自增自减
127.0.0.1:6379> hincrby hash key2 1
(integer) 11
127.0.0.1:6379> hincrby hash key2 -1
(integer) 10
127.0.0.1:6379> hsetnx hash key3 5 	#如果不存在可以设置
(integer) 1
127.0.0.1:6379> hset hash key3  6 	#如果存在不可以设置
(integer) 0

#应用变更数据 user name 和 age,尤其是用户信息之类的,经常变动信息,更适合用 hash 存储 ,String 更加适合字符串
127.0.0.1:6379> hset user:1 name zs age 18
(integer) 2
127.0.0.1:6379> hget user:1 age
"18"
127.0.0.1:6379> hget user:1 name
"zs"
127.0.0.1:6379> hgetall user:1
1) "name"
2) "zs"
3) "age"
4) "18"

Zset(有序集合)

在 set 的基础上增加了一个值

127.0.0.1:6379> zadd name 1 one  # 添加一个值
(integer) 1
127.0.0.1:6379> zadd name 2 two 3 three # 添加多个值
(integer) 2
127.0.0.1:6379> zrange name 0 -1
1) "one"
2) "two"
3) "three"

# 排序
27.0.0.1:6379> zadd salary 8000 zs
(integer) 1
127.0.0.1:6379> zadd salary 800 ls
(integer) 1
127.0.0.1:6379> zadd salary 1600 ww
(integer) 1
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf  #显示全部用户从小到大
1) "ls"
2) "ww"
3) "zs"
127.0.0.1:6379> ZrevRANGE salary 0 -1  #从大到小进行排序
1) "ww"
2) "ls"
127.0.0.1:6379> ZrevRANGE salary 0 -1 withscores
1) "ww"
2) "1600"
3) "ls"
4) "800"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores #显示全部用户从小到大并且附带有序集合值
1) "ls"
2) "800"
3) "ww"
4) "1600"
5) "zs"
6) "8000"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf 1600 withscores #显示工资小于 2500 的升序排列
1) "ls"
2) "800"
3) "ww"
4) "1600"

#移除元素
127.0.0.1:6379> zrem salary zs  #移除指定元素
(integer) 1
127.0.0.1:6379> zrange salary 0 -1 
1) "ls"
2) "ww"

#查看有多少元素
127.0.0.1:6379> zcard salary #获取集合中的个数
(integer) 2

#获取制定区间成员数量
127.0.0.1:6379> zcount salary 800 8000
(integer) 2

案例思路:

  • set 排序 存储班级成绩,工资表排序
  • 排行榜应用实现,权重排名

三种特殊数据类型

geospatial 地理位置

Redis 的 Geo 在 3.2 版本推出

getadd

# 添加地理位置
#地球两极:无法直接添加,我们一般会下载城市数据,直接通过 java 程序程序一次性导入
# 参数 key 值(经度(-180,+180)、纬度(-85,+85)、名称)
127.0.0.1:6379> geoadd china:city 116.40 39.90 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1
127.0.0.1:6379> geoadd china:city 106.50 29.53  chongqing
(integer) 1
127.0.0.1:6379> geoadd china:city  114.05 22.52 shenzhen 120.16 30.24 hangzhou 108.96 34.26 xian
(integer) 3

Geopos

127.0.0.1:6379> geopos china:city beijing #获取制定的城市经度和维度
1) 1) "116.39999896287918091"
   2) "39.90000009167092543"
127.0.0.1:6379> geopos china:city shanghai
1) 1) "121.47000163793563843"
   2) "31.22999903975783553"

Geodist

#返回两个指定位置的直线距离
127.0.0.1:6379> geodist china:city beijing shanghai #查看北京到上海的直线距离
"1067378.7564"
127.0.0.1:6379> geodist china:city beijing shanghai km
"1067.3788"
127.0.0.1:6379>

Georadius

附近的人?(获得所有附近的人的地址,定位)通过半径来查询

所有的数据都录入,才能使结果更加清晰准确

127.0.0.1:6379> georadius china:city 110 30 1000 km  #以经度 110 和维度 30为中心,1000km 的城市
1) "chongqing"
2) "xian"
3) "shenzhen"
4) "hangzhou"
127.0.0.1:6379> georadius china:city 110 30 500 km
1) "chongqing"
2) "xian"
127.0.0.1:6379> georadius china:city 110 30 100 km
(empty array)
127.0.0.1:6379> georadius china:city 110 30 500 km withcoord withdist #显示到中心的经度维度和距离
1) 1) "chongqing"
   2) "341.9374"
   3) 1) "106.49999767541885376"
      2) "29.52999957900659211"
2) 1) "xian"
   2) "483.8340"
   3) 1) "108.96000176668167114"
      2) "34.25999964418929977"
127.0.0.1:6379> georadius china:city 110 30 500 km withcoord withdist count 1 #筛选指定的结果
1) 1) "chongqing"
   2) "341.9374"
   3) 1) "106.49999767541885376"
      2) "29.52999957900659211"

Georadiusbymember

#找出位于指定元素周围的其他元素
127.0.0.1:6379> georadiusbymember china:city beijing 1000 km
1) "beijing"
2) "xian"
127.0.0.1:6379> georadiusbymember china:city shanghai 400 km
1) "hangzhou"
2) "shanghai"

Geohash

#将二维的经纬度转换为一维字符串,如两个字符串越接近,name 距离越近
127.0.0.1:6379> geohash china:city beijing chongqing
1) "wx4fbxxfke0"
2) "wm5xzrybty0"

Geo 底层实现原理:其实就是 Zset,我们可以用 Zset 操作 geo

127.0.0.1:6379> zrange china:city 0 -1 #查看地图所有元素
1) "chongqing"
2) "xian"
3) "shenzhen"
4) "hangzhou"
5) "shanghai"
6) "beijing"
127.0.0.1:6379> zrem china:city xian #移除指定元素
(integer) 1
127.0.0.1:6379> zrange china:city 0 -1 
1) "chongqing"
2) "shenzhen"
3) "hangzhou"
4) "shanghai"
5) "beijing"

Hyperloglog

什么是基数?

基数:一个集合中不重复元素的个数

A {1,3,5,7,8,7}

B {1,3,5,7,8}

基数=5

简介

2.8.9版本就更新Hyperloglog数据结构

Redis Hyperloglog 基数统计的算法

优点:占用内存是固定的,2^64不同元素的基数,只需要使用 12kb 内存,要从内存角度比较的话Hyperloglog会是首选,错误率0.81%,统计uv 任务,可以忽略。若不许容错不能使用

网页 UV(一个人访问一个网站多次,但是只算做一个 UV)

传统方式,set 保存用户的 id,然后就可以统计 set 元素数量作为标准判断!

这个分数如果保存大量的 id 会比较麻烦,我们的目的不是为了保存 id,而是基数

测试使用

127.0.0.1:6379> pfadd key a a b c d e f g h i j #创建第一组元素
(integer) 1
127.0.0.1:6379> PFCOUNT key  #统计 key 的基数数量
(integer) 11
127.0.0.1:6379> pfadd key1  h j z k a b u o   
(integer) 1
127.0.0.1:6379> pfcount key1
(integer) 8
127.0.0.1:6379> pfmerge key2 key key1 # 合并两组到 key2(并集)
OK
127.0.0.1:6379> pfcount key2 #查看并集数量
(integer) 14

Bitmap

位存储

统计用户信息,活跃,不活跃。打卡,两个状态都可以使用 Bitmaps!

Bitmaps 位图,数据结构!都是操作二进制位来进行记录,就只有 0 和 1 两个状态!

365 天=365bit, 1 字节=8 bit ,46 个字节左右

使用 bitmap 记录一周打卡情况,1 代表打卡
127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 0
(integer) 0
127.0.0.1:6379> setbit sign 2 0
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 1
(integer) 0
127.0.0.1:6379> setbit sign 5 0
(integer) 0
127.0.0.1:6379> setbit sign 6 0
(integer) 0
127.0.0.1:6379> getbit sign 3  # 查看某一天是否打卡
(integer) 1
127.0.0.1:6379> bitcount sign  # 统计打卡天数
(integer) 3

事务

Redis 事务本质:一组命令的集合!一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行(谁先在队里先执行)!

一次性、顺序性、排他性(事务在执行过程中不能被干扰),来执行一系列命令

-----队列 set set set 执行----

Redis事务没有隔离级别的概念

所有的命令在事务中,并没有执行!只有发起执行命令的时候才会执行!Exec

Redis 单条命令是保证原子性的,但是事务不保证原子性

redis 事务:

  • 开启事务(multi)
  • 命令入队(...)
  • 执行事务(exec)

正常执行事务

127.0.0.1:6379> multi  #开启事务
OK
# 命令入队
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> exec #执行事务
1) OK
2) OK
3) "v2"
4) OK
#队列结束

放弃事务

27.0.0.1:6379> set k1 v1  # 开启事务 
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> discard  # 取消事务
OK
127.0.0.1:6379> get k4   # 事务队列中命令都不会执行
(nil)

编译型异常(代码有问题,命令有错误,事务中所有命令都不会执行)

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> getset k1 #错误命令
(error) ERR wrong number of arguments for 'getset' command
127.0.0.1:6379> set k5 v5
QUEUED
127.0.0.1:6379> exec  #执行报错
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k5 #所有命令都没有执行
(nil)

运行时异常(1/0),如果事务队列存在语法性错误,那么执行命令时,其他命令可以正常执行,错误命令抛出异常

127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr k1  #会执行失败
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> get k3
QUEUED
127.0.0.1:6379> exec
1) (error) ERR value is not an integer or out of range  #虽然第一条命令报错了,但是其他依旧正常执行了成了
2) OK
3) OK
4) "v3"
127.0.0.1:6379> get k2
"v2"

监控!watch(面试常问)

悲观锁:

  • 很悲观,认为什么时候都会出问题,无论做什么都加锁!影响性能

乐观锁:

  • 很乐观,认为什么时候都不会出现问题,所以不会上锁!更新数据时候去判断一下在此期间是否有人修改过这个数据。加一个 version 字段(获取 version,更新时比较 version,正确才提交)

Redis监视测试

  • 正常执行
127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money  #监控 money 对象
OK
127.0.0.1:6379> multi 
OK
127.0.0.1:6379> decrby money 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379> exec # 事务正常结束,数据期间没有发生变动,这个时候就正常执行成功
1) (integer) 80
2) (integer) 20
  • 测试多线程修改值,使用 watch 来监视我们需要修改的值,若监视的值发生变化,则会导致执行失败
127.0.0.1:6379> watch money  #监视 money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby money 10
QUEUED
127.0.0.1:6379> incrby out 10
QUEUED
127.0.0.1:6379> exec # 执行之前,另一个线程,修改我们的值,这个时候,就会导致事务执行失败
(nil)

127.0.0.1:6379> get money
"80"
127.0.0.1:6379> set money 1000
OK
127.0.0.1:6379> unwatch  #如果发现事务执行失败,先解锁
OK
127.0.0.1:6379> watch money #获取最新的值,再次监视
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby money 10
QUEUED
127.0.0.1:6379> incrby out 10
QUEUED
127.0.0.1:6379> exec  # watch 对比监视的值是否发生变化,没有变化则执行成功,失败的话重复上述动作(解锁—监视-比较 id)
1) (integer) 990
2) (integer) 30

Jedis

什么是 Jedis?是官方推荐的 java 连接工具!使用 java 操作 Redis 中间件!,如果使用 java 操作 redis,name 一定要对 jedis 十分熟悉!

测试

  1. 导入对应依赖
<!--    导入 jedis 的包-->
    <dependencies>
        <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.2.0</version>
        </dependency>

        <!--    fastjson json 转换-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
  1. 测试 redis
import redis.clients.jedis.Jedis;

public class TestPing {
    public static void main(String[] args) {
        //创建 jedis 对象
        Jedis jedis = new Jedis("ip地址",6379);
        //jedis 所有的命令就是之前指令
        System.out.println(jedis.ping()); //输出 pong 即连接成功
    }

}

这里使用阿里云虚拟机连接需要修改几个地方

  • redis.config

    设置为: # bind127.0.0.1

    设置为: no protected-mode no

    重启 redis

  • 阿里云的防火墙,确认 redis 端口已经设置

常用 API

String

List

Set

Hash

Zset

所有命令都上面的指令一毛一样

事务

public class TestTx {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        jedis.flushDB();
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("hello", "world");
        jsonObject.put("name", "redis");
        //开启事务
        Transaction multi = jedis.multi();
        String string = jsonObject.toString();
        try {
            multi.set("user1", string);
            multi.set("user2", string);

            int i = 1 / 0;//代码抛出异常,执行失败
            multi.exec();//执行事务
        } catch (Exception e) {
            multi.discard();//放弃事务
            e.printStackTrace();
        } finally {
            System.out.println(jedis.get("user1"));
            System.out.println(jedis.get("user2"));
            jedis.close();//关闭连接
        }

    }
}

SpringBoot 整合

在 SpringBoot 2.x后,原使用的 redis被替换成了lettuce

jedis:采用直连,多线程操作的话是,不安全的,如果要避免不安全的,使用 jedis pool 连接池!更像BIO(同步阻塞的)

lettuce:采用 netty,实例可以在多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据,更像 NIO 模式(同步非阻塞)

BIO(同步阻塞):每一个IO请求都会有一个线程去处理,如果数据没有准备就绪,线程会一直等待。直到数据读取完毕线程才会释放,在此期间,进程不回去做任何其他任务,这种模式会浪费一定的线程资源。

NIO(同步非阻塞):NIO的优点在于首先基于缓存读写文件,能够批量操作,然后用channel双向读写数据,减少每次打开断开流的资源消耗。引入selecore的概念,用一个线程管理多个通道,大大减少线程开销。

源码分析:

 @Bean
    @ConditionalOnMissingBean( //不存在这个 bean 才生效,我们自己配置了 redisTemplate 这个就失效
        name = {"redisTemplate"}//我们可以自己定义redisTemplate 来替换这个默认的
    )
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
      //默认 RedisTemplate 没有过多的设置,redis 对象都是需要序列化的
      //两个泛型都是 object,object 的类型,我们后面使用都需要强制类型转换<String,Object>
        RedisTemplate<Object, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

 @Bean
    @ConditionalOnMissingBean //由于 string 是 redis 中最常使用的类型,所以说单独提出来一个 bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

整合测试

  1. 导入依赖
!--        操作 redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
        </dependency>
  1. 配置连接
#配置 redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
  1. 连接测试
@SpringBootTest
class RedisSpringbootApplicationTests {

    @Autowired
    private RedisTemplate redisTemplate;

    @Test
    void contextLoads() {

        //操作不同数据类型
        //opsForValue 操作字符串,类似 string
        //opsForList() 类似 list ....

        //除了基本操作,常用的方法都可以直接使用 redisTemplate操作,比如基本的事务和 CRUD
        //获取 redis 连接对象
        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        connection.flushAll();

        redisTemplate.opsForValue().set("k1", "v1");
        System.out.println(redisTemplate.opsForValue().get("k1"));
    }
}

关于值的序列化

  • 默认是 jdk 序列化,当使用中文会出现转义问题,需要修改序列话器

直接传递对象的话需要序列化否则会报错

@Test
    void test() throws JsonProcessingException {
        //真实开发,一般用 json 来传递对象
        User user = new User("张三", 3);//未实现系列化会报错(如上)
        //转换成 json 对象
        String jsonUser = new ObjectMapper().writeValueAsString(user);
        redisTemplate.opsForValue().set("user",jsonUser);
        System.out.println(redisTemplate.opsForValue().get("user"));
    }

编写自己的 redisTemplate模板

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.net.UnknownHostException;

@Configuration
public class RedisConfig {

    //编写自己的 redisTemplate
    @Bean
    @SuppressWarnings("all") //告诉编译器忽略指定的警告,不用在编译完成后出现警告信息。
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        //一般使用<String, Object>
        RedisTemplate<String, Object> template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);

        //json 序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        //string序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        //key 采用 String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //hash 采用 String 的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        //value序列化方式采用 json
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //Hash value序列化方式采用 json
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        return template;
    }
}

配置 reidsUtils

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;


/**
 *
 * @author yangaliang
 * 基于spring和redis的redisTemplate工具类
 * 针对所有的hash 都是以h开头的方法
 * 针对所有的Set 都是以s开头的方法
 * 针对所有的List 都是以l开头的方法
 */
@Component
public class RedisUtils{

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    //=============================common============================
    /**
     * 指定缓存失效时间
     * @param key 键
     * @param time 时间(秒)
     * @return
     */
    public boolean expire(String key,long time){
        try {
            if(time>0){
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key){
        return redisTemplate.getExpire(key,TimeUnit.SECONDS);
    }

    /**
     * 判断key是否存在
     * @param key 键
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key){
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除缓存
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String ... key){
        if(key!=null&&key.length>0){
            if(key.length==1){
                redisTemplate.delete(key[0]);
            }else{
                redisTemplate.delete(CollectionUtils.arrayToList(key));
            }
        }
    }

    //============================String=============================
    /**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */
    public Object get(String key){
        return key==null?null:redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     * @param key 键
     * @param value 值
     * @return true成功 false失败
     */
    public boolean set(String key,Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }

    /**
     * 普通缓存放入并设置时间
     * @param key 键
     * @param value 值
     * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public boolean set(String key,Object value,long time){
        try {
            if(time>0){
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            }else{
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 递增
     * @param key 键
     * @param delta 要增加几(大于0)
     * @return
     */
    public long incr(String key, long delta){
        if(delta<0){
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     * @param key 键
     * @param delta 要减少几(小于0)
     * @return
     */
    public long decr(String key, long delta){
        if(delta<0){
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    //================================Map=================================
    /**
     * HashGet
     * @param key 键 不能为null
     * @param item 项 不能为null
     * @return 值
     */
    public Object hget(String key,String item){
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     * @param key 键
     * @return 对应的多个键值
     */
    public Map<Object,Object> hmget(String key){
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     * @param key 键
     * @param map 对应多个键值
     * @return true 成功 false 失败
     */
    public boolean hmset(String key, Map<String,Object> map){
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * HashSet 并设置时间
     * @param key 键
     * @param map 对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    public boolean hmset(String key, Map<String,Object> map, long time){
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if(time>0){
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     * @param key 键
     * @param item 项
     * @param value 值
     * @return true 成功 false失败
     */
    public boolean hset(String key,String item,Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     * @param key 键
     * @param item 项
     * @param value 值
     * @param time 时间(秒)  注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key,String item,Object value,long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if(time>0){
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除hash表中的值
     * @param key 键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item){
        redisTemplate.opsForHash().delete(key,item);
    }

    /**
     * 判断hash表中是否有该项的值
     * @param key 键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item){
        return redisTemplate.opsForHash().hasKey(key, item);
    }

    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     * @param key 键
     * @param item 项
     * @param by 要增加几(大于0)
     * @return
     */
    public double hincr(String key, String item,double by){
        return redisTemplate.opsForHash().increment(key, item, by);
    }

    /**
     * hash递减
     * @param key 键
     * @param item 项
     * @param by 要减少记(小于0)
     * @return
     */
    public double hdecr(String key, String item,double by){
        return redisTemplate.opsForHash().increment(key, item,-by);
    }

    //============================set=============================
    /**
     * 根据key获取Set中的所有值
     * @param key 键
     * @return
     */
    public Set<Object> sGet(String key){
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 根据value从一个set中查询,是否存在
     * @param key 键
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key,Object value){
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将数据放入set缓存
     * @param key 键
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object...values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 将set数据放入缓存
     * @param key 键
     * @param time 时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key,long time,Object...values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if(time>0) expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取set缓存的长度
     * @param key 键
     * @return
     */
    public long sGetSetSize(String key){
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值为value的
     * @param key 键
     * @param values 值 可以是多个
     * @return 移除的个数
     */
    public long setRemove(String key, Object ...values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
    //===============================list=================================

    /**
     * 获取list缓存的内容
     * @param key 键
     * @param start 开始
     * @param end 结束  0 到 -1代表所有值
     * @return
     */
    public List<Object> lGet(String key,long start, long end){
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     * @param key 键
     * @return
     */
    public long lGetListSize(String key){
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     * @param key 键
     * @param index 索引  index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     * @return
     */
    public Object lGetIndex(String key,long index){
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @return
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @param time 时间(秒)
     * @return
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0) expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @param time 时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     * @param key 键
     * @param value 值
     * @param time 时间(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0) expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     * @param key 键
     * @param index 索引
     * @param value 值
     * @return
     */
    public boolean lUpdateIndex(String key, long index,Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     * @param key 键
     * @param count 移除多少个
     * @param value 值
     * @return 移除的个数
     */
    public long lRemove(String key,long count,Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}

Redis.conf 详解

启动时就需要通过配置文件启动的

单位

  1. 配置文件 unit 单位对大小写不敏感

包含

image-20210116132624965

就是好比

网络

bind 127.0.0.1  #绑定的 ip
protected-mode no  #保护模式(远程关闭)
port 6379  #端口设置

通用 GENERAL

daemonize yes  #以守护进程的方式运行,默认是 no,需要改为 yes,否则退出进程结束

pidfile /var/run/redis_6379.pid  #如果以后台的方式运行就要指定一个 pid 文件

# 日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing) #测试开发阶段
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)  #生产环境级别
# warning (only very important / critical messages are logged)
loglevel notice  # 默认级别

logfile "" #日志的文件位置名

datasource 16  #数据库数量,默认 16 个数据库

always-show-logo yes #是否显示 logo

快照 SNAPSHOTTING

持久化,在规定时间内,执行多少次操作,则会持久化到文件.rdb ,aof

Redis 是内存数据库,如果没有持久化,name 数据断电就丢失

# 900s ,如果至少 1 个 key 进行修改,我们就进行持久化操作
save 900 1  
# 300s ,如果至少 10 个 key 进行修改,我们就进行持久化操作
save 300 10
# 60s ,如果至少 10000 个 key 进行修改,我们就进行持久化操作
save 60 10000
# 我们之后学习持久化,也可以自己定义测试

stop-writes-on-bgsave-error yes  # 持久化如果出错,还需要继续工作

rdbcompression yes    # 是否压缩 rdb 文件(持久化文件),这时需要消耗一些 CPU 资源

rdbchecksum yes  # 保存 rdb 文件的时候,进行错误的检查校验

dir ./  # rdb文件保存目录

复制 REPLICATION

后面讲解

安全 SECURITY

可以在配置文件设置密码或者通过命令设置

image-20210116135121452

  • 通过命令设置密码
PONG
127.0.0.1:6379> config get requirepass # 获取密码
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass "密码"  #设置密码
127.0.0.1:6379> auth "密码"  #登录

限制CLIENTS

maxclients 10000  #设置能连接上 redis 的最大客户端的数量

maxmemory <bytes>  # redis配置最大的内存容量

maxmemory-policy noeviction  # 内存到达上限之后除了策略

# 内存策略
1、volatile-lru:只对设置了过期时间的key进行LRU(默认值) 

2、allkeys-lru : 删除lru算法的key   

3、volatile-random:随机删除即将过期key   

4、allkeys-random:随机删除   

5、volatile-ttl : 删除即将过期的   

6、noeviction : 永不过期,返回错误

APPEND ONLY MODE 模式 aof 配置

appendonly no  # 默认不开腔 aof 模式,默认使用 rdb 方式持久化,大部分情况下,rdb 完全够用
appendfilename "appendonly.aof"   # 持久化文件名

# appendfsync always	 # 每次修改都会 sysc,消耗性能
appendfsync everysec   # 每秒同步一下,可能丢失这一秒数据
# appendfsync no       # 不执行 sysc,这个时候操作系统自动同步数据,速度最快

Redis 持久化

Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供持久化功能!

RDB(Redis DataBase)

什么是 RDB

在主从复制,rdb 就是备用

image-20210116175453209

RDB方式原理:当redis需要做持久化时(执行SAVA或者BGSAVA命令,或者是达到配置条件时执行),redis会fork一个子进程,子进程将数据写到磁盘上一个临时RDB文件中,当子进程完成写临时文件后,将原来的RDB替换掉(默认文件名为dump.rdb)

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是所说的 Snapshot 快照,它恢复时是将快照文件直接读到内存里。

Redis 会单独创建(fork)一个子进程来进行持久化,会将数据写入到一个临时文件中,带持久化过程都结束了,再用这个临时文件替换上次持久化的文件。整个过程中,主进程是不进行任何 IO 操作的。这就是确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,哪 RDB 方式要比 AOF 方式更加高效。RDB 的缺点是最后一次持久化的数据可能丢失。默认就是 RDB,一般情况下不需要修改配置。

有时候在生产环境我们会将这个文件进行备份

rdb 保存的文件是 dump.rdb 都是我们配置文件快照中进行配置

image-20210116172304719

RDB触发机制

  1. save 的规则满足情况下,会触发 rdb规则
  2. 执行 flushall 命令,也会触发我们的 rdb 规则
  3. 退出 redis,也会产生 rdb 文件

备份就会自动生成一个 dump.rdbweb文件

image-20210116173736321

如果恢复 rdb 文件

  1. 只需要将 rdb 文件放在 redis 启动目录就可以,redis 启动的时候会自动检查 dump.rdb 恢复其中的数据
  2. 查看需要存在的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin"  # 如果在这个目录下存在 dump.rdb 文件,启动就会自动恢复其中的数据

rdb 的优缺点

优点:

  1. 适合大规模的数据恢复
  2. 如果你对数据完整性不高可以使用

缺点:

  1. 需要一定时间间隔进程操作!redis 意外宕机,最后一次修改数据就没了
  2. fork 进程时,会占用一定的内存空间

AOF(Append Only File)

将所有命令都记录下来,history 文件,恢复的话都执行一遍

image-20210116211537356

AOF方式原理:AOF就可以做到全程持久化,Redis每执行一个修改数据的命令,都会把这个命令添加到AOF文件中,当Redis重启时,将会读取AOF文件进行“重放”以恢复到 Redis关闭前的最后时刻。

以日志的形式来记录每个写操作,将 Redis 执行过的所有指令记录下来(读操作不记录),只需追加文件但不可以改写文件,redis 启动之处会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后只需一次以完成数据的恢复工作

Aof 保存的是 appendonly.aof 文件

默认是不开启的需要我们手动进行配置,我们只需要将 appendonly 改为 yes 就可以开启 aof!

重启 redis 就可以生效生成了 appendonly.aof。如果 aof 文件有错误,redis 是启动不起来的,我们需要修复aof 文件

redis 给我们提供了一个工具 redis-check-aof --fix appendonly.aof

如果文件正常重启就可以直接恢复了

重写规则说明

aof 默认就是文件的无限追加,文件会越来越大

image-20210116205444688

如果 aof 文件大于 64m,太大了!fork 一个新的进程来把我们文件重写

aof 优缺点

appendonly no  # 默认不开腔 aof 模式,默认使用 rdb 方式持久化,大部分情况下,rdb 完全够用
appendfilename "appendonly.aof"   # 持久化文件名

# appendfsync always	 # 每次修改都会 sysc,消耗性能
appendfsync everysec   # 每秒同步一下,可能丢失这一秒数据
# appendfsync no       # 不执行 sysc,这个时候操作系统自动同步数据,速度最快

优点:

  1. 每一次修改都同步,文件的完整性会更加好
  2. 每秒同步一次,可能会丢失一秒数据

缺点:

  1. 相对于数据文件来说,aof 远远大于 rdb,修复速度也比 rdb 慢

  2. Aof 运行效率也要比 rdb 慢,所以redis 默认的配置是 rdb 持久化

扩展:

  1. RDB 持久化方式能够在指定的时间间隔内对数据进行存储
  2. AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始数据,AOF 命令以 redis 协议追加保存每次写的操作到文件末尾,redis 还能对 AOF 文件进行后台重写,使得 AOF 文件的体积不至于过大。
  3. 只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化
  4. 同时开启两种持久化
  • 在这种情况下,当 redis 重启时会优先载入 AOF 文件来恢复原始数据,通常情况下 AOF 文件保存的数据集要比 RDB 文件保存的数据集完整。
  • RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF,但不建议只使用 AOF,因为 RDB 更适合用于备份数据库(AOF 在不断变化不好备份),快速重启,而且不会有 AOF可能存在的 bug。
  1. 性能建议
  • 因为 RDB 文件只用作后备用途,建议只在 slave 上持久化 RDB 文件,而且只要 15 分钟备份一次就够了,只保留 save900 1 条规则
  • 如果使用 AOF ,好处最恶劣环境下只会丢失 不超过 2 秒数据,启动脚本较简单只启动 AOF 文件就可以。代价是持续的 IO,二是 AOF 的重写过程中会产生新数据到新文件造成阻塞。只要硬盘允许,尽量减少重写。

Redis 发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。微信、微博、关注系统!

Redis 客户端可以订阅任意数量频道

订阅发布消息:

第一个:消息发送者,第二个:频道,第三个:订阅者

命令

image-20210116215305682

测试

订阅端

127.0.0.1:6379> SUBSCRIBE xiaoshuo   # 订阅一个频道 xiaoshuo
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "xiaoshuo"
3) (integer) 1
# 等待消息
1) "message"  #消息
2) "xiaoshuo" #那个频道的消息
3) "hello,world" #消息内容
1) "message"
2) "xiaoshuo"
3) "hello,redis"

发送端:

127.0.0.1:6379> publish xiaoshuo hello,world  #发布者发布消息到频道
(integer) 1
127.0.0.1:6379> publish xiaoshuo hello,redis
(integer) 1

Java代码简单实现

  1. 依赖
<dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
</dependency>
  1. 配置类
// 配置管道
public class TestChannel extends JedisPubSub {

    @Override
    public void onMessage(String channel, String message) {
        super.onMessage(channel, message);
        System.out.println(message);
    }

}
// 订阅者
public class SubscribeTest {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("121.196.111.201",6379);
        TestChannel tc = new TestChannel();
        jedis.subscribe(tc,"A");
    }
}
// 发送者
public class PublishTest {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("121.196.111.201",6379);
        jedis.publish("A", "你好");
        jedis.publish("A", "字啊么");
    }
}

原理

image-20210116214925594

使用场景:

  1. 实时消息系统
  2. 实时聊天(频道当做聊天室,将信息回显所有人即可)

主从复制

概念

主从复制,是将一台 Redis 服务器的数据,复制到其他的 Redis 服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Mater以写为主,Slave 以读为主。

默认情况下,每台 Redis 服务器都是主节点;

且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点

主从复制的作用主要包括:

  1. 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式
  2. 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余
  3. 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写 Redis 数据时应用连接主节点,读 Redis 数据时应用连接从节点),分担服务器的负载;尤其是在写少读多的场景下,通过多个节点分担负载,可以大大提高 Redis 服务器的并发量。
  4. 高可用(集群)基石:除了上述作用外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是 Redis 高可用的基础。

一般来说,要将 Redis 运用于工程项目中,只使用一台 Redis 是万万不能(宕机):

  1. 从结构来说,单个 Redis 服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大
  2. 从容量上,单个 Redis 服务器内存容量有限,一台服务器容量 256G,但单一般单台 Redis 最大内存不超 20G

image-20210116230328101

主从复制,读写分离!80%的情况下都进行读操作,减缓服务器的压力,至少一主二从!

环境配置

只配置从库,不配置主库

127.0.0.1:6379> info replication   #查看当前库信息
# Replication
role:master  # 角色 mater
connected_slaves:0   # 没有从机
master_replid:380b945eae04c35ac909d8aed62020778d0bf8ac
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

集群模拟

  1. 端口

  2. daemonize yes(后台运行)

  3. 后台运行的指定文件 pid

  4. 日志文件要命名

  5. rdb 文件名

修改完毕启动 3 个 redis 服务,这时候每一台都是主节点,因为 redis 默认是主

image-20210116232228875

一主(79)二从(80,81)

默认情况下,每台 reids 服务器都是主节点,一般情况下只用配置从机就好了

#从机配置(6380,6381)
127.0.0.1:6380> SLAVEOF 127.0.0.1 6379  #从机配置
OK
127.0.0.1:6380> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379

#主机查询主从信息
127.0.0.1:6379> info replication
# Replication
role:master  				 # 主
connected_slaves:2   # 2 从
slave0:ip=127.0.0.1,port=6380,state=online,offset=112,lag=0
slave1:ip=127.0.0.1,port=6381,state=online,offset=112,lag=1

真实的主从配置应该在配置文件中配置,这样的是永久的;现在用命令是暂时的

image-20210116233305919

细节

主机负责写,从机负责读!主机中的所有信息和数据都会自动被从机保存

image-20210116233521177

image-20210116233555860

测试:主机断开连接,从机依旧连接主机,但是没有写操作,但是主机回来了,从机依旧可以直接获取主机写的信息

如果使用命令行配置的主从,这个时候如果重启了,从机就会变成主机,无法拿到数据;但是只要重新配置为从机,就能拿到主机的存储信息。

复制原理

Slave 启动成功连接 mater 后会发送一个 sync 同步命令

Master 接收到命令,启动后台的存盘进程里,同时收集所有接收到的要不高于修改数据集命令,在后台执行完毕之后,master 将传送整个数据文件到 slave,并完成一次完全同步

全量复制:而 slave 服务在接收到数据库文件后,将其存盘加载到内存中(slave 会加载所有 master 的数据)

增量复制 Master 继续将新的所有收集的修改命令一次传给 slave,完成同步(也就是新增加的数据,会同步给 slave)

但是只要重新连接 master,一次完全同步(全量复制)将自动执行,数据一定可以再从机中看到

宕机手动配置主机

层层链路

image-20210117092621547

这也是一种主从复制的方式

!当主节点宕机,我们可以手动设置主节点(“谋朝篡位”)哨兵模式出来之前

如果主机断开连接,我们可以使用slaveof no one让自己变为主机(将 80 作为主机,原是 79 的从),其他节点可以手动连接到最新的这个主节点。原来的节点恢复,也不再是之前的 master(已经没有从节点)

127.0.0.1:6380> slaveof no one 
ok

哨兵模式(自动选举 master)

概述

主从切换技术的方法是:当主服务器宕机,需要手动把一台从服务器切换为主服务器,这需要人工干预,费时费力,还会造成一段时间内服务器不可用,这不是一种推荐的方法,更多时候,我们优先考虑哨兵模式。Redis2.8 开始提供 Sentinel(哨兵)架构来解决这个问题。

“谋朝篡位自动版”,能够后台监控主机是否故障,如果故障根据投票自动将从库转换为主库

哨兵模式是一种特殊模式,首先 Redis 提供了哨兵命令,哨兵是独立进程,作为进程,它会独立运行。原理是哨兵通过发送命令,等待 Redis 服务器响应,从而监控运行的多个 Redis 实例

image-20210117113511102

测试

目前是一主二从

  1. 配置哨兵配置文件 sentinel.conf
									#被监控名称   host     port  数字 1 代表主机挂了,slave 投票看让谁接替成为主机,票数最多的,会成为主机 
sentinel monitor   myredis   127.0.0.1 6379 1
  1. 启动哨兵
3177:X 17 Jan 2021 11:17:06.536 * +slave slave 127.0.0.1:6379 127.0.0.1 6379 @ myredis 127.0.0.1 6380
3177:X 17 Jan 2021 11:17:06.536 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6380

如果 master(80断开) 节点断开,这个时候会从从机中选择一个服务器,这里有一个投票算法

image-20210117112125614

6381 选举成为了主机

哨兵模式

如果主机(80)此时回来了,只能归到新的主机下,当做从机,这就是哨兵模式的规则

image-20210117112524183

优点:

  1. 哨兵集群,基于主从复制模式,所有的主从配置优点全有
  2. 主从可以切换,故障可以转移,系统可用性更好
  3. 哨兵模式就是主从模式的升级,手动到自动,更加健壮

缺点:

  1. Redis 不好在线扩容,集群容量一旦到达上限,在线扩容十分麻烦
  2. 实现哨兵模式的配置其实很麻烦

哨兵模式全部配置

Redis缓存穿透和雪崩(面试高频,工作常用)

redis 缓存使用,极大提升了应用程序的性能和效率,特别是数据查询方面。但同时,它会带来一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据线一致性要求很高,那么就不能使用缓存。

image-20210117114038114

缓存穿透(查不到数据)

概念

用户想要查询一个数据,发现 Redis 内存数据库没有,也就是缓存没有命中,于是向持久层数据库中查询,发现也没有,于是本次查询失败。当用户很多时候,缓存都没有命中,于是都去请求数据库,这就会给持久层数据库造成很大的压力,这时就相当于缓存穿透。

解决方案

布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以 hash 形式存储,在控制层先校验(在 redis 前),不符合则丢弃,从而避免对底层存储系统的查询压力。比如:查询 id=-1 的数据,这样就可以丢弃。

缓存空对象

从缓存取不到的数据,在数据库中也没有取到,这时也可以将key-value对写为key-null,缓存有效时间可以设置短点,如30秒(设置太长会导致正常情况也没法使用)。这样可以防止攻击用户反复用同一个id暴力攻击

  • 存在两个问题:
  1. 如果控制能够被缓存起来,意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值键
  2. 即使对空值设置过期时间,还是会存在缓存层和存储层的数据会一段时间窗口不一致,这对于需要保持一致性业务有影响

缓存击穿(量太大,缓存过期)

微博服务器宕机

概述

这里需要注意缓存穿透和缓存击穿的区别,缓存击穿,是指一个 key 非常热点,在不停的扛着大并发对这个点进行访问,当 key 失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就想一个屏幕凿开一个洞。

当某个 key 在过期瞬间,大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并回写缓存,导致数据库瞬间压力过大。

解决方案

设置热点数据永不过期

从缓存层面来看,没有设置过期时间,所以不会出现热点 key 过期后产生的问题(浪费空间)

加互斥锁

分布式锁:使用分布式锁,保证对于每个 key 同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到分布式锁,因此对分布式锁的考验很大。

缓存雪崩(不同数据同时过期)

双十一双十二(热点数据放缓存,过期即雪崩)

概念

缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至宕机。和缓存击穿不同的是, 缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。

解决方案

  • redis 高可用

搭建集群,将热点数据均匀分布在不同搞得缓存数据库中

  • 限流降级(服务降级停掉一些服务)

在缓存失效之后,通过加锁或队列来控制读数据库写缓存的线程数量。比如:某个 key 只允许一个线程查询和写入缓存,其他线程等待

  • 设置过期时间

设置不同的过期时间,防止同一时间大量数据过期现象发生。设置热点数据永远不过期。

悲观者正确,乐观者成功
原文地址:https://www.cnblogs.com/freebule/p/14462552.html