【Python之路Day11】网络篇之Python操作Memcached、Redis

一. Memcached

1. 概念性

memcached是一套分布式的高速缓存系统,由LiveJournal的Brad Fitzpatrick开发,目前被许多网站使用,如mixi、hatena、Facebook、Vox、LiveJournal等。这是一套开放源代码软件,以BSD license授权发布。

memcached缺乏认证以及安全管制,这代表应该将memcached服务器放置在防火墙后。

memcached的API使用三十二比特的循环冗余校验(CRC-32)计算键值后,将数据分散在不同的机器上。当表格满了以后,接下来新增的数据会以LRU机制替换掉。由于memcached通常只是当作缓存系统使用,所以使用memcached的应用程序在写回较慢的系统时(像是后端的数据库)需要额外的代码更新memcached内的数据。

memcached常用来加速应用程序的处理,最吸引人的地方主要是它的分布式。     By- 维基百科

分布式对于互联网应用来讲,按照用途可划分为三种方式:

  • 分布式计算
  • 分布式存储
  • 两者兼而有之

memcached属于分布式存储。

memcached,旨在利用多个服务器内多余的RAM来充当一个可存放经常被访问信息的基于内存的缓存服务,有了它,程序无需从缓慢的资源,如磁盘或者后端数据库加载信息了。因为memcached是基于内存的缓存服务,从内存里读取数据比从磁盘上读取数据,即便是固态磁盘,也快的不是一星半点!

存储方法:

memcached的存储方法是一个简单的key/Value键值对的hashmap,类似于很多语言内的三列或关联数组。通过提供key和value来将信息存储到memcached内,通过按特定的键请求信息来恢复信息,如Python中的dict。

保存键值对发生变动的情况:

  • 为缓存分配的内存耗尽。这种情况下,memcached使用LRU(最近最少使用)算法从此缓存中删除条目。最近未曾使用的条目会从此缓存中先删除,最旧的最先访问。
  • 条目被确定删除, 如程序中删除。
  • 条目过期失效

2. 何时使用?

在使用memcached改进应用程序性能时,可以对一些关键的过程和步骤进行修改。

一般情况下:

  • 执行一个或多个查询来从数据库加载
  • 格式化适合于显示(或者进一步处理)的信息
  • 使用或显示格式化了的数据

使用Memcached,修改:

  • 尽量从缓存加载
  • 如果存在,使用信息的被缓存版本
  • 如果不存在(从后端数据库加载,并将信息缓存到缓存内)

假设,以博客站点为例,在博客系统更新数据库内的类别列表时,更新应遵循下列顺序:

  1. 更新博客数据库的类别列表
  2. 格式化信息
  3. 将信息存储到memcached
  4. 将信息返回给客户端

memcached内的存储操作是具有原子性的。获得的或许是老版本,或许是新版本的数据,但不可能会只获取到部分的数据!

3. 安装memcached

Ubuntu/debian平台

#1. 安装服务
# apt-get install memcached -y

#2. 修改配置文件
# sed -i 's@(^-l).*@1 172.17.0.1@g' /etc/memcached.conf 
# PS: 默认memcached监听在127.0.0.1, 如果本机使用的话,此步骤可以略去,建议将memcached监听在私有网段网卡上,并配合相应的安全访问控制,稍后续.


#重启服务:
# /etc/init.d/memcached restart

#查看启动状态:
# ps -ef|grep memcached
memcache  6783     1  0 10:55 ?        00:00:00 /usr/bin/memcached -m 64 -p 11211 -u memcache -l 172.17.0.1

#查看端口监听:
# ss -tnlp|grep 11211
LISTEN     0      128              172.17.0.1:11211                    *:*      users:(("memcached",6783,26))

编译安装:

#从官网下载压缩包:https://memcached.org/  

#解压缩:
tar xf memcached-1.4.15.tar.gz && cd memcached-1.4.15

#编译
./configure --prefix=/usr/local/memcached --with-libevent=/usr/local/libevent

make && make install
#!/bin/bash
#
# Init file for memcached
#
# chkconfig: - 86 14
# description: Distributed memory caching daemon
#
# processname: memcached
# config: /etc/sysconfig/memcached

. /etc/rc.d/init.d/functions

## Default variables
PORT="11211"
USER="nobody"
MAXCONN="1024"
CACHESIZE="64"
OPTIONS=""

RETVAL=0
prog="/usr/local/memcached/bin/memcached"
desc="Distributed memory caching"
lockfile="/var/lock/subsys/memcached"

start() {
        echo -n $"Starting $desc (memcached): "
        daemon $prog -d -p $PORT -u $USER -c $MAXCONN -m $CACHESIZE -o "$OPTIONS"
        RETVAL=$?
        echo
        [ $RETVAL -eq 0 ] && touch $lockfile
        return $RETVAL
}

