生命的道路上永远没有捷径可言,只有脚踏实地走下去。
今天介绍一下 openstack 中关于 RabbitMq 和 oslo_messaging 库之间的前生今世
一定要弄清楚的一个问题就是:直接使用rabbitmq和使用oslo_messaging这个库间接的使用有什么区别。
olso_messaging实际上是在rabbitmq的基础上通过一些列的调用,最终暴露给用户一个简单的使用接口,用户不必关心内部的实现,只用配置好配置文件,进行简单的函数调用即可。
并且由于这个库是openstack的标准库,里面的一些函数命名和默认参数都是针对于openstack的概念来讲的。
基本结构就是:
openstack -> oslo_messaging -> kombu -> AMQP -> socket
首先看下 oslo_messaging 中对象封装:
概念简介
Transport
Transport(传输层)主要实现RPC底层的通信(比如socket)以及事件循环,多线程等其他功能.可以通过URL来获得不同transport的句柄.URL的格式为:
transport://user:password@host:port[,hostN:portN]/virtual_host
目前支持的Transport有rabbit,qpid与zmq,分别对应不同的后端消息总线.用户可以使用oslo.messaging.get_transport函数来获得transport对象实例的句柄.
1 | import oslo_messaging |
Target
Target封装了指定某一个消息最终目的地的所有信息,下表所示为其所具有的属性:
参数=默认值 | 说 明 |
---|---|
exchange = None | (字符串类型)topic所属的范围,不指定的话默认使用配置文件中的control_exchange选项 |
topic = None | (字符串类型)一个topic可以用来标识服务器所暴露的一组接口(一个接口包含多个可被远程调用的方法).允许多个服务器暴露同一组接口,消息会以轮循的方式发送给多个服务器中的某一个 |
namespace = None | (字符串类型)用来标识服务器所暴露的某个特定接口(多个可被远程调用的方法) |
version = None | (字符串类型)服务器所暴露的接口支持M.N类型的版本号.次版本号(N)的增加表示新的接口向前兼容,主版本号(M)的增加表示新接口和旧接口不兼容.RPC服务器可以实现多个不同的主版本号接口. |
server = None | (字符串类型)客户端可以指定此参数来要求消息的目的地是某个特定的服务器,而不是一组同属某个topic的服务器中的任意一台. |
fanout = None | (布尔型)当设置为真时,消息会被发送到同属某个topic的所有服务器上,而不是其中的一台. |
在不同的应用场景下,构造Target对象需要不同的参数:创建一个RPC服务器时,需要topic和server参数,exchange参数可选;指定一个endpoint时,namespace和version是可选的;客户端发送消息时,需要topic参数,其他可选.
Server
一个RPC服务器可以暴露多个endpoint,每个endpoint包含一组方法,这组方法是可以被客户端通过某种Transport对象远程调用的.创建Server对象时,需要指定Transport,Target和一组endpoint.
RPC Client
通过RPC Client,可以远程调用RPC Sever上的方法.远程调用时,需要提供一个字典对象来指明调用的上下文,调用方法的名字和传递给调用方法的参数(用字典表示).
有cast和call两种远程调用方式.通过cast方式远程调用,请求发送后就直接返回了;通过call方式调用,需要等响应从服务器返回.
Notifier
Notifier用来通过某种transport发送通知消息.通知消息遵循如下的格式:
1 | import six |
可以在不同的优先级别上发送通知,这些优先级包括sample,critical,error,warn,info,debug,audit等.
Notification Listener
Notification Listener和Server类似,一个Notification Listener对象可以暴露多个endpoint,每个endpoint包含一组方法.但是与Server对象中的endpoint不同的是,这里的endpoint中的方法对应通知消息的不同优先级.比如:
1 | import oslo_messaging |
endpoint中的方法如果返回messaging.NotificationResult.HANDLED或者None,表示这个通知消息已经确认被处理;如果返回messaging.NotificationResult.REQUEUE,表示这个通知消息要重新进入消息队列.
下面是一个利用oslo_messaging来实现远程过程调用的示例.
1 | from oslo_config import cfg |
这个例子里,定义了两个不同的endpoint:ServerControlEndpoint与TestEndpoint.这两个endpoint中的方法stop和test都可以被客户端远程调用.
创建rpc server对象之前,需要先创建transport和target对象,这里使用get_transport()函数来获得transport对象的句柄,get_transport()的参数如下表所示:
参数=默认值 | 说 明 |
---|---|
conf | (oslo.config.cfg.ConfigOpts类型)oslo.config配置项对象 |
url = None | (字符串或者oslo.messaging.Transport类型)transport URL.如果为空,采用conf配置中的transport_url项所指定的值 |
namespace = None | (字符串类型)用来标识服务器所暴露的某个特定接口(多个可被远程调用的方法) |
allowed_remote_exmods = None | (列表类型)Python模块的列表.客户端可用列表里的模块来deserialize异常 |
aliases = None | (字典类型)transport别名和transport名称之间的对应关系 |
conf对象里,除了包含transport_url项外,还可以包含control_exchange项.control_exchange用来指明topic所属的默认范围,默认为”openstack”.可以使用oslo.messaging.set_transport_defaults()函数来修改默认值.
此处构建的Target对象是用来建立RPC Server的,所以需指明topic和server参数.用户定义的endpoint对象也可以包含一个target属性,用来指明这个endpoint所支持的特定的namespace和version.
这里使用get_rpc_server()函数创建server对象,然后调用server对象的start方法开始接收远程调用.get_rpc_server()函数的参数如下表所求:
参数=默认值 | 说 明 |
---|---|
transport | (Transpor类型)transport对象 |
target | (Target类型)target对象,用来指明监听的exchange,topic和server |
endpoints | (列表类型)包含了endpoints对象实例的列表 |
executor=’blocking’ | (字符串类型)用来指明消息接收和发收的方式:目前支持两种方式: blocking:在这种方式中,用户调用start函数后,在start函数中开始请求处理循环:用户线程阻塞,处理下一个请求.直到用户调用了stop函数后,这个处理循环才会退出.消息的接收和分发处理都在调用start函数的线程中完成. eventlet:在这种方式中,会有一个协程GreenThread来处理消息的接收,然后有其他不同的GreenThread来处理不同消息的分发处理.调用start的用户线程不会被阻塞 |
serializer = None | (Serializer类型)用来序列化/反序列化消息 |
1 | #client.py 客户端 |
这里target对象构造时,必须要有的参数只有topic,创建RPCClient对象时,可以接受的参数如下表所示:
参数=默认值 | 说 明 |
---|---|
transport | (Transport类型)transport对象 |
target | (Taget类型)该client对象的默认target对象 |
timeout = None | (整数或者浮点数类型)客户端调用call方法时超时时间(秒) |
version_cap = None | (字符串类型)最大所支持的版本号.当版本号超过时,会扔出RPCVersionCapError异常 |
serializer = None | (Serializer类型)用来序列化/反序列化消息 |
retry = None | (整数类型)连接重试次数:None或者-1:一直重试0:不重试>0:重试次数 |
远程调用时,需要传入调用上下文,调用方法的名字和传给调用方法的参数.
Target对象的属性在RPCClient对象构造以后,还可以通过prepare()方法修改.可以修改的属性包括exchange,topic,namespace,version,server,fanout,timeout,version_cap和retry.
修改后的target属性只在这个prepare()方法返回的对象中有效.
下面我们再来看一个利用oslo_messaing实现通知消息处理的例子:
1 | #notification_listener.py 消息通知处理 |
通知消息处理的endpoint对象和远程过程调用的endpoint对象不同,对象定义的方法要和通知消息的优先级一一对应.我们可以为每个endpoint指定所对应的target对象.
最后调用get_notificaton_listener()函数构造notification listener对象,get_notification_listener()函数的参数如下表所示:
参数=默认值 | 说 明 |
---|---|
transport | (Transport类型)transport对象 |
target | (列表类型)target对象的列表,用来指明endpoints列表中的每一个endpoint所侦听处理的exchange和topic |
endpoints | (列表类型)包含了endpoints对象实例的列表 |
executor=’blocking’ | (字符串类型)用来指明消息接收和发收的方式:目前支持两种方式: blocking:在这种方式中,用户调用start函数后,在start函数中开始请求处理循环:用户线程阻塞,处理下一个请求.直到用户调用了stop函数后,这个处理循环才会退出.消息的接收和分发处理都在调用start函数的线程中完成. eventlet:在这种方式中,会有一个协程GreenThread来处理消息的接收,然后有其他不同的GreenThread来处理不同消息的分发处理.调用start的用户线程不会被阻塞 |
serializer=None | (Serializer类型)用来序列化/反序列化消息 |
allow_requeue=False | (布尔类型)如果为真,表示支持NotificationResult.REQUEUE |
相对应的发送消息通知的代码如下:
1 | #notifier_send.py |
1 | 发送通知消息时,首先要构造Notifier对象,此时可能需要指定的参数如下表所示: |
参数=默认值 | 说 明 |
---|---|
transport | (Transport类型)transport对象 |
target | (列表类型)target对象的列表,用来指明endpoints列表中的每一个endpoint所侦听处理的exchange和topic |
publish_id = None | (字符串类型)发送者id |
driver = None | (字符串类型)后台驱动.一般采用”messaging”.如果没有指定,会使用配置文件中的notificaton_driver的值 |
topic = None | (字符串类型)发送消息的topic.如果没有指定,会使用配置文件中的notification_topics的值 |
serializer = None | (Serializer类型)用来序列化/反序列化消息 |
初始化Notifier对象的操作比较复杂,所以可以用prepare()方法修改已创建的Notifier对象,prepare()方法返回的是新的Notifier对象的实例.它的参数如下表所示:
参数 = 默认值 | 说 明 |
---|---|
publish_id = None | (字符串类型)发送者id |
retry = None | (整数类型)连接重试次数:None或者-1:一直重试0:不重试>0:重试次数 |
最后可以调用Notifier对象的不同方法(error, critical, warn, 等等)发送不同优先级的消息通知.
源码分析
根据上个章节,我们可以看到其实这个库最终暴漏给用户的是两个概念:1.rpc,2.notification
下面我们来根据基本源码分析一下这两个概念
rpc
rpc(即远程调用)的概念被划分为调用方和被调用方
调用方称为client:rpc_client
被调用方称为server:rpc_server
使用时,被调用方server.start,等待调用方client.cast 或 clinet.call即可发起阻塞或非阻塞的远程调用。
当rpc client执行一次远程调用时实际发生了什么呢 ?
(代码在oslo_messaging/rpc/client.py文件里)
rpc client
首先构建 rpc client ,实例化 RPCClient
1 | self.rpc_client = messaging.get_rpc_client( |
1 | def get_rpc_client(transport, retry=None, **kwargs): |
1 | class RPCClient(_BaseCallContext): |
然后通过 call 或者 cast 调用
1 | self.rpc_client.prepare(topic=topic).call(cxt, method, **args) |
1 |
|
可以看到在 29 行和 54 行,两个方法都是执行了 transport._send
只有参数不同,这里最大的区别其实是wait_for_reply这个参数,顾名思义wait or no wait也就是我们说的阻塞/非阻塞。
那_send这个方法,最重要的两关键一个是transport本身,一个是target参数,这两个东西是rpc client init的时候必须要传的参数,
transport 参数是由(osllo_messaging/transport.py文件)_get_transport方法而来:
1 | def _get_transport(conf, url=None, allowed_remote_exmods=None, |
这里url是配置文件里配的,这里以rabbitmq为例
entry_point到oslo_messaging._drivers.impl_rabbit:RabbitDriver,最终获得到的是RabbitDriver的实例。
target 直接实例化即可,这里注意到两个参数exchange和topic,和rabbitmq里的exchange和routing_key的概念一致
那我们接着来看 transport._send方法,前面也说到了transport此时是RabbitDriver
RabbitDriver 继承自 AMQPDriverBase 继承自 BaseDriver
_send 方法在AMQPDriverBase中:
1 | class AMQPDriverBase(base.BaseDriver): |
我们看下 44 行到 66 行,回顾上面的cast和call函数里调用_send的时候是没有传notify找个参数的,所以第一个条件一定不成立
那看接下来的两个case,elif target.fanout/else(这里的fanout与rabbitmq本身的fanout意义是一样的)那也就是说我们在生成target或者client.prepare的时候可以通过指定fanout这个参数来决定进入哪个case,(注意第三个case里如果指定了target.server那么topic是target.topic和target.server二者相结合)那我们这里来看一下conn.fanout_send和conn.topic_send这两个方法(conn是enter exit getattr的产物,具体本文不细说了,这里只要知道最终调用到了oslo_messaging/_drivers/impl_rabbit.py里Connection这个类就可以了):
conn.fanout_send
1 | class Connection(object): |
其实到这里基本上就清楚了fanout_send就是往名叫target.topic + “_fanout”这个exchange里发送fanout模式的消息,所有bind到这个exchange的queue都会收到这条消息,如果这个exchange没有创建过,在self.publish方法里会被declare.
conn.topic_send
1 | class Connection(object): |
topic_send就是以topic做为routing_key 以exchange_name这个参数值命名的exchange里发送topic模式的消息,这里注意区别就是exchange_name是上级调用_get_exchange方法得来的
1 | class Connection(object): |
_default_exchange如果仔细看的话前面其实前面的截图里有,就是conf.control_exchange
默认是openstack(这里大概知道点为啥oslo_messaging是为openstack搞得了吧😄),关键还是取决于target,如果target里没有指定才会用配置文件的。
同样,如果这个exchange没有创建过,在self.publish方法里会被declare.
rpc server
首先要获得一个rpc server的实例
1 | def get_rpc_server(transport, target, endpoints, |
1 | class RPCServer(msg_server.MessageHandlingServer): |
RPCServer 继承自 MessageHandlingServer 继承自 ServiceBase,_OrderedTaskRunner
同样的,transport和target是必须要有的,获得 rpc sever实例后,rpc server调用start方法,最终调用到了基类的start方法
1 |
|
关键看下 20 行到 25 行,因为这里实例化的是 RPCServer,所以_create_listener 调用的是 RPCServer 的方法
1 | class RPCServer(msg_server.MessageHandlingServer): |
1 | class Transport(object): |
1 | class AMQPDriverBase(base.BaseDriver): |
listen方法实际上关键是执行了三个declare,以下称作:
declare_topic_consumer(1)
declare_topic_consumer (2)
declare_fanout_consumer
declare_topic_consumer(1)
oslo_messaging/_drivers/impl_rabbit.py
1 | class Connection(object): |
1 | def declare_consumer(self, consumer): |
1 | def declare(self, conn): |
1 |
|
1 | def _create_exchange(self, nowait=False, channel=None): |
一目了然,总结来说就是使用target的exchange(默认openstack)做为exchange_name
使用target的topic做为默认的queue_name,然后declare这个exchange和queue,然后将二者bind。
declare_topic_consumer(2)
与declare_topic_consumer(1)的唯一区别是这里使用了target.topic结合target.server做为了默认的queue_name。
declare_fanout_consumer
1 | class Connection(object): |
与上述两种的区别是,这里的queue_name变成了target.topic+”fanout“+uuid
exchange_name变成了target.topic+”_fanout”,exchange的type变成了fanout
这里也指定了routing_key ,我觉得应该是没用的。
小结
server监听
一、
1.由target.exchange或配置文件(openstack为默认值)命名的exchange(type为topic模式)
2.以target.topic做为queue_name
2.以target.topic做为routing_key进行queue和exchange的绑定
二、
1.由target.exchange或配置文件(openstack为默认值)命名的exchange(type为topic模式)
2.以target.topic结合target.server做为queue_name
2.以target.topic结合target.server做为routingkey进行queue和exchange的绑定
三、
1.由target.topic+”fanout”命名的exchange(type为fanout模式)
2.以target.topic+”fanout“+唯一uuid做为queue_name
2.将queue和exchange的绑定
client调用
1.非阻塞调用:client.cast
2.阻塞调用:client.call
通过prepare来改变client的target进行fanout或者指定server的调用
notification
notification,顾名思义,消息/通知,其概念被分为
通知方:notifier(官方也叫driver), 监听方:notification_listener
使用时,监听方listener.start, 调用方notifier.notfiy(具体暴漏给用户使用时是sample,audit,info等不同level的方法) 即可把消息发给监听方进行处理。
发送方
以 ceilometer 为例,首先需要实例化一个 Notifier 对象
1 | self.notifier = oslo_messaging.Notifier( |
在处理完数据之后需要将数据发送出去(具体中间处理数据的部分就不讲了,见我的另一篇 polling 源码分析)
1 | class Notifier(object): |
1 | def _notify(self, ctxt, event_type, payload, priority, publisher_id=None, |
1 | class ExtensionManager(object): |
1 | 分析def map: |
1 | class ExtensionManager(object): |
可以看到 response_callback 传过来是一个 append 方法,这里的 func 就是 do_notify,发生调用,即
1 | def _notify(self, ctxt, event_type, payload, priority, publisher_id=None, |
即调用 13 行方法
1 | (Pdb) p ext |
1 | class MessagingDriver(notifier.Driver): |
实际调用到第 6 行,最关键的地方:
43 -> priority = priority.lower()
44 for topic in self.topics:
45 target = oslo_messaging.Target(topic=’%s.%s’ % (topic, priority))
可以看到oslo_messaging封装得到的真正队列名称是:
样例:
notifications.sample
所以,一旦调用oslo_messaging.notifier.sample来发送消息,此时的priority就被设置为
sample了,并且该优先级被用于最终拼接生成oslo_messaging的Target,最后生成了对应的
队列notifications.sample
1 | class Transport(object): |
1 | (Pdb) p self._driver |
1 | class AMQPDriverBase(base.BaseDriver): |
1 | def _send(self, target, ctxt, message, |
到了这里就和 rpc client 那块差不多了,区别就是这里发送的时候,传了 notify=True,后面就不啰嗦了,和前面差不多
接收/监听方
还是以 ceilometer 为例
1 | urls = cfg.CONF.notification.messaging_urls or [None] |
1 | (Pdb) endpoints |
1 | def get_batch_notification_listener(transport, targets, endpoints, |
1 | def get_batch_notification_listener(transport, targets, endpoints, |
其中 BatchNotificationDispatcher 实现了 dispatch 方法,BatchNotificationServer 中实现了 _process_incoming方法
BatchNotificationDispatcher 继承自 NotificationDispatcher 继承自 DispatcherBase
BatchNotificationServer 继承自 NotificationServerBase 继承自 MessageHandlingServer 继承自 ServiceBase,_OrderedTaskRunner
因为 listen 实例化的是 BatchNotificationServer 对象,所以在调用 listener.start() 的时候,实际调用的是 MessageHandlingServer 的 start 方法
1 |
|
重点在 21 和 25 行,第 21 行调到了
1 | class NotificationServerBase(msg_server.MessageHandlingServer): |
1 | def _listen_for_notifications(self, targets_and_priorities, pool, |
1 | class AMQPDriverBase(base.BaseDriver): |
在listen_for_notification中建立连接后,创建topic.priority的queue。PollStyleListenerAdapter启动一个线程对获取到的数据进行处理,此处返回此类,所以_create_listener 就是返回了一个PollStyleListenerAdapter的实例
然后调用 self.listener.start(self._on_incoming)
PollStyleListenerAdapter 继承自 Listener
1 | class PollStyleListenerAdapter(Listener): |
NotificationAMQPListener 继承自 AMQPListener 继承自 PollStyleListener
可以看到这里的 start 方法就是调用了 _runner方法,这里的 self.poll_style_listener 就是 NotificationAMQPListener 实例,通过 poll 不断从队列中取出数据
即调用 AMQPListener.poll
1 | class NotificationAMQPListener(AMQPListener): |
在 call 方法又调用了 self.message_cls ,即 NotificationAMQPIncomingMessage
1 | class NotificationAMQPIncomingMessage(AMQPIncomingMessage): |
NotificationAMQPIncomingMessage 继承自 AMQPIncomingMessage 继承自 RpcIncomingMessage 继承自 IncomingMessage
(self.conn.consume 中有回调函数调用,call 方法,往 incoming 中塞值,poll 再从里面取值处理)括号中的部分有点疑问,理得不是很清楚,如果有清楚的大佬,欢迎指教。
NotificationAMQPIncomingMessage 是一个消息对象,包含了acknowledge和requeue方法,可以用于消息确认或再次入队。
取出之后用self.on_incoming_callback(incoming)处理,即MessageHandlingServer中的self.__on_incoming处理
1 |
|
我们看到这边实际使用的是self._process_incoming来处理,即采用BatchNotificationServer类中的self.__process_incoming处理
1 | class BatchNotificationServer(NotificationServerBase): |
可以看到该处理函数会调用dispatcher对象来分派消息,这里的self.dispatcher,就是之前使用NotificationDispatcher初始化后传过来的参数,后面就是一些数据处理的过程,详细请见我的另一篇 nogtification 源码分析
参考: