Python之RabbitMQ操作

RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

实现的协议:AMQP。
 
术语(Jargon)
 
P,Producing,制造和发送信息的一方。
Queue,消息队列。
C,Consuming,接收消息的一方。
 
RabbitMQ安装
1 安装配置epel源
2    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
3  
4 安装erlang
5    $ yum -y install erlang
6  
7 安装RabbitMQ
8    $ yum -y install rabbitmq-server

安装rabbitmq API

1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6  
7 https://pypi.python.org/pypi/pika

使用API操作RabbitMQ

基于Queue实现生产者消费者模型

 1 #!/usr/bin/env python
 2 import pika
 3  
 4 # ######################### 生产者 #########################
 5  
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7         host='localhost'))
 8 channel = connection.channel()
 9  
10 channel.queue_declare(queue='hello')
11  
12 channel.basic_publish(exchange='',
13                       routing_key='hello',
14                       body='Hello World!')
15 print(" [x] Sent 'Hello World!'")
16 connection.close()
 1 #!/usr/bin/env python
 2 import pika
 3  
 4 # ########################## 消费者 ##########################
 5  
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7         host='localhost'))
 8 channel = connection.channel()
 9  
10 channel.queue_declare(queue='hello')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14  
15 channel.basic_consume(callback,
16                       queue='hello',
17                       no_ack=True)
18  
19 print(' [*] Waiting for messages. To exit press CTRL+C')
20 channel.start_consuming()

1、acknowledgment 消息不丢失(订阅端消息不丢失)

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
 8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错
 9 
10 def callback(ch,method,properties,body):
11     print("[x] Received %r" %body)#打印获得消息的内容
12     ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
13 
14 channel.basic_consume(callback,queue='hai',no_ack=False)
15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
18 
19 print('[*]Waiting for messages to exit press CTRL+C')
20 channel.start_consuming()
消费者
 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
 8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai
 9 channel.basic_publish(exchange='',routing_key='hai',body='hello world')
10 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
11 print("[x] Sent 'hello world' ")
12 connection.close()
生产者

2、durable   消息不丢失(服务端消息不丢失)

 1 # time:
 2 # Auto:PANpan
 3 # func:
 4 import pika
 5 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
 6 channel=connection.channel()#创建频道,通过频道操作rabbitmq
 7 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化
 8 
 9 def callback(ch,method,properties,body):
