RabbitMQie消息列队整理

使用方法过程,这儿只做了windows平台教程  

先安装Erlang 编程软件,然后设置环境变量,在安装RabbimMQ  ,这儿我下载了一个版本不行,后来换了最新版就好了,以后在使用过程 中如果有问题 ,可以换版本试一下,这是个坑。。然后 pip install pipk   

在编程器中粘上下代码测试  先是服务端:

import pika

#连接队列服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

#创建队列。有就不管,没有就自动创建
channel.queue_declare(queue='hello')

#使用默认的交换机发送消息。exchange为空就使用默认的
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello 201!')
print(" [x] Sent 'Hello World!'")
connection.close()
View Code

在另一个文件粘上客服端:

import pika
import time

# 连接服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# rabbitmq消费端仍然使用此方法创建队列。这样做的意思是:若是没有就创建。和发送端道理道理。目的是为了保证队列一定会有
channel.queue_declare(queue='hello')


# 收到消息后的回调
def callback(ch, method, properties, body):
    print("我收到了,正在等。。")
    time.sleep(10)
    print(" [x] Received %r" % body)


channel.basic_consume(callback,
                      queue='hello',
                      #no_ack=True
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code
------------------------------------------------------------------------------
以下是注释:
channel.basic_consume(callback,
queue='hello',
#no_ack=True 如果取消注释就会不等反应,如果客
#服端开闭,没有外理的消息会丢失
)
在安装rabbitmq文件夹下有个叫rebbitmqctl的命令可以管里查看消息队列

消息持久化,持久化客户端,就是客户端关机数据没有处理,就会丢失,默认有机制解决这个问题,客户端关机,函数没有处理完会
发给另一个客户端,不想发给另一台,数据不重要直接写上:
channel.basic_consume(callback,
queue='hello',
no_ack=True,
#默认上面红色这句
)

如果是防止服务端死机:
需要在二个地方修改,

channel.queue_declare(queue='hello'#durable = True,客户端和服务端在申明队列时都要写上,表示这个列队持 久
。。但是要消失都持久,还要在服各务端上:
hannel.basic_publish(exchange='',
routing_key='hello',
body="wwwwwwwwwww",
properties=pika.BasicProperties(
delivery_mode= 2
)
)

)
如果客户端处理不过来,,就先不要发消息过的解决方法,只需要在客户端的发送消息前加入:
channel.basic_qos(prefetch_count=1)
代码片断如下:
 收到消息后的回调
def callback(ch, method, properties, body):
print("我收到了,正在等。。")
time.sleep(10)
print(" [x] Received %r" % body)
#ch.basic_ack(delivery_tag=method.delivery_tag)#这句在我的版本内没有实际用


channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='hello',
no_ack=True,
)
广播效果要用到到exchange,可以设规测,转发器:
  有三个参数:
fanout 所有
direct 指定
topic规则
发送方,
channel.exchange_declare(exchange='direct_logs',
type='direct') #py3中type是关健字,需要加成, exchange_type=
  然后发送那儿:
channel.basic_publish(exchange='direct_logs',
routing_key="" #为空
body=message)
服务端完成代码:
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
View Code


消费方要改的多一点:第一个地方
channel.exchange_declare(exchange='direct_logs',
type='direct') #py3中type是关健字,需要加成, exchange_type=
第二个地方
在下面加上:
result = channel.queue_declare(exclusive = True)
queur_name = result.method.queue
channel.queue_bind(exchange = 'loags'
queue = queue_name

)
完成代码:
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]
" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

首先我写博客其实就是学习笔记,写到这儿赖了,,多对多,双向通信有空在补上。。。。。。待续

















原文地址:https://www.cnblogs.com/fgxwan/p/9661371.html