Prometheus 指标抓取源码分析

生活像一只蝴蝶,没有破茧的勇气,哪来飞舞的美丽。生活像一只蜜蜂,没有勤劳和努力,怎能尝到花粉的甜蜜,越努力越幸运!

scrape模块在prometheus中负责着采集具体指标,并记录到后端存储中的功能,可以说是prometheus最为核心的一个功能模块

篇幅较长,需要耐心

指标采集简介

为了从服务发现(serviceDiscover)实时获取监控服务(targets),指标采集(scrapeManager)通过协程把管道(chan)获取来的服务(targets)存

进一个map类型:map[string][]*targetgroup.Group.其中,map的key是job_name,map的value是结构体targetgroup.Group,

该结构体包含该job_name对应的Targets,Labes和Source

指标采集(scrapeManager)获取服务(targets)的变动,可分为多种情况,以服务增加为例,若有新的job添加,指标采集(scrapeManager)

会进行重载,为新的job创建一个scrapePool,并为job中的每个target创建一个scrapeLoop.若job没有变动,只增加了job下对应的

targets,则只需创建新的targets对应的scrapeLoop

指标采集流程

总体流程

在这里插入图片描述

静态结构

img

在一个管理面(scrapeManager)中,每次初始化(重载),会根据配置的份数创建出对应的采集缓冲池(scrapePool);在缓冲池

中,每一个监控目标会对应创建一个采集循环(scrapeLoop);采集循环可以认为是最小的一个工作单位,下图进一步解析采集循环的

静态结构

img

采集的主要流程函数在scrape.go中的scrapeAndReport,采集接口(scraper)采集到数据后,会先调用append方法写到采集缓冲层

(scrapeCache)中,最后调用持久化的Commit方法写到后端存储

指标采集配置

指标采集(scrapeManager)调用scrapeManager.ApplyConfig方法,完成配置初始化及应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
prometheus/scrape/manager.go

// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()

// 创建一个map,key是job_name,value是结构体config.ScrapeConfig
c := make(map[string]*config.ScrapeConfig)
for _, scfg := range cfg.ScrapeConfigs {
c[scfg.JobName] = scfg
}
m.scrapeConfigs = c

if err := m.setJitterSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
}

// 首次启动不执行
// Cleanup and reload pool if the configuration has changed.
var failed bool
for name, sp := range m.scrapePools {
// 若job_name在scrapePools中,不在scrapeConfigs中,则说明已经更新,停止该job_name对应的scrapePool
if cfg, ok := m.scrapeConfigs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
} else if !reflect.DeepEqual(sp.config, cfg) {
// 若job_name在scrapePools中,也在scrapeConfigs中,但配置有变化,比如target增加或减少,需要重新加
err := sp.reload(cfg)
if err != nil {
level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
failed = true
}
}
}

if failed {
return errors.New("failed to apply the new configuration")
}
return nil
}

调用reload方法重新加载配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
prometheus/scrape/scrape.go

// reload the scrape pool with the given scrape configuration. The target state is preserved
// but all scrape loops are restarted with the new scrape configuration.
// This method returns after all scrape loops that were stopped have stopped scraping.
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.mtx.Lock()
defer sp.mtx.Unlock()
targetScrapePoolReloads.Inc()
start := time.Now()

// 生成client,用于获取指标(metircs)
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, config_util.WithHTTP2Disabled())
if err != nil {
targetScrapePoolReloadsFailed.Inc()
return errors.Wrap(err, "error creating HTTP client")
}

reuseCache := reusableCache(sp.config, cfg)
sp.config = cfg
oldClient := sp.client
sp.client = client

targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))

var (
wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
sampleLimit = int(sp.config.SampleLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
}
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
)

sp.targetMtx.Lock()

forcedErr := sp.refreshTargetLimitErr()
// 停止该scrapePool下对应的所有的oldLoop,更具配置创建所有的newLoop,并通过协程启动
for fp, oldLoop := range sp.loops {
var cache *scrapeCache
if oc := oldLoop.getCache(); reuseCache && oc != nil {
oldLoop.disableEndOfRunStalenessMarkers()
cache = oc
} else {
cache = newScrapeCache()
}
var (
t = sp.activeTargets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
newLoop = sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
sampleLimit: sampleLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
cache: cache,
})
)
wg.Add(1)

go func(oldLoop, newLoop loop) {
oldLoop.stop()
wg.Done()

newLoop.setForcedError(forcedErr)
newLoop.run(interval, timeout, nil)
}(oldLoop, newLoop)

sp.loops[fp] = newLoop
}

sp.targetMtx.Unlock()

wg.Wait()
oldClient.CloseIdleConnections()
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(start).Seconds(),
)
return nil
}

每次 reload 配置文件的时候都会重新加载 scrape 的配置,config/config.go 中的 ScrapeConfig 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// ScrapeConfig configures a scraping unit for Prometheus.
type ScrapeConfig struct {
// The job name to which the job label is set by default.
JobName string `yaml:"job_name"`
// Indicator whether the scraped metrics should remain unmodified.
HonorLabels bool `yaml:"honor_labels,omitempty"`
// Indicator whether the scraped timestamps should be respected.
HonorTimestamps bool `yaml:"honor_timestamps"`
// A set of query parameters with which the target is scraped.
Params url.Values `yaml:"params,omitempty"`
// How frequently to scrape the targets of this scrape config.
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
// The timeout for scraping targets of this config.
ScrapeTimeout model.Duration `yaml:"scrape_timeout,omitempty"`
// The HTTP resource path on which to fetch metrics from targets.
MetricsPath string `yaml:"metrics_path,omitempty"`
// The URL scheme with which to fetch metrics from targets.
Scheme string `yaml:"scheme,omitempty"`
// More than this many samples post metric-relabeling will cause the scrape to
// fail.
SampleLimit uint `yaml:"sample_limit,omitempty"`
// More than this many targets after the target relabeling will cause the
// scrapes to fail.
TargetLimit uint `yaml:"target_limit,omitempty"`
// More than this many labels post metric-relabeling will cause the scrape to
// fail.
LabelLimit uint `yaml:"label_limit,omitempty"`
// More than this label name length post metric-relabeling will cause the
// scrape to fail.
LabelNameLengthLimit uint `yaml:"label_name_length_limit,omitempty"`
// More than this label value length post metric-relabeling will cause the
// scrape to fail.
LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"`

// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.

ServiceDiscoveryConfigs discovery.Configs `yaml:"-"`
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`

// List of target relabel configurations.
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
// List of metric relabel configurations.
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty"`
}

指标采集启动

  1. main 函数中初始化 scrapeManager 实例
1
2
3
4
prometheus/cmd/prometheus/main.go

// 初始化 scrapeManager,fanout Storage 是一个读写多个底层存储的代理
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)

fanoutStorage 是读写多个底层存储的代理,实现了 storage.Appendable 接口

NewManager方法了实例化结构体Manager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
prometheus/scrape/manager.go

// NewManager is the Manager constructor
func NewManager(logger log.Logger, app Appendable) *Manager {
if logger == nil {
logger = log.NewNopLogger()
}
return &Manager{
append: app,
logger: logger,
scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}),
triggerReload: make(chan struct{}, 1),
}
}

结构体Manager维护map类型的scrapePools和targetSets,两者key都是job_name,但scrapePools的value对应结构体scrapepool,而

targetSets的value对应的结构体是Group,分别给出了两者的示例输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
prometheus/scrape/manager.go

// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type Manager struct {
logger log.Logger //系统日志
append Appendable //存储监控指标
graceShut chan struct{} //退出

mtxScrape sync.Mutex // Guards the fields below. 读写锁
scrapeConfigs map[string]*config.ScrapeConfig //prometheus.yml的srape_config配置部分,key对应job_name,value对应job_name的配置参数
scrapePools map[string]*scrapePool //key对应job_name,value对应结构体scrapePool,包含该job_name下所有的targets
targetSets map[string][]*targetgroup.Group //key对应job_name,value对应结构体Group,包含job_name对应的Targets,Labels和Source

triggerReload chan struct{} //若有新的服务(targets)通过服务发现(serviceDisvoer)传过来,会向该管道传值,触发加载配置文件操作,后面会讲到
}


基于job_name:node的targetSets的示例输出:
(dlv) p m.targetSets["node"]
[]*github.com/prometheus/prometheus/discovery/targetgroup.Group len: 1, cap: 1, [
*{
Targets: []github.com/prometheus/common/model.LabelSet len: 1, cap: 1, [
[
"__address__": "localhost:9100",
],
],
Labels: github.com/prometheus/common/model.LabelSet nil,
Source: "0",},
]

基于job_name:node的scrapePools示例输出:
(dlv) p m.scrapePools
map[string]*github.com/prometheus/prometheus/scrape.scrapePool [
"node": *{
appendable: github.com/prometheus/prometheus/scrape.Appendable(*github.com/prometheus/prometheus/storage.fanout) ...,
logger: github.com/go-kit/kit/log.Logger(*github.com/go-kit/kit/log.context) ...,
mtx: (*sync.RWMutex)(0xc001be0020),
config: *(*"github.com/prometheus/prometheus/config.ScrapeConfig")(0xc00048ab40),
client: *(*"net/http.Client")(0xc000d303c0),
activeTargets: map[uint64]*github.com/prometheus/prometheus/scrape.Target [],
droppedTargets: []*github.com/prometheus/prometheus/scrape.Target len: 0, cap: 0, nil,
loops: map[uint64]github.com/prometheus/prometheus/scrape.loop [],
cancel: context.WithCancel.func1,
newLoop: github.com/prometheus/prometheus/scrape.newScrapePool.func2,},
]
  1. 指标采集(scrapeManager)获取实时监控服务(targets)的入口函数

scrapeManager.Run(discoveryManagerScrape.SyncCh())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
prometheus/cmd/prometheus/main.go

