ceilometer-polling源码分析

实力的来源不是胜利。唯有奋斗才能增强实力。当你历经苦难而不气馁,那就是实力。

以社区 N 版代码为例

一、启动命令

1
exec ceilometer-polling --polling-namespaces compute --config-file /etc/ceilometer/ceilometer.conf

二、代码入口

ceilometer代码使用setuptools的pbr管理,该部分知识请见:

Python打包之setuptools

Openstack中setuptools和pbr软件打包管理

入口在:/ceilometer/cmd/polling.py的main函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def create_polling_service(worker_id):
return manager.AgentManager(CONF.polling_namespaces,
CONF.pollster_list,
worker_id)


def main():
service.prepare_service()
sm = cotyledon.ServiceManager()
sm.add(create_polling_service)
sm.run()

'''
关键:
1 cotyledon.Service(worker_id)
作用: 创建一个新的服务
参数: worker_id (int) – service实例的标示符
2 ServiceManager()
2.1 作用
类似于主进程,管理服务的生命周期。
控制子进程的生命周期,如果子进程意外死亡就重启他们。
每一个子进程ServiceWorker运行在一个服务的实例上。
一个应用必须创建一个ServiceManager类并且使用
ServiceManager.run()做为和应用的主循环
样例:
class cotyledon.ServiceManager(wait_interval=0.01, graceful_shutdown_timeout=60)

2.2 cotyledon.ServiceManager.add
cotyledon.ServiceManager.add(service, workers=1, args=None, kwargs=None)
作用: 创建一个子进程来运行AgentService服务
参数:
service (callable) – callable that return an instance of Service
workers (int) – number of processes/workers for this service
args (tuple) – additional positional arguments for this service
kwargs (dict) – additional keywoard arguments for this service
Returns:
a service id

2.3 cotyledon.ServiceManager.run()
开启并监督服务工作者
这个方法将会开启和监督所有子进程,直到主进程被关闭了


参考:
https://blog.csdn.net/qingyuanluofeng/article/details/95533476
'''

该部分使用cotyledon的多进程框架实现

实例化子进程调用create_polling_service方法,该方法又调用AgentManager类,传入三个参数,分别是

  • CONF.polling_namespaces # 即是你在命令行中传入的参数,compute或者是central,分别代表两个服务
  • CONF.pollster_list # 默认为[]
  • worker_id # cotyledon中服务实例的标识符

compute服务即从此处开始,调用AgentManager的 init方法初始化,然后调用run方法进行数据的采集

让我们来看看AgentManager里面是什么

三、初始化AgentManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class AgentManager(service_base.PipelineBasedService):

def __init__(self, namespaces=None, pollster_list=None, worker_id=0):
namespaces = namespaces or ['compute', 'central']
pollster_list = pollster_list or []
group_prefix = cfg.CONF.polling.partitioning_group_prefix
self._inspector = virt_inspector.get_hypervisor_inspector()
self.nv = nova_client.Client()

# features of using coordination and pollster-list are exclusive, and
# cannot be used at one moment to avoid both samples duplication and
# samples being lost
......
# we'll have default ['compute', 'central'] here if no namespaces will
# be passed
'''
(Pdb) p self.extensions[0].__dict__
{
'obj': < ceilometer.compute.pollsters.cpu.CPUL3CachePollster object at 0x7f392837acd0 > ,
'entry_point': EntryPoint.parse('cpu_l3_cache = ceilometer.compute.pollsters.cpu:CPUL3CachePollster'),
'name': 'cpu_l3_cache',
'plugin': < class 'ceilometer.compute.pollsters.cpu.CPUL3CachePollster' >
}
(Pdb) p self.extensions[1].__dict__
{
'obj': < ceilometer.compute.pollsters.disk.WriteRequestsRatePollster object at 0x7f3928385650 > ,
'entry_point': EntryPoint.parse('disk.write.requests.rate = ceilometer.compute.pollsters.disk:WriteRequestsRatePollster'),
'name': 'disk.write.requests.rate',
'plugin': < class 'ceilometer.compute.pollsters.disk.WriteRequestsRatePollster' >
}
(Pdb) p self.extensions[2].__dict__
{
'obj': < ceilometer.compute.pollsters.disk.DisksTotalPoller object at 0x7f39283973d0 > ,
'entry_point': EntryPoint.parse('disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller'),
'name': 'disks.total',
'plugin': < class 'ceilometer.compute.pollsters.disk.DisksTotalPoller' >
}

'''
extensions = (self._extensions('poll', namespace).extensions
for namespace in namespaces)
# get the extensions from pollster builder
extensions_fb = (self._extensions_from_builder('poll', namespace)
for namespace in namespaces)
if pollster_list:
extensions = (moves.filter(_match, exts)
for exts in extensions)
extensions_fb = (moves.filter(_match, exts)
for exts in extensions_fb)