stop() {
        echo -n $"Shutting down $desc (memcached): "
        killproc $prog
        RETVAL=$?
        echo
        [ $RETVAL -eq 0 ] && rm -f $lockfile
        return $RETVAL
}

restart() {
        stop
        start
}

reload() {
        echo -n $"Reloading $desc ($prog): "
        killproc $prog -HUP
        RETVAL=$?
        echo
        return $RETVAL
}

case "$1" in
  start)
        start
        ;;
  stop)
        stop
        ;;
  restart)
        restart
        ;;
  condrestart)
        [ -e $lockfile ] && restart
        RETVAL=$?
        ;;       
  reload)
        reload
        ;;
  status)
        status $prog
        RETVAL=$?
        ;;
   *)
        echo $"Usage: $0 {start|stop|restart|condrestart|status}"
        RETVAL=1
esac

exit $RETVAL
memcached sysvinit启动脚本

4. Python操作memcached

安装API

python操作Memcached使用Python-memcached模块,下载地址: https://pypi.python.org/pypi/python-memcached

#解压缩:
tar xf python-memcached-1.58.tar.gz && cd python-memcached-1.58

python3 setup.py install

操作Memcached

import memcache

mc = memcache.Client(['172.16.30.162:11211'],debug=True)   #debug=True 表示出现错误时,显示错误信息,线上环境应移除该参数.

mc.set('k1','v1')   #设置一个k1, 值为v1

result = mc.get('k1')  #获取k1值
print(result)

默认支持集群

Python-memcached模块原声支持集群操作,其原理是在内部维护一个主机列表,且集群中主机的权重值和主机在列表中重复出现次数成正比

主机:   权重

1.1.1.1   1
1.1.1.2   1
1.1.1.3   2



#在内存中主机列表为:
li = ['1.1.1.1', '1.1.1.2','1.1.1.3','1.1.1.3',]

如果用户如果要在内存中创建一个键值对( 如: k1 = 'v1') 那么要执行以下步骤:

  1. 根据算法将 k1 转换成一个数字
  2. 将数字和主机列表长度求余,得到一个值 N ( 0 <= N < 列表长度)
  3. 将主机列表中根据第二步得到的值为索引主机,如 li[N]
  4. 连接将第三步中获取的主机, 将k1 = 'v1' 放置在该服务器的memcached中

代码:

mc = memcache.Client([('172.16.30.162:11211',2),('172.16.30.163:11211',1),('172.16.30.164:11211',1)],debug=1)
mc.set('k1','v1')   #设置一个k1, 值为v1

result = mc.get('k1')  #获取k1值
print(result)

PS: memcache 虽然提供了原生支持集群功能,并没有提供后端主机健康状态检测的情况。SO, 在生产中使用的时候,需要自行检测后端主机的存活状态,如果后端memcached失效,应该主动从集群中剔除。

操作:

add

添加一条键值对,如果已经纯在key,重复执行add操作会抛异常

import memcache

mc = memcache.Client(['172.16.30.162:11211'],debug=True)   #debug=True 表示出现错误时,显示错误信息,线上环境应移除该参数.

mc.add('k','v')
result = mc.get('k')  #获取k1值
print(result)

#执行结果:
v

#如果再次执行,由于k已经存在,抛异常:
MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'
v

replace

修改某个key的值,如果key不存在,则抛出异常

import memcache

mc = memcache.Client(['172.16.30.162:11211'],debug=True)   #debug=True 表示出现错误时,显示错误信息,线上环境应移除该参数.

mc.replace('k','v1')
result = mc.get('k')  #获取k1值
print(result)

#执行结果
v1

#如果修改一个不存在的key,则抛异常:
mc.replace('k3','v1')

MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'

set 和 set_multi

set  设置一个键值对,如果key不存在,则创建,如果key已经存在,则更新key值

set_multi  设置多个键值对,如果key不存在,则创建,如果key已经存在,则更新key值

import memcache

mc = memcache.Client(['172.16.30.162:11211'],debug=True)   #debug=True 表示出现错误时,显示错误信息,线上环境应移除该参数.

mc.set('k','v1')
result = mc.get('k')  #获取k1值
print(result)

#执行结果:
v1

#设置多个key
mc.set_multi({'k1':'v1','k2':'v2','k3':'v3'})
result = mc.get_multi(['k','k1','k2','k3'])  #获取k1值
print(result)

#结果如下:
{'k1': 'v1', 'k2': 'v2', 'k': 'v1', 'k3': 'v3'}

delete和delete_multi

delete   删除指定的一个key/value

delete_multi  删除指定的多个键值对

import memcache

mc = memcache.Client(['172.16.30.162:11211'],debug=True)   #debug=True 表示出现错误时,显示错误信息,线上环境应移除该参数.

mc.delete('k')   #删除指定的k
result = mc.get_multi(['k1','k2','k3'])  #获取k1值
print(result)


