RabbitMQ and Oslo.messaging

Openstack的官网上也有这个的使用介绍:http://docs.openstack.org/developer/oslo.messaging/ 

建议先把这个教程看一遍:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

Roles
producer, exchange, queue, message, consumer

Producer
RPC ways. In order to receive a response the client needs to send a ‘callback’ queue address with the request. e.g. specify properties=pika.BasicProperties(reply_to = callback_queue_name)when publishing messages. It will create a callback queue for every RPC request, which is not efficient, to solve the problem, we can use correlation_id to create a single callback queue per client.

If producer needs to know if message reached at least one queue, it can set the mandatory flag on a basic.publish, ensuring that a basic.return will be sent back to the client if no queues were appropriately bound.

Exchange
In RabbitMQ a message can never be sent directly to the queue, Instead, the producer can only send messages to an exchange, There are a few exchange types available: direct, topic, headers and fanout, the routing-key’s value is ignored for fanout exchanges.

Topic exchange is powerful and can behave like other exchanges.

Queue
Queues can optionally be made mirrored across multiple nodes. Each mirrored queue consists of one master and one or more slaves, with the oldest slave being promoted to the new master if the old master disappears for any reason.

Exclusive queues will be deleted when the connection that declared them is closed

Consumer
By default, RabbitMQ will send each message to the next consumer listening to the same queue, in sequence. This way of distributing messages is called round-robin.

Consumer applications will need to perform deduplication or handle incoming messages in an idempotent manner.

If a message is delivered to a consumer and then requeued (because it was not acknowledged before the consumer connection dropped, for example) then RabbitMQ will set the redelivered flag on it when it is delivered again (whether to the same consumer or a different one).

If a consumer determines that it cannot handle a message then it can reject it using basic.reject (or basic.nack), either asking the server to requeue it, or not.

Acknowledgment
Message acknowledgments are turned on by default. If consumer dies without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
(kong)该特性需要在consumer的代码中显式的发送ack,如:ch.basic_ack(delivery_tag = method.delivery_tag);再比如OpenStack中olso.messaging中的代码

There aren’t any message timeouts; RabbitMQ will redeliver the message only when the worker connection dies. It’s fine even if processing a message takes a very, very long time.

Please refer to http://stackoverflow.com/questions/7063224/how-can-i-recover-unacknowledged-amqp-messages-from-other-channels-than-my-conne if you’re interested in the ack mechanism.

Confirmation
Two things are required to make sure that messages aren’t lost if RabbitMQ server stops: we need tomark both the exchange/queue and messages as durable. But there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet. If you need a stronger guarantee then you can use publisher confirms.

For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.

Distributed Broker
Sometimes however it is necessary or desirable to make the RabbitMQ broker itself distributed. There are three ways in which to accomplish that: with clustering, with federation, and using the shovel.

Heartbeat
Heartbeat frames are sent about every timeout / 2 seconds. After two missed heartbeats, the peer is considered to be unreachable. Different clients manifest this differently but the TCP connection will be closed. When a client detects that RabbitMQ node is unreachable due to a heartbeat, it needs to re-connect.

  1. rabbitmqctl list_queues name messages_ready messages_unacknowledged
  2. rabbitmqctl list_exchanges
  3. rabbitmqctl list_bindings

终于可以用中文写了。

oslo.messaging的产生就不多说了,因为RPC的调用在各个项目中都有,以前各个项目分别维护一坨类似的代码,为了简化工作、方便打包等,社区就把RPC相关的功能作为OpenStack的一个依赖库。另一方面,也为后续支持非AMQP协议的消息中间件(ZeroMQ)的引入打下基础。

其实oslo.messaging库就是把rabbitmq的python库做了封装,考虑到了编程友好、性能、可靠性、异常的捕获等诸多因素。让各个项目的开发者聚焦于业务代码的编写,而不用考虑消息如何发送和接收。这对于各个项目开发者来说当然是好事,但对于一套OpenStack系统的运维人员来说,封装就意味着很多细节被隐藏,为了能够解决消息转发过程中出现的问题,需要再花费时间和精力去理解oslo.messaging的业务逻辑,对于本来就错综复杂的OpenStack核心业务来说,无疑是雪上加霜。

关于oslo.messaging的代码相关的介绍,网上有很多现成的文章,我也不想再花费时间书面总结(其实代码本身就是文档,好好读读代码即可)。但代码本身是枯燥的,所以,这里我就举例说明oslo.messaging的流程。

