第五章

目录

  1. 消息队列
  2. redis

1.消息队列

RabbitMQ安装

       http://www.cnblogs.com/shanyou/p/4067250.html

 1 #!/usr/bin/env python
 2 import pika
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel = connection.channel()
 5 channel.queue_declare(queue='hello')
 6 channel.basic_publish(exchange='',
 7                       routing_key='hello',
 8                       body='Hello World'
 9                       )
10 print("[x] sent 'Hello World!'")
11 connection.close()
生产者
 1 #!/usr/bin/env python
 2 __author__ = 'han'
 3 import pika
 4 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 5 channel = connection.channel()
 6 channel.queue_declare(queue='hello')
 7 def callback(ch,method,properties,body):
 8     print("[x]Received %r"% body)
 9 channel.basic_consume(callback,
10                       queue='hello',
11                       )   #no_ack=True 不和服务端确认 
12 print('[*]Waiting for messages.to exit press CTRL+C')
13 channel.start_consuming()
消费者
  • 持久化

 1 import pika
 2 #建立连接
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))  #建立连接
 4 #建立管道
 5 channel = connection.channel()   
 6 #建立消息队列,durable=True 开启持久化
 7 channel.queue_declare(queue='hello1',durable=True)   
 8 channel.basic_publish(exchange='',
 9                       routing_key='hello1',   #指定消息队列
10                       body='Hello World',     #发送消息
11                       properties=pika.BasicProperties(   #消息持久化
12                           delivery_mode=2, 
13                       )
14                       )
15 print("[x] sent 'Hello World!'")
16 connection.close()
生产者
 1 import pika
 2 #建立连接
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
 4 #建立管道
 5 channel = connection.channel()
 6 #建立消息队列,durable=True开启持久化
 7 channel.queue_declare(queue='hello1',durable=True)
 8 def callback(ch,method,properties,body):
 9     print("[x]Received %r"% body)
10     ch.basic_ack(delivery_tag=method.delivery_tag)  #收到消息回复生产者
11 channel.basic_qos(prefetch_count=1)   #负载均衡(处理完一个消息在发送下一个)
12 channel.basic_consume(callback,
13                       queue='hello1',
14                       )   #no_ack=True  不回复生产者
15 print('[*]Waiting for messages.to exit press CTRL+C')
16 channel.start_consuming()
消费者

查看消息持久化

cmd 
D:Program FilesRabbitMQ Server
abbitmq_server-3.6.11sbin>rabbitmqctl.bat list_queues 
Listing queues
  • 广播

  1. fanout: 所有bind到此exchange的queue都可以接收消息
  2. direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
  3. topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

fanout

 1 #!/usr/bin/env python
 2 # _*_ encoding:utf-8 _*_
 3 import pika
 4 import sys
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
 6 channel = connection.channel()
 7 channel.exchange_declare(exchange='logs',
 8                          exchange_type="fanout")  #指定广播模式,不同版本可能是type="fanout"
 9 message = ' '.join(sys.argv[1:]) or "info:Hello World!"  #发送的内容 argv1 或者 "info:Hello World!"
10 channel.basic_publish(exchange='logs',
11                       routing_key='', 
12                       body=message)
13 print("[x] Sent %r" % message)
14 connection.close() 
publisher
 1 #!/usr/bin/env python
 2 # _*_ encoding:utf-8 _*_
 3 import pika
 4 __author__ ='han'
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
 6 channel = connection.channel()
 7 channel.exchange_declare(exchange='logs',
 8                          exchange_type='fanout')   #指定广播模式,不同版本可能是type="fanout"
 9 result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
10 queue_name = result.method.queue  #消息队列的名称
11 channel.queue_bind(exchange='logs',
12                    queue=queue_name)    #制定消息队列的名称
13 print(' [*] Waiting for logs. To exit press CTRL+C')
14 def callback(ch, method, properties,body):
15     print(" [x] %r" % body)
16 channel.basic_consume(callback,
17                       queue=queue_name,
18                       no_ack=True)
19 channel.start_consuming() 
subscriber