#删除指定的多个键值对
mc.delete_multi(['k1','k2'])

如果删除的时候不存在键值对,则抛出异常:

MemCached: while expecting 'DELETED', got unexpected response 'NOT_FOUND'
MemCached: while expecting 'DELETED', got unexpected response 'NOT_FOUND'

get 和 get_multi

get     获取一个key/value

get_multi   获取多个键值对

import memcache

mc = memcache.Client(['172.16.30.162:11211'],debug=True)   #debug=True 表示出现错误时,显示错误信息,线上环境应移除该参数.

# mc.delete('k')
# mc.delete_multi(['k1','k2'])
res = mc.get('k1')    #获取一个key,如果不存在则返回None
print(res)

result = mc.get_multi(['k1','k2','k3'])  #获取多个key
print(result)


执行结果:
None
{'k3': 'v3'}

append 和 prepend

append   修改指定的key的值,在value后面追加内容

prepend  修改key的值,在value前面 插入内容

import memcache

mc = memcache.Client(['172.16.30.162:11211'],debug=True)   #debug=True 表示出现错误时,显示错误信息,线上环境应移除该参数.
mc.append('k1','back')      #在原来的v1值后面追加back,结果为 v1back
mc.prepend('k1','before')   #在原来的v1值前面追加before,结果为:  beforev1back
result = mc.get_multi(['k1','k2','k3'])  #获取k1值
print(result)

decr 和 incr

incr    自增,将某一个值自增 N(N默认为1)

decr   自减,将某一个值减少N(N默认为1) 

import memcache

mc = memcache.Client(['172.16.30.162:11211'],debug=True)   #debug=True 表示出现错误时,显示错误信息,线上环境应移除该参数.

mc.set('k4',100)    #先设置原始值为100
mc.incr('k4',2)     #自增2, 结果为102

res = mc.get('k4')
print(res)     #打印结果为102

mc.decr('k4')     #自减1,默认为1,可以不写

res = mc.get('k4')
print(res)     #打印结果为101

gets 和 cas

gets  获取带有CAS令牌存储的value(数据值) , 如果key 不存在,则返回空

语法:
gets key
多个key使用空格分开:
gets key1 key2 key3 ..

cas      (compare-And-Swap) 用于执行一个"检查并设置" 的操作。它仅在当前客户端最后一次取值后,该key对应的值没有被其他客户端修改的情况下,才能够将值写入。检查是通过cas_token参数进行的,这个参数是memcached指定给已经存在的元素的一个唯一的64位的值。

语法: 
cas key flags exptime bytes unique_cas_token [noreply]
value
  • key:键值 key-value 结构中的 key,用于查找缓存值。
  • flags:可以包括键值对的整型参数,客户机使用它存储关于键值对的额外信息 。
  • exptime:在缓存中保存键值对的时间长度(以秒为单位,0 表示永远)
  • bytes:在缓存中存储的字节数
  • unique_cas_token通过 gets 命令获取的一个唯一的64位值。
  • noreply(可选): 该参数告知服务器不需要返回数据
  • value:存储的值(始终位于第二行)(可直接理解为key-value结构中的value)

举个栗子:

如商城商品的剩余数量,假设改制保存在memcached中, product_count = 900

  • A用户刷新页面读取到 product_count = 900
  • B用户刷新页面读取到 product_count = 900

如果A和B用户同时购买了商品:

A用户修改剩余个数  product_count=899

B用户修改商品剩余个数  product_count=899

如此一来缓存内的数据就不对了,两个用户购买后,应该剩余898

如果使用Python的set和get来操作上面过程,那么程序就会出现上面描述的问题。

想要避免此类情况,就需要用到gets和cas了,如下:

import memcache

mc = memcache.Client(['172.16.30.162:11211'],debug=True)   #debug=True 表示出现错误时,显示错误信息,线上环境应移除该参数.

v = mc.gets('product_count')

#如果有人gets之后和cas之前修改了 product_count, 那么,下面的设置将会执行失败, 抛出异常, 从而避免非正常的数据的产生
mc.cas('product_count',"899")

本质上每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值,会携带之前获取的自增值和memcached中的自增值进行比较,如果相等,则可以提交,如果不等,那表示在gets和cas之间,又有其他人执行了gets(获取了缓冲的指定值), 如此一来,有可能出现非正常数据,则不允许修改。

5. memcached统计命令:

stats

用于返回统计信息,如PID、版本号、连接数等:

