Prometheus 告警规则生成和发送分析

每个人真正强大起来都要度过一段没人帮忙,没人支持的日子。所有事情都是自己一个人撑,所有情绪都是只有自己知道。但只要咬牙撑过去,一切都不一样了。

rules 配置和使用

简介

prometheus不仅可以提供数据采集功能,而且还可以做告警服务,通过匹配的性能参数,发出告警;然后把产生的警报发给Alertmanager进行处理。

但是这需要在Prometheus使用的配置文件中添加关联Alertmanager的组件的对应配置信息

1
2
3
4
5
6
alerting:
alert_relabel_configs:
[ - <relabel_config> ... ]
alertmanagers:
[ - <alertmanager_config> ... ]
# alertmanagers 为 alertmanager_config 数组,

配置范例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
alerting:
alert_relabel_configs: # 动态修改 alert 属性的规则配置。
- source_labels: [dc]
regex: (.+)\d+
target_label: dc1
alertmanagers:
- static_configs:
- targets: ['127.0.0.1:9093'] # 单实例配置
#- targets: ['172.31.10.167:19093','172.31.10.167:29093','172.31.10.167:39093'] # 集群配置
- job_name: 'Alertmanager'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ['localhost:19093']

上面的配置中的 alert_relabel_configs是指警报重新标记在发送到Alertmanager之前应用于警报。 它具有与目标重新标记相同的配置格式和操作,外部标签标记后应用警报重新标记,主要是针对集群配置。

这个设置的用途是确保具有不同外部label的HA对Prometheus服务端发送相同的警报信息。

Alertmanager 可以通过 static_configs 参数静态配置,也可以使用其中一种支持的服务发现机制动态发现,我们上面的配置是静态的单实例,针对集群HA配置,后面会讲。

此外,relabel_configs 允许从发现的实体中选择 Alertmanager,并对使用的API路径提供高级修改,该路径通过 __alerts_path__ 标签公开。

完成以上配置后,重启Prometheus服务,用以加载生效,也可以使用前文说过的热加载功能,使其配置生效。然后通过浏览器,访问 http://192.168.1.220:19090/alerts 就可以看 inactive pending firing 三个状态,没有警报信息是因为我们还没有配置警报规则 rules

告警规则

警报规则 rules 使用的是 yaml 格式进行定义,在Prometheus中通过我们前面讲过的 PromQL 配置实际警报触发条件,Prometheus 会根据设置的警告规则 Ruels 以及配置间隔时间进行周期性计算,当满足触发条件规则会发送警报通知。

警报规则加载的是在 prometheus.yml 文件中进行配置,默认的警报规则进行周期运行计算的时间是1分钟,可以使用 global 中的 evaluation_interval 来决定时间间隔。

样例:

1
2
global:
evaluation_interval: 15s

警报规则可以指定多个文件,也可以自定到自定义的目录下面,为了管理更为便捷,方便阅读,可以把警报规则拆成多份,用以区分环境,系统,服务等,如:prod,test,dev 等等,并且支持以正则表达式定义。

样例:

1
2
3
4
5
6
rule_files:
#- "/data/prometheus/rules/*.yml" # 正则表达式,会加在此目录下所有警报规则配置文件
- "/data/prometheus/rules/ops.yml" # 仅加载ops.yml警报规则文件
#- "/data/prometheus/rules/prod-*.yml"
#- "/data/prometheus/rules/test-*.yml"
#- "/data/prometheus/rules/dev-*.yml"

现在开始讲警报规则 Rules 的定义,格式为YAML。

