【RabbitMQ+Python入门经典】兔子和兔子窝 笔记

RabbitMQ工业级的消息队列服务器。

兔子和兔子窝

动机来源:从生产环境的电子邮件处理流程当中分支出一个特定的离线分析流程。

解决方案1:

开始使用MySQL处理,将要处理的东西放在表里面,另一个程序从中读取。

需要多个程序从一个队列中取数据来处理?硬编码程序的个数。

如果能够允许程序动态地增加和减少的时候动态进行压力分配?

解决方案2:

消息队列

现有的Queue

1、Apache ActiveMQ 可能会造成消息丢失,不可接受

2、ZeroMQ和RabbitMQ都支持一个开源的消息协议,成为AMQP.

AMQP的一个优点是它是一个灵活和开放的协议,以便和另外两个商业化的MessageQueue竞争。

3、ZeroMQ不支持消息持久化和崩溃恢复,不太好,但是如果不在意消息持久化和崩溃恢复,可以试试,延迟很低,而且支持灵活的拓扑

4、RabbitMQ可以支持消息持久化以及崩溃恢复

RabbitMQ

RabbitMQ是基于Erlang写的。在Erlang中,充斥着大量轻量进程,它们之间使用消息传递通信。和消息队列的思路一致。

RabbitMQ支持持久化。如果RabbitMQ死掉,消息并不会丢失,当队列重启,一切都会回来。

可以和Python无缝结合,提供对应的库http://barryp.org/software/py-amqplib/

AMQP

AMQP当中有四个概念非常重要:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。

一个虚拟主机持有一组交换机、队列和绑定。

为什么需要多个虚拟主机?RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机"/"。

交换机、队列以及绑定

队列

队列是消息的重点,可以理解伪装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。

队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行攻击的。

如果一个消费者试图创建一个已经存在的队列,RabbitMQ会忽略这个请求。因此可以将消息队列的配置写在应用程序的代码里面。

当已经创建并且连接到队列,消费者程序等待消息到来。当然需要把一个消息放进队列中,这里需要一个交换机(Exchange)

交换机

交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes)。例如指明具有路由键的X消息要到名为X_Queue队列中。

消费者程序要负责创建交换机们(Exchanges)?为什么是可以有多个交换机?

原因:每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便到达更高的效率。

在RabbitMQ的集群当中,可以使用类似的思路来扩展交换机以便获取更高的吞吐量。

已经有了消息,不知道往哪个队列中发送,需要指定路由规则,即绑定。

绑定(binding)

一个绑定就是一个类似这样的规则:将交换机desert当中具有路由键“阿里巴巴”的消息送到队列hideout里面去。

一个绑定就是一个基于路由键routing_key将交换机和队列连接起来的路由规则。

如果需要将同一个消息发送到两个不同的队列中,就需要创建两个绑定,每个都连接一个交换机和一个队列。

在这种情况下,交换机会复制一份消息并且把它们分别发送到两个队列当中。交换机就是一个由绑定构成的路由表。

交换机有多种类型,都是做路由的,不过接受不同类型的绑定。

为什么不创建一种交换机来处理所有类型的路由规则呢?因为每种规则用来做匹配分子的CPU开销是不同的。

Fanout Exchange:不处理路由键,只需要简单地将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。Fanout交换转发消息是最快的,因为不需要任何中间处理。

Direct Exchange:处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。

Topic Exchange:将路由键和某种模式进行匹配。此时队列需要绑定在一个模式上。

符号"#"匹配一个或多个词,符号"*"匹配不多不少一个词。例如“audit.#”能够匹配"audit.irs.corporate",但是"audit.*"只会匹配到"audit.irs"

 Rabbitmq Exchange 类型 - zhanghua.1199 - 郁,加没

其他类型的图,参照http://www.cnblogs.com/gsblog/p/3823426.html

持久化

如果使用默认参数构造的话,在RabbitMQ服务器程序挂了后,队列、交换机和绑定都没有了。

RabbitMQ重启后会清空一切,如何避免这一切。

队列和交换机有一个创建时候指定的标志durable。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。

那么如何才能做到不只是队列和交换机,还有消息都是持久化的呢?

首先一个问题是,真的需要消息是持久化的吗?对于一个需要在重启之后回复的消息来说,需要被写入到磁盘上,而即使最简单的磁盘操作也是要消耗时间的。如果和消息的内容相比,更看重的是消息处理速度,那么不要使用持久化的消息。对于其他服务来说,持久化可能很重要。

当将消息发布到交换机的时候,可以指定一个标志"delivery mode"(投递模式)。各个AMQP库不同,指定这个标志的方法可能不一样。Python中是将deliver mode设置为2,表示持久化。一般的AMQP默认是将deliver mode设置成1,就是非持久化的。

持久化消息的步骤如下:

1、将交换机设置成durable

2、将队列设置称durable

3、将消息的delivery mode设置成2

绑定怎么办?

无法在创建绑定的时候设置durable。没有问题,如果绑定一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似,如果删除某个队列或交换机(无论是否durable),依赖它的绑定都会自动删除。

注意两点:

1、RabbitMQ不允许绑定一个non-durable的交换机和一个durable的队列。反之亦然,要想成功必须队列和交换机都是durable的。

2、一旦创建队列和交换机,就不能修改其标志的。例如创建一个nondurable的队列,然后想把它修改称durable的,唯一的办法就是删除这个队列然后重现创建。

例子

 需要一个python的AMQP库

1、py-amqlib——通用的AMQP

2、txAMQP——使用Twisted框架的AMQP,因此允许异步IO

3、pika

参照之前的前几篇博客

参考链接

1、http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/

2、http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/

3、http://www.ituring.com.cn/article/4669

原文地址:https://www.cnblogs.com/gsblog/p/3826503.html