self.extensions = list(itertools.chain(*list(extensions))) + list(
itertools.chain(*list(extensions_fb)))

if self.extensions == []:
raise EmptyPollstersList()

"""
(Pdb) p self.discoveries[0].__dict__
{
'obj': < ceilometer.compute.discovery.InstanceDiscovery object at 0x7f392864d390 > ,
'entry_point': EntryPoint.parse('local_instances = ceilometer.compute.discovery:InstanceDiscovery'),
'name': 'local_instances',
'plugin': < class 'ceilometer.compute.discovery.InstanceDiscovery' >
}
"""
discoveries = (self._extensions('discover', namespace).extensions
for namespace in namespaces)
self.discoveries = list(itertools.chain(*list(discoveries)))
self.polling_periodics = None

"""
(Pdb) p self.heartbeat_timer.__dict__
{
'_dead': < threading._Event object at 0x7f3928654c10 > ,
'_waiter': < Condition( < _RLock owner = None count = 0 > , 0) > ,
'_now_func': < function monotonic at 0x18d51b8 > ,
'_schedule': < futurist.periodics._Schedule object at 0x7f3928654d10 > ,
'_active': < threading._Event object at 0x7f3928654c90 > ,
'_initial_schedule_strategy': < function _now_plus_periodicity at 0x1bccc80 > ,
'_watchers': [({
'successes': 0,
'failures': 0,
'runs': 0,
'elapsed': 0,
'elapsed_waiting': 0
}, < Watcher object at 0x7f3928654d90(runs = 0, successes = 0, failures = 0, elapsed = 0.00, elapsed_waiting = 0.00) > )],
'_on_failure': < functools.partial object at 0x7f3928649b50 > ,
'_executor_factory': < function < lambda > at 0x7f39283809b0 > ,
'_schedule_strategy': < function _last_started_strategy at 0x1bccb90 > ,
'_log': < logging.Logger object at 0x1bbfa90 > ,
'_callables': [( < function < lambda > at 0x7f39286581b8 > , 'ceilometer.utils.<lambda>', (), {})],
'_cond_cls': < function Condition at 0x7f39364b2938 > ,
'_tombstone': < threading._Event object at 0x7f3928654b10 > ,
'_immediates': deque([0])
}
(Pdb) p self.partition_coordinator.__dict__
{'_groups': set([]), '_my_id': 'c6ee6425-1bac-4e9c-a026-2022b133c825', '_coordinator': None}
"""
self.partition_coordinator = coordination.PartitionCoordinator()
self.heartbeat_timer = utils.create_periodic(
target=self.partition_coordinator.heartbeat,
spacing=cfg.CONF.coordination.heartbeat,
run_immediately=True)

# Compose coordination group prefix.
# We'll use namespaces as the basement for this partitioning.
namespace_prefix = '-'.join(sorted(namespaces))
self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix)
if group_prefix else namespace_prefix)

"""
(Pdb) p self.notifier.__dict__
{
'_serializer': < oslo_messaging.serializer.NoOpSerializer object at 0x7f1cc9aa3710 > ,
'_driver_mgr': < stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890 > ,
'retry': -1,
'_driver_names': ['messagingv2'],
'_topics': ['notifications'],
'publisher_id': 'ceilometer.polling',
'transport': < oslo_messaging.transport.NotificationTransport object at 0x7f1cc9b26550 >
}

(Pdb) p self.notifier._driver_mgr
<stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890>

(Pdb) p self.notifier._driver_mgr.__dict__
{
'_names': ['messagingv2'],
'namespace': 'oslo.messaging.notify.drivers',
'_on_load_failure_callback': None,
'extensions': [ < stevedore.extension.Extension object at 0x7fddfc80e990 > ],
'propagate_map_exceptions': False,
'_extensions_by_name': None,
'_name_order': False,
'_missing_names': set([])
}

(Pdb) p self.notifier._driver_mgr.__dict__.get('extensions')[0].__dict__
{
'obj': < oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7fddfc80e750 > ,
'entry_point': EntryPoint.parse('messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver'),
'name': 'messagingv2',
'plugin': < class 'oslo_messaging.notify.messaging.MessagingV2Driver' >
}

"""
self.notifier = oslo_messaging.Notifier(
messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id="ceilometer.polling")

