RabbitMQ_direct

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@version: 
@author: morgana
@license: Apache Licence 
@contact: vipmorgana@gmail.com
@site: 
@software: PyCharm
@file: myproductor_director.py
@time: 2018/4/15 上午12:24
"""
import pika
import sys

credentials = pika.PlainCredentials('morgana', '123456')

parameters = pika.ConnectionParameters(host='127.0.0.1',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列连接通道

channel.exchange_declare(exchange='direct_log',exchange_type='direct')

log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange='direct_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@version: 
@author: morgana
@license: Apache Licence 
@contact: vipmorgana@gmail.com
@site: 
@software: PyCharm
@file: mycustomer_director.py
@time: 2018/4/15 上午12:25
"""
__author__ = 'Administrator'
import pika,sys
import pika
import sys

credentials = pika.PlainCredentials('morgana', '123456')

parameters = pika.ConnectionParameters(host='127.0.0.1',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列连接通道

queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)

log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
    sys.exit(1)


for level in log_levels:
    channel.queue_bind(exchange='direct_log',
                       queue=queue_name,
                       routing_key=level) #绑定队列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name, no_ack=True)

channel.start_consuming()

  

  

原文地址:https://www.cnblogs.com/morgana/p/8852032.html