消息队列rabbitMQ

》》》》》基础rabbitMQ数据交互

 1 import pika
 2 connection = pika.BlockingConnection(
 3     pika.ConnectionParameters('localhost')
 4     )
 5 channel = connection.channel()#声明一个管道
 6 
 7 channel.queue_declare(queue='hello')
 8 
 9 channel.basic_publish(exchange='',
10                       routing_key='hello',
11                       body='hellow world')
12 
13 print("[x] sendee 'hello world'")
14 connection.close()
服务器端
 1 import pika
 2 connection = pika.BlockingConnection(
 3     pika.ConnectionParameters('localhost')
 4     )
 5 channel = connection.channel()#声明一个管道
 6 
 7 channel.queue_declare(queue='hello')
 8 def callback(ch, method, properties,body):
 9     print(">>>",ch, method, properties)
10     print("[x] recved %s"%body)
11 
12 channel.basic_consume(callback,
13                       queue='hello',
14                       no_ack=True)
15 print("waitting for message")
16 channel.start_consuming()
客户机

 》》》》》》消息持久化

 1 import pika
 2 connection = pika.BlockingConnection(
 3     pika.ConnectionParameters('localhost')
 4     )
 5 channel = connection.channel()#声明一个管道
 6 
 7 channel.queue_declare(queue='hello2',durable=True)#durable 只是把队列持久化
 8 
 9 channel.basic_publish(exchange='',
10                       routing_key='hello2',
11                       body='hellow world23',
12                       properties = pika.BasicProperties(delivery_mode=2))#消息持久化
13 
14 print("[x] sendee 'hello world'")
15 connection.close()
消息持久化(服务器端)
 1 import pika
 2 connection = pika.BlockingConnection(
 3     pika.ConnectionParameters('localhost')
 4     )
 5 channel = connection.channel()#声明一个管道
 6 
 7 channel.queue_declare(queue='hello2',durable=True)
 8 
 9 def callback(ch, method, properties,body):#回调函数
10     print(">>>",ch, method, properties)
11     print("[x] recved %s"%body)
12     ch.basic_ack(delivery_tag= method.delivery_tag)
13 
14 channel.basic_consume(callback,
15                       queue='hello2')
16                       #no_ack=True)#确认服务器端收到消息,true表示不确认
17 
18 print("waitting for message")
19 
20 channel.start_consuming()
消息持久化(客户机端)

》》》》》能者多劳

 1 import pika
 2 connection = pika.BlockingConnection(
 3     pika.ConnectionParameters('localhost')
 4     )
 5 channel = connection.channel()#声明一个管道
 6 
 7 channel.queue_declare(queue='hello3',durable=True)#durable 只是把队列持久化
 8 
 9 channel.basic_publish(exchange='',
10                       routing_key='hello3',
11                       body='hellow world25',
12                       properties = pika.BasicProperties(delivery_mode=2))#消息持久化
13 
14 print("[x] sendee 'hello world'")
15 connection.close()
客户机(能者多劳)
1 channel.basic_qos(prefetch_count=1)

 》》》》》广播发送/订阅模式direct

 1 import pika,sys
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 
 5 channel.exchange_declare(exchange='logs1',exchange_type='direct')
 6 result = channel.queue_declare(exclusive=True)
 7 queue_name = result.method.queue
 8 
 9 severities = sys.argv[1:]#获取所有执行脚本的参数,获取列表
10 if not  severities:#循环
11     sys.stderr.write("usage %s [info] [wearing] [error]
"%sys.argv[0])
12     sys.exit(1)
13 
14 for severity in severities:#循环列表绑定
15     channel.queue_bind(exchange='logs1',queue=queue_name,routing_key=severity)
16 
17 print('waitting for logs')
18 
19 def callback(ch, method, properties,body):#回调函数
20     print("[x] recved %s"%body)#打印收到的消息的主体
21 
22 channel.basic_consume(callback,
23                       queue=queue_name,
24                       no_ack=True)#确认服务器端收到消息,true表示不确认
25 
26 channel.start_consuming()
订阅模式(客户机端)
 1 import pika,sys
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host= 'localhost'))
 3 channel = connection.channel()
 4 
 5 channel.exchange_declare(exchange='logs1',exchange_type='direct')
 6 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'#级别
 7 message = ''.join(sys.argv[2:]) or 'hellow world'#消息
 8 
 9 channel.basic_publish(exchange='logs1',routing_key=severity,body=message)
