python之day12(线程池,redis,rabbitMQ)

参考博客:

mysql
http://www.cnblogs.com/wupeiqi/articles/5699254.html   

缓存
http://www.cnblogs.com/wupeiqi/articles/5132791.html

线程池
http://www.cnblogs.com/wupeiqi/articles/4839959.html

一,线程池:

  上下文管理

import contextlib

@contextlib.contextmanager
def worker_state(state_list, worker_thread):
    state_list.append(worker_thread)
    try:
        print(state_list)
        yield                       #相当于return 返回值功能,但是不终止整个函数,只是跳出后重新执行函数
    finally:
        state_list.remove(worker_thread)

free_list = []
current_thread = "alex"

with worker_state(free_list, current_thread):
    print(123)        

  yield简单用法:

def fab(max):
    a,b = 0,1
    while a < max:
        yield a                    #a的值返回给i
        a, b = b, a+b           #然后a被重新赋值

for i in fab(15):
    print(i,',',)    

   

  终止线程池操作

  ---利用contextlib模块以及with完成socket的自动关闭

import contextlib
import socket

@contextlib.contextmanager
def context_socket(host, port):
    s = socket.socket()
    s.bind( (host, port) )   #元组格式
    s.listen(5)
    try:
        yield                   #返回值为None ==sock
    finally:
        s.close()

with context_socket("127.0.0.1", 8888) as sock:
    print(sock)

二 redis 发布订阅
    连接池

import redis

pool = redis.ConnectionPool(host='192.168.61.131', port=6379)       #连接服务器端
r = redis.Redis(connection_pool=pool)       #使用线程池
r.set('foo', 'bar')
print(r.get('foo'))


    自定列表操作
    事务:
        原子性操作
    **发布订阅

 1 import redis
 2 
 3 class RedisHelper:
 4     def __init__(self):
 5         self.__conn = redis.Redis(host="192.168.61.131")     #redis 服务器
 6 
 7     def public(self, msg, chan):
 8         self.__conn.publish(chan, msg)          #发布者 发布频道和信息
 9 
10     def subscribe(self, chan):
11         pub = self.__conn.pubsub()               #订阅者接收
12         pub.subscribe(chan)                         #接收的频道
13         pub.parse_response()                       #接收消息
14         return pub        
15 
16 
17 #等待发布者以及订阅者调用的模块
demo
1 import demo
2 
3 obj = demo.RedisHelper()
4 obj.public('alex db', 'fm111.7')
发布者
1 import demo
2 
3 obj = demo.RedisHelper()
4 data = obj.subscribe('fm111.7')
5 print(data.parse_response())      
6 #[b'message', b'fm111.7', b'alex db']
订阅者

  先执行订阅者完成订阅,然后发布者发布消息,订阅者会接收到发布者发布的消息。

三 rabbitMQ
    1,基础

  安装方法(参考):http://yidao620c.iteye.com/blog/1947335

  基于Queue实现生产者消费者模型:

  

import queue
import threading
import time

q = queue.Queue(20)
def productor(arg):
    '''
    生产者
    :param arg:
    :return:
    '''
    while True:
        q.put(str(arg) + "-包子")
        time.sleep(1)

def consumer(arg):
    while True:
        print(arg , q.get())
        time.sleep(1)

for i in range(3):
    t = threading.Thread(target = productor, args = (i, ))
    t.start()

for j in range(20):
    t = threading.Thread(target = consumer, args = (j, ))
    t.start()

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

 1 import pika
 2 
 3 #消费者
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host='192.168.61.131'))
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='hello34')
10 def callback(ch, method, properties, body):
11     print(" [x] Received %r" % body)
12 
13 channel.basic_consume(callback,
14                       queue='hello34',
15                       no_ack=True)
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()
消费者
 1 import pika
 2 
 3 #生产者
 4 
 5 connection= pika.BlockingConnection(pika.ConnectionParameters(
 6     host='192.168.61.131'
 7 ))
 8 
 9 channel = connection.channel()
10 
11 channel.queue_declare(queue='hello34')
12 
13 channel.basic_publish(exchange='',
14                       routing_key='hello34',
15                       body='hello world')
16 print(" [x] Sent 'Hello World!'")
17 connection.close()
生产者

1、acknowledgment 消息不丢失

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4         host='192.168.61.131'))
 5 channel = connection.channel()
 6 
 7 channel.queue_declare(queue='hello')
 8 
 9 def callback(ch, method, properties, body):
10     print(" [x] Received %r" % body)
11     import time
12     time.sleep(10)
13     print('ok')
14     ch.basic_ack(delivery_tag = method.delivery_tag)    #回复队列确认任务已完成
15 
16 channel.basic_consume(callback,
17                       queue='hello',
18                       no_ack=False)
19 
20 print(' [*] Waiting for messages. To exit press CTRL+C')
21 channel.start_consuming()
消费者

2、durable   消息不丢失(持久化)