root@test2-ubunut:~/.aliyuncli# telnet localhost 11211
Trying ::1...
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
stats
STAT pid 9720
STAT uptime 346412
STAT time 1469082312
STAT version 1.4.14 (Ubuntu)
STAT libevent 2.0.21-stable
STAT pointer_size 64
STAT rusage_user 9.024617
STAT rusage_system 5.018162
STAT curr_connections 7
STAT total_connections 41
STAT connection_structures 8
STAT reserved_fds 20
STAT cmd_get 51
STAT cmd_set 34
STAT cmd_flush 0
STAT cmd_touch 0
STAT get_hits 39
STAT get_misses 12
STAT delete_misses 2
STAT delete_hits 3
STAT incr_misses 0
STAT incr_hits 6
STAT decr_misses 0
STAT decr_hits 5
STAT cas_misses 0
STAT cas_hits 0
STAT cas_badval 0
STAT touch_hits 0
STAT touch_misses 0
STAT auth_cmds 0
STAT auth_errors 0
STAT bytes_read 1431
STAT bytes_written 2535
STAT limit_maxbytes 67108864
STAT accepting_conns 1
STAT listen_disabled_num 0
STAT threads 4
STAT conn_yields 0
STAT hash_power_level 16
STAT hash_bytes 524288
STAT hash_is_expanding 0
STAT expired_unfetched 0
STAT evicted_unfetched 0
STAT bytes 304
STAT curr_items 4
STAT total_items 38
STAT evictions 0
STAT reclaimed 0
END
View Code

stats items

用于显示各个slab中item的数目和存储时长(最后一次访问距离到现在的秒数).

语法:

stats items
stats items
STAT items:1:number 4
STAT items:1:age 4105
STAT items:1:evicted 0
STAT items:1:evicted_nonzero 0
STAT items:1:evicted_time 0
STAT items:1:outofmemory 0
STAT items:1:tailrepairs 0
STAT items:1:reclaimed 0
STAT items:1:expired_unfetched 0
STAT items:1:evicted_unfetched 0
END
View Code

stats slabs

显示各个slab的信息,包括chunk的大小、数目、使用情况等。

语法:

stats slab
stats slabs
STAT 1:chunk_size 96
STAT 1:chunks_per_page 10922
STAT 1:total_pages 1
STAT 1:total_chunks 10922
STAT 1:used_chunks 5
STAT 1:free_chunks 10917
STAT 1:free_chunks_end 0
STAT 1:mem_requested 384
STAT 1:get_hits 39
STAT 1:cmd_set 34
STAT 1:delete_hits 3
STAT 1:incr_hits 6
STAT 1:decr_hits 5
STAT 1:cas_hits 0
STAT 1:cas_badval 0
STAT 1:touch_hits 0
STAT active_slabs 1
STAT total_malloced 1048512
END
View Code

stats sizes 

显示所有item和的大小和个数

返回两列,第一列是item大小,第二列是item的个数

stats sizes
STAT 96 4
END

flush_all  

该命令用于清空缓存中的所有 key/value, 提供了一个可选参数 time, 用于在指定的时间后执行清理缓存操作。

语法:

flush_all [time] [noreply]
flush_all
OK

二. Redis

1. 概念

Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。 

Redis和memcached一样,为了保证效率,数据都是缓存在内存中。不过区别是,redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了Master-slave(主从)。

官方站点: http://redis.io/ 

更多介绍>>

2. 安装redis

Ubuntu/Debian:

#安装服务
apt-get install redis-server

#修改配置文件:
sed -i 's@(^bind 127.0.0.1)@#1@g' /etc/redis/redis.conf 
#默认redis监听在127.0.0.1上,如果要对局域网其他主机提供服务,需要修改监听地址,此处修改监听为全0地址,生产环境不建议这么做,应该监听到内网卡地址上。

#重启服务:
/etc/init.d/redis-server restart

编译安装:

#下载解压并make,此处以3.0.5为版本.
  # wget http://download.redis.io/releases/redis-3.0.5.tar.gz
  # tar xzf redis-3.0.5.tar.gz
  # cd redis-3.0.5
  # make
#添加环境变量:
  # vim /etc/profile.d/redis.sh
    export PATH=/usr/local/redis-3.0.5/src:$PATH
  # . /etc/profile.d/redis.sh
#编辑配置文件:
  # cp redis.conf /etc/redis/
#启动redis:
[root@localhost redis-3.0.5]# redis-server /etc/redis/redis.conf &

连接客户端:

可以使用redis的客户端工具,也可以telnet上去(默认无身份安全认证)

#默认客户端工具连接
root@test2-ubunut:~# redis-cli 
127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> get k1
"v1"


#telnet连接
Daniel-Mac:~ daniel$ telnet 172.16.30.162 6379   #默认监听6379端口
Trying 172.16.30.162...
Connected to test2.
Escape character is '^]'.
set k2 v2
+OK
get k2
$2
v2
quit
+OK

3. Python操作Redis

安装API

root@test2-ubunut:~# pip3 install redis
Downloading/unpacking redis
  Downloading redis-2.10.5-py2.py3-none-any.whl (60kB): 60kB downloaded
Installing collected packages: redis
Successfully installed redis
Cleaning up...