self._keystone = None
self._keystone_last_exception = None

以上的代码中的一些参数我都用pdb打印出来了,可以更加直观的感受,我们来逐行分析一下这个初始化的过程:

  1. 前5行应该不必多说,基本的参数传递,init的参数已经在代码入口处介绍过

  2. 第6行是为了实现工作负载分区协调

  3. 第7行调用get_hypervisor_inspector方法来加载libvirt驱动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    def get_hypervisor_inspector():
    try:
    namespace = 'ceilometer.compute.virt'
    mgr = driver.DriverManager(namespace,
    cfg.CONF.hypervisor_inspector,
    invoke_on_load=True)
    return mgr.driver
    except ImportError as e:
    LOG.error(_("Unable to load the hypervisor inspector: %s") % e)
    return Inspector()

    这里使用的是stevedore库,在该篇文章不过多介绍,具体请见:Python插件之stevedore

  4. 第8行初始化了个nove的client

  5. 第40-69行主要就是在加载插件了,加载插件的方式也是使用stevedore库,其中40和43行的namespace就是你传进来的方法,我们来看下_extensions的实现

    1
    2
    3
    4
    def _extensions(self, category, agent_ns=None):
    namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
    else 'ceilometer.%s' % category)
    return self._get_ext_mgr(namespace)

    这里主要就是加载在ceilometer.poll.compute命名空间中的插件(如果开启的是central服务,即加载的是ceilometer.poll.central命名空间中的插件),插件在setup.cfg文件中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    ceilometer.poll.compute =
    disks.util = ceilometer.compute.pollsters.disk:DisksUtilPoller
    disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller
    disks.used = ceilometer.compute.pollsters.disk:DisksUsedPoller
    ...

    ceilometer.poll.central =
    account = ceilometer.chakra.chakra:AccountPollster
    ip.floating = ceilometer.network.floatingip:FloatingIPPollster
    image = ceilometer.image.glance:ImagePollster
    ...

    加载后的extension对象是这样的,是一个存储了在命名空间在所有插件对象的列表:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    (Pdb) p self.extensions[0].__dict__
    {
    'obj': < ceilometer.compute.pollsters.cpu.CPUL3CachePollster object at 0x7f392837acd0 > ,
    'entry_point': EntryPoint.parse('cpu_l3_cache = ceilometer.compute.pollsters.cpu:CPUL3CachePollster'),
    'name': 'cpu_l3_cache',
    'plugin': < class 'ceilometer.compute.pollsters.cpu.CPUL3CachePollster' >
    }
    (Pdb) p self.extensions[1].__dict__
    {
    'obj': < ceilometer.compute.pollsters.disk.WriteRequestsRatePollster object at 0x7f3928385650 > ,
    'entry_point': EntryPoint.parse('disk.write.requests.rate = ceilometer.compute.pollsters.disk:WriteRequestsRatePollster'),
    'name': 'disk.write.requests.rate',
    'plugin': < class 'ceilometer.compute.pollsters.disk.WriteRequestsRatePollster' >
    }
    (Pdb) p self.extensions[2].__dict__
    {
    'obj': < ceilometer.compute.pollsters.disk.DisksTotalPoller object at 0x7f39283973d0 > ,
    'entry_point': EntryPoint.parse('disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller'),
    'name': 'disks.total',
    'plugin': < class 'ceilometer.compute.pollsters.disk.DisksTotalPoller' >
    }

    43-69行的加载插件的原理同上,不赘述

  6. 第99-103行是为了获取一个工作负载分区协调类实例,用来协调多个采集程序worker时的分工处理,每个对象的实例在代码中已经打印,有兴趣的可以了解下,这里使用的是tooz库的一些用法,简单介绍下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    1 Tooz基础

    作用:
    1) 提供分布式协调API来管理群组和群组中的成员
    2) 提供分布式锁从而允许分布式节点获取和释放锁来实现同步
    解决的问题: 多个分布式进程同步问题

    2 Tooz架构

    本质: Tooz是zookeeper, Raft consensus algorithm, Redis等方案的抽象,
    通过驱动(driver)形式来提供后端功能
    驱动分类:
    zookeeper, Zake, memcached, redis,
    SysV IPC(只提供分布式锁功能), PostgreSQL(只提供分布式锁功能), MySQL(只提供分布式锁功能)
    驱动特点:
    所有驱动都支持分布式进程, Tooz API完全异步,更高校。

    3 Tooz功能

    3.1 群组管理

    管理群组成员。
    操作: 群组创建,加入群组,离开群组,查看群组成员,有成员加入或离开群组时通知的功能
    应用场景:
    ceilometer-notification服务利用群组管理实现负载均衡和真正意义上的服务水平扩展。

    3.2 领导选取

    每个群组都有领导,所有节点可决定是否参与选举;
    领导消失则选取新领导节点;
    领导被选取其他成员可能得到通知;
    各节点可随时获取当前组的领导。
    感悟:
    考虑可以使用tooz实现自己的leader选举算法和服务高可用。

    3.3 分布式锁

    应用场景:
    原来ceilometer中通过RPC检测alarm evaluator进程是否存活。
    后来ceilometer通过Tooz群组管理来协调多个alarm evaluator进程。
    应用场景2:
    gnocchi中利用分布式锁操作监控项与监控数据

    参考:https://blog.csdn.net/qingyuanluofeng/article/details/90349185
  7. 第154-157比较重要,这里是初始化了个oslo_messaging.Notifier的实例,用来发送采集之后的数据到指定的队列notifications.sample,基本解释如下

    1
    2
    3
    4
    5
    6
    7
    """       
    self.notifier.topics = ['notifications']
    self.notifier._driver_names = 'messagingv2'(定义在ceilometer/publisher/messaging.py 中的 telemetry_driver这个变量),代表的是ceilometer向消息队列发送消息时使用的驱动类型
    self.notifier.publisher_id = 'ceilometer.polling'
    self.notifier._driver_mgr 是从oslo.messaging.notify.drivers 中加载名称为messagingv2对应的插件
    self.notifier.transport 是从oslo.messaging.drivers 中加载名称为 rabbit对应的插件。因为ceilometer.conf 中 transport_url定义是:transport_url=rabbit://guest:guest@192.168.2.120:5672/
    """
  8. 此时就初始化完毕了,下面就是采集数据了,执行的是sm.run()代码,则是调用到了AgentManager类实例的run方法

