python-RabbitMQ

简单队列方式

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading


#建立队列长度为10
message = Queue.Queue(10)

#生产者
def producer(i):
        #往队列中扔消息
        message.put(i)
        print('queue_size=%s'%message.queue)
#消费者
def consumer(i):
        #取队列消息,先进先出
        msg = message.get()


#生产线程
for i in range(10):
    t = threading.Thread(target=producer, args=(i,))
    t.start()
#消费线程
for i in range(5):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()
    
#打印剩余的数据
print('queue_size=%s'%message.queue)

结果:

生产队列增长过程过程

producer_queue=deque([0])
producer_queue=deque([0, 1])
producer_queue=deque([0, 1, 2])
producer_queue=deque([0, 1, 2, 3])
producer_queue=deque([0, 1, 2, 3, 4])
producer_queue=deque([0, 1, 2, 3, 4, 5])
producer_queue=deque([0, 1, 2, 3, 4, 5, 6])
producer_queue=deque([0, 1, 2, 3, 4, 5, 6, 7])
producer_queue=deque([0, 1, 2, 3, 4, 5, 6, 7, 8])
producer_queue=deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

剩余数据
queue=deque([4, 5, 6, 7, 8, 9])

以下是RabbitMQ 主要模式

1简单队列

2exchange{

    1、fanout

    2、direct

    3 、topic

}

安装组件

pip install  pika

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

1、简单队列模型、消息不丢失持久化参数durable 

 生产者

 1 # -*- coding:utf-8 -*-
 2 import pika
 3 #认证的用户密码
 4 credentials = pika.PlainCredentials('openstack', 'RABBIT_PASS')
 5 #远程主机配置
 6 parameters = pika.ConnectionParameters('172.16.21.5',5673,'/',credentials)
 7 
 8 #封装socket逻辑部分,拿到操作句柄
 9 connection = pika.BlockingConnection(parameters)
10 channel = connection.channel()
11 
12 
13 #创建队列名字为hello,这个队列的名字一般是唯一的.消费者连接的信道名称也是这个
14 channel.queue_declare(queue='hello')
15 
16 # 第一种工作状态
17 #basic_publish设置队列
18 # exchange='' 就是简单队列的意思,这里exchange不工作
19 # routing_key='hello' ,exchange不工作了那么这里如何处理,就靠routing_key来找对应队列
20 #bodybody='Hello World!' 这就是传递的数据
21 channel.basic_publish(exchange='',
22                       routing_key='hello',
23                       body='Hello World!')
24 channel.basic_qos(prefetch_count=1)
25 print(" [x] Sent 'Hello World!'")
26 connection.close()
View Code

消费者

 1 # -*- coding:utf-8 -*-
 2 import pika
 3 #认证的用户密码
 4 credentials = pika.PlainCredentials('openstack', 'RABBIT_PASS')
 5 #远程主机配置
 6 parameters = pika.ConnectionParameters('172.16.21.5',5673,'/',credentials)
 7 connection = pika.BlockingConnection(parameters)
 8 channel = connection.channel()
 9 
10 
11 #消费者这里也创建一个队列,实际中生产者或消费者不一定谁先启动,如果消费者直接去没有创建的队列拿数据会直接报错
12 channel.queue_declare(queue='hello')
13 
14 #回调函数
15 def callback(ch, method, properties, body):
16     print(" [x] Received %r" % body)
17     #no_ack=false 必须在回调函数中加入下面的方法
18     #ch.basic_ack(delivery_tag = method.delivery_tag)
19 
20 #获取队列中
21 #queue='hello'获取hello队列的消息
22 #no_ack=True(无应答)false(有应答).如果callback函数会执行很长时间如果期间消费者机器出问题那么(无应答)模式消息取从队列
23 #中取走后就会删除掉的,无法找回。(有应答)队列必须等待callback正确执行完在删除队列消息
24 channel.basic_consume(callback,
25                       queue='hello',
26                       no_ack=True)
27 
28 print(' [*] Waiting for messages. To exit press CTRL+C')
29 channel.start_consuming()
View Code

消息获取顺序

默认队列中有12消息,三个消费者,A四个消息、B四个消息、C四个消息。BC处理块A处理慢就会变成A继续处理消息,BC闲置

 channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

1 #消费者增加下面的方法
2 channel.basic_qos(prefetch_count=1)
3 
4 channel.basic_consume(callback,
5                       queue='hello',
6                       no_ack=False)
消费者

2、exchange工作模型(fanout,direct,topic)

     fanout模式

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import pika
 4 import sys
 5 credentials = pika.PlainCredentials('openstack', 'RABBIT_PASS')
 6 parameters = pika.ConnectionParameters('172.16.21.5',5673,'/',credentials)
 7 connection = pika.BlockingConnection(parameters)
 8 channel = connection.channel()
 9 
10 #exchange创建一个交换机名为logs,并且设置类型为fanout
11 channel.exchange_declare(exchange='logs',
12                          type='fanout')
13 
14 message = 'exchange :type=fanout'
15 #指定名为logs的交换机,这里使用exchange,routing_key就不需要了。
16 channel.basic_publish(exchange='logs',
17                       routing_key='',
18                       body=message)
19 print(" [x] Sent %r" % message)
20 connection.close()
生产者
 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import pika
 4 
 5 credentials = pika.PlainCredentials('openstack', 'RABBIT_PASS')
 6 parameters = pika.ConnectionParameters('172.16.21.5',5673,'/',credentials)
 7 connection = pika.BlockingConnection(parameters)
 8 channel = connection.channel()
 9 
