python之路11:RabbitMQ消息队列

RabbitMQ

简介安装 https://www.cnblogs.com/BillyLV/articles/11028146.html

安装python rabbitMQ module 

pip install pika

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

简单消息队列:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 
 7 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
 8 # 连接rabbitmq
 9 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
10 channel = connection.channel()  # 声明管道
11 channel.queue_declare(queue='Hello')  # 声明队列
12 
13 # 消息发送方,exchange交换机,routing_key指定投到的队列
14 channel.basic_publish(exchange='', routing_key='Hello', body='Hello World!')
15 print("[x] Sent 'Hello World!'")
16 connection.close()  # 关闭rabbitmq连接
Publisher
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 
 7 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
 8 # 连接rabbitmq
 9 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
10 channel = connection.channel()  # 声明管道
11 channel.queue_declare(queue='Hello')  # 声明队列
12 
13 
14 def callback(ch, method, properties, body):
15     # print('---', ch, method, properties)
16     print("[x] Received %r" % body)
17 
18 
19 # 消息接收方,从Hello队列取消息
20 channel.basic_consume(
21     queue='Hello', on_message_callback=callback, auto_ack=True)
22 
23 print('[*] Waiting for messages. To exit press CTRL+C')
24 channel.start_consuming()  # 处理IO事件并调度计时器和`basic_consume`的callback,直到所有消费者都被取消。
Consumer

 工作队列:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import time
 7 import sys
 8 
 9 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
10 # 连接rabbitmq
11 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
12 channel = connection.channel()  # 声明管道
13 channel.queue_declare(queue='task_queue', durable=True)
14 
15 message = ' '.join(sys.argv[1:]) or "Hello World!"
16 channel.basic_publish(
17     exchange='',
18     routing_key='task_queue',
19     body=message,
20     properties=pika.BasicProperties(
21         delivery_mode=2,  # make message persistent
22     ))
23 print(" [x] Sent %r" % message)
24 connection.close()
发送方
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import time
 7 
 8 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
 9 # 连接rabbitmq
10 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
11 channel = connection.channel()  # 声明管道
12 
13 channel.queue_declare(queue='task_queue', durable=True)
14 print(' [*] Waiting for messages. To exit press CTRL+C')
15 
16 
17 def callback(ch, method, properties, body):
18     print(" [x] Received %r" % body)
19     time.sleep(body.count(b'.'))
20     print(" [x] Done")
21     ch.basic_ack(delivery_tag=method.delivery_tag)
22 
23 
24 channel.basic_qos(prefetch_count=1)
25 channel.basic_consume(queue='task_queue', on_message_callback=callback)
26 
27 channel.start_consuming()
接收方

发布订阅:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import time
 7 import sys
 8 
 9 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
10 # 连接rabbitmq
11 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
12 channel = connection.channel()  # 声明管道
13 
14 channel.exchange_declare(exchange='logs', exchange_type='fanout')
15 
16 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
17 channel.basic_publish(exchange='logs', routing_key='', body=message)
18 print(" [x] Sent %r" % message)
19 connection.close()
发布消息
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import time
 7 
 8 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
 9 # 连接rabbitmq
10 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
11 channel = connection.channel()  # 声明管道
12 
13 channel.exchange_declare(exchange='logs', exchange_type='fanout')
14 
15 result = channel.queue_declare(queue='', exclusive=True)
16 queue_name = result.method.queue
17 
18 channel.queue_bind(exchange='logs', queue=queue_name)
19 
20 print(' [*] Waiting for logs. To exit press CTRL+C')
21 
22 
23 def callback(ch, method, properties, body):
24     print(" [x] %r" % body)
25 
26 channel.basic_consume(
27     queue=queue_name, on_message_callback=callback, auto_ack=True)
28 
29 channel.start_consuming()
接收消息

路由:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import sys
 7 
 8 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
 9 # 连接rabbitmq
10 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
11 channel = connection.channel()  # 声明管道
12 
13 channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
14 
15 result = channel.queue_declare(queue='', exclusive=True)
16 queue_name = result.method.queue
17 
18 severities = sys.argv[1:]
19 if not severities:
20     sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
21     sys.exit(1)
22 
23 for severity in severities:
24     channel.queue_bind(
25         exchange='direct_logs', queue=queue_name, routing_key=severity)
26 
27 print(' [*] Waiting for logs. To exit press CTRL+C')
28 
29 
30 def callback(ch, method, properties, body):
31     print(" [x] %r:%r" % (method.routing_key, body))
32 
33 
34 channel.basic_consume(
35     queue=queue_name, on_message_callback=callback, auto_ack=True)
36 
37 channel.start_consuming()
direct消息接收方
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import time
 7 import sys
 8 
 9 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
10 # 连接rabbitmq
11 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
12 channel = connection.channel()  # 声明管道
13 
14 channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
15 
16 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
17 message = ' '.join(sys.argv[2:]) or 'Hello World!'
18 channel.basic_publish(
19     exchange='direct_logs', routing_key=severity, body=message)
20 print(" [x] Sent %r:%r" % (severity, message))
21 connection.close()
direct消息发送方