10     print("[x] Received %r" %body)#打印获得消息的内容
11     ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
12 
13 channel.basic_consume(callback,queue='hai',no_ack=True)
14 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
15 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
16 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
17 
18 print('[*]Waiting for messages to exit press CTRL+C')
19 channel.start_consuming()
消费者
 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
 8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
 9 channel.basic_publish(exchange='',routing_key='hai',body='hello world',
10                       properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
12 print("[x] Sent 'hello world' ")
13 connection.close()
生产者

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照索引排列

 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
 8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化
 9 
10 def callback(ch,method,properties,body):
11     print("[x] Received %r" %body)#打印获得消息的内容
12     ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
13 channel.basic_qos(prefetch_count=1)#客户端按顺序去取,默认为奇偶数
14 channel.basic_consume(callback,queue='hai',no_ack=True)
15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
18 
19 print('[*]Waiting for messages to exit press CTRL+C')
20 channel.start_consuming()
消费者
 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
 7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
 8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
 9 channel.basic_publish(exchange='',routing_key='hai',body='hello world',
10                       properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
12 print("[x] Sent 'hello world' ")
13 connection.close()
生产者

4、发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

 exchange type = fanout

 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 import sys
 7 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接消息队列服务器
 8 channel=connection.channel()#创建频道,通过频道对rabbitmq进行操作
 9 
10 channel.exchange_declare(exchange='logs',type='fanout')#创建exchange,名称为logs,若果该消息队列已经创建,可以省略
11 message=''.join(sys.argv[1:]) or "info: Hello Wrold"
12 
13 channel.basic_publish(exchange='logs',routing_key='',body=message)#将消息添加今年队列
14 print('[x] sent %r'%message)
15 connection.close()
发布者
 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8')) #连接消息队列服务器
 7 channel = connection.channel()#创建频道,通过频道对rabbitmq进行操作
 8 
 9 channel.exchange_declare(exchange='logs',#创建exchange,名称为logs
10                          type='fanout')#type='fanout'作用为凡是和exchange相关联的队列,在用户给exchange发消息时,所有关联队列都会受到消息
11 
12 result=channel.queue_declare(exclusive=True)#不指定队列名,有系统随机创建
13 queue_name=result.method.queue
14 
15 channel.queue_bind(exchange='logs',queue=queue_name)#将exchange和当前的消息队列做一个绑定
16 print(' [*] Waiting for logs. To exit press CTRL+C')
17 
18 
19 def callback(ch,method,properties,body):
20     print('[x] %r' %body)
21 
22 channel.basic_consume(callback,queue=queue_name,no_ack=True)#在队列中获取消息
23 
24 channel.start_consuming()
订阅者

5、关键字发送

 exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 import sys
 7 
 8 connection = pika.BlockingConnection(pika.ConnectionParameters(
 9         host='192.168.11.138'))
10 channel = connection.channel()
11 
12 channel.exchange_declare(exchange='direct_logs',
13                          type='direct')
14 
15 #severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
16 #message = ' '.join(sys.argv[2:]) or 'Hello World!'
17 severity='info'
18 message='test'
19 channel.basic_publish(exchange='direct_logs',
20                       routing_key=severity,
21                       body=message)
22 print(" [x] Sent %r:%r" % (severity, message))
23 connection.close()
发布者
 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 import pika
 6 import sys
 7 
 8 connection = pika.BlockingConnection(pika.ConnectionParameters(
 9         host='10.0.0.8'))
10 channel = connection.channel()
11 
12 channel.exchange_declare(exchange='direct_logs',
13                          type='direct')#设置exchange类型为direct
14 
15 result = channel.queue_declare(exclusive=True)  #创建随机队列
16 queue_name = result.method.queue
17 
18 # severities = sys.argv[1:]
19 # if not severities:
20 #     sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
21 #     sys.exit(1)
22 severities=['error']
23 for severity in severities:
24     channel.queue_bind(exchange='direct_logs',
25                        queue=queue_name,
26                        routing_key=severity)#绑定关键字
27 
28 print(' [*] Waiting for logs. To exit press CTRL+C')
29 
30 def callback(ch, method, properties, body):
31     print(" [x] %r:%r" % (method.routing_key, body))
32 
33 channel.basic_consume(callback,
34                       queue=queue_name,
35                       no_ack=True)
36 
37 channel.start_consuming()
订阅在1
 1 #!/usr/bin/env python
 2 # time:
 3 # Auto:PANpan
 4 # func:
 5 #!/usr/bin/env python
 6 # time:
 7 # Auto:PANpan
 8 # func:
 9 import pika
10 import sys
11 
12 connection = pika.BlockingConnection(pika.ConnectionParameters(
13         host='10.0.0.8'))
14 channel = connection.channel()
15 
16 channel.exchange_declare(exchange='direct_logs',
17                          type='direct')
18 
19 result = channel.queue_declare( )
20 #声明queue,确认要从中接收message的queue
21 #queue_declare函数是幂等的,可运行多次,但只会创建一次
22 #若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue
23 #但在producer和consumer中重复声明queue是一个好的习惯
24 #例如:  channel.queue_declare(queue='hello')
25 queue_name = result.method.queue
26 
27 # severities = sys.argv[1:]
28 # if not severities:
29 #     sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
30 #     sys.exit(1)
31 severities=['error','info']
32 for severity in severities:
33     channel.queue_bind(exchange='direct_logs',
34                        queue=queue_name,
35                        routing_key=severity)
36 
37 print(' [*] Waiting for logs. To exit press CTRL+C')
38 
39 def callback(ch, method, properties, body):
40     print(" [x] %r:%r" % (method.routing_key, body))
41 
42 channel.basic_consume(callback,
43                       queue=queue_name,
44                       no_ack=True)
45 
46 channel.start_consuming()
订阅在2

6、模糊匹配

 exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
 1 #!/usr/bin/env python
 2 import pika
 3 import sys
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8 
 9 channel.exchange_declare(exchange='topic_logs',
10                          type='topic')
11 
12 result = channel.queue_declare(exclusive=True)
13 queue_name = result.method.queue
14 
15 binding_keys = sys.argv[1:]
16 if not binding_keys:
17     sys.stderr.write("Usage: %s [binding_key]...
" % sys.argv[0])
18     sys.exit(1)
19 
20 for binding_key in binding_keys:
21     channel.queue_bind(exchange='topic_logs',
22                        queue=queue_name,
23                        routing_key=binding_key)
24 
25 print(' [*] Waiting for logs. To exit press CTRL+C')
26 
27 def callback(ch, method, properties, body):
28     print(" [x] %r:%r" % (method.routing_key, body))
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()
订阅者
 1 #!/usr/bin/env python
 2 import pika
 3 import sys
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8 
 9 channel.exchange_declare(exchange='topic_logs',
10                          type='topic')
11 
12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
13 message = ' '.join(sys.argv[2:]) or 'Hello World!'
14 channel.basic_publish(exchange='topic_logs',
15                       routing_key=routing_key,
16                       body=message)
17 print(" [x] Sent %r:%r" % (routing_key, message))
18 connection.close()
发布者

注:

订阅/发布Demo
 
发送消息给多个订阅者
核心思想:消息发送给exchange,每个接收方创建匿名Queue绑定到exchange,exchange发送消息给每个接收方。
 
Exchanges
 
在RabbitMQ完整的模型中,消息只能发送给一个exchange。
exchange一方面接收消息,另一方面push给queues。
 
exchange类型
> rabbitmqctl list_exchanges
direct
topic
headers
fanout 广播消息给已知队列
 
原文地址:https://www.cnblogs.com/panwenbin-logs/p/5715669.html