10 
11 #用意和之前一样
12 channel.exchange_declare(exchange='logs',
13                          type='fanout')
14 
15 #创建一个队列
16 result = channel.queue_declare(exclusive=True)
17 #给队列随机命名
18 queue_name = result.method.queue
19 
20 #将消费者队列和交换机(exchange='logs')进行绑定
21 channel.queue_bind(exchange='logs',
22                    queue=queue_name)
23 
24 
25 print(' [*] Waiting for logs. To exit press CTRL+C')
26 
27 #回调函数
28 def callback(ch, method, properties, body):
29     print(" [x] %r" % body)
30 
31 channel.basic_consume(callback,
32                       queue=queue_name,
33                       no_ack=True)
34 #阻塞函数
35 channel.start_consuming()
消费者

原理:

这种模式就是广播形式,生产者的消息会同时发送到所有消费者上

关键之匹配(direct模式)

这个模式上在上图增加了route key 。生产者发送消息的时候会绑定一个route key,消费者订阅消息也必须绑定route key 。这样一来就可以使用同一个交换机但可以接受自己关心的消息
原理:

 生产者

 1 #!/usr/bin/env python
 2 import pika
 3 import sys
 4 
 5 credentials = pika.PlainCredentials('openstack', 'RABBIT_PASS')
 6 parameters = pika.ConnectionParameters('172.16.21.5',5673,'/',credentials)
 7 connection = pika.BlockingConnection(parameters)
 8 channel = connection.channel()
 9 
10 #建立交换机和类型
11 channel.exchange_declare(exchange='direct_logs',
12                          type='direct')
13 #route key
14 severity = 'info'
15 message = 'exchange:type direct'
16 #发送消息
17 channel.basic_publish(exchange='direct_logs',
18                       routing_key=severity,
19                       body=message)
20 print(" [x] Sent %r:%r" % (severity, message))
21 connection.close()
生产者

消费者

 1 #!/usr/bin/env python
 2 import pika
 3 import sys
 4 
 5 credentials = pika.PlainCredentials('openstack', 'RABBIT_PASS')
 6 parameters = pika.ConnectionParameters('172.16.21.5',5673,'/',credentials)
 7 connection = pika.BlockingConnection(parameters)
 8 channel = connection.channel()
 9 
10 #创建交换机和类型
11 channel.exchange_declare(exchange='direct_logs',
12                          type='direct')
13 
14 #创建队列
15 result = channel.queue_declare(exclusive=True)
16 #设定随机队列名
17 queue_name = result.method.queue
18 
19 #消费者一个队列可以绑定多个routekey
20 severities = ['info','error']
21 
22 #循环多个routekey
23 for severity in severities:
24     #绑定叫交换机、队列、routekey
25     channel.queue_bind(exchange='direct_logs',
26                        queue=queue_name,
27                        routing_key=severity)
28 
29 print(' [*] Waiting for logs. To exit press CTRL+C')
30 
31 #回调函数
32 def callback(ch, method, properties, body):
33     print(" [x] %r:%r" % (method.routing_key, body))
34 
35 #指定回调函数
36 channel.basic_consume(callback,
37                       queue=queue_name,
38                       no_ack=True)
39 #阻塞等待
40 channel.start_consuming()
消费者

topic(模糊匹配)

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

#匹配模式写在消费者中
abc.123.abc    abc.* -- 不匹配 ,* 表示只能匹配 一个 单词
abc.123.abc    abc.# -- 匹配 ,# 表示可以匹配 0 个 或 多个 单词

消费者

 1  #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import pika
 4 import sys
 5 
 6 credentials = pika.PlainCredentials('openstack', 'RABBIT_PASS')
 7 parameters = pika.ConnectionParameters('172.16.21.5',5673,'/',credentials)
 8 connection = pika.BlockingConnection(parameters)
 9 channel = connection.channel()
10 
11 #创建交换机和类型
12 channel.exchange_declare(exchange='topic_logs1',
13                          type='topic')
14 
15 #创建队列
16 result = channel.queue_declare(exclusive=True)
17 #设定随机队列名
18 queue_name = result.method.queue
19 
20 #消费者一个队列可以绑定多个routekey
21 #消费者中用模糊匹配abc.*  这个只能匹配一个字母
22 severities = ['abc.*']
23 
24 #循环多个routekey
25 for severity in severities:
26     #绑定叫交换机、队列、routekey
27     channel.queue_bind(exchange='topic_logs1',
28                        queue=queue_name,
29                        routing_key=severity)
30 
31 print(' [*] Waiting for logs. To exit press CTRL+C')
32 
33 #回调函数
34 def callback(ch, method, properties, body):
35     print(" [x] %r:%r" % (method.routing_key, body))
36 
37 #指定回调函数
38 channel.basic_consume(callback,
39                       queue=queue_name,
40                       no_ack=True)
41 #阻塞等待
42 channel.start_consuming()
View Code

生产者

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import pika
 4 import sys
 5 
 6 credentials = pika.PlainCredentials('openstack', 'RABBIT_PASS')
 7 parameters = pika.ConnectionParameters('172.16.21.5',5673,'/',credentials)
 8 connection = pika.BlockingConnection(parameters)
 9 channel = connection.channel()
10 
11 #建立交换机和类型
12 channel.exchange_declare(exchange='topic_logs1',
13                          type='topic')
14 #route key'abc.1'
15 severity = 'abc.1'
16 message = 'exchange:type topic route_key=abc.#'
17 #发送消息
18 channel.basic_publish(exchange='topic_logs1',
19                       routing_key=severity,
20                       body=message)
21 print(" [x] Sent %r:%r" % (severity, message))
22 connection.close()
View Code

结果:

[*] Waiting for logs. To exit press CTRL+C
[x] 'abc.1':'exchange:type topic route_key=abc.1'

原文地址:https://www.cnblogs.com/menkeyi/p/6971581.html