四、采集数据

入口:ceilometer/agent/manager.py:AgentManager.run

1
2
3
4
5
6
7
def run(self):
super(AgentManager, self).run()

self.polling_manager = pipeline.setup_polling()
self.join_partitioning_groups()
self.start_polling_tasks()
self.init_pipeline_refresh()

run方法看起来比较简单,我们来分析一下:

  1. 第4行调用了setup_polling的方法

    1
    2
    3
    4
    def setup_polling():
    """Setup polling manager according to yaml config file."""
    cfg_file = cfg.CONF.pipeline_cfg_file
    return PollingManager(cfg_file)

    这里面是为了加载配置文件pipeline.yaml文件中的你所配置的采集项,我们来看下这里面发生了什么:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    class PollingManager(ConfigManagerBase):
    """Polling Manager

    Polling manager sets up polling according to config file.
    """

    def __init__(self, cfg_info):
    """Setup the polling according to config.

    The configuration is the sources half of the Pipeline Config.
    """
    super(PollingManager, self).__init__()
    # 加载配置文件中内容
    cfg = self.load_config(cfg_info)
    self.sources = []
    if not ('sources' in cfg and 'sinks' in cfg):
    raise PipelineException("Both sources & sinks are required",
    cfg)
    LOG.info(_LI('detected decoupled pipeline config format'))

    unique_names = set()
    # 初始化并记录配置文件中的 sources 项
    for s in cfg.get('sources'):
    name = s.get('name')
    if name in unique_names:
    raise PipelineException("Duplicated source names: %s" %
    name, self)
    else:
    unique_names.add(name)
    # 将配置文件中的数据皆放进 self.sources 中
    self.sources.append(SampleSource(s))
    unique_names.clear()

    初始化好了之后,返回一个 self.polling_manager 的对象, 我们来看下这个对象中存的东西:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    """

    (Pdb) p self.polling_manager.__dict__

    {'cfg_mtime': 1592535895.477891, 'cfg_hash': 'ba1311798d634022e4b684b72bf7b42a', 'cfg_loc': '/etc/ceilometer/pipeline.yaml', 'sources': [<ceilometer.pipeline.SampleSource object at 0x7f0eec042110>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0423d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042050>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0422d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042290>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042390>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042350>]}

    (Pdb) p self.polling_manager.sources

    [<ceilometer.pipeline.SampleSource object at 0x7f0eec042110>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0423d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042050>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0422d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042290>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042390>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042350>]

    (Pdb) p self.polling_manager.sources[0].__dict__

    {
    'name': 'notification_source',
    'cfg': {
    'interval': 3600,
    'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
    'name': 'notification_source',
    'sinks': ['notification_sink']
    },
    'interval': 3600,
    'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
    'discovery': [],
    'resources': [],
    'sinks': ['notification_sink']
    }

    (Pdb) p self.polling_manager.sources[1].__dict__

    {
    'name': 'meter_source',
    'cfg': {
    'interval': 300,
    'meters': ['poll.*', 'memory.usage', 'memory.util'],
    'name': 'meter_source',
    'sinks': ['meter_sink']
    },
    'interval': 300,
    'meters': ['poll.*', 'memory.usage', 'memory.util'],
    'discovery': [],
    'resources': [],
    'sinks': ['meter_sink']
    }

    """

    可以看到,这行代码的意义就是为了加载在配置文件的定义的采集项

  2. 第 6 行调用 start_polling_tasks 方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    def start_polling_tasks(self):
    # allow time for coordination if necessary
    delay_start = self.partition_coordinator.is_active()

    # set shuffle time before polling task if necessary
    delay_polling_time = random.randint(
    0, cfg.CONF.shuffle_time_before_polling_task)

    data = self.setup_polling_tasks()

    # One thread per polling tasks is enough
    # 按照时间间隔创建线程池,相同时间间隔创建一个线程
    self.polling_periodics = periodics.PeriodicWorker.create(
    [], executor_factory=lambda:
    futures.ThreadPoolExecutor(max_workers=len(data)))

    for interval, polling_task in data.items():
    delay_time = (interval + delay_polling_time if delay_start
    else delay_polling_time)

    @periodics.periodic(spacing=interval, run_immediately=False)
    def task(running_task):
    self.interval_task(running_task)

    utils.spawn_thread(utils.delayed, delay_time,
    self.polling_periodics.add, task, polling_task)

    if data:
    # Don't start useless threads if no task will run
    utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
    1. 前 8 行不用说,就是简单的参数定义,重要的是第 9 行之后的代码,分析一下

    2. 第9 行调用了 setup_polling_tasks 的方法:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      def setup_polling_tasks(self):
      """
      (Pdb) p self.polling_manager.sources[1].__dict__
      {
      'name': 'meter_source',
      'cfg': {
      'interval': 300,
      'meters': ['poll.*', 'memory.usage', 'memory.util'],
      'name': 'meter_source',
      'sinks': ['meter_sink']
      },
      'interval': 300,
      'meters': ['poll.*', 'memory.usage', 'memory.util'],
      'discovery': [],
      'resources': [],
      'sinks': ['meter_sink']
      }

      (Pdb) p self.extensions[0].__dict__
      {
      'obj': < ceilometer.compute.pollsters.memory.MemoryUtilizationPollster object at 0x7ff12062e590 > ,
      'entry_point': EntryPoint.parse('memory.util = ceilometer.compute.pollsters.memory:MemoryUtilizationPollster'),
      'name': 'memory.util',
      'plugin': < class 'ceilometer.compute.pollsters.memory.MemoryUtilizationPollster' >
      }
      """
      polling_tasks = {}
      for source in self.polling_manager.sources:
      polling_task = None
      for pollster in self.extensions:
      # 将 souce 中的 meters 与 extensions 中的每个对象的 name 进行匹配,如果一样则以采集周期为 key,以 PollingTask 对象为 value,加入到 polling_tasks中
      if source.support_meter(pollster.name):
      polling_task = polling_tasks.get(source.get_interval())
      if not polling_task:
      polling_task = self.create_polling_task()
      polling_tasks[source.get_interval()] = polling_task
      polling_task.add(pollster, source)
      return polling_tasks

      这里是遍历self.polling_manager.sources中的source,同时遍历self.extensions,查看如果对应插件在对应source的meters列表中,代表该插件需要定期执行,获取该插件定义的执行周期,将其加入到polling_tasks中

      我们来看下 polling_tasks 中的 value 对象,其实就是初始化了一些值,然后调用 add 方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      class PollingTask(object):
      """Polling task for polling samples and notifying.

      A polling task can be invoked periodically or only once.
      """

      def __init__(self, agent_manager):
      self.manager = agent_manager

      # elements of the Cartesian product of sources X pollsters
      # with a common interval
      self.pollster_matches = collections.defaultdict(set)

      # we relate the static resources and per-source discovery to
      # each combination of pollster and matching source
      resource_factory = lambda: Resources(agent_manager)
      self.resources = collections.defaultdict(resource_factory)

      self._batch = cfg.CONF.batch_polled_samples
      self._telemetry_secret = cfg.CONF.publisher.telemetry_secret

      def add(self, pollster, source):
      self.pollster_matches[source.name].add(pollster)
      key = Resources.key(source.name, pollster)
      self.resources[key].setup(source)

      即最终返回后的 data 是:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      (Pdb) p data
      {
      3600: < ceilometer.agent.manager.PollingTask object at 0x7fe6a4043bd0 > ,
      300: < ceilometer.agent.manager.PollingTask object at 0x7fe6a4043d10 >
      }
      (Pdb) p data[3600].__dict__
      {
      'manager': < ceilometer.agent.manager.AgentManager object at 0x3230b50 > ,
      '_telemetry_secret': 'change this for valid signing',
      'pollster_matches': defaultdict( < type 'set' > , {
      'notification_source': set([ < stevedore.extension.Extension object at 0x7fe6b062ec50 > ])
      }),
      'resources': defaultdict( < function < lambda > at 0x7fe6a40319b0 > , {
      'notification_source-instance': < ceilometer.agent.manager.Resources object at 0x7fe6a4043b90 >
      }),
      '_batch': True
      }
    3. 第 13 到 30 行即是按照时间间隔创建线程池,相同时间间隔创建一个线程,这里用到了futurist 库(这里不做介绍),实现了一个周期任务,每隔配置文件中定义的采样周期调用 self.interval_task,而 self.interval_task 又调用了 task.poll_and_notify(),下面看下task.poll_and_notify()的实现:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      def poll_and_notify(self):
      """Polling sample and notify."""
      cache = {}
      discovery_cache = {}
      poll_history = {}
      """
      (Pdb) self.pollster_matches
      defaultdict( < type 'set' > , {
      'meter_source': set([ < stevedore.extension.Extension object at 0x7f9a645de8d0 > , < stevedore.extension.Extension object at 0x7f9a645d40d0 > ]),
      'cpu_source': set([ < stevedore.extension.Extension object at 0x7f9a645d4cd0 > ]),
      'cpu_util_source': set([ < stevedore.extension.Extension object at 0x7f9a645dec10 > ]),
      'disk_source': set([ < stevedore.extension.Extension object at 0x7f9a64399c90 > , < stevedore.extension.Extension object at 0x7f9a645d4ad0 > , < stevedore.extension.Extension object at 0x7f9a645d49d0 > , < stevedore.extension.Extension object at 0x7f9a645d4950 > , < stevedore.extension.Extension object at 0x7f9a645dea90 > , < stevedore.extension.Extension object at 0x7f9a64389fd0 > , < stevedore.extension.Extension object at 0x7f9a645d43d0 > , < stevedore.extension.Extension object at 0x7f9a645d4d50 > ]),
      'network_source': set([ < stevedore.extension.Extension object at 0x7f9a645def10 > , < stevedore.extension.Extension object at 0x7f9a645d4650 > , < stevedore.extension.Extension object at 0x7f9a645d4450 > , < stevedore.extension.Extension object at 0x7f9a645d4550 > ]),
      'volume_source': set([ < stevedore.extension.Extension object at 0x7f9a645d4b50 > , < stevedore.extension.Extension object at 0x7f9a645d4c50 > ])
      })
      """
      # self.pollster_matches 中存的是以你在配置文件中定义的 source name 为 key,以在 source name 中定义的每个采集插件的对象的集合为 value
      # Resources中的 key 是 source.name + 插件的名字
      for source_name in self.pollster_matches:
      for pollster in self.pollster_matches[source_name]:

      """
      first
      (Pdb) p key
      'meter_source-memory.util'
      (Pdb) p candidate_res
      []
      (Pdb) p pollster.obj.default_discovery
      'local_instances'
      """
      key = Resources.key(source_name, pollster)
      candidate_res = list(
      self.resources[key].get(discovery_cache))
      if not candidate_res and pollster.obj.default_discovery:
      candidate_res = self.manager.discover(
      [pollster.obj.default_discovery], discovery_cache)
      """
      (Pdb) p candidate_res
      [<Server: jy-2>]
      """

      # Remove duplicated resources and black resources. Using
      # set() requires well defined __hash__ for each resource.
      # Since __eq__ is defined, 'not in' is safe here.
      polling_resources = []
      black_res = self.resources[key].blacklist
      history = poll_history.get(pollster.name, [])
      for x in candidate_res:
      if x not in history:
      history.append(x)
      if x not in black_res:
      polling_resources.append(x)
      """
      first
      (Pdb) p poll_history
      {'memory.util': [<Server: jy-2>]}
      second
      (Pdb) p poll_history
      {'memory.util': [<Server: jy-2>], 'memory.usage': [<Server: jy-2>]}
      """
      poll_history[pollster.name] = history

      # If no resources, skip for this pollster
      if not polling_resources:
      p_context = 'new ' if history else ''
      LOG.info(_("Skip pollster %(name)s, no %(p_context)s"
      "resources found this cycle"),
      {'name': pollster.name, 'p_context': p_context})
      continue

      LOG.info(_("Polling pollster %(poll)s in the context of "
      "%(src)s"),
      dict(poll=pollster.name, src=source_name))
      try:
      polling_timestamp = timeutils.utcnow().isoformat()
      samples = pollster.obj.get_samples(
      manager=self.manager,
      cache=cache,
      resources=polling_resources
      )
      sample_batch = []

      # filter None in samples
      """
      first
      (Pdb) samples
      [<name: memory.util, volume: 18.58, resource_id: 32eb16e0-c8af-4b41-b18c-b63152f0f8fc, timestamp: None>]
      second
      (Pdb) samples
      [<name: memory.usage, volume: 91, resource_id: 32eb16e0-c8af-4b41-b18c-b63152f0f8fc, timestamp: None>]
      """
      samples = [s for s in samples if s is not None]

      for sample in samples:
      # Note(yuywz): Unify the timestamp of polled samples
      sample.set_timestamp(polling_timestamp)
      """
      (Pdb) sample_dict
      {
      'counter_name': 'memory.util',
      'resource_id': u '32eb16e0-c8af-4b41-b18c-b63152f0f8fc',
      'timestamp': '2020-06-19T06:21:32.542893',
      'counter_volume': 18.58,
      'user_id': u '7431e07e49de2703f1b9e703daf5aff158e32028506f77e05a62e9eb3892dcde',
      'message_signature': '76e6cbd944963855306b829cdf2c49bba9f0b20220c8c738b61b5048be51b24f',
      'resource_metadata': {
      'status': u 'active',
      'ephemeral_gb': 0,
      'disk_gb': 0,
      'instance_host': u 'node-3.domain.tld',
      'kernel_id': None,
      'image': None,
      'ramdisk_id': None,
      'host': u '619a474e64dbedcd55508bda51aea8a611fd15f3f3e8fa39ce0d0552',
      'flavor': {
      'name': u '1C-0.5G',
      u 'links': [{
      u 'href': u 'http://nova-api.openstack.svc.cluster.local:8774/ca80e5ccd445438580c4b128296d1936/flavors/211',
      u 'rel': u 'bookmark'
      }],
      'ram': 512,
      'ephemeral': 0,
      'vcpus': 1,
      'disk': 0,
      u 'id': u '211'
      },
      'task_state': None,
      'image_ref_url': None,
      'memory_mb': 512,
      'root_gb': 0,
      'display_name': u 'jy-2',
      'name': u 'instance-00000002',
      'vcpus': 1,
      'instance_id': u '32eb16e0-c8af-4b41-b18c-b63152f0f8fc',
      'instance_type': u '1C-0.5G',
      'state': u 'active',
      'image_ref': None,
      'OS-EXT-AZ:availability_zone': u 'default-az'
      },
      'source': 'openstack',
      'counter_unit': '%',
      'project_id': u '34ecb9aca8454157bd5c3c64708990bf',
      'message_id': '6fbcdc0e-b1f5-11ea-a901-be1696955049',
      'monotonic_time': None,
      'counter_type': 'gauge'
      }
      """
      sample_dict = (
      publisher_utils.meter_message_from_counter(
      sample, self._telemetry_secret
      ))
      if self._batch:
      sample_batch.append(sample_dict)
      else:
      self._send_notification([sample_dict])

      if sample_batch:
      self._send_notification(sample_batch)

      except plugin_base.PollsterPermanentError as err:
      LOG.error(_(
      'Prevent pollster %(name)s for '
      'polling source %(source)s anymore!')
      % ({'name': pollster.name, 'source': source_name}))
      self.resources[key].blacklist.extend(err.fail_res_list)
      except Exception as err:
      LOG.warning(_(
      'Continue after error from %(name)s: %(error)s')
      % ({'name': pollster.name, 'error': err}),
      exc_info=True)

      def _send_notification(self, samples):
      self.manager.notifier.sample(
      {},
      'telemetry.polling',
      {'samples': samples}
      )
      1. 前 5 行不谈

      2. 第 19 和 20 行,拿出在 self.pollster_matches 中定义的资源和插件遍历,然后第 31 行到最后都是采集数据的一些转换,其中比较重要的地方都已经打印出来了,很清楚,选几个介绍下(candidate_res这个对象我现在貌似还不能非常清楚的理解,如果有清楚的大佬,还请麻烦留言介绍下,十分感谢)

      3. 看下第 76 行,这边就是采集数据的地方,调用的就是插件中的方法,比如现在的pollster.name=memory.usage,那么就是调用MemoryUsagePollster类的get_samples方法,可以看见在该方法中实现数据采集之后,yield 返回

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        class MemoryUsagePollster(pollsters.BaseComputePollster):

        def get_samples(self, manager, cache, resources):
        self._inspection_duration = self._record_poll_time()
        for instance in resources:
        LOG.debug('Checking memory usage for instance %s', instance.id)
        try:
        memory_info = self.inspector.inspect_memory_usage(
        instance, self._inspection_duration)
        LOG.debug("MEMORY USAGE: %(instance)s %(usage)f",
        {'instance': instance,
        'usage': memory_info.usage})
        yield util.make_sample_from_instance(
        instance,
        name='memory.usage',
        type=sample.TYPE_GAUGE,
        unit='MB',
        volume=memory_info.usage,
        )
      4. 再看下第 157 行,这里是实现发送消息的地方,调用self._send_notification(sample_batch)

        1
        2
        3
        4
        5
        6
        def _send_notification(self, samples):
        self.manager.notifier.sample(
        {},
        'telemetry.polling',
        {'samples': samples}
        )

        这里使用了oslo_messaging库,继续往里分析,这里的 sample 调用的是 oslo_messaging.py/notify/notifier.py:Notifier 类的 sample 方法:

        1
        2
        3
        def sample(self, ctxt, event_type, payload):

        self._notify(ctxt, event_type, payload, 'SAMPLE')

        继续往里调用 _notify 方法

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
        retry=None):
        payload = self._serializer.serialize_entity(ctxt, payload)
        ctxt = self._serializer.serialize_context(ctxt)

        msg = dict(message_id=six.text_type(uuid.uuid4()),
        publisher_id=publisher_id or self.publisher_id,
        event_type=event_type,
        priority=priority,
        payload=payload,
        timestamp=six.text_type(timeutils.utcnow()))

        def do_notify(ext):
        try:
        ext.obj.notify(ctxt, msg, priority, retry or self.retry)
        except Exception as e:
        _LOG.exception("Problem '%(e)s' attempting to send to "
        "notification system. Payload=%(payload)s",
        {'e': e, 'payload': payload})

        if self._driver_mgr.extensions:
        self._driver_mgr.map(do_notify)

        这里主要是调用了 15 行,我们看下这个对象的信息:

        1
        2
        3
        4
        (Pdb) p ext.obj
        <oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f72f876d510>
        (Pdb) p ext.obj.__dict__
        {'topics': ['notifications'], 'version': 2.0, 'transport': <oslo_messaging.transport.Transport object at 0x7f72f85dd1d0>, 'conf': <oslo_config.cfg.ConfigOpts object at 0x13dc990>}

        继续往里调用 notify 方法(oslo_messaging.py/notify/messaging.py:MessagingDriver类):

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        def notify(self, ctxt, message, priority, retry):
        priority = priority.lower()
        for topic in self.topics:
        target = oslo_messaging.Target(topic='%s.%s' % (topic, priority))
        try:
        self.transport._send_notification(target, ctxt, message,
        version=self.version,
        retry=retry)
        except Exception:
        LOG.exception("Could not send notification to %(topic)s. "
        "Payload=%(message)s",
        {'topic': topic, 'message': message})

        可以看到这里的 target 就是:

        1
        2
        (Pdb) p target
        <Target topic=notifications.sample>

        所以最终调用 self.transport._send_notification 将消息发送至 notifications.sample 队列,由此整个 ceilometer-compute 服务的采集任务就完成了

        ceilometer-central 服务和 ceilometer-compute 服务差不多,加载命令空间的插件不同,逻辑基本一致,不赘述。

参考:
https://www.cnblogs.com/luohaixian/p/11145939.html

-------------本文结束 感谢您的阅读-------------
作者Magiceses
有问题请 留言 或者私信我的 微博
满分是10分的话,这篇文章你给几分