1
2
3
4
5
6
7
8
9
10
groups:
- name: <string>
rules:
- alert: <string>
expr: <string>
for: [ <duration> | default 0 ]
labels:
[ <lable_name>: <label_value> ]
annotations:
[ <lable_name>: <tmpl_string> ]
参数 描述
- name: <string> 警报规则组的名称
- alert: <string> 警报规则的名称
expr: <string 使用PromQL表达式完成的警报触发条件,用于计算是否有满足触发条件
<lable_name>: <label_value> 自定义标签,允许自行定义标签附加在警报上,比如high warning
annotations: <lable_name>: <tmpl_string> 用来设置有关警报的一组描述信息,其中包括自定义的标签,以及expr计算后的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
groups:
- name: operations
rules:
- alert: node-down
expr: up{env="operations"} != 1
for: 5m
labels:
status: High
team: operations
annotations:
description: "Environment: {{ $labels.env }} Instance: {{ $labels.instance }} is Down ! ! !"
value: '{{ $value }}'
summary: "The host node was down 20 minutes ago"

以上就是一个完整 Rules 的配置,如果Prometheus 在周期检测中使用PromQ以env=operations为维度查询,如果当前查询结果中具有标签operations,且返回值都不等于1的时候,发送警报。
对于写好的 Rules 可以是常用 promtoolcheck ruls.yml的书写格式是否正确。

1
2
3
/usr/local/bin/promtool check rules /data/prometheus/rules/ops.yml
Checking /data/prometheus/rules/ops.yml
SUCCESS: 7 rules found

对于修改好的rules文件,保存以后,经过检测没有问题,直接重新热加载 Prometheus就可以在页面看到了。对于触发警报规则,比较简单了,直接修改运算值或者去停掉 node-exporter 服务,便可在界面看到警报信息。一个警报在生命周期会有三种状态

状态 描述
Inactive 正常状态,未激活警报
Pending 已满足触发条件,但没有满足发送时间条件,此条件就是上面rules范例中的 for 5m 子句中定义的持续时间
Firing 满足条件,且超过了 for 子句中的的指定持续时间5m

带有for子句的警报触发以后首先会先转换成 Pending 状态,然后在转换为 Firing 状态。这里需要俩个周期才能触发警报条件,如果没有设置 for 子句,会直接 Inactive 状态转换成 Firing状态,然后触发警报,发送给 Receiver 设置的通知人。

在运行过程中,Prometheus会把Pending或Firing状态的每一个警报创建一个 Alerts指标名称,这个可以通过Rules来触发警报测试,直接在UI中Graph查看指标 ALERTS,格式如下:

1
ALERTS{alertname="alert name",alertstate="pending|firing",<additional alert label>}

img

当警报处于激活状态 Pending 或者 Firing时候,如上图所示,样本值为1。其他状态为0。则不显示。上图已经触发警报,其警报已经被转发给Alertmanager组件,此时可以在浏览器上通过可以用过9093端口访问,查看警报状态。

img

现在我们来说一下整理下Prometheus从收集监控指标信息到触发警报的过程:

状态 描述
1.定义规则 在Prometheus配置中,scrape_interval: 15s,默认是1分钟,这个定义是收集监控指标信息的采集周期,同时配置对应的警报规则,可以是全局,也可以单独为某一个metrics定义
2.周期计算 对于表达式进行计算时,Prometheus中的配置中配置了 evaluation_interval: 15s,默认也是一分钟,为警报规则的计算周期,evaluation_interval 只是全局计算周期值。
3.1警报状态转换(pending) 当首次触发警报规则条件成立,表达式为 true,并且没有满足警报规则中的for子句中的持续时间时,警报状态切换为 Pending
3.2警报状态转换(firing) 若下一个计算周期中,表达式仍为 true,并且满足警报规则中的for子句的持续时间时,警报状态转换为 Firing,即为 active,警报会被Prometheus推送到ALertmanager组件
3.3警报状态转换(period) 如果在 evaluation_interval 的计算周期内,表达式还是为 true,同时满足 for子句的持续时间,持续转发到Alertmanager,这里只是转发状态到Alertmanager,并不是直接发送通知到指定通知源
3.4警报状态转换(resolve) 只到某个周期,表达式 为 false,警报状态会变成 inactive ,并且会有一个 resolve 被发送到Alertmanager,用于说明警报故障依解决,发送resolve信息需要自己单独在Alertmanager中定义

Rules 类型