10 print("send %s>>> %s"%(severity,message))
订阅模式(服务器端)

》》》》》fanout

 1 import pika
 2 connection = pika.BlockingConnection(
 3     pika.ConnectionParameters('localhost')
 4     )#生成一个连接实例
 5 channel = connection.channel()#声明一个管道
 6 #channel.exchange_declare(exchange='logs',type ='fanout')#转发器类型
 7 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 8 result = channel.queue_declare(exclusive= True)#声明管道的唯一性
 9 queue_name = result.method.queue#返回随机生成的queue名字
10 print("queue name>>>>",queue_name)#打印临时生成的队列名字
11 channel.queue_bind(exchange='logs',queue = queue_name)#转发器和管道绑定
12 # channel.queue_declare(queue='hello3',durable=True)
13 print("waitting for message")
14 def callback(ch, method, properties,body):#回调函数
15     print("[x] recved %s"%body)#打印收到的消息的主体
16     #ch.basic_ack(delivery_tag= method.delivery_tag)#手动确认客户机收到了服务器发送的消息
17 
18 #channel.basic_qos(prefetch_count=1)#检验当前程序等待执行的命令的个数,在callback之前进行判断
19 channel.basic_consume(callback,
20                       queue=queue_name,
21                       no_ack=True)#确认服务器端收到消息,true表示不确认
22 
23 channel.start_consuming()
fanout(客户机)
 1 import pika
 2 connection = pika.BlockingConnection(
 3     pika.ConnectionParameters('localhost')
 4     )
 5 channel = connection.channel()#声明一个管道
 6 channel.exchange_declare(exchange='logs',exchange_type = 'fanout')#声明一个转发器,并声明转发类型为fanout
 7 message = "info : hellow world4"
 8 #channel.queue_declare(queue='hello3',durable=True)#durable 只是把队列持久化
 9 channel.basic_publish(exchange='logs',
10                       routing_key='',
11                       body=message)
12                       #properties = pika.BasicProperties(delivery_mode=2))#消息持久化
13 print("[x] send %s"%message)
14 connection.close()
fanout(服务器)

》》》》》topic

 1 import pika,sys
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host= 'localhost'))
 3 channel = connection.channel()
 4 
 5 channel.exchange_declare(exchange='logs2',exchange_type='topic')
 6 #severity = sys.argv[1] if len(sys.argv) > 1 else 'info'#级别
 7 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
 8 message = ''.join(sys.argv[2:]) or 'hellow world'#消息
 9 
10 channel.basic_publish(exchange='logs2',routing_key=routing_key,body=message)
11 print("send %s>>> %s"%(routing_key,message))
服务器(topic)
 1 import pika,sys
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 
 5 channel.exchange_declare(exchange='logs2',exchange_type='topic')
 6 result = channel.queue_declare(exclusive=1)
 7 queue_name = result.method.queue
 8 
 9 binding_keys = sys.argv[1:]#获取所有执行脚本的参数,获取列表
10 if not  binding_keys:#循环
11     sys.stderr.write("usage %s [binding_keys]
"%sys.argv[0])
12     sys.exit(1)
13 
14 for binding_key in binding_keys:#循环列表绑定
15     channel.queue_bind(exchange='logs2',queue=queue_name,routing_key=binding_key)
16 
17 print('waitting for logs')
18 
19 def callback(ch, method, properties,body):#回调函数
20     print("[x] %s recved %s"%(method.routing_key,body))
21 
22 channel.basic_consume(callback,
23                       queue=queue_name,
24                       no_ack=True)#确认服务器端收到消息,true表示不确认
25 
26 channel.start_consuming()
客户机(topic)
原文地址:https://www.cnblogs.com/cerofang/p/8035012.html