// Scrape manager.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C

err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)


// ts即map[string][]*targetgroup.Group
(dlv) p ts["prometheus"]
[]*github.com/prometheus/prometheus/discovery/targetgroup.Group len: 1, cap: 1, [
*{
Targets: []github.com/prometheus/common/model.LabelSet len: 1, cap: 1, [
[...],
],
Labels: github.com/prometheus/common/model.LabelSet nil,
Source: "0",},
]

// 例如配置文件中 scrape_configs 段是这样做的
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090', '192.168.1.2:9091']
labels:
cluster: es
env: prod
// 那么其中的 static_configs 会解析为
targetgroup.Group{
Targets: []model.LabelSet{
model.LabelSet{"__address__": "localhost:9090"},
model.LabelSet{"__address__": "192.168.1.2:9091"}
},
Labels: model.LabelSet{
"cluster": "es",
"env": "prod"
},
Source: ""

这里会起一个协程运行Run方法,从服务发现(serviceDiscover)实时获取被监控服务(targets)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
prometheus/scrape/manager.go

// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
//定时(5s)更新服务(targets),结合triggerReload一起使用,即每5s判断一次triggerReload是否更新.
go m.reloader()
for {
select {
//通过管道获取被监控的服务(targets)
case ts := <-tsets:
m.updateTsets(ts)

select {
    //若从服务发现 (serviceDiscover)有服务(targets)变动,则给管道triggerReload传值,并触发reloader()方法更新服务.
case m.triggerReload <- struct{}{}:
default:
}

case <-m.graceShut:
return nil
}
}
}

以上流程还是比较清晰,若服务发现(serviceDiscovery)有服务(target)变动,Run方法就会向管道triggerReload注入值:

m.triggerReload <- struct{}{}中,并起了一个协程,运行reloader方法.用于定时更新服务(targets).启动这个协程应该是为了防止阻塞

从服务发现(serviceDiscover)获取变动的服务(targets)

reloader方法启动了一个定时器,在无限循环中每5s判断一下管道triggerReload,若有值,则执行reload方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
prometheus/scrape/manager.go

func (m *Manager) reloader() {
//定时器5s
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-m.graceShut:
return
  // 若服务发现(serviceDiscovery)有服务(targets)变动,就会向管道triggerReload写入值,定时器每5s判断一次triggerReload管道是否有值,若有值,则触发reload方法
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}

reload方法会根据job_name比较targetSets,scrapePools和scrapeConfigs的一致性,并把每个job_name下的类型为

[]*targetgroup.Group的groups通过协程传给sp.Sync方法,增加并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
prometheus/scrape/manager.go

func (m *Manager) reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
//setName对应job_name,
 //group的结构体包含job_name对应的Targets,Labels和source
// 遍历最新的抓取目标配置中的每个job 的 targetGroup
for setName, groups := range m.targetSets {

//若该job_name不在scrapePools中,分为两种情况处理
//(1)job_name不在scrapeConfigs中,则跳过
//(2)job_name在scrapeConfigs中,则需要创建这个 job 的scrapePool,并把该job_name加到scrapePools中
if _, ok := m.scrapePools[setName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
// 抓取配置 m.scrapeConfigs 有没有这个 job 的配置,
// 解析配置的时候有可能出错,就会跳过出错的 job,这里再检查一下
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
continue
}
// 创建这个 job 的scrapePool
sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName))
if err != nil {
level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName)
continue
}
m.scrapePools[setName] = sp
}

wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
// 并发执行 scrapePool.Sync() 方法并等待全部执行完毕。
go func(sp *scrapePool, groups []*targetgroup.Group) {
//把groups转换为targets类型
sp.Sync(groups)
wg.Done()
}(m.scrapePools[setName], groups) // 如果已经有这个 job 就启动,所以此处不用 sp 而用m.scrapePools[setName]

}
m.mtxScrape.Unlock()
wg.Wait()
}

// 创建这个 job 的scrapePool
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) {
// target_scrape_pools 数量统计,每个 job 一个池
targetScrapePools.Inc()
if logger == nil {
logger = log.NewNopLogger()
}

// 根据配置创建 http client
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, config_util.WithHTTP2Disabled())
if err != nil {
targetScrapePoolsFailed.Inc()
return nil, errors.Wrap(err, "error creating HTTP client")
}

// pool.New 返回一个分桶的 sync.Pool
buffers := pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })

ctx, cancel := context.WithCancel(context.Background())

sp := &scrapePool{
cancel: cancel,
appendable: app,
config: cfg,
client: client,
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
}
sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache.
cache := opts.cache
if cache == nil {
cache = newScrapeCache()
}
opts.target.SetMetadataStore(cache)

return newScrapeLoop(
ctx,
opts.scraper,
log.With(logger, "target", opts.target),
buffers,
func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
},
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
func(ctx context.Context) storage.Appender { return appender(app.Appender(ctx), opts.sampleLimit) },
cache,
jitterSeed,
opts.honorTimestamps,
opts.labelLimits,
)
}