Prometheus 支持两种类型的 Rules ,可以对其进行配置,然后定期进行运算:recording rules 记录规则 与 alerting rules 警报规则,规则文件的计算频率与警报规则计算频率一致,都是通过全局配置中的 evaluation_interval 定义。

alerting rules

要在Prometheus中使用Rules规则,就必须创建一个包含必要规则语句的文件,并让Prometheus通过Prometheus配置中的rule_files字段加载该文件,前面我们已经讲过了。

其实语法都一样,除了 recording rules 中的收集的指标名称 record: <string> 字段配置方式略有不同,其他都是一样的。

样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- alert: ServiceDown
expr: avg_over_time(up[5m]) * 100 < 50
annotations:
description: The service {{ $labels.job }} instance {{ $labels.instance }} is
not responding for more than 50% of the time for 5 minutes.
summary: The service {{ $labels.job }} is not responding
- alert: RedisDown
expr: avg_over_time(redis_up[5m]) * 100 < 50
annotations:
description: The Redis service {{ $labels.job }} instance {{ $labels.instance
}} is not responding for more than 50% of the time for 5 minutes.
summary: The Redis service {{ $labels.job }} is not responding
- alert: PostgresDown
expr: avg_over_time(pg_up[5m]) * 100 < 50
annotations:
description: The Postgres service {{ $labels.job }} instance {{ $labels.instance
}} is not responding for more than 50% of the time for 5 minutes.
summary: The Postgres service {{ $labels.job }} is not responding

recording rules

recording rules 是提前设置好一个比较花费大量时间运算或经常运算的表达式,其结果保存成一组新的时间序列数据。当需要查询的时候直接会返回已经计算好的结果,这样会比直接查询快,同时也减轻了PromQl的计算压力,同时对可视化查询的时候也很有用,可视化展示每次只需要刷新重复查询相同的表达式即可。

在配置的时候,除却 record: <string> 需要注意,其他的基本上是一样的,一个 groups 下可以包含多条规则 rulesRecordingRules 保存在 group 内,Group 中的规则以规则的配置时间间隔顺序运算,也就是全局中的 evaluation_interval 设置。

样例:

1
2
3
4
5
6
7
8
9
10
11
groups:
- name: http_requests_total
rules:
- record: job:http_requests_total:rate10m
expr: sum by (job)(rate(http_requests_total[10m]))
lables:
team: operations
- record: job:http_requests_total:rate30m
expr: sum by (job)(rate(http_requests_total[30m]))
lables:
team: operations

上面的规则其实就是根据 record 规则中的定义,Prometheus 会在后台完成 expr 中定义的 PromQL 表达式周期性运算,以 job 为维度使用 sum 聚合运算符 计算 函数ratehttp_requests_total 指标区间 10m 内的增长率,并且将计算结果保存到新的时间序列 job:http_requests_total:rate10m 中,

同时还可以通过 labels 为样本数据添加额外的自定义标签,但是要注意的是这个 Lables 一定存在当前表达式 Metrics 中。

使用模板

模板是在警报中使用时间序列标签和值展示的一种方法,可以用于警报规则中的注释(annotation)与标签(lable)。模板其实使用的go语言的标准模板语法,并公开一些包含时间序列标签和值的变量。这样查询的时候,更具有可读性,也可以执行其他PromQL查询来向警报添加额外内容,ALertmanager Web UI中会根据标签值显示器警报信息。

1
2
3
{{ $lable.<lablename>}} 可以获取当前警报实例中的指定标签值

{{ $value }} 变量可以获取当前PromQL表达式的计算样本值。
1
2
3
4
5
6
7
8
9
10
11
12
13
groups:
- name: operations
rules:
# monitor node memory usage
- alert: node-memory-usage
expr: (1 - (node_memory_MemAvailable_bytes{env="operations",job!='atlassian'} / (node_memory_MemTotal_bytes{env="operations"})))* 100 > 90
for: 1m
labels:
status: Warning
team: operations
annotations:
description: "Environment: {{ $labels.env }} Instance: {{ $labels.instance }} memory usage above {{ $value }} ! ! !"
summary: "node os memory usage status"