命令行上执行:

python receive_logs_direct.py warning error > logs_from_rabbit.log 匹配warning error消息输出到日志

python receive_logs_direct.py info warning error 匹配info warning error输出打印到屏幕

python emit_log_direct.py error "Run. Run. Or it will explode." 只发送error信息

python emit_log_direct.py warning "Run. Run. Or it will explode." 只发送warning信息

python emit_log_direct.py info "Run. Run. Or it will explode." 只发送info信息

 

话题:

 

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import time
 7 import sys
 8 
 9 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
10 # 连接rabbitmq
11 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
12 channel = connection.channel()  # 声明管道
13 
14 channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
15 
16 routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
17 message = ' '.join(sys.argv[2:]) or 'Hello World!'
18 channel.basic_publish(
19     exchange='topic_logs', routing_key=routing_key, body=message)
20 print(" [x] Sent %r:%r" % (routing_key, message))
21 connection.close()
topic发送方
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import sys
 7 
 8 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
 9 # 连接rabbitmq
10 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
11 channel = connection.channel()  # 声明管道
12 
13 channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
14 
15 result = channel.queue_declare('', exclusive=True)
16 queue_name = result.method.queue
17 
18 binding_keys = sys.argv[1:]
19 if not binding_keys:
20     sys.stderr.write("Usage: %s [binding_key]...
" % sys.argv[0])
21     sys.exit(1)
22 
23 for binding_key in binding_keys:
24     channel.queue_bind(
25         exchange='topic_logs', queue=queue_name, routing_key=binding_key)
26 
27 print(' [*] Waiting for logs. To exit press CTRL+C')
28 
29 
30 def callback(ch, method, properties, body):
31     print(" [x] %r:%r" % (method.routing_key, body))
32 
33 
34 channel.basic_consume(
35     queue=queue_name, on_message_callback=callback, auto_ack=True)
36 
37 channel.start_consuming()
topic接收方

python receive_logs_topic.py "#" #匹配所有,接收所有信息

python receive_logs_topic.py "kern.*" 只接收所有跟kern有关的消息

python receive_logs_topic.py "kern.*" "*.critical" 接收所有跟kern或critical有关的消息

python emit_log_topic.py "kern.critical" "A critical kernel error" 发送kern.critical类型的消息

 

RPC:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import time
 7 import sys
 8 
 9 credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
10 # 连接rabbitmq
11 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
12 channel = connection.channel()  # 声明管道
13 
14 channel.queue_declare(queue='rpc_queue')
15 
16 def fib(n):
17     if n == 0:
18         return 0
19     elif n == 1:
20         return 1
21     else:
22         return fib(n - 1) + fib(n - 2)
23 
24 def on_request(ch, method, props, body):
25     n = int(body)
26 
27     print(" [.] fib(%s)" % n)
28     response = fib(n)
29 
30     ch.basic_publish(exchange='',
31                      routing_key=props.reply_to,
32                      properties=pika.BasicProperties(correlation_id = 
33                                                          props.correlation_id),
34                      body=str(response))
35     ch.basic_ack(delivery_tag=method.delivery_tag)
36 
37 channel.basic_qos(prefetch_count=1)
38 channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
39 
40 print(" [x] Awaiting RPC requests")
41 channel.start_consuming()
rpc_server
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 __author__ = 'BillyLV'
 4 
 5 import pika  # 导入rabbitmq客户端
 6 import uuid
 7 
 8 
 9 class FibonacciRpcClient(object):
10     def __init__(self):
11         credentials = pika.PlainCredentials('root', '123456')  # 连接rabbitmq用户凭证
12         self.connection = pika.BlockingConnection(
13             pika.ConnectionParameters('192.168.199.176', 5672, '/', credentials))
14 
15         self.channel = self.connection.channel()
16 
17         result = self.channel.queue_declare(queue='', exclusive=True)
18         self.callback_queue = result.method.queue
19 
20         self.channel.basic_consume(
21             queue=self.callback_queue,
22             on_message_callback=self.on_response,
23             auto_ack=True)
24 
25     def on_response(self, ch, method, props, body):
26         if self.corr_id == props.correlation_id:
27             self.response = body
28 
29     def call(self, n):
30         self.response = None
31         self.corr_id = str(uuid.uuid4())
32         self.channel.basic_publish(
33             exchange='',
34             routing_key='rpc_queue',
35             properties=pika.BasicProperties(
36                 reply_to=self.callback_queue,
37                 correlation_id=self.corr_id,
38             ),
39             body=str(n))
40         while self.response is None:
41             self.connection.process_data_events()
42         return int(self.response)
43 
44 
45 fibonacci_rpc = FibonacciRpcClient()
46 
47 print(" [x] Requesting fib(30)")
48 response = fibonacci_rpc.call(30)
49 print(" [.] Got %r" % response)
rpc_client

参考:

https://www.rabbitmq.com/getstarted.html

http://www.cnblogs.com/alex3714

http://www.cnblogs.com/wupeiqi

internet&python books

PS:如侵权,联我删。

原文地址:https://www.cnblogs.com/BillyLV/p/11025730.html