#源码安装,过程略,和memcached的API安装方式一样:
https://github.com/WoLpH/redis-py

API使用

redis-py的API的使用可以分为:

  • 连接方式
  • 连接池
  • 操作:
    • String操作
    • Hash操作
    • List操作
    • Set操作
    • Sort Set操作
  • 管道
  • 发布订阅

(1)操作模式:

redis-py提供了两个类: Redis 和StrictRedis用于实现Redis的命令与Redis实现交互。

  • StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令
  • Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py
import redis

rd = redis.Redis(host='172.16.30.162',port=6379) #连接redis-server
rd.set('k1','v1')  #设置一个k1,值为v1

res = rd.get('k1') #获取k1
print(res)

#执行结果:
b'v1'

(2) 连接池

redis-py使用connection pool来管理一个redis server的所有连接,避免每次都建立、释放连接的开销。

默认情况下,每个Redis实例都会维护一个自己的连接池,可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)

rd.set('k1','v1')

res = rd.get('k1')
print(res)

(3) 操作

String操作

Redis中的String在内存中按照一个 key对应一个 value来存储。

set(key, value, ex=None, px=None, nx=False, xx=False)

设置值,默认不存在则创建,存在的话,则更新改值

参数:
     ex,过期时间(秒)
     px,过期时间(毫秒)
     nx,如果设置为True,则只有key不存在时,当前set操作才执行
     xx,如果设置为True,则只有key存在时,当前set操作才执行
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)

rd.set('k1','v1',ex=10)   #设置超时时间为10秒,十秒之内可以get到值,10秒之后get到的值就是 None

res = rd.get('k1')
print(res)


#
rd.set('k1','v',ex=10)   #设置超时时间为10秒,十秒之内可以get到值,10秒之后get到的值就是 None
rd.set('k1','v1',nx=True)  #设置nx为true,如果不存在k1的话,才会set,现在是已经存在了,因此打印出的结果的值,还是上一步设置的:

b'v'

setnx(key, value)

设置值,只有key不存在时,执行设置操作(添加)
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)

rd.setnx('k1','v1')
rd.setnx('k1','v2')   #只有在k1不存在时才会设置成功,所以这里的值还是v1
res = rd.get('k1')
print(res)

b'v1'

setex(key, value, time)

设置值

# 设置值
# 参数:
    # time,过期时间(数字秒 或 timedelta对象)
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
rd.setex('k1','v1',10) #设置超时时间为10秒,十秒之内可以get到值,10秒之后get到的值就是 None

res = rd.get('k1')
print(res)

psetex(key, time_ms, value)

设置值

# 参数:
    # time_ms,过期时间(数字毫秒 或 timedelta对象)
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
rd.psetex('k1',100,'v1') #设置超时时间为100毫秒, 100毫秒之内可以get到值,100毫秒之后get到的值就是 None

res = rd.get('k1')
print(res)

mset(*args, **kwargs)

批量设置值

import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
rd.mset({'k1':'v1','k2':'v2','k3':'v3'})    #批量设置值

res = rd.get('k1')
print(res)

get(key)

获取值:

res = rd.get('k1')
print(res)

mget(keys,*args)

批量获取值

import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
rd.mset({'k1':'v1','k2':'v2','k3':'v3'})    #批量设置值

res = rd.mget(['k1','k2','k3'])
print(res)

#结果:
[b'v1', b'v2', b'v3']

getset(key, value)

设置新值并获取原来的值

import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
res = rd.getset('k1','Nv1')   #原来的值是v1, 设置一个新的值 Nv1
print(res)

#执行结果:
b'v1'
View Code

getrange(key, start, end)

获取子序列(根据字节获取,非字符)

# 参数:
    # key,Redis 的 name
    # start,起始位置(字节)
    # end,结束位置(字节)
# 如: "欧阳锋" ,0-3表示 "欧"
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
rd.set('k1','欧阳锋')
res = rd.getrange('k1',0,3)
print(res)
View Code

setrange(key, offset, value)

修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)

# 参数:
    # offset,字符串的索引,字节(一个汉字三个字节)
    # value,要设置的值
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
rd.set('k1','hello')

rd.setrange('k1',3,'F')   #原有的值为hello,将第三个索引位改为F

result = rd.get('k1')
print(result)

#执行结果:

b'helFo'
View Code

setbit(key, offset, value)

key对应值的二进制表示的位进行操作

# 参数:
    # key,redis的name
    # offset,位的索引(将值变换成二进制后再进行索引)
    # value,值只能是 1 或 0
rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
rd.set('k1','hello')  #k1的值默认为hello
rd.setbit('k1',3,1)   #将索引3的位置改为1

result = rd.get('k1')
print(result)

#执行结果:
b'xello'


#扩展
source = '欧阳锋'

for i in source:
    num = ord(i)
    print(bin(num).replace('b',''))