调整好rules以后,我们可以使用 curl -XPOST http://localhost:9090/-/reload 或者 对Prometheus服务重启,让警报规则生效。

这个时候,我们可以把阈值调整为 50 来进行故障模拟操作,这时在去访问UI的时候,当持续1分钟满足警报条件,实际警报状态已转换为 Firing,可以在 Annotations中看到模板信息 summarydescription 已经成功显示。

img

需要注意的是,一个稳定健壮的Prometheus监控系统中,要尽量使用模板化,这样会降低性能开销(Debug调试信息等),同时也易于维护。

下面网站收录了当前大部分的rules规则,大家可以对应自己的环境,配置相关服务的Rules。

Prometheus警报规则收集(https://awesome-prometheus-alerts.grep.to/)

rules 源码分析

入口还是在 prometheus 的 main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// notifier 是用于向 alertmanager 发送告警的
notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))

// 声明 ruleManager
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
// 若触发告警规则,则通过sendAlerts发送告警信息
NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})

主要看下这里两个 manager 的初始化,ruleManager 主要是根据定义的告警规则进行定时的计算,notifierManager 主要是在生成告警之后会告警的内容进行发送,与 alertmanager 进行交互,将告警信息交给 alertmanager 进行处理之后再根据配置的发送方式,将内容发送给 receivers。下面分别看下:

ruleManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
name: "rules",
reloader: func(cfg *config.Config) error {
// Get all rule files matching the configuration paths.
var files []string
// 列出配置的所有rules文件,比如 /tmp/*.rules
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
return errors.Wrapf(err, "error retrieving rule files for %s", pat)
}
files = append(files, fs...)
}
// files["/tmp/kubelet.rules","nodes.rules"]
return ruleManager.Update(
time.Duration(cfg.GlobalConfig.EvaluationInterval),
files,
cfg.GlobalConfig.ExternalLabels,
)
},
},

遍历配置的苏欧欧 rules 文件,进行解析,并将其加入 files 切片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error {
m.mtx.Lock()
defer m.mtx.Unlock()

groups, errs := m.LoadGroups(interval, externalLabels, files...)
if errs != nil {
for _, e := range errs {
level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
m.restored = true

var wg sync.WaitGroup
for _, newg := range groups {
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
gn := GroupKey(newg.file, newg.name)
oldg, ok := m.groups[gn]
delete(m.groups, gn)

if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}

wg.Add(1)
// 每一个 group 开一个协程去 run
go func(newg *Group) {
if ok {
oldg.stop()
newg.CopyState(oldg)
}
wg.Done()
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
newg.run(m.opts.Context)
}(newg)
}

// Stop remaining old groups.
wg.Add(len(m.groups))
for n, oldg := range m.groups {
go func(n string, g *Group) {
g.markStale = true
g.stop()
if m := g.metrics; m != nil {
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
m.GroupRules.DeleteLabelValues(n)
m.GroupSamples.DeleteLabelValues((n))
}
wg.Done()
}(n, oldg)
}

wg.Wait()
m.groups = groups

return nil
}

结合上面的初始化看,到这里为止主要做了如下几件事情:

  1. 列出配置的所有rules文件
  2. 解析rules文件
  3. 按groupName分组
  4. group.Run(),此处会启动定时任务,按照配置的频率evaluation_interval执行告警或者汇总规则

看下 run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
func (g *Group) run(ctx context.Context) {
defer close(g.terminated)

// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
select {
case <-time.After(time.Until(evalTimestamp)):
case <-g.done:
return
}

ctx = promql.NewOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"file": g.File(),
"name": g.Name(),
},
})

iter := func() {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()

start := time.Now()
// 将循环当前group的所有rule,并执行eval
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)

g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}

// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.interval` occurrence.
tick := time.NewTicker(g.interval)
defer tick.Stop()