return sp, nil
}

// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable Appendable
logger log.Logger

mtx sync.RWMutex
config *config.ScrapeConfig
client *http.Client
// Targets and loops must always be synchronized to have the same
// set of hashes.
targets map[uint64]*Target
droppedTargets []*Target
loops map[uint64]loop
cancel context.CancelFunc

// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(*Target, scraper, int, bool, []*config.RelabelConfig)
}
//scrapePool管理一组对象的数据采集,其中的targetsloops都是mapkey是一种hashvalue分别是TargetloopTargetloop存在一一对应的关系,Target表示数据采集的对象,而loop是个接口
// A loop can run and be stopped again. It must not be reused after it was stopped.
type loop interface {
run(interval, timeout time.Duration, errc chan<- error)
stop()
}

// 其中 scrapeCache 是跟踪暴露的指标字符串到标签集和存储直接按的映射的, 此外它还跟踪相邻两次抓取之间的腐化情况
// scrapeCache tracks mappings of exposed metric strings to label sets and
// storage references. Additionally, it tracks staleness of series between
// scrapes.
type scrapeCache struct {
iter uint64 // Current scrape iteration. 当前抓取的迭代序号。

// How many series and metadata entries there were at the last success.
// 最后一次成功抓取的时序和元数据项
successfulCount int

// Parsed string to an entry with information about the actual label set
// and its storage reference.
// 将字符串解析为标签信息,key是metric,value是cacheEntry结构体
series map[string]*cacheEntry

// Cache of dropped metric strings and their iteration. The iteration must
// be a pointer so we can update it without setting a new entry with an unsafe
// string in addDropped().
// 丢弃的指标字符串和他们的迭代序号,缓存不合法指标(metrics)
droppedSeries map[string]*uint64

// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// We hold two maps and swap them out to save allocations.
// 当前抓取和上次抓取中见到的标签集,两个映射轮换可以节省分配。
seriesCur map[uint64]labels.Labels //缓存本次scrape的指标(metrics)
seriesPrev map[uint64]labels.Labels //缓存上次scrape的指标(metrics)

metaMtx sync.Mutex //同步锁
metadata map[string]*metaEntry //元数据
}

sp.Sync方法引入了Target结构体,把[]*targetgroup.Group类型的groups转换为targets类型,其中每个groups对应一个job_name下多

个targets.随后,调用sp.sync方法,同步scrape服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
start := time.Now()

sp.targetMtx.Lock()
var all []*Target
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
// 转换targetgroup.Group类型为Target
targets, err := targetsFromGroup(tg, sp.config)
if err != nil {
level.Error(sp.logger).Log("msg", "creating targets failed", "err", err)
continue
}
for _, t := range targets {
// 判断Target的有效label是否大于0
if t.Labels().Len() > 0 {
all = append(all, t)
} else if t.DiscoveredLabels().Len() > 0 {
// 若为无效Target,则加入scrapeLoop的droppedTargets中
sp.droppedTargets = append(sp.droppedTargets, t)
}
}
}
sp.targetMtx.Unlock()
sp.sync(all)

targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

scrape.Target 是一次抓取的具体对象,包含了抓取和抓取后存储所需要的全部信息。从 targetGroup.Group 到 scrape.Target 的转换过程如下:

  1. targetsFromGroup函数遍历每个targetGroup.Group中的Target,合并targetGroup.Group的公共标签集(记为A)和这个Target本身的标签集(记为B)为标签集C。

  2. populateLabels函数从C和*config.ScrapeConfig中创建Target。

Target结构体以及方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
// TargetHealth describes the health state of a target.
type TargetHealth string

// The possible health states of a target based on the last performed scrape.
const (
HealthUnknown TargetHealth = "unknown"
HealthGood TargetHealth = "up"
HealthBad TargetHealth = "down"
)

// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
// Labels before any processing.
// 未经处理的抓取到的原始标签集
discoveredLabels labels.Labels
// Any labels that are added to this target and its metrics.
// 经过 relabel 处理后的标签集,会记录进 TSDB
labels labels.Labels
// Additional URL parameters that are part of the target URL.
// 目标 URL 的额外参数
params url.Values

// 读写锁保护下面的变量
mtx sync.RWMutex
// 最后一次抓取的错误值
lastError error
// 最后一次抓取的时间
lastScrape time.Time
// 最后一次抓取的耗时
lastScrapeDuration time.Duration
// 目标的健康状态
health TargetHealth
// 标签的元数据
metadata MetricMetadataStore
}

// NewTarget creates a reasonably configured target for querying.
// 构造函数
func NewTarget(labels, discoveredLabels labels.Labels, params url.Values) *Target {
return &Target{
labels: labels,
discoveredLabels: discoveredLabels,
params: params,
health: HealthUnknown,
}
}

func (t *Target) String() string {
return t.URL().String()
}