#特别的,如果source是汉字,欧阳锋怎么办?
答:对于utf-8,每一个汉字占3个字节,那么’欧阳锋‘ 则有9个字节
对汉字,for循环的时候会按照字节来迭代,每一个字节传唤十进制数,然后再讲二进制数转换为二进制
0110101100100111     01001011000110011        01001010100001011
    欧                        阳                        锋

getbit(key, offset)

获取key对应的值的二进制表示中的某位的值 (0或1)

import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)

rd.set('k1','hello')

result = rd.getbit('k1',3)
print(result)


#执行结果:
0
View Code

bitcount(key, start=None, end=None)

获取key对应的值的二进制表示中 1 的个数

# 参数:
    # key,Redis的key
    # start,位起始位置
    # end,位结束位置
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)

result = rd.bitcount('k1',0,5)
# print(rd.get('k1'))
print(result)

#执行结果:
21
View Code

bitop(operation, dest, *keys)

 获取多个值,并将值做位运算,将最后的结果保存至新的name对应的值

# 参数:
    # operation,AND(并) 、 OR(或) 、 NOT(非) 、 XOR(异或)
    # dest, 新的Redis的name
    # *keys,要查找的Redis的name
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)

rd.mset({'k1':[11,22,33],'k2':[22,33,44],'k3':[44,55,66]})

result = rd.bitop('AND','and_value','k1','k2')
#获取k1,k2,k3的值,而后做位运算求并集,然后将结果保存在and_value中
res = rd.get('and_value')
print(res)

#执行结果:
b'[00, 22, 00]'


#或:
result = rd.bitop('OR','and_value','k1','k2')
#结果:
b'[33, 33, 77]'
View Code

strlen(key)

返回name对应值的字节长度(一个汉字3个字节)

import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)


res = rd.strlen('k1')
print(res)

#结果:
12
View Code

incr(self, key, amount=1)

自增 key对应的值,当key不存在时,则创建key=amount,否则,则自增。

# 参数:
    # name,Redis的name
    # amount,自增数(必须是整数)
 
# 注:同incrby
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)

rd.incr('k6',amount=3)
res = rd.get('k6')
print(res)


#执行结果:
b'3'

#因为k6事先不存在,如果存在的话,再次基础上自增
View Code

incrbyfloat(self, name, amount=1.0)

自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。

# 参数:
    # name,Redis的name
    # amount,自增数(浮点型)
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)

rd.incrbyfloat('k6',amount=2.3)
res = rd.get('k6')
print(res)

#执行结果:
b'5.3'    #原值是3
View Code

decr(self, name, amount=1)

自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。

# 参数:
    # name,Redis的name
    # amount,自减数(整数)
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
rd.decr('k6')
res = rd.get('k6')
print(res)

#执行结果:
b'2'    #原值为3

append(key, value)

在redis name对应的值后面追加内容

# 参数:
    #key, redis的name
    #value, 要追加的字符串
import redis

rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
rd = redis.Redis(connection_pool=rd_pool)
rd.append('k6','hehe')
res = rd.get('k6')
print(res)

#结果:
b'2hehe'
View Code

Hash操作

redis中Hash在内存中的存储格式如下图:

hset(name, key, value)

# name对应的hash中设置一个键值对(不存在,则创建;否则,修改)
 
# 参数:
    # name,redis的name
    # key,name对应的hash中的key
    # value,name对应的hash中的value
 
# 注:
    # hsetnx(name, key, value),当name对应的hash中不存在当前key时则创建(相当于添加)

hmset(name, mapping)

在name对应的hash中批量设置键值对

# 参数:
    # name,redis的name
    # mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
 
# 如:
    # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

hget(name,key)

在name对应的hash中获取根据key获取value

hmget(name, keys, *args)

在name对应的hash中获取多个key的值

# 参数:
    # name,reids对应的name
    # keys,要获取key集合,如:['k1', 'k2', 'k3']
    # *args,要获取的key,如:k1,k2,k3
 
# 如:
    # r.mget('xx', ['k1', 'k2'])
    #
    # print r.hmget('xx', 'k1', 'k2')

hgetall(name)

获取name对应hash的所有键值

hlen(name)

获取name对应的hash中键值对的个数

hkeys(name)

获取name对应的hash中所有的key的值

hvals(name)

获取name对应的hash中所有的value的值

hexists(name, key)

检查name对应的hash是否存在当前传入的key

hdel(name,*keys)

将name对应的hash中指定key的键值对删除

hincrby(name, key, amount=1)

自增name对应的hash中的指定key的值,不存在则创建key=amount

# 参数:
    # name,redis中的name
    # key, hash对应的key
    # amount,自增数(整数)

hincrbyfloat(name, key, amount=1.0)

自增name对应的hash中的指定key的值,不存在则创建key=amount