direct

 1 #接收指定的内容
 2 
 3 #!/usr/bin/ebv python
 4 # _*_ coding:utf-8 _*_
 5 import pika
 6 import sys
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))
 8 channel = connection.channel()
 9 channel.exchange_declare(exchange='direct_logs',   #设置direct模式
10                          exchange_type='direct')
11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' #设置默认info
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='direct_logs',   #调用direct模式
14                       routing_key=severity,      #调用severity
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity,message))
17 connection.close()
publisher
 1 #!/usr/bin/env python
 2 # _*_ encoding:utf-8 _*_
 3 import pika
 4 import sys
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6       host='127.0.0.1'))
 7 channel = connection.channel()
 8 channel.exchange_declare(exchange='direct_logs',  #设置direct模式
 9                          exchange_type='direct')
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 severities = sys.argv[1:]
13 if not severities:
14     sys.stderr.write("usege: %s [info] [warning][reeor]
"%sys.argv[0])
15     sys.exit(1)
16 for severity in severities:
17     channel.queue_bind(exchange='direct_logs', #调用direct模式
18                        queue=queue_name,
19                        routing_key=severity)  #调用severities
20 print(' [*] Waiting for logs. To exit press CTRL+C')
21 def callback(ch,method,properties,body):
22     print(" [x] %r:%r" % (method.routing_key, body))
23 channel.basic_consume(callback,
24                       queue=queue_name,
25                       no_ack=True)
26 channel.start_consuming()
subscriber

topic

 1 #过滤模式
 2 
 3  import pika
 4   import sys
 5   connection = pika.BlockingConnection(pika.ConnectionParameters(
 6       host='localhost'))
 7   channel = connection.channel()
 8   channel.exchange_declare(exchange='topic_logs',
 9                            exchange_type='topic')
10   routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
11   message = ' '.join(sys.argv[2:]) or 'Hello World!'
12   channel.basic_publish(exchange='topic_logs',
13                         routing_key=routing_key,
14                         body=message)
15   print(" [x] Sent %r:%r" % (routing_key, message))
16   connection.close()
publisher
 1   import pika
 2   import sys
 3   connection = pika.BlockingConnection(pika.ConnectionParameters(
 4       host='localhost'))
 5   channel = connection.channel()
 6   channel.exchange_declare(exchange='topic_logs',
 7                            exchange_type='topic')
 8   result = channel.queue_declare(exclusive=True)
 9   queue_name = result.method.queue
10   binding_keys = sys.argv[1:]
11   if not binding_keys:
12       sys.stderr.write("Usage: %s [binding_key]...
" % sys.argv[0])
13       sys.exit(1)
14   for binding_key in binding_keys:
15       channel.queue_bind(exchange='topic_logs',
16                          queue=queue_name,
17                          routing_key=binding_key)
18   print(' [*] Waiting for logs. To exit press CTRL+C')
19   def callback(ch, method, properties, body):
20       print(" [x] %r:%r" % (method.routing_key, body))
21   channel.basic_consume(callback,
22                         queue=queue_name,
23                         no_ack=True)
24   channel.start_consuming()
subscriber
  1. To receive all the logs run:
  python receive_logs_topic.py "#"

  2. To receive all logs from the facility "kern":
  python receive_logs_topic.py "kern.*"

  3. Or if you want to hear only about "critical" logs:
  python receive_logs_topic.py "*.critical"

  4. You can create multiple bindings:
  python receive_logs_topic.py "kern.*" "*.critical"

  5. And to emit a log with a routing key "kern.critical" type:
  python emit_log_topic.py "kern.critical" "A critical kernel error" 

rpc

 1 #!/usr/bin/env python
 2 # _*_ encoding:utf-8 _*_
 3 import pika
 4 import sys
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6                                      host='127.0.0.1'))
 7 channel = connection.channel()
 8 channel.queue_declare(queue='rpc_queue') #设置rpc模式
 9 #斐波那契