// MetricMetadataStore represents a storage for metadata.
// MetricMetadataStore 接口代表元数据的存储
type MetricMetadataStore interface {
ListMetadata() []MetricMetadata
GetMetadata(metric string) (MetricMetadata, bool)
SizeMetadata() int
LengthMetadata() int
}

// MetricMetadata is a piece of metadata for a metric.
// MetricMetadata 是一个指标的元数据。
// 包括指标名、指标类型、帮助信息(这三项在用客户端写观测指标时都要写)
// 和指标单位。
type MetricMetadata struct {
Metric string
Type textparse.MetricType
Help string
Unit string
}

// target 有 MetadataList()、MetadataSize()、MetadataLength() 和 Metadata() 方法,
// 获取元数据的一些信息,这些方法内部就是加读锁调用 metadata 字段的相对应的方法
func (t *Target) MetadataList() []MetricMetadata {
t.mtx.RLock()
defer t.mtx.RUnlock()

if t.metadata == nil {
return nil
}
return t.metadata.ListMetadata()
}

func (t *Target) MetadataSize() int {
t.mtx.RLock()
defer t.mtx.RUnlock()

if t.metadata == nil {
return 0
}

return t.metadata.SizeMetadata()
}

func (t *Target) MetadataLength() int {
t.mtx.RLock()
defer t.mtx.RUnlock()

if t.metadata == nil {
return 0
}

return t.metadata.LengthMetadata()
}

// Metadata returns type and help metadata for the given metric.
func (t *Target) Metadata(metric string) (MetricMetadata, bool) {
t.mtx.RLock()
defer t.mtx.RUnlock()

if t.metadata == nil {
return MetricMetadata{}, false
}
return t.metadata.GetMetadata(metric)
}

// 设置元数据,参数是个接口类型,也就是实现了接口方法的结构体
func (t *Target) SetMetadataStore(s MetricMetadataStore) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.metadata = s
}

// hash returns an identifying hash for the target.
// 用于得到一个目标的唯一标识。FVN-1a 是一个简单的非加密哈希算法,性能较高,碰撞率较低。
// 该方法用目标的标签集的哈希值和目标的端点 URL 作为参数计算哈希值,其中标签集的哈希值使用 xxHash 算法
func (t *Target) hash() uint64 {
h := fnv.New64a()
//nolint: errcheck
h.Write([]byte(fmt.Sprintf("%016d", t.labels.Hash())))
//nolint: errcheck
h.Write([]byte(t.URL().String()))

return h.Sum64()
}

// offset returns the time until the next scrape cycle for the target.
// It includes the global server jitterSeed for scrapes from multiple Prometheus to try to be at different times.
// 得到距离目标开始下一次抓取循环的时间。参数中包含一个随机数,用于打散抓取开始时间,均匀化 Prometheus 的负载
func (t *Target) offset(interval time.Duration, jitterSeed uint64) time.Duration {
now := time.Now().UnixNano()

// Base is a pinned to absolute time, no matter how often offset is called.
var (
base = int64(interval) - now%int64(interval)
offset = (t.hash() ^ jitterSeed) % uint64(interval)
next = base + int64(offset)
)

if next > int64(interval) {
next -= int64(interval)
}
return time.Duration(next)
}

// Labels returns a copy of the set of all public labels of the target.
// Labels()、DiscoveredLabels()、SetDiscoveredLabels(l labels.Labels) 分别用于获取目标的非元信息
// (不以“————”开头)标签集、relabel 前的原始标签集和设置 relabel 前的原始标签集。需要注意的是 Labels() 方法没有加锁
func (t *Target) Labels() labels.Labels {
lset := make(labels.Labels, 0, len(t.labels))
for _, l := range t.labels {
if !strings.HasPrefix(l.Name, model.ReservedLabelPrefix) {
lset = append(lset, l)
}
}
return lset
}

// DiscoveredLabels returns a copy of the target's labels before any processing.
func (t *Target) DiscoveredLabels() labels.Labels {
t.mtx.Lock()
defer t.mtx.Unlock()
lset := make(labels.Labels, len(t.discoveredLabels))
copy(lset, t.discoveredLabels)
return lset
}

// SetDiscoveredLabels sets new DiscoveredLabels
func (t *Target) SetDiscoveredLabels(l labels.Labels) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.discoveredLabels = l
}

// URL returns a copy of the target's URL.
// URL() 方法组装 net/url.URL
func (t *Target) URL() *url.URL {
params := url.Values{}

for k, v := range t.params {
params[k] = make([]string, len(v))
copy(params[k], v)
}
// 将 url 参数相关的标签添加到参数中
for _, l := range t.labels {
if !strings.HasPrefix(l.Name, model.ParamLabelPrefix) {
continue
}
ks := l.Name[len(model.ParamLabelPrefix):]

if len(params[ks]) > 0 {
params[ks][0] = l.Value
} else {
params[ks] = []string{l.Value}
}
}

return &url.URL{
Scheme: t.labels.Get(model.SchemeLabel),
Host: t.labels.Get(model.AddressLabel),
Path: t.labels.Get(model.MetricsPathLabel),
RawQuery: params.Encode(),
}
}