defer func() {
if !g.markStale {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
// Wait for 2 intervals to give the opportunity to renamed rules
// to insert new series in the tsdb. At this point if there is a
// renamed rule, it should already be started.
select {
case <-g.managerDone:
case <-time.After(2 * g.interval):
g.cleanupStaleSeries(ctx, now)
}
}(time.Now())
}()

// 第一次运行iter,先进行一次rules判断
iter()
// 如果我们需要恢复的话,我们等待另一个 Eval 完成。
// 原因是,在第一次评估期间(或之前),我们可能没有充足的数据,
// 并且记录规则没有更新某些警报可能依赖的最新值
if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it)
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter()
}

g.RestoreForState(time.Now())
g.shouldRestore = false
}

for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
// 之后按照 interval 周期循环rules判断是否发送告警
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter()
}
}
}
}

这里面主要就是根据rules的内容进行定时的计算,判断是否需要生成告警信息,然后将其交给 notifierManager 组件进行处理

通过g.Eval(),遍历rules去匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
var samplesTotal float64
// 循环所有的rules
for i, rule := range g.rules {
select {
case <-g.done:
return
default:
}

// 循环所有的rules,将每个rule拿出来执行匿名函数
func(i int, rule Rule) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
sp.SetTag("name", rule.Name())
defer func(t time.Time) {
sp.Finish()

since := time.Since(t)
g.metrics.EvalDuration.Observe(since.Seconds())
rule.SetEvaluationDuration(since)
rule.SetEvaluationTimestamp(t)
}(time.Now())

g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

// AlertingRule.Eval(),通过Engine拉取数据,计算告警表达式,创建告警状态 pending or firing
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
if _, ok := err.(promql.ErrQueryCanceled); !ok {
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
}
return
}
samplesTotal += float64(len(vector))

// 判断是否需要发送告警给 alertmanager
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
var (
numOutOfOrder = 0
numDuplicates = 0
)

app := g.opts.Appendable.Appender(ctx)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
if err := app.Commit(); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
return
}
g.seriesInPreviousEval[i] = seriesReturned
}()

for _, s := range vector {
if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)

switch errors.Cause(err) {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
default:
level.Warn(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
}
} else {
seriesReturned[s.Metric.String()] = s.Metric
}
}
if numOutOfOrder > 0 {
level.Warn(g.logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder)
}
if numDuplicates > 0 {
level.Warn(g.logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates)
}

for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("msg", "Adding stale sample failed", "sample", metric, "err", err)
}
}
}
}(i, rule)
}
if g.metrics != nil {
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
}
g.cleanupStaleSeries(ctx, ts)
}

AlertingRule.Eval(),通过Engine拉取数据,计算告警表达式,创建告警状态pending or firing,然后判断是否需要发送告警给 alertmanager

如果需要发送的话,执行 sendAlerts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, r.vector.String(), alerts...)
}

可以看到这边就是将 alert 整合一下,通过 notifyFunc 发送出去,那么这里的 notifyFunc 是什么呢?还记得我们初始化的时候定义的变量吗,回忆一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
// 若触发告警规则,则通过sendAlerts发送告警信息
NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})

没错就是 sendAlerts(notifierManager, cfg.web.ExternalURL.String()) ,具体实现在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// sendAlerts implements the rules.NotifyFunc for a Notifier.
// sendAlerts方法主要作用是把规则管理(ruleManager)把告警信息转换成notifier.Alert类型
// 遍历告警信息,构造告警,告警信息长度大于0, 则发送告警
func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
var res []*notifier.Alert

// 遍历告警信息
for _, alert := range alerts {
a := &notifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
// 若告警结束,设置告警结束时间为ResolverdAt时间
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
// 若告警还是active状态,设置告警结束时间为当前时间
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}

// 若是有告警信息, 则发送告警
if len(alerts) > 0 {
s.Send(res...)
}
}
}

可以看到这里的 sender 就是 norifierManager ,规则管理(ruleManager)把告警信息转换成notifier.Alert类型,遍历告警信息,如果存在告警,则发送告警。