10 def fib(n):
11     if n == 0:
12         return 0
13     elif n == 1:
14         return 1
15     else:
16         return fib(n-1) + fib(n-2)
17 #2.
18 def on_request(ch,method,props,body):
19     n = int(body) #斐波那契
20     print("[.]fib(%s)"%n)
21     response = fib(n)
22     #3.发送
23     ch.basic_publish(exchange='',
24                      routing_key=props.reply_to,  #接收到的随机queue
25                      properties=pika.BasicProperties(correlation_id=    #接收到的uuid
26                                                      props.correlation_id),
27                      body = str(response)) #发送消息
28     ch.basic_ack(delivery_tag = method.delivery_tag)  #等待确认
29 #1.开始接收
30 channel.basic_consume(on_request,queue='rpc_queue')
31 print("[x] Awaiting RPC requests")
32 channel.start_consuming()
server
 1 #!/usr/bin/env python
 2 # _*_ encoding:utf-8 _*_
 3 import pika
 4 import sys
 5 import uuid
 6 class FibonacCiRpcClient(object):
 7     def __init__(self):
 8         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 9                                                   host = '127.0.0.1'))
10         self.channel= self.connection.channel()
11         result = self.channel.queue_declare(exclusive=True)   #设置随机queue
12         self.callback_queue = result.method.queue
13         self.channel.basic_consume(self.on_response,no_ack=True,   #设置接收
14                                    queue=self.callback_queue)
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:   #确认server和clinet的uuid是否相等
17             self.response = body
18     #1.发送
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',
23                                    routing_key='rpc_queue',     #设置rpc模式
24                                    properties=pika.BasicProperties(
25                                          reply_to = self.callback_queue,  #指定随机queue
26                                           correlation_id= self.corr_id), #uuid
27                                    body=str(n))                          #发送消息
28         #2.没有数据开始接收
29         while self.response is None:
30             self.connection.process_data_events()  #设置非堵塞模式
31         return int(self.response)                 #返回结果
32 fibonacci_rpc =  FibonacCiRpcClient()
33 print(" [x] Requesting fib(5)")
34 response = fibonacci_rpc.call(5)
35 print('[.]Got%r'%response)
clinet

2.redis

http://www.cnblogs.com/wupeiqi/articles/5132791.html

redis安装

yum install epel-release 
yum install redis 
redis-server /etc/redis.conf
redis-cli -h 192.168.80.11
  • python执行rerdis命令

redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。

#!/usr/bin/env python
# _*_ encoding:utf-8 _*_
__author__ = 'han'
import redis
r = redis.Redis(host='192.168.80.12',port=6379)
r.set('foo','Bar')  #设置一个字典
print(r.get('foo'))  
  • 链接池

redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

#!/usr/bin/env python
# _*_ encoding:utf-8 _*_
__author__ = 'han'
import redis
pool = redis.ConnectionPool(host='192.168.80.12',port=6379)
r = redis.Redis(connection_pool=pool)
r.set('foo','bar')
print(r.get('foo'))
  • 字典

查看修改等..

set name han  #设置一个字典

mset naem8 zhang name9 li  #设置多个 


192.168.80.12:6379> keys *  #查看所有 
1) "naem" 
2) "aeg" 
3) "name" 
4) "foo" 


set test2 100 ex 5  #设置5秒  

setex name4 3 alex  #设置alex3秒


set 2b alex nx         #不存在设置

set 2b abc xx   #存在设置 

getset name9 jack  #设置新值并获取原来的值

getrange name9 0 2 #切片 

setrange name9 0 l  #修改

 

 1 1.set naem11 alex
 2  ord('a')
 3 97
 4 
 5  bin(97)
 6 '0b1100001'
 7 
 8    127 64 32 16 8 4 2 1 
 9    0  97 1  1  0  0 0 0 1
10 
11  192.168.80.12:6379> setbit naem11 6 1
12 (integer) 0
13 
14 192.168.80.12:6379> get naem11
15 "clex"
通过二进制修改

setbit

模拟用户在线
192.168.80.12:6379> setbit n5 1000 1
(integer) 0
192.168.80.12:6379> setbit n5 55 1
(integer) 0
192.168.80.12:6379> setbit n5 600 1
(integer) 0
192.168.80.12:6379> bitcount n5
(integer) 3                         共3 
192.168.80.12:6379> getbit n5 55  查看
(integer) 1

incrbyfloat

1 #支持小数
2 
3 192.168.80.12:6379> set n6 1.0    
4 OK
5 192.168.80.12:6379> incrbyfloat n6 1.2
6 "2.2"
7 192.168.80.12:6379> incrbyfloat n6 1.4
8 "3.6"
View Code