// Report sets target data about the last scrape.
// Report() 设置最后一次抓取的结构体字段值
func (t *Target) Report(start time.Time, dur time.Duration, err error) {
t.mtx.Lock()
defer t.mtx.Unlock()

if err == nil {
t.health = HealthGood
} else {
t.health = HealthBad
}

t.lastError = err
t.lastScrape = start
t.lastScrapeDuration = dur
}

// LastError returns the error encountered during the last scrape.
// LastError()、LastScrape()、LastScrapeDuration()、Health()
// 方法加读锁获取结构体最后一次抓取的错误、最后一次抓取的时间、最后一次抓取的耗时和最后一次抓取目标的状态字段
func (t *Target) LastError() error {
t.mtx.RLock()
defer t.mtx.RUnlock()

return t.lastError
}

// LastScrape returns the time of the last scrape.
func (t *Target) LastScrape() time.Time {
t.mtx.RLock()
defer t.mtx.RUnlock()

return t.lastScrape
}

// LastScrapeDuration returns how long the last scrape of the target took.
func (t *Target) LastScrapeDuration() time.Duration {
t.mtx.RLock()
defer t.mtx.RUnlock()

return t.lastScrapeDuration
}

// Health returns the last known health state of the target.
func (t *Target) Health() TargetHealth {
t.mtx.RLock()
defer t.mtx.RUnlock()

return t.health
}

// Targets is a sortable list of targets.
// 是一个实现了 sort 接口的 Target 指针切片,排序依据是 URL 字符串
type Targets []*Target

func (ts Targets) Len() int { return len(ts) }
func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() }
func (ts Targets) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }

var errSampleLimit = errors.New("sample limit exceeded")

// limitAppender limits the number of total appended samples in a batch.
// limitAppender 结构体限制一次批量追加的样本数
type limitAppender struct {
storage.Appender

limit int
i int
}

func (app *limitAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
if !value.IsStaleNaN(v) {
app.i++
if app.i > app.limit {
return 0, errSampleLimit
}
}
ref, err := app.Appender.Append(ref, lset, t, v)
if err != nil {
return 0, err
}
return ref, nil
}

// timeLimitAppender 结构体是限制插入时间的,如果要追加的样本时间戳超过限制就返回错误
type timeLimitAppender struct {
storage.Appender

maxTime int64
}

func (app *timeLimitAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
if t > app.maxTime {
return 0, storage.ErrOutOfBounds
}

ref, err := app.Appender.Append(ref, lset, t, v)
if err != nil {
return 0, err
}
return ref, nil
}

// populateLabels builds a label set from the given label set and scrape configuration.
// It returns a label set before relabeling was applied as the second return value.
// Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling.
// populateLabels 函数从给定的标签集和抓取配置中构造一个标签集。返回的第二个值是 relabel 之前的标签集。
// 如果目标在 rebalel 期间被丢弃,就返回 relabel 之前的原始标签集
func populateLabels(lset labels.Labels, cfg *config.ScrapeConfig) (res, orig labels.Labels, err error) {
// Copy labels into the labelset for the target if they are not set already.
scrapeLabels := []labels.Label{
{Name: model.JobLabel, Value: cfg.JobName},
{Name: model.MetricsPathLabel, Value: cfg.MetricsPath},
{Name: model.SchemeLabel, Value: cfg.Scheme},
}
lb := labels.NewBuilder(lset)

// 如果参数标签集 lset 中不含有 job、metricPath 和 scheme 标签就把它们添加进去
for _, l := range scrapeLabels {
if lv := lset.Get(l.Name); lv == "" {
lb.Set(l.Name, l.Value)
}
}
// Encode scrape query parameters as labels.
// 添加 url 参数标签
for k, v := range cfg.Params {
if len(v) > 0 {
lb.Set(model.ParamLabelPrefix+k, v[0])
}
}

// relabel 之前的标签集
preRelabelLabels := lb.Labels()
// 应用 relabel
lset = relabel.Process(preRelabelLabels, cfg.RelabelConfigs...)

// Check if the target was dropped.
// 如果 relabel 把这个标签集丢弃了就返回 relabel 之前的标签集
if lset == nil {
return nil, preRelabelLabels, nil
}
// 如果 relabel 后 __address__ 标签没有了就返回错误
if v := lset.Get(model.AddressLabel); v == "" {
return nil, nil, errors.New("no address")
}

lb = labels.NewBuilder(lset)

// addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either.
// addPort 检查是否需要为地址添加默认端口。如果地址不合法,也不添加端口
addPort := func(s string) bool {
// If we can split, a port exists and we don't have to add one.
// 有端口就不用添加了
if _, _, err := net.SplitHostPort(s); err == nil {
return false
}
// If adding a port makes it valid, the previous error
// was not due to an invalid address and we can append a port.
// 如果添加以后不合法就可以添加
_, _, err := net.SplitHostPort(s + ":1234")
return err == nil
}
addr := lset.Get(model.AddressLabel)
// If it's an address with no trailing port, infer it based on the used scheme.
// __address__ 标签如果没有端口就根据 http 或 https 推断一个默认值
if addPort(addr) {
// Addresses reaching this point are already wrapped in [] if necessary.
switch lset.Get(model.SchemeLabel) {
case "http", "":
addr = addr + ":80"
case "https":
addr = addr + ":443"
default:
return nil, nil, errors.Errorf("invalid scheme: %q", cfg.Scheme)
}
lb.Set(model.AddressLabel, addr)
}

// 检查地址标签的值是否是合法地址
if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil {
return nil, nil, err
}

// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
// relabel 以后删除 __meta_ 开头的标签。其他的内部标签保留
for _, l := range lset {
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
lb.Del(l.Name)
}
}

