RabbitMQ

简单模式 

The Python code based on pika==1.0.0 version

producer:
  channel.basic_publish(
    exchange = ``,
    routing_key = 'hello',
    body = “世界你好!”
  )
	- 这种交换是特殊的‒它使我们可以准确地指定消息应进入的队列。队列名称需要在routing_key参数中指定.

consumer:
  channel.basic_consume(
    queue='hello', 
    on_message_callback=callback,
    auto_ack=True
  )

  

Putting it all together

send.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')  # 队列的声明是幂等的

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receive.py

#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

  

 消息确认机制

 铺垫:同生产者消费者模型,消费者在处理消息后要向队列发送消息回执,表明该消息已被消息掉。

 官网解释:bool auto_ack: if set to True, automatic acknowledgement mode will be used

 该参数默认为False,意味着RabbitMQ会消耗越来越多的内存,因为它无法释放那些未被确认的消息。

 当设置为True时: 如果一个消费者在处理消息过程中挂掉了,那么这个消息就丢失了,并且还丢失了发送给此消费者所有尚未处理的消息。

 这种情况肯定不是我们想要的,当一个消费者挂掉,如果能重新把消息发给其他消费者,这样,我们的消息就不会丢失任何消息了。要如何做呢? (消息持久化+工作队列+任务派遣)

消息持久化

当RabbitMQ服务挂掉时,队列和消息都会丢失,要想不丢失,需要做持久化。

# 队列持久化
channel.queue_declare(queue='hello', durable=True)  

# 消息持久化
发送端:
  channel.basic_publish(
	  exchange='',
	  routing_key='task_queue',
	  body=message,
	  properties=pika.BasicProperties(
	  delivery_mode=2, # make message persistent
	  ))

消费端:
def callback(ch, method, properties, body):
	print(" [x] Received %r" % body.decode())
	ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

工作队列

工作队列又叫任务队列

使用场景:当一个消费者处理一条消息很耗时,我又不想等待它完成,想把消息分发给其他消费者时

准备:一个生产者、多个消费者

MQ将按顺序地将每个消息发送给下一个消费者,平均而言,每个消费者都会收到相同数量的消息,这种分发方式称为循环。

以循环方式分发消息仍然不能解决上述场景的问题,理想情况是公平的分发消息(在消费者忙碌时就把消息发给不忙的消费者)

公平派遣/公平分发

为了克服这个问题,我们可以将 Channel#basic_qos通道方法与 prefetch_count = 1设置一起使用。

使用basic.qos协议方法来告诉RabbitMQ一次不向消费者发送多条消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给消费者。而是将其分派给不忙的下一个消费者。

channel.basic_qos(prefetch_count = 1)

关于队列大小的注意事项

如果所有工作人员都忙,您的队列就满了。您将需要注意这一点,并可能增加更多的工作人员,或使用消息TTL

Putting it all together

 send.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

receive.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

 

  

原文地址:https://www.cnblogs.com/liuwei0824/p/14685017.html