ceilometer-notification源码分析

在强者的眼中,没有最好,只有更好。

以社区 N 版代码为例

一、启动命令

1
exec ceilometer-agent-notification --config-file /etc/ceilometer/ceilometer.conf

二、代码入口

ceilometer代码使用setuptools的pbr管理,该部分知识请见:

Python打包之setuptools

Openstack中setuptools和pbr软件打包管理

入口在:/ceilometer/cmd/agent_notification.py的main函数

1
2
3
4
5
6
7
def main():
service.prepare_service()

sm = cotyledon.ServiceManager()
sm.add(notification.NotificationService,
workers=CONF.notification.workers)
sm.run()

该部分使用cotyledon的多进程框架实现

直接看NotificationService类的 run 方法

三、服务启动

服务启动在:NotificationService.run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
def run(self):
super(NotificationService, self).run()
self.shutdown = False
self.periodic = None
self.partition_coordinator = None
self.coord_lock = threading.Lock()

self.listeners = []

# NOTE(kbespalov): for the pipeline queues used a single amqp host
# hence only one listener is required
self.pipeline_listener = None

"""
(Pdb) self.pipeline_manager.__dict__
{
'cfg_mtime': 1592551325.2732353,
'cfg_hash': 'ba1311798d634022e4b684b72bf7b42a',
'cfg_loc': '/etc/ceilometer/pipeline.yaml',
'pipelines': [ < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3ad0 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3290 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3b90 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e39d0 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3950 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3a10 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e37d0 > ]
}

(Pdb) self.pipeline_manager.pipelines[0].__dict__
{
'source': < ceilometer.pipeline.SampleSource object at 0x7fa17405e050 > ,
'sink': < ceilometer.pipeline.SampleSink object at 0x7fa17405e350 > ,
'name': 'notification_source:notification_sink'
}

(Pdb) self.pipeline_manager.pipelines[0].source.__dict__
{
'name': 'notification_source',
'cfg': {
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'name': 'notification_source',
'sinks': ['notification_sink']
},
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'discovery': [],
'resources': [],
'sinks': ['notification_sink']
}

(Pdb) self.pipeline_manager.pipelines[0].sink.__dict__
{
'publishers': [ < ceilometer.publisher.messaging.SampleNotifierPublisher object at 0x7fa17405e410 > ],
'transformers': [],
'name': 'notification_sink',
'cfg': {
'publishers': ['notifier://'],
'transformers': None,
'name': 'notification_sink'
},
'multi_publish': False,
'transformer_cfg': []
}

"""
self.pipeline_manager = pipeline.setup_pipeline()
self.event_pipeline_manager = pipeline.setup_event_pipeline()

self.transport = messaging.get_transport()

if cfg.CONF.notification.workload_partitioning:
self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
else:
# FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option
# not a ceilometer. Until we have something to get the
# notification_topics in another way, we must create a transport
# to ensure the option has been registered by oslo_messaging.
messaging.get_notifier(self.transport, '')
self.group_id = None

# 该函数里判断了是否支持工作负载,如果是则返回SamplePipelineTransportManager类实例替换掉
# PipelineManager类实例,否则还是PipelineManager类实例
# 这两者的区别在于publisher函数实现是不一样的
# SamplePipelineTransportManager的在调用publisher函数时会再发到消息队列中去保存,之后会再取出来处理再发到gnocchi-api上
# PipelineManager的则直接去发到gnocchi-api服务上去了
self.pipe_manager = self._get_pipe_manager(self.transport,
self.pipeline_manager)
self.event_pipe_manager = self._get_event_pipeline_manager(
self.transport)

self._configure_main_queue_listeners(self.pipe_manager,
self.event_pipe_manager)

if cfg.CONF.notification.workload_partitioning:
......

if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
'advisable to disable these meters using '
'ceilometer.conf or the pipeline.yaml'))

self.init_pipeline_refresh()