// Default the instance label to the target address.
// instance 标签为空就设置为地址
if v := lset.Get(model.InstanceLabel); v == "" {
lb.Set(model.InstanceLabel, addr)
}

// 最终标签集
res = lb.Labels()
// 最后检查一遍,标签值必须都是合法的 UTF8 字符
for _, l := range res {
// Check label values are valid, drop the target if not.
if !model.LabelValue(l.Value).IsValid() {
return nil, nil, errors.Errorf("invalid label value for %q: %q", l.Name, l.Value)
}
}
return res, preRelabelLabels, nil
}

// targetsFromGroup builds targets based on the given TargetGroup and config.
// targetGroup.Group 在 prometheus/discovery/targetgroup/targetgroup.go 中,
// Target 在 prometheus/scrape/target.go 中。这是从服务发现到抓取目标的转换
func targetsFromGroup(tg *targetgroup.Group, cfg *config.ScrapeConfig) ([]*Target, error) {
targets := make([]*Target, 0, len(tg.Targets))

for i, tlset := range tg.Targets {
// tlset 是这个目标独有的标签,tg.Labels 是这个 group 公共的标签
lbls := make([]labels.Label, 0, len(tlset)+len(tg.Labels))

for ln, lv := range tlset {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
for ln, lv := range tg.Labels {
if _, ok := tlset[ln]; !ok {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
}

lset := labels.New(lbls...)

lbls, origLabels, err := populateLabels(lset, cfg)
if err != nil {
return nil, errors.Wrapf(err, "instance %d in group %s", i, tg)
}
if lbls != nil || origLabels != nil {
targets = append(targets, NewTarget(lbls, origLabels, cfg.Params))
}
}
return targets, nil
}

sp.sync方法对比新的Target列表和原来的Target列表,若发现不在原来的Target列表中,则新建该targets的scrapeLoop,通过协程启动

scrapeLoop的run方法,并发采集存储指标.然后判断原来的Target列表是否存在失效的Target,若存在,则移除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// sync takes a list of potentially duplicated targets, deduplicates them, starts
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
var (
uniqueLoops = make(map[uint64]loop)
interval = time.Duration(sp.config.ScrapeInterval) // 指标采集周期
timeout = time.Duration(sp.config.ScrapeTimeout) // 指标采集超时时间
sampleLimit = int(sp.config.SampleLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
}
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
)

sp.targetMtx.Lock()
for _, t := range targets {
hash := t.hash()

// 若发现不在原来的Target列表中,则新建该target的scrapeLoop
if _, ok := sp.activeTargets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
sampleLimit: sampleLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
})

sp.activeTargets[hash] = t
sp.loops[hash] = l

uniqueLoops[hash] = l
} else {
// This might be a duplicated target.
if _, ok := uniqueLoops[hash]; !ok {
uniqueLoops[hash] = nil
}
// Need to keep the most updated labels information
// for displaying it in the Service Discovery web page.
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
}
}

var wg sync.WaitGroup

// Stop and remove old targets and scraper loops.
// 判断原来的Target列表是否存在失效的Target,若存在则移除
for hash := range sp.activeTargets {
if _, ok := uniqueLoops[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(sp.loops[hash])

delete(sp.loops, hash)
delete(sp.activeTargets, hash)
}
}

sp.targetMtx.Unlock()

targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
forcedErr := sp.refreshTargetLimitErr()
for _, l := range sp.loops {
l.setForcedError(forcedErr)
}
// 通过协程启动scrapeLoop的run方法,采集存储指标
for _, l := range uniqueLoops {
if l != nil {
go l.run(interval, timeout, nil)
}
}
// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set.
wg.Wait()
}

sp.sync方法起了一个协程运行scrapePool的run方法去采集并存储监控指标(metrics)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
select {
// 检测超时
case <-time.After(sl.scraper.offset(interval, sl.jitterSeed)):
// Continue after a scraping offset.
// 停止, 退出
case <-sl.ctx.Done():
close(sl.stopped)
return
}

var last time.Time

alignedScrapeTime := time.Now().Round(0)
ticker := time.NewTicker(interval)
defer ticker.Stop()

mainLoop:
for {
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
default:
}

