[rabbitmq] python版本(二) 工作队列

贴一个写的比较好的rabbitmq小结,部分函数有疑惑可以查这个博客里面:https://www.jianshu.com/p/18ffa93fe5d2

  • p:生产者
  • 中间task queue:消息队列
  • c1,c2:多个消费者

worker.py

#!/usr/bin/env python
import pika
import time

#连接操作
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

#声明task_queue队列
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

'''
回调函数:
关于str前缀u--unicode b--byte r--raw等可以参考这篇文章:https://blog.csdn.net/anlian523/article/details/80504699?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1
根据body.count()数出来的'.'来决定time.sleep()睡几秒--模拟1秒钟的操作
为了防止一个worker挂掉后丢失消息,消费者通过该一个ack,告诉rabbitmq已经收到并处理信息,之后rabbitmq释放删除此消息
'''
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    #显示发送确认信息,告诉服务器消息被处理了
    ch.basic_ack(delivery_tag=method.delivery_tag)

#每次只为消费者分配一条信息,处理完之后才分配第二条信息
channel.basic_qos(prefetch_count=1)
#channel.basic_consume(callback, queue='hello', no_ack=True) 使用no_ack=True可以关闭消息响应
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

new_task.py

#!/usr/bin/env python
import pika
import sys


#建立连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

#声明一个叫task_queue的队列
channel.queue_declare(queue='task_queue', durable=True)

'''
第一句是说可以输入命令行参数作为发送的随意的信息
默认交换机,选定task_queue队列
'''
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

运行结果:
最右边是作为生产者,左边2个(其实是三个,有一个没有拆分到这个界面),可以看到worker采用的是轮询(round-robin)的方式,每个worker依次“执行一次操作”

日积月累,水滴石穿
原文地址:https://www.cnblogs.com/lonelyisland/p/12747137.html