python之上下文管理、redis的发布订阅、rabbitmq

使用with打开文件的方式,是调用了上下文管理的功能

 1 #打开文件的两种方法:
 2 
 3 f = open('a.txt','r')
 4 
 5 with open('a.txt','r') as f 
 6 
 7 实现使用with关闭socket
 8 import contextlib
 9 import socket
10 
11 @contextlib.contextmanage
12 def Sock(ip,port):
13     socket = socket.socket()
14     socket.bind((ip,port))
15     socket.listen(5)
16     try:
17         yield socket
18     finally:
19         socket.close()
20 
21 #执行Sock函数传入参数,执行到yield socket返回值给s,执行with语句体,执行finally后面的语句
22 with Sock('127.0.0.1',8000) as s:
23     print(s)

redis的发布订阅

class RedisHelper:

    def __init__(self):
        #调用类时自动连接redis
        self.__conn = redis.Redis(host='192.168.1.100')

    def public(self, msg, chan):
        self.__conn.publish(chan, msg)
        return True

    def subscribe(self, chan):
        pub = self.__conn.pubsub()
        pub.subscribe(chan)
        pub.parse_response()
        return pub

#订阅者
import s3

obj = s3.RedisHelper()
data = obj.subscribe('fm111.7')
print(data.parse_response())

#发布者
import s3

obj = s3.RedisHelper()
obj.public('alex db', 'fm111.7')

 RabbitMQ

 1 #消费者
 2 import pika
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
 5 channel = connection.channel()#创建对象
 6 
 7 channel.queue_declare(queue = 'wocao')
 8 def callback(ch,method,properties,body):
 9     print("[x] Received %r"%body)
10 
11 channel.basic_consume(callback,queue = 'wocao',no_ack = True)
12 print('[*] Waiting for messages. To exit press CTRL+C')
13 channel.start_consuming()
14 
15 #生产者
16 import pika
17 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
18 channel = connection.channel()
19 channel.queue_declare(queue = 'wocao')#指定一个队列,不存在此队列则创建
20 channel.basic_publish(exchange = '',routing_key = 'wocao',body = 'hello world!')
21 print("[x] Sent 'hello world!")
22 connection.close()

 exchange type类型

#生产者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.11.87'))
channel = connection.channel()
#fanout类型,对绑定该exchange的队列实行广播
channel.exchange_declare(exchange='logs_fanout',
                         type='fanout')

# 随机创建队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 绑定exchange
channel.queue_bind(exchange='logs_fanout',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
#消费者
import pika

#发送方
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.11.87'))
channel = connection.channel()

channel.exchange_declare(exchange='logs_fanout',
                         type='fanout')

message = "what's the fuck"
#设置exchange的名
channel.basic_publish(exchange='logs_fanout',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
 1 #根据关键字发送指定队列
 2 #生产者(发布者)
 3 import pika
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5                                      host = '127.0.0.1'))
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='direct_logs_1',
 9                          type='direct')  # 关键字发送到队列
10 #对error关键字队列发送指令
11 severity = 'error'
12 message = '123'
13 channel.basic_publish(exchange = 'direct_logs_1',
14                        routing_key = severity,
15                        body = message)
16 print('[x] Sent %r:%r'%(severity,message))
17 connection.close()
18 #消费者(订阅者)
19 import pika
20 #消费者
21 connection = pika.BlockingConnection(pika.ConnectionParameters(
22                                      host = '127.0.0.1'))
23 channel = connection.channel()
24 channel.exchange_declare(exchange='direct_logs_1',
25                          type = 'direct')#关键字发送到队列
26 
27 result = channel.queue_declare(exclusive=True)
28 queue_name = result.method.queue
29 serverities = ['error','info','warning']
30 for severity in serverities:
31     channel.queue_bind(exchange='direct_logs_1',
32                        queue = queue_name,
33                        routing_key = severity)
34 def callback(ch,method,properties,body):
35     print('[x] %r:%r'%(method.routing_key,body))
36 
37 channel.basic_consume(callback,
38                       queue = queue_name,
39                       no_ack = True)
40 channel.start_consuming()
 1 #实现消息不丢失接收方
 2 import pika
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4'))
 4 channel = connection.channel()
 5 channel.queue_declare(queue = 'hello')
 6 
 7 def callback(ch,method,properties,body):
 8     print('redeived %s'%body)
 9     import time
10     time.sleep(10)
11     print('ok')
12     ch.basic_ack(delivery_tag= method.delivery_tag)
13 #no_ack = False接收方接受完请求后发送给对方一个接受成功的信号,如果没收到mq会重新将任务放到队列
14 channel.basic_consume(callback,queue = 'hello',no_ack=False)
15 print(' Waiting for messages.To exit press CTRL+C')
16 channel.start_consuming()
 1 #发送方
 2 #实现消息不丢失
 3 import pika
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4'))
 5 channel = connection.channel()
 6 channel.queue_declare(queue = 'hello',durable = True)
 7 channel.basic_publish(exchange = '',routing_key = 'hello world',
 8                       properties = pika.BasicProperties(
 9                           delivery_mode=2,
10                       ))#发送方不丢失,发送方保持持久化
11 print(' Waiting for messages.To exit press CTRL+C')
12 channel.start_consuming()
 1 #接收方
 2 import pika
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100'))
 5 channel = connection.channel()
 6 
 7 
 8 channel.queue_declare(queue='hello', durable=True)
 9 def callback(ch, method, properties, body):
10     print(" [x] Received %r" % body)
11     import time
12     time.sleep(10)
13     print 'ok'
14     ch.basic_ack(delivery_tag = method.delivery_tag)
15 channel.basic_consume(callback,
16                       queue='hello',
17                       no_ack=False)
18 channel.start_consuming()

RabbitMQ队列中默认情况下,接收方从队列中获取消息是顺序的,例如:接收方1只从队列中获取奇数的任务,接收方2只从队列中获取偶数任务

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)
#表示队列不分奇偶分配,谁来取任务就给谁
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

RabbitMQ会重新将该任务添加到队列中

原文地址:https://www.cnblogs.com/liguangxu/p/5704390.html