在nova-compute重启的时候,系统初始化过程中,如果判断到主机上有虚拟机需要reboot,nova-compute会有如下调用(这个例子中的RPC调用方式其实是一个bug,已被我的同事修复,https://review.openstack.org/#/c/170110/):

  1. self.compute_rpcapi.reboot_instance(context, instance, block_device_info=None, reboot_type=reboot_type)

这里的compute_rpcapi就是一个调用oslo.messaging库的客户端:

  1. VERSION_ALIASES ={
  2. 'icehouse':'3.23',
  3. 'juno':'3.35',
  4. }
  5. def __init__(self):
  6. super(ComputeAPI,self).__init__()
  7. target = messaging.Target(topic=CONF.compute_topic, version='3.0')
  8. version_cap =self.VERSION_ALIASES.get(CONF.upgrade_levels.compute,
  9. CONF.upgrade_levels.compute)
  10. serializer = objects_base.NovaObjectSerializer()
  11. self.client=self.get_client(target, version_cap, serializer)
  12. def get_client(self, target, version_cap, serializer):
  13. return rpc.get_client(target,
  14. version_cap=version_cap,
  15. serializer=serializer)
  16. ......
  17. def get_client(target, version_cap=None, serializer=None):
  18. assert TRANSPORT isnotNone
  19. serializer =RequestContextSerializer(serializer)
  20. return messaging.RPCClient(TRANSPORT,
  21. target,
  22. version_cap=version_cap,
  23. serializer=serializer)

这里有几个概念:
target:作为消息发送者,需要在target中指定消息要发送到的exchange, binding-key, consumer等信息(这些概念可能与target对象属性不一样)
serializer:负责消息的序列化处理。就是负责把Nova中的对象转换成可以在网络中传送的格式。
TRANSPORT:处理消息发送的抽象层。根据rpc_backend的配置确定真正处理消息发送的driver。一般我们会用到这个:rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver。对于RabbitDriver,其相关配置项都在/oslo_messaging/_drivers/impl_rabbit.py中,它内部会维护一个connection pool,管理Connection对象。

此时,我们知道,一个messaging客户端的初始化,可以确定这么几个事情:消息发到哪?消息由谁来发?消息如何做序列化?……但是,我们还缺一个最重要的,消息在哪?

别急,这只是一个RPC客户端的初始化过程……

我们接着看nova-compute通过RPC重启虚拟机:

  1. def reboot_instance(self, ctxt, instance, block_device_info,
  2. reboot_type):
  3. version ='3.0'
  4. cctxt =self.client.prepare(server=_compute_host(None, instance),
  5. version=version)
  6. cctxt.cast(ctxt,'reboot_instance',
  7. instance=instance,
  8. block_device_info=block_device_info,
  9. reboot_type=reboot_type)

解释一下:
1、RPCClient的prepare函数,是根据客户端的需求,对target重新配置一下,比如这里,更新了客户端中的server、version属性(server就对应于RabbitMQ中的consumer,也就是一个计算节点)。说明该请求是针对某个特定的consumer,而不是把消息扔到queue里,随机选择consumer处理。该函数返回一个RPCClient的包装类(_CallContext)对象。
2、cast。熟悉OpenStack消息队列的都知道,RPC消息发送分为同步和异步,这个背景知识我也不再赘述。这里的cast就是发送一个异步消息。

  1. def cast(self, ctxt, method,**kwargs):
  2. """Invoke a method and return immediately. See RPCClient.cast()."""
  3. msg =self._make_message(ctxt, method, kwargs)
  4. ctxt =self.serializer.serialize_context(ctxt)
  5. ifself.version_cap:
  6. self._check_version_cap(msg.get('version'))
  7. try:
  8. self.transport._send(self.target, ctxt, msg,retry=self.retry)
  9. except driver_base.TransportDriverErroras ex:
  10. raiseClientSendError(self.target, ex)

3、消息组装。msg就是要发送的消息体,里面都有啥呢?几个部分:
method,接收端函数的名称;
args,接收端函数的参数,参数会做序列化处理;
namespace,就是target中的namespace属性;
version,target中的version,这个version要与之前的version_cap兼容,major要一致,minor要小于或等于version_cap,即:rpcapi中的函数版本不能大于该客户端version_cap指定的版本;这个version很大的作用是为了升级时版本兼容,具体可参见团队一个同事关于升级的一篇博客:这里

4、消息的发送。这里调用了上述TRANSPORT的方法:

  1. def _send(self, target, ctxt, message,
  2. wait_for_reply=None, timeout=None,
  3. envelope=True, notify=False,retry=None):
  4. classContext(object):
  5. def __init__(self, d):
  6. self.d= d
  7. def to_dict(self):
  8. returnself.d
  9. context =Context(ctxt)
  10. msg = message
  11. if wait_for_reply:
  12. msg_id = uuid.uuid4().hex
  13. msg.update({'_msg_id': msg_id})
  14. LOG.debug('MSG_ID is %s', msg_id)
  15. msg.update({'_reply_q':self._get_reply_q()})
  16. rpc_amqp._add_unique_id(msg)
  17. rpc_amqp.pack_context(msg, context)
  18. if envelope:
  19. msg = rpc_common.serialize_msg(msg)
  20. if wait_for_reply:
  21. self._waiter.listen(msg_id)
  22. try:
  23. withself._get_connection(rpc_amqp.PURPOSE_SEND)as conn:
  24. if notify:
  25. conn.notify_send(self._get_exchange(target),
  26. target.topic, msg,retry=retry)
  27. elif target.fanout:
  28. conn.fanout_send(target.topic, msg,retry=retry)
  29. else:
  30. topic = target.topic
  31. if target.server:
  32. topic ='%s.%s'%(target.topic, target.server)
  33. conn.topic_send(exchange_name=self._get_exchange(target),
  34. topic=topic, msg=msg, timeout=timeout,
  35. retry=retry)
  36. if wait_for_reply:
  37. result =self._waiter.wait(msg_id, timeout)
  38. if isinstance(result,Exception):
  39. raise result
  40. return result
  41. finally:
  42. if wait_for_reply:
  43. self._waiter.unlisten(msg_id)

代码有点多,几点解释:
1、对于cast调用,没有wait_for_reply,没有timeout,没有notify,也说明cast方式不需要等待consumer的消费和返回值。
2、给msg消息随机生成一个唯一标识:_unique_id,并且将context中的信息扔到msg里。然后重新组装msg成如下格式:

  1. {
  2. 'oslo.version':2.0(目前是)
  3. 'oslo.message':原始msg
  4. }

这就是最终在RabbitMQ中传递的消息体。

3、从上述提到的connection pool中获取一个Connection对象,调用其topic_send方法,注意里面的参数,指定了exchange的名称、binding-key、消息体等参数。
4、Connection对象,前面提到,但没细讲。这个对象封装了对RabbitMQ的连接。
5、topic_send方法,就是初始化一个TopicPublisher对象,发消息发到RabbitMQ的broker中。 对于RabbitDriver来说,有很多类似于TopicPublisher的类,比如:DirectPublisher、FanoutPublisher,它们都是继承自Publisher(里面方法就是对RabbitMQ python库的直接调用),但表示不同的消息发送行为(从名字就能看出来)。但没有本质区别,只是不同类型的exchange属性不同而已。从topic_send方法也能看出来oslo.messaging在RabbitMQ python库自身消息发送之外,做了两个事儿:一是重试机制,二是对异常的捕获。这也是RabbitMQ tutorial中RPC章节的最后所建议的。

OK,到这里,一个cast消息的发送就结束了。本来说不讲oslo.messaging的代码实现,但发现绕开代码讲实现有点耍流氓,所以还是贴了很多代码。但对于oslo.messaging的学习,不能钻进代码太深(其实对于任何项目的学习都是这样),到每一个关键步骤,最好适时的从代码中出来,从全局视角再思考一下整个流程,会对整个机制的理解更深。

讲完了cast,那么顺便说一下call消息发送的过程。call和cast很像,区别在于:
1、call需要等待consumer处理结束,拿到返回值;
2、call需要考虑超时和捕获异常;
3、对返回消息做反序列化处理;

具体实现中的区别在于:
1、在消息发送之前,给msg增加两个属性:_msg_id_reply_q,前者是随机生成(注意要跟unique id区分开来),用于区分不同发送端发送的消息,后者是为了给consumer提示,处理结果该发给谁,其实就是发送端维护的临时exchange和queue,用于获取返回的消息。
2、发送端会创建一个全局的ReplyWaiter对象(在每个发送端共用),用于监听上述临时的queue,对收到的消息进行处理(它本身就是接收到消息的callback)。

  1. def __call__(self, message):
  2. message.acknowledge()
  3. incoming_msg_id = message.pop('_msg_id',None)
  4. self.waiters.put(incoming_msg_id, message)

可以看到首先发送ack,然后把消息存储起来。ReplyWaiter全局对象维护着每个msg的id和返回结果,每个发送端只关心与自己相关的msg。返回结果中可能包含的字段:failure、ending、result,注意,每个发送端都会有超时处理和重复消息处理。

关于call调用有一张经典的图:

上面讲了那么多,都是作为RPC客户端的行为,RPC客户端的调用,需要RPC服务端提供接口服务。在nova-compute服务初始化方法中(其他作为接收端的服务也类似):

  1. target = messaging.Target(topic=self.topic, server=self.host)
  2. endpoints =[
  3. self.manager,
  4. baserpc.BaseRPCAPI(self.manager.service_name,self.backdoor_port)
  5. ]
  6. endpoints.extend(self.manager.additional_endpoints)
  7. serializer = objects_base.NovaObjectSerializer()
  8. self.rpcserver= rpc.get_server(target, endpoints, serializer)
  9. self.rpcserver.start()
  10. ......
  11. def get_server(target, endpoints, serializer=None):
  12. assert TRANSPORT isnotNone
  13. serializer =RequestContextSerializer(serializer)
  14. return messaging.get_rpc_server(TRANSPORT,
  15. target,
  16. endpoints,
  17. executor='eventlet',
  18. serializer=serializer)

几点解释:
1、作为服务端,要在target中定义topic和server,同样需要提供serializer、TRANSPORT、target;
2、endpoints,作为接收端,消息来了如何处理?endpoints就是消息最终处理者,endpoint本身是可调用的,nova-compute本身就是一个endpoint;
3、executor,见下述描述。

transport, dispatcher, executor三者有啥区别呢?引用一句代码注释:

Connect a transport to a dispatcher that knows how to process the message using an executor that knows how the app wants to create new tasks.

通俗一点讲,executor确定接收消息的线程模型,transport负责在消息中间件层面接收消息,dispatcher负责最终的消息处理。都是为了代码逻辑而抽象出来的概念。

一般我们用的executor是eventlet,executor的工作就两件事儿:1、取消息;2、处理消息。

取消息

在取消息之前,transport有一个listen的操作:

  1. def listen(self, target):
  2. conn =self._get_connection(rpc_amqp.PURPOSE_LISTEN)
  3. listener =AMQPListener(self, conn)
  4. conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
  5. topic=target.topic,
  6. callback=listener)
  7. conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
  8. topic='%s.%s'%(target.topic,
  9. target.server),
  10. callback=listener)
  11. conn.declare_fanout_consumer(target.topic, listener)
  12. return listener