// Temporary workaround for a jitter in go timers that causes disk space
// increase in TSDB.
// See https://github.com/prometheus/prometheus/issues/7846
// Calling Round ensures the time used is the wall clock, as otherwise .Sub
// and .Add on time.Time behave differently (see time package docs).
scrapeTime := time.Now().Round(0)
if AlignScrapeTimestamps && interval > 100*scrapeTimestampTolerance {
// For some reason, a tick might have been skipped, in which case we
// would call alignedScrapeTime.Add(interval) multiple times.
for scrapeTime.Sub(alignedScrapeTime) >= interval {
alignedScrapeTime = alignedScrapeTime.Add(interval)
}
// Align the scrape time if we are in the tolerance boundaries.
if scrapeTime.Sub(alignedScrapeTime) <= scrapeTimestampTolerance {
scrapeTime = alignedScrapeTime
}
}

last = sl.scrapeAndReport(interval, timeout, last, scrapeTime, errc)

select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
case <-ticker.C:
}
}

close(sl.stopped)

if !sl.disabledEndOfRunStalenessMarkers {
sl.endOfRunStaleness(last, ticker, interval)
}
}


// scrapeAndReport performs a scrape and then appends the result to the storage
// together with reporting metrics, by using as few appenders as possible.
// In the happy scenario, a single appender is used.
// This function uses sl.parentCtx instead of sl.ctx on purpose. A scrape should
// only be cancelled on shutdown, not on reloads.
func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last, appendTime time.Time, errc chan<- error) time.Time {
start := time.Now()

// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}

//获取上次scrape(拉取)指标(metric)占用空间
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
defer sl.buffers.Put(b)
//根据上次的占用的空间申请存储空间
buf := bytes.NewBuffer(b)

var total, added, seriesAdded int
var err, appErr, scrapeErr error

app := sl.appender(sl.parentCtx)
defer func() {
...
}()

defer func() {
...
}()

if forcedErr := sl.getForcedError(); forcedErr != nil {
scrapeErr = forcedErr
// Add stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
app.Rollback()
app = sl.appender(sl.parentCtx)
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
if errc != nil {
errc <- forcedErr
}

return start
}

var contentType string
scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, timeout)
//开始scrape(拉取)指标
contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf)
cancel()

if scrapeErr == nil {
b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
// 存储本次scrape拉取磁盘占用的空间,留待下次scrape(拉取)使用
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
} else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr)
if errc != nil {
errc <- scrapeErr
}
}

// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
// 存储指标
total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
if appErr != nil {
app.Rollback()
app = sl.appender(sl.parentCtx)
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
app.Rollback()
app = sl.appender(sl.parentCtx)
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
}

if scrapeErr == nil {
scrapeErr = appErr
}

return start
}

run方法主要实现两个功能:指标采集(scrape)和指标存储.此外,为了实现对象的复用,在采集(scrape)过程中,使用了sync.Pool机制

提高性能,即每次采集(scrape)完成后,都会申请和本次采集(scrape)指标存储空间一样的大小的bytes,加入到buffer中,以备下次指标

采集(scrape)直接使用

最后看一下scrape函数的代码,这个函数其实就是发送http get请求,并把响应结果写入到io.Writer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
if s.req == nil {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return "", err
}
req.Header.Add("Accept", acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", userAgentHeader)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))

s.req = req
}

resp, err := s.client.Do(s.req.WithContext(ctx))
if err != nil {
return "", err
}
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return "", errors.Errorf("server returned HTTP status %s", resp.Status)
}

if resp.Header.Get("Content-Encoding") != "gzip" {
_, err = io.Copy(w, resp.Body)
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}

if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body)
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {
return "", err
}
} else {
s.buf.Reset(resp.Body)
if err = s.gzipr.Reset(s.buf); err != nil {
return "", err
}
}

_, err = io.Copy(w, s.gzipr)
s.gzipr.Close()
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}

至此就完成了指标采集

参考:

https://blog.csdn.net/dengxiafubi/article/details/102996336?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522162311811516780265424839%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=162311811516780265424839&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_v2~rank_v29-2-102996336.pc_v2_rank_blog_default&utm_term=Prometheus%E6%BA%90%E7%A0%81%E7%B3%BB%E5%88%97&spm=1018.2226.3001.4450

https://blog.csdn.net/qq_35753140/article/details/117148565?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522162304613116780269873364%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=162304613116780269873364&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_v2~rank_v29-3-117148565.pc_v2_rank_blog_default&utm_term=Prometheus%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0&spm=1018.2226.3001.4450

https://blog.csdn.net/qq_35753140/article/details/117201128?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522162304613116780269873364%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=162304613116780269873364&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_v2~rank_v29-6-117201128.pc_v2_rank_blog_default&utm_term=Prometheus%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0&spm=1018.2226.3001.4450

-------------本文结束 感谢您的阅读-------------
作者Magiceses
有问题请 留言 或者私信我的 微博
满分是10分的话,这篇文章你给几分