# 参数:
    # name,redis中的name
    # key, hash对应的key
    # amount,自增数(浮点数)
 
# 自增name对应的hash中的指定key的值,不存在则创建key=amount

hscan(name, cursor=0, match=None, count=None)

增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆

# 参数:
    # name,redis的name
    # cursor,游标(基于游标分批取获取数据)
    # match,匹配指定key,默认None 表示所有的key
    # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
 
# 如:
    # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
    # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
    # ...
    # 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕

hscan_iter(name, match=None, count=None)

利用yield封装hscan创建生成器,实现分批去redis中获取数据

# 参数:
    # match,匹配指定key,默认None 表示所有的key
    # count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
 
# 如:
    # for item in r.hscan_iter('xx'):
    #     print item

List操作

redis中的List在在内存中按照一个name对应一个List来存储。如图

lpush(name,values)

在name对应的list中添加元素,每个新的元素都添加到列表的最左边

# 如:
    # r.lpush('oo', 11,22,33)
    # 保存顺序为: 33,22,11
 
# 扩展:
    # rpush(name, values) 表示从右向左操作

lpushx(name,value)

在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边

# 更多:
    # rpushx(name, value) 表示从右向左操作

llen(name)

name对应的list元素的个数

linsert(name, where, refvalue, value))

在name对应的列表的某一个值前或后插入一个新值

# 参数:
    # name,redis的name
    # where,BEFORE或AFTER
    # refvalue,标杆值,即:在它前后插入数据
    # value,要插入的数据

r.lset(name, index, value)

对name对应的list中的某一个索引位置重新赋值

# 参数:
    # name,redis的name
    # index,list的索引位置
    # value,要设置的值

r.lrem(name, value, num)

在name对应的list中删除指定的值

# 参数:
    # name,redis的name
    # value,要删除的值
    # num,  num=0,删除列表中所有的指定值;
           # num=2,从前到后,删除2个;
           # num=-2,从后向前,删除2个

lpop(name)

在name对应的列表的左侧获取第一个元素并在列表中移除,返回值则是第一个元素

# 更多:
    # rpop(name) 表示从右向左操作

lindex(name, index)

在name对应的列表中根据索引获取列表元素

lrange(name, start, end)

# 在name对应的列表分片获取数据

# 参数:
    # name,redis的name
    # start,索引的起始位置
    # end,索引结束位置

ltrim(name, start, end)

在name对应的列表中移除没有在start-end索引之间的值

# 参数:
    # name,redis的name
    # start,索引的起始位置
    # end,索引结束位置

rpoplpush(src, dst)

从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边

# 参数:
    # src,要取数据的列表的name
    # dst,要添加数据的列表的name

blpop(keys, timeout)

将多个列表排列,按照从左到右去pop对应列表的元素

# 参数:
    # keys,redis的name的集合
    # timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞
 
# 更多:
    # r.brpop(keys, timeout),从右向左获取数据

brpoplpush(src, dst, timeout=0)

从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧

# 参数:
    # src,取出并要移除元素的列表对应的name
    # dst,要插入元素的列表对应的name
    # timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞

自定义增量迭代

# 由于redis类库中没有提供对列表元素的增量迭代,如果想要循环name对应的列表的所有元素,那么就需要:
    # 1、获取name对应的所有列表
    # 2、循环列表
# 但是,如果列表非常大,那么就有可能在第一步时就将程序的内容撑爆,所有有必要自定义一个增量迭代的功能:

import redis
rd_pool = redis.ConnectionPool(host='172.16.30.162',port=6379)
r = redis.Redis(connection_pool=rd_pool)

def list_iter(name):
    """
    自定义redis列表增量迭代
    :param name: redis中的name,即:迭代name对应的列表
    :return: yield 返回 列表元素
    """
    list_count = r.llen(name)
    for index in range(list_count):
        yield r.lindex(name, index)

# 使用
for item in list_iter('pp'):
    print(item)

Set操作

Set集合就是不允许重复的列表

sadd(name,values)

# name对应的集合中添加元素

scard(name)

#获取name对应的集合中元素个数

sdiff(keys, *args)

#在第一个name对应的集合中且不在其他name对应的集合的元素集合

sdiffstore(dest, keys, *args)

# 获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中

sinter(keys, *args)

# 获取多一个name对应集合的并集

sinterstore(dest, keys, *args)

# 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中

sismember(name, value)

# 检查value是否是name对应的集合的成员

smembers(name)

# 获取name对应的集合的所有成员

smove(src, dst, value)

# 将某个成员从一个集合中移动到另外一个集合

spop(name)

# 从集合的右侧(尾部)移除一个成员,并将其返回

srandmember(name, numbers)

# 从name对应的集合中随机获取 numbers 个元素

srem(name, values)

# 在name对应的集合中删除某些值

sunion(keys, *args)

# 获取多一个name对应的集合的并集

sunionstore(dest,keys, *args)