incr

模拟用户在线
192.168.80.12:6379> incr name8 
(integer) 1
192.168.80.12:6379> incr name8 
(integer) 2
192.168.80.12:6379> incr name8  加
(integer) 3
192.168.80.12:6379> decr name8  减
(integer) 2
192.168.80.12:6379> decr name8 
(integer) 1

append

#追加

192.168.80.12:6379> set name9 zhang 设置字典
OK
192.168.80.12:6379> append name9 san   追加为zhangsan
(integer) 8
192.168.80.12:6379> get name9
"zhangsan"
  • Hash

#设置
127.0.0.1:6379> hset ni name li 
(integer) 1

#长度
127.0.0.1:6379> hlen ni   (integer) 1
192.168.80.12:6379> hset info id 999  
(integer) 1

#设置
192.168.80.12:6379> hset info name alex
(integer) 1
192.168.80.12:6379> hget info name
"alex"

#查看keys
192.168.80.12:6379> hkeys info 
1) "id"
2) "name"

 查看values
192.168.80.12:6379> hvals info
1) "999"
2) "alex"

hmset

#设置多个
192.168.80.12:6379> hmset info2 k1 1 k2 2  
OK

#查看keys
192.168.80.12:6379> keys info2  
1) "info2"

#查看所有
192.168.80.12:6379> hgetall info2 
1) "k1"
2) "1"

hmget

#查看多个
1) "1"
192.168.80.12:6379> hmget info2 k1 k2 
2) "2"

hexists

#判断一个keys是否存在
192.168.80.12:6379> hexists info2 k3 
(integer) 0

192.168.80.12:6379> hexists info2 k2
(integer) 1

hscan

#过滤查看
192.168.80.12:6379> hscan info2 0 match k* 
1) "0"
2) 1) "k1"
   2) "1"
   3) "k2"
   4) "2"
  • 列表

设置列表

192.168.80.12:6379> lpush names alex hanyan zhuangyang
(integer) 3

#从右查看
192.168.80.12:6379> lrange names 0 -1  
1) "zhuangyang"
2) "hanyan"
3) "alex"


192.168.80.12:6379> rpush names wangsen zouyi
(integer) 5

#从左查看
192.168.80.12:6379> lrange names 0 -1
1) "zhuangyang"
2) "hanyan"
3) "alex"
4) "wangsen"
5) "zouyi"

#查看长度
127.0.0.1:6379> lpush n1 zhang
(integer) 1
127.0.0.1:6379> llen n1
(integer) 1

lrem

#删除
127.0.0.1:6379> lrem n1 1 zhang  
(integer) 1

#从左删除
127.0.0.1:6379> lpop n1   
"han"

#从右删除
127.0.0.1:6379> rpop n1  
"zhang"

index

#获取index
127.0.0.1:6379> lindex n1 0
"wang"

插入

#插入前面
192.168.80.12:6379> linsert names before alex TEST
(integer) 6
192.168.80.12:6379> lrange names 0 -1
1) "zhuangyang"
2) "hanyan"
3) "TEST"
4) "alex"
5) "wangsen"
6) "zouyi"

#插入后面
192.168.80.12:6379> linsert names after alex TEST
(integer) 7
192.168.80.12:6379> lrange names 0 -1
1) "zhuangyang"
2) "hanyan"
3) "TEST"
4) "alex"
5) "TEST"
6) "wangsen"
7) "zouyi"

修改

修改
192.168.80.12:6379> lset names 3 ALEX
OK
192.168.80.12:6379> lrange names 0 -1 
1) "zhuangyang"
2) "hanyan"
3) "TEST"
4) "ALEX"
5) "TEST"
6) "wangsen"
7) "zouyi"

删除多个

192.168.80.12:6379> lrem names 2 TEST
(integer) 2
192.168.80.12:6379> lrange names 0 -1
1) "zhuangyang"
2) "hanyan"
3) "ALEX"
4) "wangsen"
5) "zouyi"

rpoplpush

#从name2 中删除一个值 如果name2为空就等待10秒
192.168.80.12:6379> blpop names2 10
1) "names2"
2) "wangsen"
192.168.80.12:6379> blpop names2 10
(nil) 
(10.02s)
  • 集合