channel.queue_declare(queue='hello1', durable=True) 在消费者和生产者都需要设置
通过delivery_mode = 2, 实现消息持久化
 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131'))
 4 channel = connection.channel()
 5 
 6 # make message persistent
 7 channel.queue_declare(queue='hello1', durable=True)
 8 
 9 
10 def callback(ch, method, properties, body):
11     print(" [x] Received %r" % body)
12     import time
13     time.sleep(10)
14     print('ok')
15     ch.basic_ack(delivery_tag = method.delivery_tag)
16 
17 channel.basic_consume(callback,
18                       queue='hello1',
19                       no_ack=False)
20 
21 print(' [*] Waiting for messages. To exit press CTRL+C')
22 channel.start_consuming()
23 
24 #消费者
消费者
 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131'))
 4 channel = connection.channel()
 5 
 6 # make message persistent
 7 channel.queue_declare(queue='hello1', durable=True)
 8 
 9 channel.basic_publish(exchange='',
10                       routing_key='hello1',
11                       body='Hello World!',
12                       properties=pika.BasicProperties(
13                           delivery_mode=2, # make message persistent
14                       ))
15 print(" [x] Sent 'Hello World!'")
16 connection.close()
17 
18 #生产者
生产者

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131'))
 4 channel = connection.channel()
 5 
 6 # make message persistent
 7 channel.queue_declare(queue='hello1', durable=True)
 8 
 9 
10 def callback(ch, method, properties, body):
11     print(" [x] Received %r" % body)
12     import time
13     time.sleep(10)
14     print('ok')
15     ch.basic_ack(delivery_tag = method.delivery_tag)
16 
17 channel.basic_qos(prefetch_count=1)    #实现随来随取
18 
19 channel.basic_consume(callback,
20                       queue='hello1',
21                       no_ack=False)
22 
23 print(' [*] Waiting for messages. To exit press CTRL+C')
24 channel.start_consuming()
25 
26 #消费者
消费者

重点说明

a、使用工作队列的一个好处就是能够并行的处理队列。如果任务堆积,只需要添加更多的工作者work即可

b、对于多个work,RabbitMQ会按照顺序把消息发送给每个消费者,这种方式为轮询(round-robin)

c、消息响应:如果一个work挂掉,上面代码实现将这个消息发送给其他work,而不是丢弃。

     因此需要消息响应机制,每个work处理完成任务的时候,会发送一个ack,告诉RabbitMQ-server已经收到并处理某条消息,然后RabbitMQ-server释放并删除这条消息。

d、消息ack没有超时的概念,这样在处理一个非常耗时的消息任务时候就不会出现问题

e、消息ack默认是开启的,通过no_ack=True标识关闭,在回调函数中basic_ack中

f、如果忘记调用basic_ack的话,这样消息在程序退出后重新发送,会导致RabbitMQ-server中消息堆积,占用越来越多的内存。通过如下命令进行确认:

[root@localhost sbin]# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello   0       0
hello1  3       0
hello34 0       0
...done.

存在三个堆积的任务

g、关于队列大小:如果所有的工作者都在处理任务,队列就会被填满。需要留意这个问题,要么添加更多的工作者,要么使用其他策略,例如设置队列大小等。

4、发布订阅   

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

 exchange type = fanout

 1 #!/usr/bin/env  python
 2 # -*- coding: UTF-8 -*-
 3 # Author: Aaron Shen
 4 
 5 import pika
 6 import sys
 7 
 8 connection = pika.BlockingConnection(pika.ConnectionParameters(
 9         host='192.168.61.131'))
10 channel = connection.channel()
11 
12 channel.exchange_declare(exchange='logs',
13                          type='fanout')
14 
15 message = "info: Hello World!"
16 channel.basic_publish(exchange='logs',
17                       routing_key='',
18                       body=message)
19 print(" [x] Sent %r" % message)
20 connection.close()
21 
22 # 发布者
发布者
 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4         host='192.168.61.131'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='logs',
 8                          type='fanout')
 9 
10 result = channel.queue_declare(exclusive=True) #队列断开后自动删除临时队列
11 queue_name = result.method.queue            # 队列名采用服务端分配的临时队列
12 
13 channel.queue_bind(exchange='logs',
14                    queue=queue_name)
15 
16 print(' [*] Waiting for logs. To exit press CTRL+C')
17 
18 def callback(ch, method, properties, body):
19     print(" [x] %r" % body)
20 
21 channel.basic_consume(callback,
22                       queue=queue_name,
23                       no_ack=True)
24 
25 channel.start_consuming()
订阅者

