实力的来源不是胜利。唯有奋斗才能增强实力。当你历经苦难而不气馁,那就是实力。
以社区 N 版代码为例
一、启动命令
1 | exec ceilometer-polling --polling-namespaces compute --config-file /etc/ceilometer/ceilometer.conf |
二、代码入口
ceilometer代码使用setuptools的pbr管理,该部分知识请见:
Openstack中setuptools和pbr软件打包管理
入口在:/ceilometer/cmd/polling.py的main函数
1 | def create_polling_service(worker_id): |
该部分使用cotyledon的多进程框架实现
实例化子进程调用create_polling_service方法,该方法又调用AgentManager类,传入三个参数,分别是
- CONF.polling_namespaces # 即是你在命令行中传入的参数,compute或者是central,分别代表两个服务
- CONF.pollster_list # 默认为[]
- worker_id # cotyledon中服务实例的标识符
compute服务即从此处开始,调用AgentManager的 init方法初始化,然后调用run方法进行数据的采集
让我们来看看AgentManager里面是什么
三、初始化AgentManager
1 | class AgentManager(service_base.PipelineBasedService): |
以上的代码中的一些参数我都用pdb打印出来了,可以更加直观的感受,我们来逐行分析一下这个初始化的过程:
前5行应该不必多说,基本的参数传递,init的参数已经在代码入口处介绍过
第6行是为了实现工作负载分区协调
第7行调用get_hypervisor_inspector方法来加载libvirt驱动
1
2
3
4
5
6
7
8
9
10def get_hypervisor_inspector():
try:
namespace = 'ceilometer.compute.virt'
mgr = driver.DriverManager(namespace,
cfg.CONF.hypervisor_inspector,
invoke_on_load=True)
return mgr.driver
except ImportError as e:
LOG.error(_("Unable to load the hypervisor inspector: %s") % e)
return Inspector()这里使用的是stevedore库,在该篇文章不过多介绍,具体请见:Python插件之stevedore
第8行初始化了个nove的client
第40-69行主要就是在加载插件了,加载插件的方式也是使用stevedore库,其中40和43行的namespace就是你传进来的方法,我们来看下_extensions的实现
1
2
3
4def _extensions(self, category, agent_ns=None):
namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
else 'ceilometer.%s' % category)
return self._get_ext_mgr(namespace)这里主要就是加载在ceilometer.poll.compute命名空间中的插件(如果开启的是central服务,即加载的是ceilometer.poll.central命名空间中的插件),插件在setup.cfg文件中
1
2
3
4
5
6
7
8
9
10
11ceilometer.poll.compute =
disks.util = ceilometer.compute.pollsters.disk:DisksUtilPoller
disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller
disks.used = ceilometer.compute.pollsters.disk:DisksUsedPoller
...
ceilometer.poll.central =
account = ceilometer.chakra.chakra:AccountPollster
ip.floating = ceilometer.network.floatingip:FloatingIPPollster
image = ceilometer.image.glance:ImagePollster
...加载后的extension对象是这样的,是一个存储了在命名空间在所有插件对象的列表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21(Pdb) p self.extensions[0].__dict__
{
'obj': < ceilometer.compute.pollsters.cpu.CPUL3CachePollster object at 0x7f392837acd0 > ,
'entry_point': EntryPoint.parse('cpu_l3_cache = ceilometer.compute.pollsters.cpu:CPUL3CachePollster'),
'name': 'cpu_l3_cache',
'plugin': < class 'ceilometer.compute.pollsters.cpu.CPUL3CachePollster' >
}
(Pdb) p self.extensions[1].__dict__
{
'obj': < ceilometer.compute.pollsters.disk.WriteRequestsRatePollster object at 0x7f3928385650 > ,
'entry_point': EntryPoint.parse('disk.write.requests.rate = ceilometer.compute.pollsters.disk:WriteRequestsRatePollster'),
'name': 'disk.write.requests.rate',
'plugin': < class 'ceilometer.compute.pollsters.disk.WriteRequestsRatePollster' >
}
(Pdb) p self.extensions[2].__dict__
{
'obj': < ceilometer.compute.pollsters.disk.DisksTotalPoller object at 0x7f39283973d0 > ,
'entry_point': EntryPoint.parse('disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller'),
'name': 'disks.total',
'plugin': < class 'ceilometer.compute.pollsters.disk.DisksTotalPoller' >
}43-69行的加载插件的原理同上,不赘述
第99-103行是为了获取一个工作负载分区协调类实例,用来协调多个采集程序worker时的分工处理,每个对象的实例在代码中已经打印,有兴趣的可以了解下,这里使用的是tooz库的一些用法,简单介绍下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
441 Tooz基础
作用:
1) 提供分布式协调API来管理群组和群组中的成员
2) 提供分布式锁从而允许分布式节点获取和释放锁来实现同步
解决的问题: 多个分布式进程同步问题
2 Tooz架构
本质: Tooz是zookeeper, Raft consensus algorithm, Redis等方案的抽象,
通过驱动(driver)形式来提供后端功能
驱动分类:
zookeeper, Zake, memcached, redis,
SysV IPC(只提供分布式锁功能), PostgreSQL(只提供分布式锁功能), MySQL(只提供分布式锁功能)
驱动特点:
所有驱动都支持分布式进程, Tooz API完全异步,更高校。
3 Tooz功能
3.1 群组管理
管理群组成员。
操作: 群组创建,加入群组,离开群组,查看群组成员,有成员加入或离开群组时通知的功能
应用场景:
ceilometer-notification服务利用群组管理实现负载均衡和真正意义上的服务水平扩展。
3.2 领导选取
每个群组都有领导,所有节点可决定是否参与选举;
领导消失则选取新领导节点;
领导被选取其他成员可能得到通知;
各节点可随时获取当前组的领导。
感悟:
考虑可以使用tooz实现自己的leader选举算法和服务高可用。
3.3 分布式锁
应用场景:
原来ceilometer中通过RPC检测alarm evaluator进程是否存活。
后来ceilometer通过Tooz群组管理来协调多个alarm evaluator进程。
应用场景2:
gnocchi中利用分布式锁操作监控项与监控数据
参考:https://blog.csdn.net/qingyuanluofeng/article/details/90349185第154-157比较重要,这里是初始化了个oslo_messaging.Notifier的实例,用来发送采集之后的数据到指定的队列notifications.sample,基本解释如下
1
2
3
4
5
6
7"""
self.notifier.topics = ['notifications']
self.notifier._driver_names = 'messagingv2'(定义在ceilometer/publisher/messaging.py 中的 telemetry_driver这个变量),代表的是ceilometer向消息队列发送消息时使用的驱动类型
self.notifier.publisher_id = 'ceilometer.polling'
self.notifier._driver_mgr 是从oslo.messaging.notify.drivers 中加载名称为messagingv2对应的插件
self.notifier.transport 是从oslo.messaging.drivers 中加载名称为 rabbit对应的插件。因为ceilometer.conf 中 transport_url定义是:transport_url=rabbit://guest:guest@192.168.2.120:5672/
"""此时就初始化完毕了,下面就是采集数据了,执行的是sm.run()代码,则是调用到了AgentManager类实例的run方法
四、采集数据
入口:ceilometer/agent/manager.py:AgentManager.run
1 | def run(self): |
run方法看起来比较简单,我们来分析一下:
第4行调用了setup_polling的方法
1
2
3
4def setup_polling():
"""Setup polling manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
return PollingManager(cfg_file)这里面是为了加载配置文件pipeline.yaml文件中的你所配置的采集项,我们来看下这里面发生了什么:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32class PollingManager(ConfigManagerBase):
"""Polling Manager
Polling manager sets up polling according to config file.
"""
def __init__(self, cfg_info):
"""Setup the polling according to config.
The configuration is the sources half of the Pipeline Config.
"""
super(PollingManager, self).__init__()
# 加载配置文件中内容
cfg = self.load_config(cfg_info)
self.sources = []
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_LI('detected decoupled pipeline config format'))
unique_names = set()
# 初始化并记录配置文件中的 sources 项
for s in cfg.get('sources'):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated source names: %s" %
name, self)
else:
unique_names.add(name)
# 将配置文件中的数据皆放进 self.sources 中
self.sources.append(SampleSource(s))
unique_names.clear()初始化好了之后,返回一个 self.polling_manager 的对象, 我们来看下这个对象中存的东西:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45"""
(Pdb) p self.polling_manager.__dict__
{'cfg_mtime': 1592535895.477891, 'cfg_hash': 'ba1311798d634022e4b684b72bf7b42a', 'cfg_loc': '/etc/ceilometer/pipeline.yaml', 'sources': [<ceilometer.pipeline.SampleSource object at 0x7f0eec042110>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0423d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042050>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0422d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042290>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042390>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042350>]}
(Pdb) p self.polling_manager.sources
[<ceilometer.pipeline.SampleSource object at 0x7f0eec042110>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0423d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042050>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0422d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042290>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042390>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042350>]
(Pdb) p self.polling_manager.sources[0].__dict__
{
'name': 'notification_source',
'cfg': {
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'name': 'notification_source',
'sinks': ['notification_sink']
},
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'discovery': [],
'resources': [],
'sinks': ['notification_sink']
}
(Pdb) p self.polling_manager.sources[1].__dict__
{
'name': 'meter_source',
'cfg': {
'interval': 300,
'meters': ['poll.*', 'memory.usage', 'memory.util'],
'name': 'meter_source',
'sinks': ['meter_sink']
},
'interval': 300,
'meters': ['poll.*', 'memory.usage', 'memory.util'],
'discovery': [],
'resources': [],
'sinks': ['meter_sink']
}
"""可以看到,这行代码的意义就是为了加载在配置文件的定义的采集项
第 6 行调用 start_polling_tasks 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30def start_polling_tasks(self):
# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()
# set shuffle time before polling task if necessary
delay_polling_time = random.randint(
0, cfg.CONF.shuffle_time_before_polling_task)
data = self.setup_polling_tasks()
# One thread per polling tasks is enough
# 按照时间间隔创建线程池,相同时间间隔创建一个线程
self.polling_periodics = periodics.PeriodicWorker.create(
[], executor_factory=lambda:
futures.ThreadPoolExecutor(max_workers=len(data)))
for interval, polling_task in data.items():
delay_time = (interval + delay_polling_time if delay_start
else delay_polling_time)
def task(running_task):
self.interval_task(running_task)
utils.spawn_thread(utils.delayed, delay_time,
self.polling_periodics.add, task, polling_task)
if data:
# Don't start useless threads if no task will run
utils.spawn_thread(self.polling_periodics.start, allow_empty=True)前 8 行不用说,就是简单的参数定义,重要的是第 9 行之后的代码,分析一下
第9 行调用了 setup_polling_tasks 的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38def setup_polling_tasks(self):
"""
(Pdb) p self.polling_manager.sources[1].__dict__
{
'name': 'meter_source',
'cfg': {
'interval': 300,
'meters': ['poll.*', 'memory.usage', 'memory.util'],
'name': 'meter_source',
'sinks': ['meter_sink']
},
'interval': 300,
'meters': ['poll.*', 'memory.usage', 'memory.util'],
'discovery': [],
'resources': [],
'sinks': ['meter_sink']
}
(Pdb) p self.extensions[0].__dict__
{
'obj': < ceilometer.compute.pollsters.memory.MemoryUtilizationPollster object at 0x7ff12062e590 > ,
'entry_point': EntryPoint.parse('memory.util = ceilometer.compute.pollsters.memory:MemoryUtilizationPollster'),
'name': 'memory.util',
'plugin': < class 'ceilometer.compute.pollsters.memory.MemoryUtilizationPollster' >
}
"""
polling_tasks = {}
for source in self.polling_manager.sources:
polling_task = None
for pollster in self.extensions:
# 将 souce 中的 meters 与 extensions 中的每个对象的 name 进行匹配,如果一样则以采集周期为 key,以 PollingTask 对象为 value,加入到 polling_tasks中
if source.support_meter(pollster.name):
polling_task = polling_tasks.get(source.get_interval())
if not polling_task:
polling_task = self.create_polling_task()
polling_tasks[source.get_interval()] = polling_task
polling_task.add(pollster, source)
return polling_tasks这里是遍历self.polling_manager.sources中的source,同时遍历self.extensions,查看如果对应插件在对应source的meters列表中,代表该插件需要定期执行,获取该插件定义的执行周期,将其加入到polling_tasks中
我们来看下 polling_tasks 中的 value 对象,其实就是初始化了一些值,然后调用 add 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25class PollingTask(object):
"""Polling task for polling samples and notifying.
A polling task can be invoked periodically or only once.
"""
def __init__(self, agent_manager):
self.manager = agent_manager
# elements of the Cartesian product of sources X pollsters
# with a common interval
self.pollster_matches = collections.defaultdict(set)
# we relate the static resources and per-source discovery to
# each combination of pollster and matching source
resource_factory = lambda: Resources(agent_manager)
self.resources = collections.defaultdict(resource_factory)
self._batch = cfg.CONF.batch_polled_samples
self._telemetry_secret = cfg.CONF.publisher.telemetry_secret
def add(self, pollster, source):
self.pollster_matches[source.name].add(pollster)
key = Resources.key(source.name, pollster)
self.resources[key].setup(source)即最终返回后的 data 是:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17(Pdb) p data
{
3600: < ceilometer.agent.manager.PollingTask object at 0x7fe6a4043bd0 > ,
300: < ceilometer.agent.manager.PollingTask object at 0x7fe6a4043d10 >
}
(Pdb) p data[3600].__dict__
{
'manager': < ceilometer.agent.manager.AgentManager object at 0x3230b50 > ,
'_telemetry_secret': 'change this for valid signing',
'pollster_matches': defaultdict( < type 'set' > , {
'notification_source': set([ < stevedore.extension.Extension object at 0x7fe6b062ec50 > ])
}),
'resources': defaultdict( < function < lambda > at 0x7fe6a40319b0 > , {
'notification_source-instance': < ceilometer.agent.manager.Resources object at 0x7fe6a4043b90 >
}),
'_batch': True
}第 13 到 30 行即是按照时间间隔创建线程池,相同时间间隔创建一个线程,这里用到了futurist 库(这里不做介绍),实现了一个周期任务,每隔配置文件中定义的采样周期调用 self.interval_task,而 self.interval_task 又调用了 task.poll_and_notify(),下面看下task.poll_and_notify()的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177def poll_and_notify(self):
"""Polling sample and notify."""
cache = {}
discovery_cache = {}
poll_history = {}
"""
(Pdb) self.pollster_matches
defaultdict( < type 'set' > , {
'meter_source': set([ < stevedore.extension.Extension object at 0x7f9a645de8d0 > , < stevedore.extension.Extension object at 0x7f9a645d40d0 > ]),
'cpu_source': set([ < stevedore.extension.Extension object at 0x7f9a645d4cd0 > ]),
'cpu_util_source': set([ < stevedore.extension.Extension object at 0x7f9a645dec10 > ]),
'disk_source': set([ < stevedore.extension.Extension object at 0x7f9a64399c90 > , < stevedore.extension.Extension object at 0x7f9a645d4ad0 > , < stevedore.extension.Extension object at 0x7f9a645d49d0 > , < stevedore.extension.Extension object at 0x7f9a645d4950 > , < stevedore.extension.Extension object at 0x7f9a645dea90 > , < stevedore.extension.Extension object at 0x7f9a64389fd0 > , < stevedore.extension.Extension object at 0x7f9a645d43d0 > , < stevedore.extension.Extension object at 0x7f9a645d4d50 > ]),
'network_source': set([ < stevedore.extension.Extension object at 0x7f9a645def10 > , < stevedore.extension.Extension object at 0x7f9a645d4650 > , < stevedore.extension.Extension object at 0x7f9a645d4450 > , < stevedore.extension.Extension object at 0x7f9a645d4550 > ]),
'volume_source': set([ < stevedore.extension.Extension object at 0x7f9a645d4b50 > , < stevedore.extension.Extension object at 0x7f9a645d4c50 > ])
})
"""
# self.pollster_matches 中存的是以你在配置文件中定义的 source name 为 key,以在 source name 中定义的每个采集插件的对象的集合为 value
# Resources中的 key 是 source.name + 插件的名字
for source_name in self.pollster_matches:
for pollster in self.pollster_matches[source_name]:
"""
first
(Pdb) p key
'meter_source-memory.util'
(Pdb) p candidate_res
[]
(Pdb) p pollster.obj.default_discovery
'local_instances'
"""
key = Resources.key(source_name, pollster)
candidate_res = list(
self.resources[key].get(discovery_cache))
if not candidate_res and pollster.obj.default_discovery:
candidate_res = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
"""
(Pdb) p candidate_res
[<Server: jy-2>]
"""
# Remove duplicated resources and black resources. Using
# set() requires well defined __hash__ for each resource.
# Since __eq__ is defined, 'not in' is safe here.
polling_resources = []
black_res = self.resources[key].blacklist
history = poll_history.get(pollster.name, [])
for x in candidate_res:
if x not in history:
history.append(x)
if x not in black_res:
polling_resources.append(x)
"""
first
(Pdb) p poll_history
{'memory.util': [<Server: jy-2>]}
second
(Pdb) p poll_history
{'memory.util': [<Server: jy-2>], 'memory.usage': [<Server: jy-2>]}
"""
poll_history[pollster.name] = history
# If no resources, skip for this pollster
if not polling_resources:
p_context = 'new ' if history else ''
LOG.info(_("Skip pollster %(name)s, no %(p_context)s"
"resources found this cycle"),
{'name': pollster.name, 'p_context': p_context})
continue
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
try:
polling_timestamp = timeutils.utcnow().isoformat()
samples = pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
)
sample_batch = []
# filter None in samples
"""
first
(Pdb) samples
[<name: memory.util, volume: 18.58, resource_id: 32eb16e0-c8af-4b41-b18c-b63152f0f8fc, timestamp: None>]
second
(Pdb) samples
[<name: memory.usage, volume: 91, resource_id: 32eb16e0-c8af-4b41-b18c-b63152f0f8fc, timestamp: None>]
"""
samples = [s for s in samples if s is not None]
for sample in samples:
# Note(yuywz): Unify the timestamp of polled samples
sample.set_timestamp(polling_timestamp)
"""
(Pdb) sample_dict
{
'counter_name': 'memory.util',
'resource_id': u '32eb16e0-c8af-4b41-b18c-b63152f0f8fc',
'timestamp': '2020-06-19T06:21:32.542893',
'counter_volume': 18.58,
'user_id': u '7431e07e49de2703f1b9e703daf5aff158e32028506f77e05a62e9eb3892dcde',
'message_signature': '76e6cbd944963855306b829cdf2c49bba9f0b20220c8c738b61b5048be51b24f',
'resource_metadata': {
'status': u 'active',
'ephemeral_gb': 0,
'disk_gb': 0,
'instance_host': u 'node-3.domain.tld',
'kernel_id': None,
'image': None,
'ramdisk_id': None,
'host': u '619a474e64dbedcd55508bda51aea8a611fd15f3f3e8fa39ce0d0552',
'flavor': {
'name': u '1C-0.5G',
u 'links': [{
u 'href': u 'http://nova-api.openstack.svc.cluster.local:8774/ca80e5ccd445438580c4b128296d1936/flavors/211',
u 'rel': u 'bookmark'
}],
'ram': 512,
'ephemeral': 0,
'vcpus': 1,
'disk': 0,
u 'id': u '211'
},
'task_state': None,
'image_ref_url': None,
'memory_mb': 512,
'root_gb': 0,
'display_name': u 'jy-2',
'name': u 'instance-00000002',
'vcpus': 1,
'instance_id': u '32eb16e0-c8af-4b41-b18c-b63152f0f8fc',
'instance_type': u '1C-0.5G',
'state': u 'active',
'image_ref': None,
'OS-EXT-AZ:availability_zone': u 'default-az'
},
'source': 'openstack',
'counter_unit': '%',
'project_id': u '34ecb9aca8454157bd5c3c64708990bf',
'message_id': '6fbcdc0e-b1f5-11ea-a901-be1696955049',
'monotonic_time': None,
'counter_type': 'gauge'
}
"""
sample_dict = (
publisher_utils.meter_message_from_counter(
sample, self._telemetry_secret
))
if self._batch:
sample_batch.append(sample_dict)
else:
self._send_notification([sample_dict])
if sample_batch:
self._send_notification(sample_batch)
except plugin_base.PollsterPermanentError as err:
LOG.error(_(
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
self.resources[key].blacklist.extend(err.fail_res_list)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
def _send_notification(self, samples):
self.manager.notifier.sample(
{},
'telemetry.polling',
{'samples': samples}
)前 5 行不谈
第 19 和 20 行,拿出在 self.pollster_matches 中定义的资源和插件遍历,然后第 31 行到最后都是采集数据的一些转换,其中比较重要的地方都已经打印出来了,很清楚,选几个介绍下(candidate_res这个对象我现在貌似还不能非常清楚的理解,如果有清楚的大佬,还请麻烦留言介绍下,十分感谢)
看下第 76 行,这边就是采集数据的地方,调用的就是插件中的方法,比如现在的pollster.name=memory.usage,那么就是调用MemoryUsagePollster类的get_samples方法,可以看见在该方法中实现数据采集之后,yield 返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19class MemoryUsagePollster(pollsters.BaseComputePollster):
def get_samples(self, manager, cache, resources):
self._inspection_duration = self._record_poll_time()
for instance in resources:
LOG.debug('Checking memory usage for instance %s', instance.id)
try:
memory_info = self.inspector.inspect_memory_usage(
instance, self._inspection_duration)
LOG.debug("MEMORY USAGE: %(instance)s %(usage)f",
{'instance': instance,
'usage': memory_info.usage})
yield util.make_sample_from_instance(
instance,
name='memory.usage',
type=sample.TYPE_GAUGE,
unit='MB',
volume=memory_info.usage,
)再看下第 157 行,这里是实现发送消息的地方,调用self._send_notification(sample_batch)
1
2
3
4
5
6def _send_notification(self, samples):
self.manager.notifier.sample(
{},
'telemetry.polling',
{'samples': samples}
)这里使用了oslo_messaging库,继续往里分析,这里的 sample 调用的是 oslo_messaging.py/notify/notifier.py:Notifier 类的 sample 方法:
1
2
3def sample(self, ctxt, event_type, payload):
self._notify(ctxt, event_type, payload, 'SAMPLE')继续往里调用 _notify 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
retry=None):
payload = self._serializer.serialize_entity(ctxt, payload)
ctxt = self._serializer.serialize_context(ctxt)
msg = dict(message_id=six.text_type(uuid.uuid4()),
publisher_id=publisher_id or self.publisher_id,
event_type=event_type,
priority=priority,
payload=payload,
timestamp=six.text_type(timeutils.utcnow()))
def do_notify(ext):
try:
ext.obj.notify(ctxt, msg, priority, retry or self.retry)
except Exception as e:
_LOG.exception("Problem '%(e)s' attempting to send to "
"notification system. Payload=%(payload)s",
{'e': e, 'payload': payload})
if self._driver_mgr.extensions:
self._driver_mgr.map(do_notify)这里主要是调用了 15 行,我们看下这个对象的信息:
1
2
3
4(Pdb) p ext.obj
<oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f72f876d510>
(Pdb) p ext.obj.__dict__
{'topics': ['notifications'], 'version': 2.0, 'transport': <oslo_messaging.transport.Transport object at 0x7f72f85dd1d0>, 'conf': <oslo_config.cfg.ConfigOpts object at 0x13dc990>}继续往里调用 notify 方法(oslo_messaging.py/notify/messaging.py:MessagingDriver类):
1
2
3
4
5
6
7
8
9
10
11
12def notify(self, ctxt, message, priority, retry):
priority = priority.lower()
for topic in self.topics:
target = oslo_messaging.Target(topic='%s.%s' % (topic, priority))
try:
self.transport._send_notification(target, ctxt, message,
version=self.version,
retry=retry)
except Exception:
LOG.exception("Could not send notification to %(topic)s. "
"Payload=%(message)s",
{'topic': topic, 'message': message})可以看到这里的 target 就是:
1
2(Pdb) p target
<Target topic=notifications.sample>所以最终调用 self.transport._send_notification 将消息发送至 notifications.sample 队列,由此整个 ceilometer-compute 服务的采集任务就完成了
ceilometer-central 服务和 ceilometer-compute 服务差不多,加载命令空间的插件不同,逻辑基本一致,不赘述。