notifier.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Manager) Send(alerts ...*Alert) {
n.mtx.Lock()
defer n.mtx.Unlock()

// Attach external labels before relabelling and sending.
// 根据配置文件prometheus.yml的alert_relabel_configs下的relabel_config对告警的label进行重置
for _, a := range alerts {
lb := labels.NewBuilder(a.Labels)

for _, l := range n.opts.ExternalLabels {
if a.Labels.Get(l.Name) == "" {
lb.Set(l.Name, l.Value)
}
}

a.Labels = lb.Labels()
}

alerts = n.relabelAlerts(alerts)
if len(alerts) == 0 {
return
}

// Queue capacity should be significantly larger than a single alert
// batch could be.
// 若待告警信息的数量大于队列总容量,则移除待告警信息中最早的告警信息, 依据的规则是先进先移除
if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
alerts = alerts[d:]

level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
n.metrics.dropped.Add(float64(d))
}

// If the queue is full, remove the oldest alerts in favor
// of newer ones.
// 若队列中已有的告警信息和待发送的告警信息大于队列的总容量,则从队列中移除最早的告警信息, 依据是先进先移除
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
n.queue = n.queue[d:]

level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d)
n.metrics.dropped.Add(float64(d))
}
// 把待发送的告警信息加入队列
n.queue = append(n.queue, alerts...)

// Notify sending goroutine that there are alerts to be processed.
// 告知通知管理(notifierManager)有告警信息需要处理
// setMore方法相当于一个触发器,向管道n.more发送触发信息, 告知通知管理(notifierManager)有告警信息需要处理,
// 是连接规则管理(ruleManager)和通知管理(notifierManager)的桥梁
n.setMore()
}

下面看下 notifierManager 的处理

notifierManager

