RabbitMQ 笔记

1、简介

  RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成的面向消息中间件。

  Ubuntu安装RabbitMQ可参照RabbitMQ服务安装

各种MQ消息中间件对比:

2、重要概念

Broker:经纪人,简单来说就是消息队列服务器实体。它提供一种传输服务,维护一条从生产者到消费者的传输路线,保证消息数据能够按照指定的方式传输。
Exchange:消息中间件,指定消息按照什么规则路由到哪个队列。
Queue:消息队列,它作为消息的载体,每条消息都会被投递到一个或多个队列中。
Binding:绑定,用于将Exchange和Queue按照路由规则绑定起来。
Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个Broker可以开设多个vhost,用于不同用户间的权限分离。
producer:消息生产者,就是投递消息的程序,主要将消息传递到Exchange。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可以建立多个channel,每个channel代表一个会话任务。

3、Exchange类型

exchange有四种类型,分别是Direct,Fanout,Topic和Header,其中header在实际使用中用的很少,就不加说明了。

Direct:任何发送到Exchange的消息都会被转发到Routing Key指定的Queue中。该模式不需要将Exchange进行任何绑定操作,如果vhost中不存在Routing Key指定的队列名,消息将会被丢弃。

Fanout:任何发送到Exchange的消息都会被转发到与其绑定的Queue中。该模式需要将Exchange和Queue提前进行绑定,不需要Routing Key,一个Exchange可以绑定多个Queue,一个Queue也可以绑定到多个Exchange上,对没有和任何Queue绑定的Exchange,消息将会被丢弃。

Topic:任何发送到Exchange的消息都会被转发与Routing Key匹配的的所有Queue中。该模式需要提前将Exchange和Queue进行绑定,并且需要设置好Routing Key,字符"#"表示0个货多个关键字匹配,字符"*"表示一个关键字匹配,如果Exchange发现没有与Routing Key匹配的Queue,消息将会被丢弃。

整体性能上是Direct > Fanout > Topic

4、消费者订阅消息方式

  两种方式获取队列中的消息:

  1. 一种是通过basic.consume命令,订阅某一个队列中的消息,channel会自动在处理完上一条消息之后,接收下一条消息。同一个channel消息处理是串行的,除非关闭channel或者取消订阅,否则客户端将会一直接收队列的消息。

        2. 另一种方式是通过basic.get命令主动获取队列中的消息,一定不要通过循环调用basic.get来代替basic.consume,这是因为basic.get在实际执行的时候,是首先consume某一个队列,然后检索第一条消息,然后再取消订阅。对于高吞吐率的消费者,建议使用basic.consume。

      如果有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA回复服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,然后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。

  消费者在接到消息以后,都需要给服务器发送一条确认命令,这个即可以在handleDelivery里显示的调用basic.ack实现,也可以在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器可以安全的删除该消息,而不是通知生产者。与RPC不同,如果消费者在接到消息以后还没来得及返回ACK就断开了连接,消息服务器会重传该消息给下一个订阅者,如果没有订阅者就会存储该消息。

        RabbitMQ也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,可以设置一个requeue的属性,如果为true,则消息服务器会重传该消息给下一个订阅者;如果为false,则会直接删除该消息。当然可以通过设置ack,让消息服务器直接删除该消息并且不会重传。

5、持久化

        RabbitMQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,但是这还不能使得队列中的消息持久化,这需要生产者在发送消息的时候,将delivery mode设置为2,只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。这里需要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,否则在绑定时,RabbitMQ都会抛错的。

6. Confirm机制

  使用事务固然可以保证只有提交的事务,才会被服务器执行。RabbitMQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。

  Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息是否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。

7、RabbitMQ使用交互流程

  1. 客户端连接到消息队列服务器,打开一个channel。
  2. 客户端声明一个exchange,并设置相关属性。
  3. 客户端声明一个queue,并设置相关属性。
  4. 客户端使用routing key,在exchange和queue之间建立好Binding关系。
  5. 生产者客户端投递消息到exchange。
  6. exchange接收到消息后,就根据消息的RoutingKey和已经设置的binding,进行消息路由(投递),将消息投递到一个或多个队列里。
  7. 消费者客户端从对应的队列中获取并处理消息。
原文地址:https://www.cnblogs.com/kingsonfu/p/10582057.html