以上的代码中的一些参数我都用pdb打印出来了,可以更加直观的感受,我们来逐行分析一下服务启动的过程:

  1. 前 12 主要是一些参数的定义和初始化的工作

  2. 看下第 61 行和 62 行两部分,实现的逻辑是一样的,我们来分析一下 61 行:

    1
    2
    3
    4
    5
    6
    7
    def setup_pipeline(transformer_manager=None):
    """Setup pipeline manager according to yaml config file."""

    default = extension.ExtensionManager('ceilometer.transformer')
    cfg_file = cfg.CONF.pipeline_cfg_file
    return PipelineManager(cfg_file, transformer_manager or default,
    SAMPLE_TYPE)
    1. 第4行加载了 ceilometer.transformer 命名空间中的插件

      1
      2
      3
      4
      5
      6
      7
      ceilometer.transformer =
      accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
      delta = ceilometer.transformer.conversions:DeltaTransformer
      unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
      rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
      aggregator = ceilometer.transformer.conversions:AggregatorTransformer
      arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
    2. 第 5 行的文件是 pipeline.yaml ,然后使用 PipelineManager 类进行初始化

      我们来看下 PipelineManager 类初始化的内容,同样的我也加了 pdb 调试出来,更加直观:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      class PipelineManager(ConfigManagerBase):
      def __init__(self, cfg_info, transformer_manager, p_type=SAMPLE_TYPE):
      super(PipelineManager, self).__init__()
      cfg = self.load_config(cfg_info)

      self.pipelines = []
      if not ('sources' in cfg and 'sinks' in cfg):
      raise PipelineException("Both sources & sinks are required",
      cfg)
      LOG.info(_LI('detected decoupled pipeline config format'))

      unique_names = set()
      sources = []
      """
      pipeline.yaml
      (Pdb) cfg.get('sources')
      [{
      'interval': 3600,
      'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
      'name': 'notification_source',
      'sinks': ['notification_sink']
      }, {
      'interval': 300,
      'meters': ['poll.*', 'memory.usage', 'memory.util'],
      'name': 'meter_source',
      'sinks': ['meter_sink']
      }]
      event_pipeline.yaml
      (Pdb) cfg.get('sources')
      [{
      'sinks': ['event_sink'],
      'name': 'event_source',
      'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists']
      }]

      """
      for s in cfg.get('sources'):
      name = s.get('name')
      if name in unique_names:
      raise PipelineException("Duplicated source names: %s" %
      name, self)
      else:
      unique_names.add(name)
      """
      pipeline.yaml
      (Pdb) sources
      [<ceilometer.pipeline.SampleSource object at 0x7fa17405e050>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e110>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e090>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e190>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e250>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e210>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e310>]
      (Pdb) sources[0].__dict__
      {
      'name': 'notification_source',
      'cfg': {
      'interval': 3600,
      'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
      'name': 'notification_source',
      'sinks': ['notification_sink']
      },
      'interval': 3600,
      'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
      'discovery': [],
      'resources': [],
      'sinks': ['notification_sink']
      }
      (Pdb) sources[1].__dict__
      {'name': 'meter_source', 'cfg': {'interval': 300, 'meters': ['poll.*', 'memory.usage', 'memory.util'], 'name': 'meter_source', 'sinks': ['meter_sink']}, 'interval': 300, 'meters': ['poll.*', 'memory.usage', 'memory.util'], 'discovery': [], 'resources': [], 'sinks': ['meter_sink']}

      event_pipeline.yaml
      {
      'cfg': {
      'sinks': ['event_sink'],
      'name': 'event_source',
      'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists']
      },
      'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists'],
      'name': 'event_source',
      'sinks': ['event_sink']
      }
      """
      sources.append(p_type['source'](s))
      unique_names.clear()

      sinks = {}
      """
      pipeline.yaml
      (Pdb) cfg.get('sinks')
      [{
      'publishers': ['notifier://'],
      'transformers': None,
      'name': 'notification_sink'
      }, {
      'publishers': ['notifier://'],
      'transformers': None,
      'name': 'meter_sink'
      }]
      event_pipeline.yaml
      (Pdb) cfg.get('sinks')
      [{'publishers': ['notifier://'], 'transformers': None, 'name': 'event_sink'}]
      """
      for s in cfg.get('sinks'):
      name = s.get('name')
      if name in unique_names:
      raise PipelineException("Duplicated sink names: %s" %
      name, self)
      else:
      unique_names.add(name)
      """
      (Pdb) sinks
      {'network_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741e3350>, 'volume_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741dbfd0>, 'disk_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741dbc50>, 'cpu_delta_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e290>, 'cpu_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e390>, 'meter_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e3d0>, 'notification_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e350>}
      (Pdb) sinks.get('network_sink').__dict__
      {
      'publishers': [ < ceilometer.publisher.messaging.SampleNotifierPublisher object at 0x7fa1741e3890 > ],
      'transformers': [ < ceilometer.transformer.conversions.RateOfChangeTransformer object at 0x7fa1741e3710 > ],
      'name': 'network_sink',
      'cfg': {
      'publishers': ['notifier://'],
      'transformers': [{
      'name': 'rate_of_change',
      'parameters': {
      'source': {
      'map_from': {
      'name': 'network\\.(incoming|outgoing)\\.(bytes|packets)',
      'unit': '(B|packet)'
      }
      },
      'target': {
      'map_to': {
      'name': 'network.\\1.\\2.rate',
      'unit': '\\1/s'
      },
      'type': 'gauge'
      }
      }
      }],
      'name': 'network_sink'
      },
      'multi_publish': False,
      'transformer_cfg': [{
      'name': 'rate_of_change',
      'parameters': {
      'source': {
      'map_from': {
      'name': 'network\\.(incoming|outgoing)\\.(bytes|packets)',
      'unit': '(B|packet)'
      }
      },
      'target': {
      'map_to': {
      'name': 'network.\\1.\\2.rate',
      'unit': '\\1/s'
      },
      'type': 'gauge'
      }
      }
      }]
      }

      """
      sinks[s['name']] = p_type['sink'](s, transformer_manager)
      unique_names.clear()

      for source in sources:
      source.check_sinks(sinks)
      for target in source.sinks:
      pipe = p_type['pipeline'](source, sinks[target])
      if pipe.name in unique_names:
      raise PipelineException(
      "Duplicate pipeline name: %s. Ensure pipeline"
      " names are unique. (name is the source and sink"
      " names combined)" % pipe.name, cfg)
      else:
      unique_names.add(pipe.name)
      self.pipelines.append(pipe)
      unique_names.clear()

      def publisher(self):
      """Build a new Publisher for these manager pipelines.

      :param context: The context.
      """
      return PublishContext(self.pipelines)
      1. 第 37-79 行是取出 pipeline.yaml 文件中的 sources然后经过p_type[‘source’]类的初始化,再放进 sources 对象里,那么p_type[‘source’]类是什么呢,如下所示,代表SampleSource类,这个类里面就是解析下配置文件中每个资源的采集周期interval和采集插件 meters

        1
        2
        3
        4
        5
        6
        SAMPLE_TYPE = {'pipeline': SamplePipeline,
        'source': SampleSource,
        'sink': SampleSink}
        EVENT_TYPE = {'pipeline': EventPipeline,
        'source': EventSource,
        'sink': EventSink}
      2. 同样的,第 98-158 行是为了取出配置文件中的 sinks 对象,使用SampleSink类进行初始化,这个类本身没有 init 函数,所以使用此类的基类进行初始化,即 Sink 类

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        class Sink(object):
        def __init__(self, cfg, transformer_manager):
        self.cfg = cfg

        try:
        self.name = cfg['name']
        # It's legal to have no transformer specified
        self.transformer_cfg = cfg.get('transformers') or []
        except KeyError as err:
        raise PipelineException(
        "Required field %s not specified" % err.args[0], cfg)

        if not cfg.get('publishers'):
        raise PipelineException("No publisher specified", cfg)

        self.publishers = []
        for p in cfg['publishers']:
        if '://' not in p:
        # Support old format without URL
        p = p + "://"
        try:
        self.publishers.append(publisher.get_publisher(p,
        self.NAMESPACE))
        except Exception:
        LOG.exception(_("Unable to load publisher %s"), p)

        self.multi_publish = True if len(self.publishers) > 1 else False
        self.transformers = self._setup_transformers(cfg, transformer_manager)

        这里面主要看下第 22 行和第 28 行,分析:

        1. 第 22 行调用了get_publisher方法,这个里面主要是加载ceilometer.publisher命名空间中的notifier插件
        2. 第 28 行调用了_setup_transformers 方法,其中transformer_manager参数是ceilometer-transformer 中的所有插件,方法里面主要实现的是,遍历配置文件中的 sinks ,然后根据transformer 中的 name 来生成插件对象,比如name 是rate_of_change的话,即代表生成在ceilometer-transformer中的rate_of_change插件,然后放到transformers列表里
      3. 我们接着来分析PipelineManager,然后第 160-185 行,就是遍历在以上分析配置文件之后存储的值 sources 和 sinks,拿出 sources 中每个 source,然后再拿出 source 中的每个 sink,使用SamplePipeline类初始化之后,放进self.pipelines列表,该类没有 init 方法,所以调用基类Pipeline

        1
        2
        3
        4
        5
        6
        7
        8
        @six.add_metaclass(abc.ABCMeta)
        class Pipeline(object):
        """Represents a coupling between a sink and a corresponding source."""

        def __init__(self, source, sink):
        self.source = source
        self.sink = sink
        self.name = str(self)
      4. 最终在对象存储的数据是:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        pipeline.yaml
        (Pdb) self.pipelines
        [<ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3ad0>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3290>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3b90>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e39d0>]
        (Pdb) self.pipelines[0].__dict__
        {'source': <ceilometer.pipeline.SampleSource object at 0x7fc20c082650>, 'sink': <ceilometer.pipeline.SampleSink object at 0x7fc20c082050>, 'name': 'notification_source:notification_sink'}
        event_pipeline.yaml
        (Pdb) self.pipelines
        [<ceilometer.pipeline.EventPipeline object at 0x7fa1741e3d10>]
        (Pdb) self.pipelines[0].__dict__
        {'source': <ceilometer.pipeline.EventSource object at 0x7fa1741e3dd0>, 'sink': <ceilometer.pipeline.EventSink object at 0x7fa1741e3ed0>, 'name': 'event:event_source:event_sink'}
        """
    3. 再继续 NotificationService.run 方法,61 行已经分析完毕,62 行和 61 行逻辑一致

    4. 第 79-82 行的分析见代码中

    5. 最终的是第 89 行的代码,调用了 _configure_main_queue_listeners 方法:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      def _configure_main_queue_listeners(self, pipe_manager,
      event_pipe_manager):
      """"""
      """
      (Pdb) notification_manager.extensions[0].__dict__
      {
      'obj': < ceilometer.network.notifications.Firewall object at 0x7fa1741e3d50 > ,
      'entry_point': EntryPoint.parse('network.services.firewall = ceilometer.network.notifications:Firewall'),
      'name': 'network.services.firewall',
      'plugin': < class 'ceilometer.network.notifications.Firewall' >
      }
      """
      notification_manager = self._get_notifications_manager(pipe_manager)
      if not list(notification_manager):
      LOG.warning(_('Failed to load any notification handlers for %s'),
      self.NOTIFICATION_NAMESPACE)

      ack_on_error = cfg.CONF.notification.ack_on_event_error

      endpoints = []
      """
      (Pdb) endpoints
      [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>]
      (Pdb) endpoints[0].__dict__
      {
      'event_converter': < ceilometer.event.converter.NotificationEventsConverter object at 0x7fa17467c050 > ,
      'manager': < ceilometer.pipeline.PipelineManager object at 0x2f6d550 >
      }
      (Pdb) endpoints[0].manager.__dict__
      {
      'cfg_mtime': 1592551325.2732353,
      'cfg_hash': 'c01e8ee1c40b66314628536afcd48a39',
      'cfg_loc': '/etc/ceilometer/event_pipeline.yaml',
      'pipelines': [ < ceilometer.pipeline.EventPipeline object at 0x7fa1741e3d10 > ]
      }
      """
      endpoints.append(
      event_endpoint.EventsNotificationEndpoint(event_pipe_manager))

      targets = []
      for ext in notification_manager:
      handler = ext.obj
      """
      (Pdb) cfg.CONF.oslo_messaging_notifications.topics
      ['notifications']
      (Pdb) cfg.CONF.notification.disable_non_metric_meters
      True
      """

      if (cfg.CONF.notification.disable_non_metric_meters and
      isinstance(handler, base.NonMetricNotificationBase)):
      continue
      LOG.debug('Event types from %(name)s: %(type)s'
      ' (ack_on_error=%(error)s)',
      {'name': ext.name,
      'type': ', '.join(handler.event_types),
      'error': ack_on_error})
      # NOTE(gordc): this could be a set check but oslo_messaging issue
      # https://bugs.launchpad.net/oslo.messaging/+bug/1398511
      # This ensures we don't create multiple duplicate consumers.
      for new_tar in handler.get_targets(cfg.CONF):
      if new_tar not in targets:
      targets.append(new_tar)
      endpoints.append(handler)

      """
      (Pdb) endpoints
      [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>,
      <ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fa1741f6810>,
      <ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fa17424d410>,
      <ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fa17424d8d0>,
      <ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fa1742461d0>,
      <ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fa17424dc90>,
      <ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fa17467ddd0>]
      (Pdb) cfg.CONF.notification.messaging_urls
      ['rabbit://rabbitmq:I5dZs2KN@rabbitmq.openstack.svc.cluster.local:5672/']
      """
      urls = cfg.CONF.notification.messaging_urls or [None]
      for url in urls:
      transport = messaging.get_transport(url)
      # NOTE(gordc): ignore batching as we want pull
      # to maintain sequencing as much as possible.
      listener = messaging.get_batch_notification_listener(
      transport, targets, endpoints)
      listener.start()
      self.listeners.append(listener)
      1. 第 13 行,里面主要是加载了ceilometer.notification命名空间的插件,并将pipe_manager作为自动加载extension时传入的参数

      2. 第 37 行,定义了当其余组件触发事件的时候的 endpoints,此处我们稍后分析

      3. 第 40-64 行,遍历刚在 13 行加载的插件,里面大概的逻辑是,先判断该插件的对象是不是 NonMetricNotificationBase 类的子类,如果是的话,直接跳过,如果不是的话,调用插件的 get-target 方法,生成oslo_messaging.Target实例,topic 是 notifications,然后将插件也放进 endpoints 中

      4. 第 78 行到最后就是开启服务的最后了,首先我们看下这个 urls 是什么:

        1
        2
        (Pdb) cfg.CONF.notification.messaging_urls
        ['rabbit://rabbitmq:I5dZs2KN@rabbitmq.openstack.svc.cluster.local:5672/']

        使用 rabbitmq 通信,然后在 83 行调用了get_batch_notification_listener方法,方法中定义接收消息的 target 类型,已经消息过来之后的 endpoints ,这个 endpoints 我们知道里面有两中类型的数据,一种是采集的数据的收集,还有一种是 event 事件的收集,我们看下这个里面发生了什么

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        def get_batch_notification_listener(transport, targets, endpoints,
        allow_requeue=False,
        batch_size=1, batch_timeout=None):
        """
        (Pdb) a
        transport = <oslo_messaging.transport.Transport object at 0x7fab5c6a9190>
        targets = [<Target exchange=ironic, topic=notifications>, <Target exchange=ceilometer, topic=notifications>, <Target exchange=nova, topic=notifications>, <Target exchange=cinder, topic=notifications>, <Target exchange=glance, topic=notifications>, <Target exchange=neutron, topic=notifications>, <Target exchange=heat, topic=notifications>, <Target exchange=keystone, topic=notifications>, <Target exchange=sahara, topic=notifications>, <Target exchange=trove, topic=notifications>, <Target exchange=zaqar, topic=notifications>, <Target exchange=swift, topic=notifications>, <Target exchange=magnum, topic=notifications>, <Target exchange=central, topic=notifications>]
        endpoints = [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x7fab5c258c90>, <ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fab5c2a9610>, <ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fab5c2b8110>, <ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fab5c2b8510>, <ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fab5c2b8810>, <ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fab5c2b8a90>, <ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fab5c6be210>]
        allow_requeue = False
        batch_size = 1
        batch_timeout = None
        """
        return oslo_messaging.get_batch_notification_listener(
        transport, targets, endpoints, executor='threading',
        allow_requeue=allow_requeue,
        batch_size=batch_size, batch_timeout=batch_timeout)

        实际上调用还是oslo_messaging.get_batch_notification_listener 我们继续往里分析

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        def get_batch_notification_listener(transport, targets, endpoints,
        executor='blocking', serializer=None,
        allow_requeue=False, pool=None,
        batch_size=None, batch_timeout=None):
        dispatcher = notify_dispatcher.BatchNotificationDispatcher(
        endpoints, serializer)
        return BatchNotificationServer(
        transport, targets, dispatcher, executor, allow_requeue, pool,
        batch_size, batch_timeout
        )

        看下BatchNotificationDispatcher的实现,没有 init 方法,调用基类NotificationDispatcher的 init 方法,即:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']

        class NotificationDispatcher(dispatcher.DispatcherBase):
        def __init__(self, endpoints, serializer):

        self.endpoints = endpoints
        self.serializer = serializer or msg_serializer.NoOpSerializer()

        self._callbacks_by_priority = {}
        for endpoint, prio in itertools.product(endpoints, PRIORITIES):
        if hasattr(endpoint, prio):
        method = getattr(endpoint, prio)
        screen = getattr(endpoint, 'filter_rule', None)
        self._callbacks_by_priority.setdefault(prio, []).append(
        (screen, method))

        这里用到了itertools.product(A,B),这个方法返回A、B中的元素的笛卡尔积的元组,以此遍历,如果某个 endpoint 中实现了此 prio 的属性,如果存在的话,即存进_callbacks_by_priority,大概格式是:

        1
        2
        3
        'sample': [
        ( < oslo_messaging.notify.filter.NotificationFilter object at 0x7ff14c254590 > , < bound method TemperatureSensorNotification.sample of < ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7ff14c254550 >> ),
        ( < oslo_messaging.notify.filter.NotificationFilter object at 0x7ff14c260590 > , < bound method TelemetryIpc.sample of < ceilometer.telemetry.notifications.TelemetryIpc object at 0x7ff14c260150 >> )]

        列表中元祖格式(screen,method),其中 screen 是插件类的filter_rule属性,以TelemetryIpc为例,NotificationBase是其基类:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        @six.add_metaclass(abc.ABCMeta)
        class NotificationBase(PluginBase):
        """Base class for plugins that support the notification API."""
        def __init__(self, manager):
        super(NotificationBase, self).__init__()
        # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
        # messages to an endpoint.
        if self.event_types:
        self.filter_rule = oslo_messaging.NotificationFilter(
        event_type='|'.join(self.event_types))
        self.manager = manager

        NotificationFilter是用来筛选 notification 服务接收到的消息的,可以根据 context,publisher_id, event_type, metadata and payload来筛选

        我们再回到get_batch_notification_listener,dispatcher分析完了之后,下面就是BatchNotificationServer类的实例

        1
        2
        继承关系
        BatchNotificationServer--->NotificationServerBase--->MessageHandlingServer

        这里面的初始化就是简单的定义一些参数,返回 listen 对象

        然后_configure_main_queue_listeners方法的第 83 行调用了 listen.start,这里就是调用MessageHandlingServer的 start 方法

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        @ordered(reset_after='stop')
        def start(self, override_pool_size=None):
        if self._started:
        LOG.warning('The server has already been started. Ignoring '
        'the redundant call to start().')
        return

        self._started = True

        executor_opts = {}

        if self.executor_type in ("threading", "eventlet"):
        executor_opts["max_workers"] = (
        override_pool_size or self.conf.executor_thread_pool_size
        )
        self._work_executor = self._executor_cls(**executor_opts)

        try:
        self.listener = self._create_listener()
        except driver_base.TransportDriverError as ex:
        raise ServerListenError(self.target, ex)

        self.listener.start(self._on_incoming)

        这里主要看下 19 行和 23 行,第 19 行调用了_create_listener,实际上调用的是NotificationServerBase的方法

        1
        2
        3
        4
        5
        def _create_listener(self):
        return self.transport._listen_for_notifications(
        self._targets_priorities, self._pool, self._batch_size,
        self._batch_timeout
        )

        我们看到这里又调用了_listen_for_notifications

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        def _listen_for_notifications(self, targets_and_priorities, pool,
        batch_size, batch_timeout):
        for target, priority in targets_and_priorities:
        if not target.topic:
        raise exceptions.InvalidTarget('A target must have '
        'topic specified',
        target)
        return self._driver.listen_for_notifications(
        targets_and_priorities, pool, batch_size, batch_timeout
        )

        继续往里分析

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        def listen_for_notifications(self, targets_and_priorities, pool,
        batch_size, batch_timeout):
        conn = self._get_connection(rpc_common.PURPOSE_LISTEN)

        listener = NotificationAMQPListener(self, conn)
        for target, priority in targets_and_priorities:
        conn.declare_topic_consumer(
        exchange_name=self._get_exchange(target),
        topic='%s.%s' % (target.topic, priority),
        callback=listener, queue_name=pool)
        return base.PollStyleListenerAdapter(listener, batch_size,
        batch_timeout)

        这段代码主要是为了在要监听的队列上创建消费者进行监听,并初始化PollStyleListenerAdapter类实例并返回

        PollStyleListenerAdapter类对象初始化时会生成一个线程对象:

        File:oslo_messaging/_drivers/base.py:PollStyleListenerAdapter.init

        self._listen_thread = threading.Thread(target=self.____runner)

        然后调用start后就会生成一个线程运行___runner函数,该函数主要功能是不断的去获取消息,并通过调用_process_incoming函数来处理消息

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        class PollStyleListenerAdapter(Listener):
        """A Listener that uses a PollStyleListener for message transfer. A
        dedicated thread is created to do message polling.
        """

        def __init__(self, poll_style_listener, batch_size, batch_timeout):
        super(PollStyleListenerAdapter, self).__init__(
        batch_size, batch_timeout, poll_style_listener.prefetch_size
        )
        self._poll_style_listener = poll_style_listener
        self._listen_thread = threading.Thread(target=self._runner)
        self._listen_thread.daemon = True
        self._started = False

        def start(self, on_incoming_callback):
        super(PollStyleListenerAdapter, self).start(on_incoming_callback)
        self._started = True
        self._listen_thread.start()

        @excutils.forever_retry_uncaught_exceptions
        def _runner(self):
        while self._started:
        incoming = self._poll_style_listener.poll(
        batch_size=self.batch_size, batch_timeout=self.batch_timeout)

        if incoming:
        self.on_incoming_callback(incoming)

        # listener is stopped but we need to process all already consumed
        # messages
        while True:
        incoming = self._poll_style_listener.poll(
        batch_size=self.batch_size, batch_timeout=self.batch_timeout)

        if not incoming:
        return
        self.on_incoming_callback(incoming)

        我们看到在MessageHandlingServer的 start 方法中的最后一行是 self.listener.start(self._on_incoming) 这里调用的就是上述代码中的 start,启动之后调用 runner,然后通过poll 不断从队列中取出数据,取出之后用self.on_incoming_callback(incoming)处理,即MessageHandlingServer中的self.__on_incoming处理

        1
        2
        3
        4
        5
        6
        def _on_incoming(self, incoming):
        """Handles on_incoming event

        :param incoming: incoming request.
        """
        self._work_executor.submit(self._process_incoming, incoming)

        我们看到这边实际使用的是self._process_incoming来处理,即采用BatchNotificationServer类中的self.__process_incoming处理

        1
        2
        3
        4
        5
        6
        7
        class BatchNotificationServer(NotificationServerBase):

        def _process_incoming(self, incoming):
        try:
        not_processed_messages = self.dispatcher.dispatch(incoming)
        except Exception:
        ......

        可以看到该处理函数会调用dispatcher对象来分派消息,这里的self.dispatcher,就是之前使用NotificationDispatcher初始化后传过来的参数,即调用

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        class BatchNotificationDispatcher(NotificationDispatcher):
        """A message dispatcher which understands Notification messages.

        A MessageHandlingServer is constructed by passing a callable dispatcher
        which is invoked with a list of message dictionaries each time 'batch_size'
        messages are received or 'batch_timeout' seconds is reached.
        """

        def dispatch(self, incoming):
        """Dispatch notification messages to the appropriate endpoint method.
        """

        messages_grouped = itertools.groupby(sorted(
        (self._extract_user_message(m) for m in incoming),
        key=operator.itemgetter(0)), operator.itemgetter(0))

        requeues = set()
        for priority, messages in messages_grouped:
        __, raw_messages, messages = six.moves.zip(*messages)
        if priority not in PRIORITIES:
        LOG.warning('Unknown priority "%s"', priority)
        continue
        for screen, callback in self._callbacks_by_priority.get(priority,
        []):
        if screen:
        filtered_messages = [message for message in messages
        if screen.match(
        message["ctxt"],
        message["publisher_id"],
        message["event_type"],
        message["metadata"],
        message["payload"])]
        else:
        filtered_messages = list(messages)

        if not filtered_messages:
        continue

        ret = self._exec_callback(callback, filtered_messages)
        if ret == NotificationResult.REQUEUE:
        requeues.update(raw_messages)
        break
        return requeues

        def _exec_callback(self, callback, messages):
        try:
        return callback(messages)
        except Exception:
        LOG.exception("Callback raised an exception.")
        return NotificationResult.REQUEUE

        这里重点看通过消息的priority字段查找self._callbacks_by_priority字典里匹配的插件的对应方法,也即是获取到callback函数,然后进行调用。比如notifications.sample上的消息会匹配到TelemetryIpc类的sample方法去处理

        1
        _sample = ceilometer.telemetry.notifications:TelemetryIpc

        但该类实例是调用了它父类的sample方法:ceilometer/agent/plugin_base.py:NotificationBase.sample

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        def sample(self, notifications):
        self._process_notifications('sample', notifications)

        def _process_notifications(self, priority, notifications):
        for notification in notifications:
        try:
        notification = messaging.convert_to_old_notification_format(
        priority, notification)
        self.to_samples_and_publish(notification)
        except Exception:
        LOG.error(_LE('Fail to process notification'), exc_info=True)

        def to_samples_and_publish(self, notification):
        with self.manager.publisher() as p:
        p(list(self.process_notification(notification)))

        可以看到这边以次往下调用_process_notifications–>to_samples_and_publish,这边比较重要的是第 14 行和第 15 行,第14 行调用了self.manager.publisher(),此处的 manager 是在插件初始化的时候传进来的,是PipelineManager类的实例,所以看下PipelineManager.publisher

        1
        2
        def publisher(self):
        return PublishContext(self.pipelines)

        参数self.pipelines是在init的时候生成的,看下PublishContext

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        class PublishContext(object):

        def __init__(self, pipelines=None):
        pipelines = pipelines or []
        self.pipelines = set(pipelines)

        def add_pipelines(self, pipelines):
        self.pipelines.update(pipelines)

        def __enter__(self):
        def p(data):
        for p in self.pipelines:
        p.publish_data(data)
        return p

        def __exit__(self, exc_type, exc_value, traceback):
        for p in self.pipelines:
        p.flush()

        因为这边使用 with 实现,所以看下 enter 方法,enter 返回的值与 as 后面的值绑定,data 参数就是list(self.process_notification(notification)),这里的 p 有两种类型,SamplePipeline和EventPipeline

        以SamplePipeline为例

        1
        2
        3
        4
        5
        6
        def publish_data(self, samples):
        if not isinstance(samples, list):
        samples = [samples]
        supported = [s for s in samples if self.source.support_meter(s.name)
        and self._validate_volume(s)]
        self.sink.publish_samples(supported)

        调用SampleSink的publish_samples方法

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        def _transform_sample(self, start, sample):
        try:
        for transformer in self.transformers[start:]:
        # 这里就是调用了RateOfChangeTransformer的handle_sample方法进行处理
        sample = transformer.handle_sample(sample)
        if not sample:
        LOG.debug(
        "Pipeline %(pipeline)s: Sample dropped by "
        "transformer %(trans)s", {'pipeline': self,
        'trans': transformer})
        return
        return sample
        except Exception as err:
        # TODO(gordc): only use one log level.
        LOG.warning(_("Pipeline %(pipeline)s: "
        "Exit after error from transformer "
        "%(trans)s for %(smp)s") % ({'pipeline': self,
        'trans': transformer,
        'smp': sample}))
        LOG.exception(err)

        def _publish_samples(self, start, samples):
        """Push samples into pipeline for publishing.

        :param start: The first transformer that the sample will be injected.
        This is mainly for flush() invocation that transformer
        may emit samples.
        :param samples: Sample list.

        """

        transformed_samples = []
        if not self.transformers:
        transformed_samples = samples
        else:
        for sample in samples:
        LOG.debug(
        "Pipeline %(pipeline)s: Transform sample "
        "%(smp)s from %(trans)s transformer", {'pipeline': self,
        'smp': sample,
        'trans': start})
        sample = self._transform_sample(start, sample)
        if sample:
        transformed_samples.append(sample)

        if transformed_samples:
        for p in self.publishers:
        try:
        p.publish_samples(transformed_samples)
        except Exception:
        LOG.exception(_(
        "Pipeline %(pipeline)s: Continue after error "
        "from publisher %(pub)s") % ({'pipeline': self,
        'pub': p}))

        def publish_samples(self, samples):
        self._publish_samples(0, samples)

        可以看到self._publish_samples方法被调用,在该方法中组装了transformed_samples之后,遍历self.publishers的publish_samples方法来处理数据,之前我们看到 publisher 里面放的是 notifier 插件,那么即调用SampleNotifierPublisher类,此类继承NotifierPublisher–>MessagingPublisher,即调用到MessagingPublisher的publish_samples方法

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
         def publish_samples(self, samples):
        """Publish samples on RPC.

        :param samples: Samples from pipeline after transformation.

        """

        meters = [
        utils.meter_message_from_counter(
        sample, cfg.CONF.publisher.telemetry_secret)
        for sample in samples
        ]
        topic = cfg.CONF.publisher_notifier.metering_topic
        self.local_queue.append((topic, meters))

        if self.per_meter_topic:
        for meter_name, meter_list in itertools.groupby(
        sorted(meters, key=operator.itemgetter('counter_name')),
        operator.itemgetter('counter_name')):
        meter_list = list(meter_list)
        topic_name = topic + '.' + meter_name
        LOG.debug('Publishing %(m)d samples on %(n)s',
        {'m': len(meter_list), 'n': topic_name})
        self.local_queue.append((topic_name, meter_list))

        self.flush()

        第 13 行的 topic 是 metering,最后执行了 flush,然后执行了_process_queue,其中有个send方法,将数据发出去

        1
        2
        3
        4
        5
        6
        def _send(self, event_type, data):
        try:
        self.notifier.sample({}, event_type=event_type,
        payload=data)
        except oslo_messaging.MessageDeliveryFailure as e:
        raise_delivery_failure(e)

        发到了 metering.sample 队列,之后的消息发送流程便和polling-compute 服务一样了,具体请见:polling-compute

参考:
https://www.cnblogs.com/luohaixian/p/11145939.html

-------------本文结束 感谢您的阅读-------------
作者Magiceses
有问题请 留言 或者私信我的 微博
满分是10分的话,这篇文章你给几分