可以看到,新建了三个exchange(2个是topic类型,1个是fanout类型)和绑定的队列,并在队列上监听消息,消息的处理是AMQPListener对象。AMQPListener的处理也很简单,不停的从队列中取消息,对消息做去重、解封装(把一些属性抽取出来)后,最终构造成一个AMQPIncomingMessage类型的消息加入内存,等待被处理。

处理消息

在dispatcher处理消息时,首先发送ack,然后根据msg中的method、args、namespace、version等信息(都是上述我们已经熟悉的东西),选择合适的endpoint处理。什么叫合适的endpoint呢?

  • endpoint的target属性中namespace、version是否与消息兼容;
  • endpoint中是否有对应的method;

endpoint本身是可调用的,后续就是消息的反序列化、调用、序列化(准备返回给调用者)。

OK,我们还有最后一点没提及,消息如何返回给调用者(对于call操作)。根据前面的知识,我们知道call的调用者们会监听一个全局的queue,并且在msg中也给了足够的hints(msg_idreply_q),最为接收者,消息的返回就很简单了。

  1. def _send_reply(self, conn, reply=None, failure=None,
  2. ending=False, log_failure=True):
  3. if failure:
  4. failure = rpc_common.serialize_remote_exception(failure,
  5. log_failure)
  6. msg ={'result': reply,'failure': failure}
  7. if ending:
  8. msg['ending']=True
  9. rpc_amqp._add_unique_id(msg)
  10. # If a reply_q exists, add the msg_id to the reply and pass the
  11. # reply_q to direct_send() to use it as the response queue.
  12. # Otherwise use the msg_id for backward compatibility.
  13. ifself.reply_q:
  14. msg['_msg_id']=self.msg_id
  15. conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
  16. else:
  17. conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))

http://docs.openstack.org/developer/oslo.messaging/
https://wiki.openstack.org/wiki/Oslo/Messaging
http://bingotree.cn/?p=207

原文地址:https://www.cnblogs.com/double12gzh/p/10166172.html