RabbitMQ

RabbitMQ

好处: 解耦,异步,流量削峰

阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

简单命令

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl list_queues

默认端口号 15672

模式

简单模式(最广泛)

参数

交换机模式

-- 发布订阅

-- 关键字模式

-- 模糊匹配模式

简单模式

  • 连接rabbitmq
  • 创建队列
  • 向指定的队列插入数据

**生产者 **basic_publish

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='',  # 简单模式 交换机为空
                      routing_key='hello',  # 指定队列
                      body='Hello World!')  # 向指定队列插入内容
 
print(" [x] Sent 'Hello World!'")

**消费者 **basic_consume

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
 
 
channel.basic_consume(queue='hello',
                      auto_ack=True , #自动应答
                      on_message_callback=callback)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
 

参数使用

应答参数

模拟消费者出问题

生产者生产数据给消费者,消费者取到数据则队列无数据,若消费者出bug ,再次启动,则取不到数据了

auto_ack=False  #改默认应答为手动应答
ch.basic_ack(delivery_tag=method.delivery_tag)   # 给信号

原理

生产者生产数据放入队列,消费者改手动应答,消费者取数据。队列还会保留一份数据,当消费者发出信号后ch.basic_ack(delivery_tag=method.delivery_tag) ,队列则删除数据

手动应答牺牲效率

注重效率则默认应答

持久化参数

模拟队列(rabbitmq)出问题

#声明queue
channel.queue_declare(queue='hello2', durable=True)  # durable=True 声明可持久化的队列
                                                    # 但是放数据的时候还得指定是否持久化
                                                    properties=pika.BasicProperties(
                                                              delivery_mode=2,  #持久化参数2
                                                      )
 
channel.basic_publish(exchange='',
                      routing_key='hSSello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                          )
                      )                              # 但是放数据的时候还得指定是否持久化
                                                    #properties=pika.BasicProperties(
                                                      #        delivery_mode=2,  # make                                                         #    message persistent
                                                        #      )

分发参数

改为手动应应答并且加上 channel.basic_qos(prefetch_count=1) # 消费者 加上这一句话

默认轮询分发,一人一个

轮询分发(round-robin)不管谁忙,都不会多给消息,总是你一个我一个。想要做到公平分发(fair dispatch),必须关闭自动应答ack,改成手动应答。使用basicQos(perfetch=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个。

 channel.basic_qos(prefetch_count=1)  # 消费者 加上这一句话
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    import time
    time.sleep(4)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)  ########
channel.basic_consume(queue='hello',
                      auto_ack=False,
                      on_message_callback=callback)


print(' [*] Waiting for messages. To exit press CTRL+C')
while True:
    channel.start_consuming()

交换机模式(routingkey和exchange同时满足)

基于交换机通信(容器) 由生产者创建,向交换机插入数据

消费者创建队列 队列绑定交换机

发布订阅

# 生产者
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',          ## 交换机模式 名字任意
                         exchange_type='fanout') ## fanout 发布订阅模式
 
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
 
#####################################################################
# 消费者
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',       ## 交换机模式 名字任意
                         exchange_type='fanout') ## fanout 发布订阅模式
 
# 消费者创建队列
result = channel.queue_declare("",exclusive=True) # 随机名字  "" , exclusive=True
queue_name = result.method.queue  # 拿到随机名字
 
# 绑定到交换机上
channel.queue_bind(exchange='logs',
                   queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
 
channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)
 
channel.start_consuming()

关键字

生产者放一个关键字 消费者放一个关键字

匹配成功后 则给消费者

# 生产者
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',          ## 交换机模式 名字任意
                         exchange_type='direct') ## direct 关键字模式
 
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='xxx', # 绑定关键字
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
 
#####################################################################
# 消费者
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',       ## 交换机模式 名字任意
                         exchange_type='direct') ## direct 关键字模式
 
# 消费者创建队列
result = channel.queue_declare("",exclusive=True) # 随机名字  "" , exclusive=True
queue_name = result.method.queue  # 拿到随机名字
 
# 绑定到交换机上
channel.queue_bind(exchange='logs',
                   queue=queue_name,routing_key='xxx') # routing_key 绑定关键字
[ 可以绑定多个关键字                 
channel.queue_bind(exchange='logs',
                   queue=queue_name,routing_key='xxx') # channel.queue_bind(exchange='logs',
                   queue=queue_name,routing_key='xxx') #
 
                   ]
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
 
channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)
 
channel.start_consuming()

通配符

同关键字相比,模糊匹配

#  一个多个单词
* 一个单词
 
uss.#
#.news
#.weather

 
# 生产者
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs3',
                         exchange_type='topic')
 
message = "info: Hello ERU!"
channel.basic_publish(exchange='logs3',
                      routing_key='europe.weather',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
 
# 消费者
 
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs3',
                         exchange_type='topic')
 
result = channel.queue_declare("",exclusive=True)
queue_name = result.method.queue
 
 
 
channel.queue_bind(exchange='logs3',
                   queue=queue_name,
                   routing_key="#.news")
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
 
channel.basic_consume(queue=queue_name,
                      auto_ack=True,
                      on_message_callback=callback)
 
channel.start_consuming()
原文地址:https://www.cnblogs.com/tangshuo/p/12744501.html