rabbitmq 之基础概念

Helloworld

首先先上一段 Helloworld 代码

分为两个角色, 1. producer(生产者)  2.consumer(消费者)   

同样我们可以理解为 发送者  和 接收者

上图 P是 producer     红色的是队列    C是consumer

 send.py (生产者)

import pika

connection =
pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))      #建立socket通信
channel = connection.channel()                                    #声明管道
     
channel.queue_declare(queue='hello')                                     #声明一个队列

#向队列发送消息 channel.basic_publish(exchange
='', #exchange后面会重点说,这里不需要知道,设置为空就行 routing_key='hello', #routing_key 是 向哪个队列发送消息 body='Hello World!') #消息内容 print(" [x] Sent 'Hello World!'") connection.close()

receive.py (消费者)

import pika

connection =
pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')          

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,          #收到消息后的回调函数
                      queue='hello',     #监听着 hello 队列
                      no_ack=True)       #no_ack (no_acknowledge) 不返回消息

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

至此  通过rabbitmq 实现了简单的 生产者 消费者 的通信

名词的介绍

1.循环调度

按照上面的helloword程序,  我们开启多个consumer(消费者), 他们会同时监听着 hello这个队列。 当producer(生产者) 向队列中发送了很多条消息时,

这些消费者会依次排序从该队列中取消息。 平均每个消费者都会收到同等数量的消息。  这种发送消息的方式叫做-----round-robin(轮询)

2.消息确认

场景: 当consumer(消费者) 从队列中拿到了一个耗时的任务,但是在他执行这个任务的时候 突然挂掉了。此时,这个正在处理的

任务就会丢失,以及发送到这个消费者的其他任务也都会丢失。   这是一个非常严重的问题。

为了解决这个问题: 在rabbitmq中有解决步骤。

  1.  消息响应    auto_ack(现在的版本) / no_ack(旧版的)        默认是开启的。当auto_ack= True 标识把他关闭。      
  2.  回调语句    在callback回调函数中   加上一句  
    ch.basic_ack(delivery_tag = method.delivery_tag)

3.消息持久化

上面的案例是consumer(消费者)突然挂掉了。  现在如果producer(生产者)突然崩溃了,那么所有的队列以及队列中的消息也全部会丢失。

因此我们必须把 队列  和  消息 设为持久化

队列持久化(durable) 

在声明队列的时候,加上参数 durable=True

channel.queue_declare(queue='hello', durable=True)

仅仅是队列持久了没有用,队列中的消息不会被持久,同样也会存在很大问题。下面就是消息持久化的方法

消息持久化

在发送消息时,加上参数  properties=pika.BasicProperties(delivery_mode = 2, # make message persistent)

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

注意:  将消息持久化并不能完全保证不会丢失。 上面的代码只是告诉rabbitmq要把消息存到硬盘中去。但从rabbitmq收到消息到保存之间

还是会有一个很小的间隔时间。 (在这很短的时间producer挂掉了 同样消息会丢失)  因此并不能保证真正的持久化。

如果一定要保证持久化,需要改写代码来支持transaction(事务)

4.公平调度

在不做任何处理的情况下, producer是按照顺序依次向consumer 分发任务,如果有的producer的任务执行时间非常长,那么他会囤积很多

任务。 这样会存在消费者不公平的情况。

我们可以使用 basic.qos方法,并设置prefetch_count=1。 这样是告诉rabbitmq,在同一时刻,不要发送超过1条

消息给一个消费者,直到他已经处理了上一条消息并且做出了相应。这样 rabbitmq 就会把消息分发给下一个空闲的消费者

channel.basic_qos(prefetch_count=1)

注意: 如果所有的消费者都处于繁忙状态,那么队列很有可能会被填满。这时候需要 增加消费者,或者其他方法来处理。

原文地址:https://www.cnblogs.com/s686zhou/p/12895594.html