5、关键字发送

 exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

 1 #!/usr/bin/env  python
 2 # -*- coding: UTF-8 -*-
 3 # Author: Aaron Shen
 4 
 5 #!/usr/bin/env python
 6 import pika
 7 import sys
 8 
 9 connection = pika.BlockingConnection(pika.ConnectionParameters(
10         host='192.168.61.131'))
11 channel = connection.channel()
12 
13 channel.exchange_declare(exchange='direct_logs_test',
14                          type='direct')
15 
16 severity = 'error'
17 message = 'qwe'
18 channel.basic_publish(exchange='direct_logs_test',
19                       routing_key=severity,
20                       body=message)
21 print(" [x] Sent %r:%r" % (severity, message))
22 connection.close()
23 
24 #生产者
生产者
 1 #!/usr/bin/env  python
 2 # -*- coding: UTF-8 -*-
 3 # Author: Aaron Shen
 4 
 5 #!/usr/bin/env python
 6 import pika
 7 import sys
 8 
 9 connection = pika.BlockingConnection(pika.ConnectionParameters(
10         host='192.168.61.131'))
11 channel = connection.channel()
12 
13 channel.exchange_declare(exchange='direct_logs_test',
14                          type='direct')
15 
16 result = channel.queue_declare(exclusive=True)
17 queue_name = result.method.queue
18 
19 severities = ['info']      #只消费info
20 if not severities:
21     sys.stderr.write("Usage: %s [info] [error]
" % severities)
22     sys.exit(1)
23 
24 for severity in severities:
25     channel.queue_bind(exchange='direct_logs_test',
26                        queue=queue_name,
27                        routing_key=severity)
28 
29 print(' [*] Waiting for logs. To exit press CTRL+C')
30 
31 def callback(ch, method, properties, body):
32     print(" [x] %r:%r" % (method.routing_key, body))
33 
34 channel.basic_consume(callback,
35                       queue=queue_name,
36                       no_ack=True)
37 
38 channel.start_consuming()
39 
40 #消费者
消费者1
 1 #!/usr/bin/env  python
 2 # -*- coding: UTF-8 -*-
 3 # Author: Aaron Shen
 4 
 5 #!/usr/bin/env python
 6 import pika
 7 import sys
 8 
 9 connection = pika.BlockingConnection(pika.ConnectionParameters(
10         host='192.168.61.131'))
11 channel = connection.channel()
12 
13 channel.exchange_declare(exchange='direct_logs_test',
14                          type='direct')
15 
16 result = channel.queue_declare(exclusive=True)
17 queue_name = result.method.queue
18 
19 severities = ['error', 'info']                  #可以消费error 和info的
20 if not severities:
21     sys.stderr.write("Usage: %s [error]
" % severities)
22     sys.exit(1)
23 
24 for severity in severities:
25     channel.queue_bind(exchange='direct_logs_test',
26                        queue=queue_name,
27                        routing_key=severity)
28 
29 print(' [*] Waiting for logs. To exit press CTRL+C')
30 
31 def callback(ch, method, properties, body):
32     print(" [x] %r:%r" % (method.routing_key, body))
33 
34 channel.basic_consume(callback,
35                       queue=queue_name,
36                       no_ack=True)
37 
38 channel.start_consuming()
39 
40 #消费者
消费者2

6、模糊匹配

 exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
发送者路由值              队列中

old.boy.python          old.*  -- 不匹配

old.boy.python          old.#  -- 匹配
 1 #!/usr/bin/env  python
 2 # -*- coding: UTF-8 -*-
 3 # Author: Aaron Shen
 4 
 5 import pika
 6 import sys
 7 
 8 connection = pika.BlockingConnection(pika.ConnectionParameters(
 9         host='192.168.61.131'))
10 channel = connection.channel()
11 
12 channel.exchange_declare(exchange='topic_logs',
13                          type='topic')
14 
15 routing_key = 'anonymous.info'
16 message = ' '.join(sys.argv[2:]) or 'Hello World!'
17 channel.basic_publish(exchange='topic_logs',
18                       routing_key=routing_key,
19                       body=message)
20 print(" [x] Sent %r:%r" % (routing_key, message))
21 connection.close()
22 
23 # 生产者
生产者
 1 #!/usr/bin/env  python
 2 # -*- coding: UTF-8 -*-
 3 # Author: Aaron Shen
 4 
 5 import pika
 6 import sys
 7 
 8 connection = pika.BlockingConnection(pika.ConnectionParameters(
 9         host='192.168.61.131'))
10 channel = connection.channel()
11 
12 channel.exchange_declare(exchange='topic_logs',
13                          type='topic')
14 
15 result = channel.queue_declare(exclusive=True)
16 queue_name = result.method.queue
17 
18 binding_keys = ['*.info', ]                  #模糊查询
19 if not binding_keys:
20     sys.stderr.write("Usage: %s [binding_key]...
" % sys.argv[0])
21     sys.exit(1)
22 
23 for binding_key in binding_keys:
24     channel.queue_bind(exchange='topic_logs',
25                        queue=queue_name,
26                        routing_key=binding_key)
27 
28 print(' [*] Waiting for logs. To exit press CTRL+C')
29 
30 def callback(ch, method, properties, body):
31     print(" [x] %r:%r" % (method.routing_key, body))
32 
33 channel.basic_consume(callback,
34                       queue=queue_name,
35                       no_ack=True)
36 
37 channel.start_consuming()
38 
39 # 消费者
消费者

  

原文地址:https://www.cnblogs.com/aaron-shen/p/5703402.html