1
2
// notifier 是用于向 alertmanager 发送告警的
notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))
1
2
3
4
{
name: "notify",
reloader: notifierManager.ApplyConfig,
{
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// ApplyConfig updates the status state as the new config requires.
func (n *Manager) ApplyConfig(conf *config.Config) error {
n.mtx.Lock()
defer n.mtx.Unlock()

// 配置文件prometheus.yml中global下的external_labels, 用于外部系统标签的,不是用于metrics数据
n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels
// 配置文件prometheus.yml中alertingl下alert_relabel_configs, 动态修改 alert 属性的规则配置
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs

amSets := make(map[string]*alertmanagerSet)

// 遍历告警相关的配置,即配置文件prometheus.yml的alerting
for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() {
// 把alerting下每个配置项, 转换成结构实例:alertmanagerSet
ams, err := newAlertmanagerSet(cfg, n.logger, n.metrics)
if err != nil {
return err
}

amSets[k] = ams
}

n.alertmanagers = amSets

return nil
}

看下正式启动的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func() error {
// When the notifier manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager
// so we wait until the config is fully loaded.
<-reloadReady.C

notifierManager.Run(discoveryManagerNotify.SyncCh())
level.Info(logger).Log("msg", "Notifier manager stopped")
return nil
},
func(err error) {
notifierManager.Stop()
},
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Run dispatches notifications continuously.
/*
Run方法, 监听n.more管道. 若规则管理(ruleManager)向管道n.more发送信号,
告知通知管理(notifierManager)有告警信息需要发送,则触发接下来的告警信息处理

Run方法和指标采集(scrapeManager)的Run方法共用服务发现 (serviceDiscover)的处理逻辑,
检测目标(targets)是否有变动, 不同的是通知管理(notifierManager)只监听告警服务的变动

指标采集(scrapeManager)的tset为map类型, key为job_name, value为targetgroup.Group.
而通知管理(notifierManager)的tset也为map类型,不同的是key为AlertmanagerConfig, value为targetgroup.Group
*/

func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {

for {
select {
case <-n.ctx.Done():
return
// 发现告警服务有更新,重新加载配置. 参考服务发现(discoveryManager)
case ts := <-tsets:
n.reload(ts)
// 告警信号,这个channel在ruleManager服务在产生告警时,会发出信号
case <-n.more:
}
// 利用告警队列, 获取告警信息
alerts := n.nextBatch()

// 如果全部am都没发送成功,就记录丢弃数量指标
if !n.sendAll(alerts...) {
n.metrics.dropped.Add(float64(len(alerts)))
}
// If the queue still has items left, kick off the next iteration.
// 若告警队列中还有告警信息,则再次想n.more传入信号,发送告警信息
if n.queueLen() > 0 {
n.setMore()
}
}
}

这里就是通过 n.more channel与 ruleManager 做交互,当那边产生告警的时候,这边会接收到这个信号,进行处理,然后发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
/*
将告警发送给全部当前配置的 Alertmanager。至少成功发送给一个 Alertmanager 就返回 true。
1. 如果 alerts 为空就返回 true
2. 记录函数执行开始时间
3. 声明 v1Payload, v2Payload 两个字节数组,他们是 alerts 序列化的结果,提前声明是作为缓存,避免在循环中反复声明降低性能
4. 加锁读取 Alertmanager 集合
5. 声明一个 WaitGroup 用于同步等待每个 am 都发送完毕后退出函数
6. 循环 amSets 中的每个 ams,amSet 是不同的服务发现方式配置的am集合,每个 amSet 里面可能有多个 am
7. 根据 ams 的 API 版本对 alerts 进行序列化成 payload
8. 循环 ams 中的每个 am,启一个 goroutine,调用 sendOne() 函数将 payload 发送过去,根据成功或者失败的结果记录观测指标
*/

func (n *Manager) sendAll(alerts ...*Alert) bool {
if len(alerts) == 0 {
return true
}

begin := time.Now()

// v1Payload and v2Payload represent 'alerts' marshaled for Alertmanager API
// v1 or v2. Marshaling happens below. Reference here is for caching between
// for loop iterations.
var v1Payload, v2Payload []byte

n.mtx.RLock()
// 获取当前最新的告警服务列表
amSets := n.alertmanagers
n.mtx.RUnlock()

var (
wg sync.WaitGroup
numSuccess atomic.Uint64
)
// 遍历告警服务列表
for _, ams := range amSets {
var (
payload []byte
err error
)

ams.mtx.RLock()

switch ams.cfg.APIVersion {
case config.AlertmanagerAPIVersionV1:
{
if v1Payload == nil {
v1Payload, err = json.Marshal(alerts)
if err != nil {
level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v1 failed", "err", err)
ams.mtx.RUnlock()
return false
}
}

payload = v1Payload
}
case config.AlertmanagerAPIVersionV2:
{
if v2Payload == nil {
openAPIAlerts := alertsToOpenAPIAlerts(alerts)

v2Payload, err = json.Marshal(openAPIAlerts)
if err != nil {
level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v2 failed", "err", err)
ams.mtx.RUnlock()
return false
}
}

payload = v2Payload
}
default:
{
level.Error(n.logger).Log(
"msg", fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", ams.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions),
"err", err,
)
ams.mtx.RUnlock()
return false
}
}

for _, am := range ams.ams {
wg.Add(1)

ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout))
defer cancel()

// 起一个协程发送告警信息
go func(client *http.Client, url string) {
if err := n.sendOne(ctx, client, url, payload); err != nil {
level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
n.metrics.errors.WithLabelValues(url).Inc()
} else {
numSuccess.Inc()
}
n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds())
n.metrics.sent.WithLabelValues(url).Add(float64(len(alerts)))

wg.Done()
}(ams.client, am.url().String())
}

ams.mtx.RUnlock()
}

wg.Wait()

// 若至少成功发送给一个告警服务,则返回true
return numSuccess.Load() > 0
}

这里发送成功之后就到 alertmanager 服务了,如果发送失败了,就将这个指标丢弃了,若告警队列中还有告警信息,则再次想n.more传入信号,再次发送告警信息处理。

参考:

Prometheus 实战于源码分析之alert

Prometheus监控神器-Rules篇

-------------本文结束 感谢您的阅读-------------
作者Magiceses
有问题请 留言 或者私信我的 微博
满分是10分的话,这篇文章你给几分