# 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

#同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大

有序集合

在集合的基础上,为每元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。

zadd(name, *args, **kwargs)

# 在name对应的有序集合中添加元素
# 如:
     # zadd('zz', 'n1', 1, 'n2', 2)
     #
     # zadd('zz', n1=11, n2=22)

zcard(name)

# 获取name对应的有序集合元素的数量

zcount(name, min, max)

# 获取name对应的有序集合中分数 在 [min,max] 之间的个数

zincrby(name, value, amount)

# 自增name对应的有序集合的 name 对应的分数

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

# 按照索引范围获取name对应的有序集合的元素
 
# 参数:
    # name,redis的name
    # start,有序集合索引起始位置(非分数)
    # end,有序集合索引结束位置(非分数)
    # desc,排序规则,默认按照分数从小到大排序
    # withscores,是否获取元素的分数,默认只获取元素的值
    # score_cast_func,对分数进行数据转换的函数
 
# 更多:
    # 从大到小排序
    # zrevrange(name, start, end, withscores=False, score_cast_func=float)
 
    # 按照分数范围获取name对应的有序集合的元素
    # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
    # 从大到小排序
    # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

zrank(name, value)

# 获取某个值在 name对应的有序集合中的排行(从 0 开始)
 
# 更多:
    # zrevrank(name, value),从大到小排序

zrangebylex(name, min, max, start=None, num=None)

# 当有序集合的所有成员都具有相同的分值时,有序集合的元素会根据成员的 值 (lexicographical ordering)来进行排序,而这个命令则可以返回给定的有序集合键 key 中, 元素的值介于 min 和 max 之间的成员
# 对集合中的每个成员进行逐个字节的对比(byte-by-byte compare), 并按照从低到高的顺序, 返回排序后的集合成员。 如果两个字符串有一部分内容是相同的话, 那么命令会认为较长的字符串比较短的字符串要大
 
# 参数:
    # name,redis的name
    # min,左区间(值)。 + 表示正无限; - 表示负无限; ( 表示开区间; [ 则表示闭区间
    # min,右区间(值)
    # start,对结果进行分片处理,索引位置
    # num,对结果进行分片处理,索引后面的num个元素
 
# 如:
    # ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga
    # r.zrangebylex('myzset', "-", "[ca") 结果为:['aa', 'ba', 'ca']
 
# 更多:
    # 从大到小排序
    # zrevrangebylex(name, max, min, start=None, num=None)

zrem(name, values)

# 删除name对应的有序集合中值是values的成员
 
# 如:zrem('zz', ['s1', 's2'])

zremrangebyrank(name, min, max)

# 根据排行范围删除

zremrangebyscore(name, min, max)

# 根据分数范围删除

zremrangebylex(name, min, max)

# 根据值返回删除

zscore(name, value)

# 获取name对应有序集合中 value 对应的分数

zinterstore(dest, keys, aggregate=None)

# 获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为:  SUM  MIN  MAX

zunionstore(dest, keys, aggregate=None)

# 获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为:  SUM  MIN  MAX

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

# 同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作

其他常用操作

delete(*names)

# 根据删除redis中的任意数据类型

exists(name)

# 检测redis的name是否存在

keys(pattern='*')

# 根据模型获取redis的name
 
# 更多:
    # KEYS * 匹配数据库中所有 key 。
    # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
    # KEYS h*llo 匹配 hllo 和 heeeeello 等。
    # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

expire(name ,time)

# 为某个redis的某个name设置超时时间

rename(src, dst)

# 对redis的name重命名为

move(name, db))

# 将redis的某个值移动到指定的db下

randomkey()

# 随机获取一个redis的name(不删除)

type(name)

# 获取name对应值的类型

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

# 同字符串操作,用于增量迭代获取key

(4) 管道

redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)
 
import redis
 
pool = redis.ConnectionPool(host='172.16.30.162', port=6379)
 
r = redis.Redis(connection_pool=pool)
 
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)
 
r.set('name', 'alex')
r.set('role', 'sb')
 
pipe.execute()

(5) 发布/订阅

 

发布者:服务器

订阅者:Dashboad和数据处理

Demo如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author: DBQ(Du Baoqiang)

import redis


class RedisHelper:

    def __init__(self):
        self.__conn = redis.Redis(host='172.16.30.162')
        self.chan_sub = 'fm104.5'
        self.chan_pub = 'fm104.5'

    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()
        pub.subscribe(self.chan_sub)
        pub.parse_response()
        return pub
View Code

订阅者:

from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
redis_sub = obj.subscribe()
 
while True:
    msg= redis_sub.parse_response()
    print msg

发布者:

from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
obj.public('hello')

更多请参见:

https://github.com/andymccurdy/redis-py/

http://doc.redisfans.com/

原文地址:https://www.cnblogs.com/dubq/p/5688948.html