#添加集合
192.168.80.12:6379> sadd names3 alex alex jack jack 3 3
(integer) 3
#查看集合
192.168.80.12:6379> smembers names3
1) "alex"
2) "3"
3) "jack"

并集

192.168.80.12:6379> sinter names3 names4  
1) "alex"
2) "jack"

差集

#a 中存在b中不存在的
127.0.0.1:6379> sdiff names3 names4 
1) "jack"

sdiffstore

#把差集加入names2中
127.0.0.1:6379> sdiffstore names2 names3 names4
(integer) 1

查看集合

127.0.0.1:6379> smembers names4
1) "zhang"
2) "san"
3) "han"
4) "wei"
5) "alex"

移动,删除

#把names4的han移到names2
smove names4 names2 han
#删除
srem names4 abc

并集

sunion names4 names3

sismember

#查看一个name是否是集合的成员
192.168.80.12:6379> sismember names3 alex
(integer) 1
192.168.80.12:6379> sismember names3 ale
(integer) 0
  • 有序集合

#添加一个有序集合
192.168.80.12:6379> zadd z1 10 alex 5 jack 88 hanyang
(integer) 3

#查看集合
192.168.80.12:6379> zrange z1 0 -1
1) "jack"
2) "alex"
3) "hanyang"

zrank

#查看一个vlaues是否存在
192.168.80.12:6379> zrank z1 alex 
(integer) 1 
192.168.80.12:6379> zrank z1 ajan 
(nil)

zcount

#查看从1-9 
zcount z1 1 9

zrank

获取一个值index
127.0.0.1:6379> zrank z1 han 
(integer) 1

删除

127.0.0.1:6379> zrank z1 han 
(integer) 1

查看并显示编号

127.0.0.1:6379> zrange z9 0 -1 withscores 
1) "han" 
2) "8" 
3) "zhang" 
4) "9" 
5) "alex" 
6) "10" 
7) "zxc" 
8) "88" 
9) "abc" 
10) "100"
  • 其他

包括 字典 列表 集合

筛选

127.0.0.1:6379> keys na* 
1) "name" 
2) "names3" 
3) "names2" 
4) "name2" 
5) "names4"

删除

del name4 

#删除设置时间2秒
expire z8 2 

select

切换列表 
select 3

move

#移动位置
127.0.0.1:6379[1]> move name6 2 
(integer) 1 

rename

#改名
127.0.0.1:6379[2]> rename name6 name 
OK 
  • 管道

redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。

#!/usr/bin/env python
# _*_ encoding:utf-8 _*_
import redis,time

pool = redis.ConnectionPool(host='192.168.1.82',port=6379)

r = redis.Redis(connection_pool=pool)

pipe = r.pipeline()

pipe.set('name','alex')
time.sleep(20)
pipe.set('role','abc')

pipe.execute()


#centos查看
192.168.1.82:6379> get role
"abc"


192.168.1.82:6379> get name
"alex"
  • 发布订阅

发布服务器

 1 #!/usr/bin/env python
 2 # _*_ encoding:utf-8 _*_
 3 
 4 import redis
 5 
 6 
 7 class RedisHelper(object):
 8     def __init__(self):
 9         self.__conn = redis.Redis(host='192.168.1.82')
10         self.chan_sub = 'fm104.5'
11         self.chan_pub = 'fm104.5'
12 
13     def public(self, msg):
14         self.__conn.publish(self.chan_pub,msg)
15         return True
16 
17     def subscribe(self):
18         pub = self.__conn.pubsub()
19         pub.subscribe(self.chan_sub)
20         pub.parse_response()
21         return pub
22 
23 
24 obj = RedisHelper()
25 obj.public('hello')
View Code

订阅服务器

 1 # _*_coding:utf-8_*_
 2 __author__ = 'Alex Li'
 3 
 4 import sys,os
 5 
 6 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 7 sys.path.append(BASE_DIR)
 8 
 9 from day2.monitor import RedisHelper
10 
11 obj = RedisHelper()
12 redis_sub = obj.subscribe()
13 
14 while True:
15     msg = redis_sub.parse_response()
16     print(msg)
View Code
原文地址:https://www.cnblogs.com/hanwei999/p/7534992.html