参考博客:
mysql
http://www.cnblogs.com/wupeiqi/articles/5699254.html
缓存
http://www.cnblogs.com/wupeiqi/articles/5132791.html
线程池
http://www.cnblogs.com/wupeiqi/articles/4839959.html
一,线程池:
上下文管理
import contextlib @contextlib.contextmanager def worker_state(state_list, worker_thread): state_list.append(worker_thread) try: print(state_list) yield #相当于return 返回值功能,但是不终止整个函数,只是跳出后重新执行函数 finally: state_list.remove(worker_thread) free_list = [] current_thread = "alex" with worker_state(free_list, current_thread): print(123)
yield简单用法:
def fab(max): a,b = 0,1 while a < max: yield a #a的值返回给i a, b = b, a+b #然后a被重新赋值 for i in fab(15): print(i,',',)
终止线程池操作
---利用contextlib模块以及with完成socket的自动关闭
import contextlib import socket @contextlib.contextmanager def context_socket(host, port): s = socket.socket() s.bind( (host, port) ) #元组格式 s.listen(5) try: yield #返回值为None ==sock finally: s.close() with context_socket("127.0.0.1", 8888) as sock: print(sock)
二 redis 发布订阅
连接池
import redis pool = redis.ConnectionPool(host='192.168.61.131', port=6379) #连接服务器端 r = redis.Redis(connection_pool=pool) #使用线程池 r.set('foo', 'bar') print(r.get('foo'))
自定列表操作
事务:
原子性操作
**发布订阅
1 import redis 2 3 class RedisHelper: 4 def __init__(self): 5 self.__conn = redis.Redis(host="192.168.61.131") #redis 服务器 6 7 def public(self, msg, chan): 8 self.__conn.publish(chan, msg) #发布者 发布频道和信息 9 10 def subscribe(self, chan): 11 pub = self.__conn.pubsub() #订阅者接收 12 pub.subscribe(chan) #接收的频道 13 pub.parse_response() #接收消息 14 return pub 15 16 17 #等待发布者以及订阅者调用的模块
1 import demo 2 3 obj = demo.RedisHelper() 4 obj.public('alex db', 'fm111.7')
1 import demo 2 3 obj = demo.RedisHelper() 4 data = obj.subscribe('fm111.7') 5 print(data.parse_response()) 6 #[b'message', b'fm111.7', b'alex db']
先执行订阅者完成订阅,然后发布者发布消息,订阅者会接收到发布者发布的消息。
三 rabbitMQ
1,基础
安装方法(参考):http://yidao620c.iteye.com/blog/1947335
基于Queue实现生产者消费者模型:
import queue import threading import time q = queue.Queue(20) def productor(arg): ''' 生产者 :param arg: :return: ''' while True: q.put(str(arg) + "-包子") time.sleep(1) def consumer(arg): while True: print(arg , q.get()) time.sleep(1) for i in range(3): t = threading.Thread(target = productor, args = (i, )) t.start() for j in range(20): t = threading.Thread(target = consumer, args = (j, )) t.start()
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
1 import pika 2 3 #消费者 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='192.168.61.131')) 7 channel = connection.channel() 8 9 channel.queue_declare(queue='hello34') 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 13 channel.basic_consume(callback, 14 queue='hello34', 15 no_ack=True) 16 17 print(' [*] Waiting for messages. To exit press CTRL+C') 18 channel.start_consuming()
1 import pika 2 3 #生产者 4 5 connection= pika.BlockingConnection(pika.ConnectionParameters( 6 host='192.168.61.131' 7 )) 8 9 channel = connection.channel() 10 11 channel.queue_declare(queue='hello34') 12 13 channel.basic_publish(exchange='', 14 routing_key='hello34', 15 body='hello world') 16 print(" [x] Sent 'Hello World!'") 17 connection.close()
1、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host='192.168.61.131')) 5 channel = connection.channel() 6 7 channel.queue_declare(queue='hello') 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 import time 12 time.sleep(10) 13 print('ok') 14 ch.basic_ack(delivery_tag = method.delivery_tag) #回复队列确认任务已完成 15 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack=False) 19 20 print(' [*] Waiting for messages. To exit press CTRL+C') 21 channel.start_consuming()
2、durable 消息不丢失(持久化)
channel.queue_declare(queue='hello1', durable=True) 在消费者和生产者都需要设置
通过delivery_mode = 2, 实现消息持久化
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello1', durable=True) 8 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 import time 13 time.sleep(10) 14 print('ok') 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_consume(callback, 18 queue='hello1', 19 no_ack=False) 20 21 print(' [*] Waiting for messages. To exit press CTRL+C') 22 channel.start_consuming() 23 24 #消费者
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello1', durable=True) 8 9 channel.basic_publish(exchange='', 10 routing_key='hello1', 11 body='Hello World!', 12 properties=pika.BasicProperties( 13 delivery_mode=2, # make message persistent 14 )) 15 print(" [x] Sent 'Hello World!'") 16 connection.close() 17 18 #生产者
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello1', durable=True) 8 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 import time 13 time.sleep(10) 14 print('ok') 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_qos(prefetch_count=1) #实现随来随取 18 19 channel.basic_consume(callback, 20 queue='hello1', 21 no_ack=False) 22 23 print(' [*] Waiting for messages. To exit press CTRL+C') 24 channel.start_consuming() 25 26 #消费者
重点说明
a、使用工作队列的一个好处就是能够并行的处理队列。如果任务堆积,只需要添加更多的工作者work即可
b、对于多个work,RabbitMQ会按照顺序把消息发送给每个消费者,这种方式为轮询(round-robin)
c、消息响应:如果一个work挂掉,上面代码实现将这个消息发送给其他work,而不是丢弃。
因此需要消息响应机制,每个work处理完成任务的时候,会发送一个ack,告诉RabbitMQ-server已经收到并处理某条消息,然后RabbitMQ-server释放并删除这条消息。
d、消息ack没有超时的概念,这样在处理一个非常耗时的消息任务时候就不会出现问题
e、消息ack默认是开启的,通过no_ack=True标识关闭,在回调函数中basic_ack中
f、如果忘记调用basic_ack的话,这样消息在程序退出后重新发送,会导致RabbitMQ-server中消息堆积,占用越来越多的内存。通过如下命令进行确认:
[root@localhost sbin]# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 hello1 3 0 hello34 0 0 ...done.
存在三个堆积的任务
g、关于队列大小:如果所有的工作者都在处理任务,队列就会被填满。需要留意这个问题,要么添加更多的工作者,要么使用其他策略,例如设置队列大小等。
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
1 #!/usr/bin/env python 2 # -*- coding: UTF-8 -*- 3 # Author: Aaron Shen 4 5 import pika 6 import sys 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters( 9 host='192.168.61.131')) 10 channel = connection.channel() 11 12 channel.exchange_declare(exchange='logs', 13 type='fanout') 14 15 message = "info: Hello World!" 16 channel.basic_publish(exchange='logs', 17 routing_key='', 18 body=message) 19 print(" [x] Sent %r" % message) 20 connection.close() 21 22 # 发布者
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host='192.168.61.131')) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange='logs', 8 type='fanout') 9 10 result = channel.queue_declare(exclusive=True) #队列断开后自动删除临时队列 11 queue_name = result.method.queue # 队列名采用服务端分配的临时队列 12 13 channel.queue_bind(exchange='logs', 14 queue=queue_name) 15 16 print(' [*] Waiting for logs. To exit press CTRL+C') 17 18 def callback(ch, method, properties, body): 19 print(" [x] %r" % body) 20 21 channel.basic_consume(callback, 22 queue=queue_name, 23 no_ack=True) 24 25 channel.start_consuming()
5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
1 #!/usr/bin/env python 2 # -*- coding: UTF-8 -*- 3 # Author: Aaron Shen 4 5 #!/usr/bin/env python 6 import pika 7 import sys 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters( 10 host='192.168.61.131')) 11 channel = connection.channel() 12 13 channel.exchange_declare(exchange='direct_logs_test', 14 type='direct') 15 16 severity = 'error' 17 message = 'qwe' 18 channel.basic_publish(exchange='direct_logs_test', 19 routing_key=severity, 20 body=message) 21 print(" [x] Sent %r:%r" % (severity, message)) 22 connection.close() 23 24 #生产者
1 #!/usr/bin/env python 2 # -*- coding: UTF-8 -*- 3 # Author: Aaron Shen 4 5 #!/usr/bin/env python 6 import pika 7 import sys 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters( 10 host='192.168.61.131')) 11 channel = connection.channel() 12 13 channel.exchange_declare(exchange='direct_logs_test', 14 type='direct') 15 16 result = channel.queue_declare(exclusive=True) 17 queue_name = result.method.queue 18 19 severities = ['info'] #只消费info 20 if not severities: 21 sys.stderr.write("Usage: %s [info] [error] " % severities) 22 sys.exit(1) 23 24 for severity in severities: 25 channel.queue_bind(exchange='direct_logs_test', 26 queue=queue_name, 27 routing_key=severity) 28 29 print(' [*] Waiting for logs. To exit press CTRL+C') 30 31 def callback(ch, method, properties, body): 32 print(" [x] %r:%r" % (method.routing_key, body)) 33 34 channel.basic_consume(callback, 35 queue=queue_name, 36 no_ack=True) 37 38 channel.start_consuming() 39 40 #消费者
1 #!/usr/bin/env python 2 # -*- coding: UTF-8 -*- 3 # Author: Aaron Shen 4 5 #!/usr/bin/env python 6 import pika 7 import sys 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters( 10 host='192.168.61.131')) 11 channel = connection.channel() 12 13 channel.exchange_declare(exchange='direct_logs_test', 14 type='direct') 15 16 result = channel.queue_declare(exclusive=True) 17 queue_name = result.method.queue 18 19 severities = ['error', 'info'] #可以消费error 和info的 20 if not severities: 21 sys.stderr.write("Usage: %s [error] " % severities) 22 sys.exit(1) 23 24 for severity in severities: 25 channel.queue_bind(exchange='direct_logs_test', 26 queue=queue_name, 27 routing_key=severity) 28 29 print(' [*] Waiting for logs. To exit press CTRL+C') 30 31 def callback(ch, method, properties, body): 32 print(" [x] %r:%r" % (method.routing_key, body)) 33 34 channel.basic_consume(callback, 35 queue=queue_name, 36 no_ack=True) 37 38 channel.start_consuming() 39 40 #消费者
6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
- # 表示可以匹配 0 个 或 多个 单词
- * 表示只能匹配 一个 单词
发送者路由值 队列中 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配
1 #!/usr/bin/env python 2 # -*- coding: UTF-8 -*- 3 # Author: Aaron Shen 4 5 import pika 6 import sys 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters( 9 host='192.168.61.131')) 10 channel = connection.channel() 11 12 channel.exchange_declare(exchange='topic_logs', 13 type='topic') 14 15 routing_key = 'anonymous.info' 16 message = ' '.join(sys.argv[2:]) or 'Hello World!' 17 channel.basic_publish(exchange='topic_logs', 18 routing_key=routing_key, 19 body=message) 20 print(" [x] Sent %r:%r" % (routing_key, message)) 21 connection.close() 22 23 # 生产者
1 #!/usr/bin/env python 2 # -*- coding: UTF-8 -*- 3 # Author: Aaron Shen 4 5 import pika 6 import sys 7 8 connection = pika.BlockingConnection(pika.ConnectionParameters( 9 host='192.168.61.131')) 10 channel = connection.channel() 11 12 channel.exchange_declare(exchange='topic_logs', 13 type='topic') 14 15 result = channel.queue_declare(exclusive=True) 16 queue_name = result.method.queue 17 18 binding_keys = ['*.info', ] #模糊查询 19 if not binding_keys: 20 sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) 21 sys.exit(1) 22 23 for binding_key in binding_keys: 24 channel.queue_bind(exchange='topic_logs', 25 queue=queue_name, 26 routing_key=binding_key) 27 28 print(' [*] Waiting for logs. To exit press CTRL+C') 29 30 def callback(ch, method, properties, body): 31 print(" [x] %r:%r" % (method.routing_key, body)) 32 33 channel.basic_consume(callback, 34 queue=queue_name, 35 no_ack=True) 36 37 channel.start_consuming() 38 39 # 消费者