Alertmanager 使用原理和源码分析

真正的坚强,是属于那些夜晚在被窝里哭泣,而白天却若无其事的人。未曾深夜痛哭过的人,不足以谈论人生。

基本概述

我们先从应用的角度来看详细的介绍一下alertmanager以下简称am,以下是官方文档介绍。

The Alertmanager handles alerts sent by client applications such as the Prometheus server. It takes care of deduplicating, grouping, and routing them to the correct receiver integrations such as email, PagerDuty, or OpsGenie. It also takes care of silencing and inhibition of alerts.

翻译一下就是,负责处理接受client(例如prometheus)发送的告警消息,包括重复告警的发送、聚合、发给相关人员,并且支持多种方式例如email或者pagerduty这种第三方通知告警平台,同时他还提供了静音以及告警抑制的功能。

这些功能基本涵盖了目前各大公司的告警痛点,重复告警(告警发生了但是一直也没人处理)、告警风暴(某次版本上线导致的大量服务机器指标异常)、告警信息重复(例如机器宕机之后又收到了网络不通的告警)。

这里注意下,prometheus族包括am他们的实现思路都是基于label来做的,后面会从代码层面详细介绍下

实现一个完整的监控体系需要以下几个功能:

  • 数据采集(xxx_export)
  • 数据抓取(prometheus)
  • 数据存储(prometheus/cortex)
  • 规则检测并生成告警(prometheus/cotex.ruler)
  • 告警处理(alertmanager)
  • 告警通知(一般根据自身业务和管理体系实现)

Alertmanager实现了告警处理(聚合、抑制、屏蔽、路由)

基本流程如下:

1. Prometheus Server监控目标主机上暴露的http接口(这里假设接口A),通过上述Promethes配置的’scrape_interval’定义的时间间隔,定期采集目标主机上监控数据。

2. 当接口A不可用的时候,Server端会持续的尝试从接口中取数据,直到”scrape_timeout”时间后停止尝试。这时候把接口的状态变为“DOWN”。

3. Prometheus同时根据配置的”evaluation_interval”的时间间隔,定期(默认1min)的对Alert Rule进行评估;当到达评估周期的时候,发现接口A为DOWN,即UP=0为真,激活Alert,进入“PENDING”状态,并记录当前active的时间;

4. 当下一个alert rule的评估周期到来的时候,发现UP=0继续为真,然后判断警报Active的时间是否已经超出rule里的‘for’ 持续时间,如果未超出,则进入下一个评估周期;如果时间超出,则alert的状态变为“FIRING”;同时调用Alertmanager接口,发送相关报警数据。

5. AlertManager收到报警数据后,会将警报信息进行分组,然后根据alertmanager配置的“group_wait”时间先进行等待。等wait时间过后再发送报警信息。

6. 属于同一个Alert Group的警报,在等待的过程中可能进入新的alert,如果之前的报警已经成功发出,那么间隔“group_interval”的时间间隔后再重新发送报警信息。比如配置的是邮件报警,那么同属一个group的报警信息会汇总在一个邮件里进行发送。

7. 如果Alert Group里的警报一直没发生变化并且已经成功发送,等待‘repeat_interval’时间间隔之后再重复发送相同的报警邮件;如果之前的警报没有成功发送,则相当于触发第6条条件,则需要等待group_interval时间间隔后重复发送。同时最后至于警报信息具体发给谁,满足什么样的条件下指定警报接收人,设置不同报警发送频率,这里有alertmanager的route路由规则进行配置。

告警发送通知流程-1:

https://note.youdao.com/yws/public/resource/08ef03888558443a04d7f3e39c2d975d/xmlnote/WEBRESOURCEe71833b5f301818c2524048ed5ed436d/24641

告警发送通知流程-2:

https://note.youdao.com/yws/public/resource/08ef03888558443a04d7f3e39c2d975d/xmlnote/WEBRESOURCE643c3bb348a7477a11e20d8740e182dd/24645

实现架构

img

配置文件

alertmanager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
global:
# The smarthost and SMTP sender used for mail notifications.
smtp_smarthost: 'smtp.qq.com:465'
smtp_from: '447040949@qq.com'
smtp_auth_username: '447040949@qq.com'
smtp_auth_password: 'nihao206206#'
# The auth token for Hipchat.
hipchat_auth_token: '1234556789'
# Alternative host for Hipchat.
hipchat_url: 'https://hipchat.foobar.org/'

# The directory from which notification templates are read.
templates:
- '/etc/alertmanager/template/*.tmpl'

# The root route on which each incoming alert enters.
route:
# The labels by which incoming alerts are grouped together. For example,
# multiple alerts coming in for cluster=A and alertname=LatencyHigh would
# be batched into a single group.
group_by: ['alertname', 'cluster', 'service']

# When a new group of alerts is created by an incoming alert, wait at
# least 'group_wait' to send the initial notification.
# This way ensures that you get multiple alerts for the same group that start
# firing shortly after another are batched together on the first
# notification.
group_wait: 30s

# When the first notification was sent, wait 'group_interval' to send a batch
# of new alerts that started firing for that group.
group_interval: 5m

# If an alert has successfully been sent, wait 'repeat_interval' to
# resend them.
repeat_interval: 3h

# A default receiver
receiver: team-X-mails

# All the above attributes are inherited by all child routes and can
# overwritten on each.

# The child route trees.
routes:
# This routes performs a regular expression match on alert labels to
# catch alerts that are related to a list of services.
- match_re:
service: ^(foo1|foo2|baz)$
receiver: team-X-mails
# The service has a sub-route for critical alerts, any alerts
# that do not match, i.e. severity != critical, fall-back to the
# parent node and are sent to 'team-X-mails'
routes:
- match:
severity: critical
receiver: team-X-pager
- match:
service: files
receiver: team-Y-mails

routes:
- match:
severity: critical
receiver: team-Y-pager

# This route handles all alerts coming from a database service. If there's
# no team to handle it, it defaults to the DB team.
- match:
service: database
receiver: team-DB-pager
# Also group alerts by affected database.
group_by: [alertname, cluster, database]
routes:
- match:
owner: team-X
receiver: team-X-pager
- match:
owner: team-Y
receiver: team-Y-pager


# Inhibition rules allow to mute a set of alerts given that another alert is
# firing.
# We use this to mute any warning-level notifications if the same alert is
# already critical.
inhibit_rules:
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
# Apply inhibition if the alertname is the same.
equal: ['alertname', 'cluster', 'service']


receivers:
- name: 'team-X-mails'
webhook_configs:
- url: 'http://u2.kugou.net:11770/sendRtxByPost'

- name: 'team-X-pager'
email_configs:
- to: 'team-X+alerts-critical@example.org'
pagerduty_configs:
- service_key: <team-X-key>

- name: 'team-Y-mails'
email_configs:
- to: 'team-Y+alerts@example.org'

- name: 'team-Y-pager'
pagerduty_configs:
- service_key: <team-Y-key>

- name: 'team-DB-pager'
pagerduty_configs:
- service_key: <team-DB-key>

- name: 'team-X-hipchat'
hipchat_configs:
- auth_token: <auth_token>
room_id: 85
message_format: html
notify: true

参数说明

  • global
    smtp_smarthost、smtp_from、smtp_auth_username、smtp_auth_password用于设置smtp邮件的地址及用户信息
    hipchat_auth_token与安全性认证有关

  • templates
    指定告警信息展示的模版

  • route
    group_by:指定所指定的维度对告警进行分组
    group_wait:指定每组告警发送等待的时间
    group_interval:指定告警调度的时间间隔
    repeat_interval:在连续告警触发的情况下,重复发送告警的时间间隔

  • receiver
    指定告警默认的接受者
  • routes
    match_re:定义告警接收者的匹配方式
    service:定义匹配的方式,纬度service值以foo1或foo2或baz开始/结束时表示匹配成功
    receiver:定义了匹配成功的的情况下的接受者
  • inhibit_rules
    定义告警的抑制条件,过滤不必要的告警
  • receivers
    定义了具体的接收者,也就是告警具体的方式方式

prometheus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# my global config
global:
scrape_interval: 15s # By default, scrape targets every 15 seconds.
evaluation_interval: 15s # By default, scrape targets every 15 seconds.
# scrape_timeout is set to the global default (10s).

# Attach these labels to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager).
external_labels:
monitor: 'codelab-monitor'

# Load and evaluate rules in this file every 'evaluation_interval' seconds.
rule_files:
# - "first.rules"
# - "second.rules"
- "alert.rules"
# - "record.rules"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'windows-test'

# Override the global default and scrape targets from this job every 5 seconds.
scrape_interval: 1s

# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.

static_configs:
- targets: ['192.168.3.1:9090','192.168.3.120:9090']

- job_name: 'windows-chenx'

# Override the global default and scrape targets from this job every 5 seconds.
scrape_interval: 3s

# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.

static_configs:
- targets: ['192.168.3.1:9091']

参数说明:

  • global下的scrape_interval
    用于向pushgateway采集数据的频率,上图所示:每隔15秒向pushgateway采集一次指标数据
  • global下的evaluation_interval
    表示规则计算的频率,上图所示:每隔15秒根据所配置的规则集,进行规则计算
  • global下的external_labels
    为指标增加额外的维度,可用于区分不同的prometheus,在应用中多个prometheus可以对应一个alertmanager
  • rule_files
    指定所配置规则文件,文件中每行可表示一个规则
  • scrape_configs下的job_name
    指定任务名称,在指标中会增加该维度,表示该指标所属的job
  • scrape_configs下的scrape_interval
    覆盖global下的scrape_interval配置
  • static_configs下的targets
    指定指标数据源的地址,多个地址之间用逗号隔开

功能介绍

告警路由

路由字段即route的配置他控制了告警的聚合以及发送频率,route字段本身是一个树状的,每一个节点是一个配置,配置包括了接收人、match字段以及相应的告警发送配置例如:首次聚合时间、告警后续发送频率。对于到达的告警会收心进行match,如果有一个节点match并且该节点的continue字段为true,那么会继续递归遍历它的子节点,直到到达最后一个,这样就把匹配到同一个节点的告警经过group_by字段的分类,放置在不同的里面,这样就完成了告警的聚合功能。

这里有一点需要着重指出的一点是,对于每个簇,有三个字段影响了告警发送的频率,group_wait、group_interval、repeat_interval

group_wait:当告警A第一次到达之后由于之前并没有告警簇,此时会进行创建,创建完之后会等待group_wait时间之后才会进行发送,这是为什么呢?这其实是为了解决告警风暴的问题,例如当服务集群a发生了告警,例如有10条,如果他们在group_wait这段时间内相继到达,那么最终他们就会被合并成一条告警进行发送。而不是收到10次告警信息。

group_interval:控制的是遍历告警簇的时间间隔,am当中当有新的告警到达时(之前没有进行过发生的告警)会进行告警簇的发送或者当检测到上次告警发送时间距离当前时间已经大于repeat_interval那么此时会进行发送。

路由配置格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#报警接收器
[ receiver: <string> ]

#分组
[ group_by: '[' <labelname>, ... ']' ]

# Whether an alert should continue matching subsequent sibling nodes.
[ continue: <boolean> | default = false ]

# A set of equality matchers an alert has to fulfill to match the node.
#根据匹配的警报,指定接收器
match:
[ <labelname>: <labelvalue>, ... ]

# A set of regex-matchers an alert has to fulfill to match the node.
match_re:
#根据匹配正则符合的警告,指定接收器
[ <labelname>: <regex>, ... ]

# How long to initially wait to send a notification for a group
# of alerts. Allows to wait for an inhibiting alert to arrive or collect
# more initial alerts for the same group. (Usually ~0s to few minutes.)
[ group_wait: <duration> ]

# How long to wait before sending notification about new alerts that are
# in are added to a group of alerts for which an initial notification
# has already been sent. (Usually ~5min or more.)
[ group_interval: <duration> ]

# How long to wait before sending a notification again if it has already
# been sent successfully for an alert. (Usually ~3h or more).
[ repeat_interval: <duration> ]

# Zero or more child routes.
routes:
[ - <route> ... ]

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# The root route with all parameters, which are inherited by the child
# routes if they are not overwritten.
route:
receiver: 'default-receiver'
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
group_by: [cluster, alertname]
# All alerts that do not match the following child routes
# will remain at the root node and be dispatched to 'default-receiver'.
routes:
# All alerts with service=mysql or service=cassandra
# are dispatched to the database pager.
- receiver: 'database-pager'
group_wait: 10s
match_re:
service: mysql|cassandra
# All alerts with the team=frontend label match this sub-route.
# They are grouped by product and environment rather than cluster
# and alertname.
- receiver: 'frontend-pager'
group_by: [product, environment]
match:
team: frontend

告警聚合

告警聚合是非常重要的一个功能,好的聚合可以极大的减少告警风暴。

告警聚合在路由之后,每个路由节点可以配置自己的独特的聚合labels,比如按产品、集群、team等聚合。

例如上述示例,只按产品聚合,那么所有属于产品tcs的告警将与该告警一起聚合,所有属于产品tcs的告警都会放到一起,这种比较杂乱;如果按产品和类型聚合,那么属于不同机器的告警将分别聚合。

告警抑制

告警抑制是指高等级告警发生时,自动抑制低等级的告警发送,同时当高等级告警恢复是,放开对低等级的抑制,常见场景是磁盘满80%警告告警通知,90%发送严重告警通知。

告警抑制的实现也是基于labels,但是是基于全局的,不是特定路由,而且只支持静态label,这个地方的设计其实不太好,有两个问题:

  1. 全局容易出现不同用户的规则互相影响,为了减少此种行为的发生,我们应该为每个路由设定一个抑制规则,同时必须包含路由的labels
  2. 静态label对label规范化增加了不必要的限制,所有数据都必须拥有指定的抑制labels才能使用

告警屏蔽

告警屏蔽和告警抑制有点类似,但不一样,告警屏蔽是直接屏蔽报警,不再发送,不同报警之间无关联(告警抑制是高等级告警抑制低等级告警)。告警屏蔽也是基于labels,当告警中包含的labels满足(match)配置的屏蔽labels,就会发生屏蔽,不再发送告警。

原生的alertmanger屏蔽会直接在内存中屏蔽告警,无法记录到底哪些告警被屏蔽了,而且一旦屏蔽,及时报警恢复也不会发送通知,一旦设定,只能等屏蔽过期或者手工删除。

370对记录告警是个强需求,我们改造了alertmanager,让被屏蔽的告警也能继续发出来,但会加一个特殊的标记,这样我们就可以记录被屏蔽告警的信息,也可以捕获恢复。

抑制配置格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Matchers that have to be fulfilled in the alerts to be muted.
##必须在要需要静音的警报中履行的匹配者
target_match:
[ <labelname>: <labelvalue>, ... ]
target_match_re:
[ <labelname>: <regex>, ... ]

# Matchers for which one or more alerts have to exist for the
# inhibition to take effect.
#必须存在一个或多个警报以使抑制生效的匹配者。
source_match:
[ <labelname>: <labelvalue>, ... ]
source_match_re:
[ <labelname>: <regex>, ... ]

# Labels that must have an equal value in the source and target
# alert for the inhibition to take effect.
#在源和目标警报中必须具有相等值的标签才能使抑制生效
[ equal: '[' <labelname>, ... ']' ]

告警发送

告警发送是告警系统的最后一个处理点,也是众口难调的一个点,目前支持常见的第三方组件,但都不好用,且无法定制,一般都会基于webhook设计适合公司的发送能力。

通用配置格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# The unique name of the receiver.
name: <string>

# Configurations for several notification integrations.
email_configs:
[ - <email_config>, ... ]
pagerduty_configs:
[ - <pagerduty_config>, ... ]
slack_config:
[ - <slack_config>, ... ]
opsgenie_configs:
[ - <opsgenie_config>, ... ]
webhook_configs:
[ - <webhook_config>, ... ]

邮件接收器email_config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Whether or not to notify about resolved alerts.
#警报被解决之后是否通知
[ send_resolved: <boolean> | default = false ]

# The email address to send notifications to.
to: <tmpl_string>
# The sender address.
[ from: <tmpl_string> | default = global.smtp_from ]
# The SMTP host through which emails are sent.
[ smarthost: <string> | default = global.smtp_smarthost ]

# The HTML body of the email notification.
[ html: <tmpl_string> | default = '{{ template "email.default.html" . }}' ]

# Further headers email header key/value pairs. Overrides any headers
# previously set by the notification implementation.
[ headers: { <string>: <tmpl_string>, ... } ]

Slcack接收器slack_config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Whether or not to notify about resolved alerts.
[ send_resolved: <boolean> | default = true ]

# The Slack webhook URL.
[ api_url: <string> | default = global.slack_api_url ]

# The channel or user to send notifications to.
channel: <tmpl_string>

# API request data as defined by the Slack webhook API.
[ color: <tmpl_string> | default = '{{ if eq .Status "firing" }}danger{{ else }}good{{ end }}' ]
[ username: <tmpl_string> | default = '{{ template "slack.default.username" . }}'
[ title: <tmpl_string> | default = '{{ template "slack.default.title" . }}' ]
[ title_link: <tmpl_string> | default = '{{ template "slack.default.titlelink" . }}' ]
[ pretext: <tmpl_string> | default = '{{ template "slack.default.pretext" . }}' ]
[ text: <tmpl_string> | default = '{{ template "slack.default.text" . }}' ]
[ fallback: <tmpl_string> | default = '{{ template "slack.default.fallback" . }}' ]

Webhook接收器webhook_config

1
2
3
4
5
 # Whether or not to notify about resolved alerts.
[ send_resolved: <boolean> | default = true ]

# The endpoint to send HTTP POST requests to.
url: <string>

Alertmanager会使用以下的格式向配置端点发送HTTP POST请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"version": "3",
"groupKey": <number> // key identifying the group of alerts (e.g. to deduplicate)
"status": "<resolved|firing>",
"receiver": <string>,
"groupLabels": <object>,
"commonLabels": <object>,
"commonAnnotations": <object>,
"externalURL": <string>, // backling to the Alertmanager.
"alerts": [
{
"labels": <object>,
"annotations": <object>,
"startsAt": "<rfc3339>",
"endsAt": "<rfc3339>"
},
...
]
}

可以添加一个钉钉webhook,通过钉钉报警,由于POST数据需要有要求,简单实现一个数据转发脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from flask import Flask
from flask import request
import json

app = Flask(__name__)

@app.route('/',methods=['POST'])
def send():
if request.method == 'POST':
post_data = request.get_data()
alert_data(post_data)
return
def alert_data(data):
from urllib2 import Request,urlopen
url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxx'
send_data = '{"msgtype": "text","text": {"content": %s}}' %(data)
request = Request(url, send_data)
request.add_header('Content-Type','application/json')
return urlopen(request).read()
if __name__ == '__main__':
app.run(host='0.0.0.0')

报警规则

报警规则允许你定义基于Prometheus表达式语言的报警条件,并发送报警通知到外部服务

报警规则通过以下格式定义:

1
2
3
4
5
ALERT <alert name>
IF <expression>
[ FOR <duration> ]
[ LABELS <label set> ]
[ ANNOTATIONS <label set> ]

可选的FOR语句,使得Prometheus在表达式输出的向量元素(例如高HTTP错误率的实例)之间等待一段时间,将警报计数作为触发此元素。如果元素是active,但是没有firing的,就处于pending状态。

LABELS(标签)语句允许指定一组标签附加警报上。将覆盖现有冲突的任何标签,标签值也可以被模板化。

ANNOTATIONS(注释)它们被用于存储更长的其他信息,例如警报描述或者链接,注释值也可以被模板化。

Templating(模板) 标签和注释值可以使用控制台模板进行模板化。$labels变量保存警报实例的标签键/值对,$value保存警报实例的评估值。

1
2
3
4
# To insert a firing element's label values:
{{ $labels.<labelname> }}
# To insert the numeric expression value of the firing element:
{{ $value }}

报警规则示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Alert for any instance that is unreachable for >5 minutes.
ALERT InstanceDown
IF up == 0
FOR 5m
LABELS { severity = "page" }
ANNOTATIONS {
summary = "Instance {{ $labels.instance }} down",
description = "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes.",
}

# Alert for any instance that have a median request latency >1s.
ALERT APIHighRequestLatency
IF api_http_request_latencies_second{quantile="0.5"} > 1
FOR 1m
ANNOTATIONS {
summary = "High request latency on {{ $labels.instance }}",
description = "{{ $labels.instance }} has a median request latency above 1s (current value: {{ $value }}s)",
}

源码解析

alertmanager/cmd/alertmanager/main.go

服务启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
// 程序入口main函数
func main() {
os.Exit(run())
}

// run函数,真实的程序主函数。初始化,并开启相应的服务。阻塞监听reload和关闭信号,会进行平滑退出。
func run() int {

...

// 初始化集群,可高可用
var peer *cluster.Peer
if *clusterBindAddr != "" {
peer, err = cluster.Create(
log.With(logger, "component", "cluster"),
prometheus.DefaultRegisterer,
*clusterBindAddr,
*clusterAdvertiseAddr,
*peers,
true,
*pushPullInterval,
*gossipInterval,
*tcpTimeout,
*probeTimeout,
*probeInterval,
)
if err != nil {
level.Error(logger).Log("msg", "unable to initialize gossip mesh", "err", err)
return 1
}
// 设置普罗米修斯集群指标,为已启用
clusterEnabled.Set(1)
}

// 配置 stop channel 和 等待组,确保优雅退出
stopc := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)

// TODO: 配置集群广播日志信息配置,并配置广播???
notificationLogOpts := []nflog.Option{
nflog.WithRetention(*retention),
nflog.WithSnapshot(filepath.Join(*dataDir, "nflog")),
nflog.WithMaintenance(15*time.Minute, stopc, wg.Done),
nflog.WithMetrics(prometheus.DefaultRegisterer),
nflog.WithLogger(log.With(logger, "component", "nflog")),
}

notificationLog, err := nflog.New(notificationLogOpts...)
if err != nil {
level.Error(logger).Log("err", err)
return 1
}
if peer != nil {
c := peer.AddState("nfl", notificationLog, prometheus.DefaultRegisterer)
notificationLog.SetBroadcast(c.Broadcast)
}

marker := types.NewMarker(prometheus.DefaultRegisterer)

// 静默配置,并生成静默对象,如果有配置集群,则设置集群广播。
silenceOpts := silence.Options{
SnapshotFile: filepath.Join(*dataDir, "silences"),
Retention: *retention,
Logger: log.With(logger, "component", "silences"),
Metrics: prometheus.DefaultRegisterer,
}

silences, err := silence.New(silenceOpts)
if err != nil {
level.Error(logger).Log("err", err)
return 1
}
if peer != nil {
c := peer.AddState("sil", silences, prometheus.DefaultRegisterer)
silences.SetBroadcast(c.Broadcast)
}

// Start providers before router potentially sends updates.
wg.Add(1)
// 维护静默产生的data数据,每十五分钟进行下数据删除。
go func() {
silences.Maintenance(15*time.Minute, filepath.Join(*dataDir, "silences"), stopc)
wg.Done()
}()

// defer wg.Wait 确保最后所有的任务都优雅退出之后,才会程序退出。
defer func() {
close(stopc)
wg.Wait()
}()

// 集群peer的状态监听器已经进行注册成功,现在可以进行加入集群和初始化状态。
// Peer state listeners have been registered, now we can join and get the initial state.
if peer != nil {
err = peer.Join(
*reconnectInterval,
*peerReconnectTimeout,
)
if err != nil {
level.Warn(logger).Log("msg", "unable to join gossip mesh", "err", err)
}
ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout)
defer func() {
cancel()
if err := peer.Leave(10 * time.Second); err != nil {
level.Warn(logger).Log("msg", "unable to leave gossip mesh", "err", err)
}
}()
go peer.Settle(ctx, *gossipInterval*10)
}

// 创建监控对象
alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, logger)
if err != nil {
level.Error(logger).Log("err", err)
return 1
}
defer alerts.Close()

var disp *dispatch.Dispatcher
defer disp.Stop()

// 创建分组方法
groupFn := func(routeFilter func(*dispatch.Route) bool, alertFilter func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) {
return disp.Groups(routeFilter, alertFilter)
}

// alertmanager api结构体,包含所有版本V1,V2的http接口。
api, err := api.New(api.Options{
Alerts: alerts,
Silences: silences,
StatusFunc: marker.Status,
Peer: peer,
Timeout: *httpTimeout,
Concurrency: *getConcurrency,
Logger: log.With(logger, "component", "api"),
Registry: prometheus.DefaultRegisterer,
GroupFunc: groupFn,
})

if err != nil {
level.Error(logger).Log("err", errors.Wrap(err, "failed to create API"))
return 1
}

amURL, err := extURL(logger, os.Hostname, *listenAddress, *externalURL)
if err != nil {
level.Error(logger).Log("msg", "failed to determine external URL", "err", err)
return 1
}
level.Debug(logger).Log("externalURL", amURL.String())

waitFunc := func() time.Duration { return 0 }
if peer != nil {
waitFunc = clusterWait(peer, *peerTimeout)
}
timeoutFunc := func(d time.Duration) time.Duration {
if d < notify.MinTimeout {
d = notify.MinTimeout
}
return d + waitFunc()
}

var (
inhibitor *inhibit.Inhibitor
tmpl *template.Template
)

// 创建普罗米修斯相关统计指标,创建配置协调者。
dispMetrics := dispatch.NewDispatcherMetrics(prometheus.DefaultRegisterer)
pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer)
configLogger := log.With(logger, "component", "configuration")
configCoordinator := config.NewCoordinator(
*configFile,
prometheus.DefaultRegisterer,
configLogger,
)
configCoordinator.Subscribe(func(conf *config.Config) error {
tmpl, err = template.FromGlobs(conf.Templates...)
if err != nil {
return errors.Wrap(err, "failed to parse templates")
}
tmpl.ExternalURL = amURL

// Build the routing tree and record which receivers are used.
// ------------------------------------------------------------
// 建立路由树和记录下被使用的接收人
routes := dispatch.NewRoute(conf.Route, nil)
activeReceivers := make(map[string]struct{})
routes.Walk(func(r *dispatch.Route) {
activeReceivers[r.RouteOpts.Receiver] = struct{}{}
})

// Build the map of receiver to integrations.
receivers := make(map[string][]notify.Integration, len(activeReceivers))
var integrationsNum int
// 循环加载所有配置中的接受消息人
for _, rcv := range conf.Receivers {
// 查看此接受消息人,没有任何route在使用,则进行记录和丢弃,然后开始下一轮。
if _, found := activeReceivers[rcv.Name]; !found {
// No need to build a receiver if no route is using it.
level.Info(configLogger).Log("msg", "skipping creation of receiver not referenced by any route", "receiver", rcv.Name)
continue
}
// 创建接收人的integration,并插入到receiver map中。
integrations, err := buildReceiverIntegrations(rcv, tmpl, logger)
if err != nil {
return err
}
// rcv.Name is guaranteed to be unique across all receivers.
receivers[rcv.Name] = integrations
integrationsNum += len(integrations)
}

// 停止抑制器和调度器
inhibitor.Stop()
disp.Stop()

// 创建静默,抑制器并放到pipeline里面,pipeline包含告警处理的每一个stage。
inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger)
silencer := silence.NewSilencer(silences, marker, logger)
pipeline := pipelineBuilder.New(
receivers,
waitFunc,
inhibitor,
silencer,
notificationLog,
peer,
)
configuredReceivers.Set(float64(len(activeReceivers)))
configuredIntegrations.Set(float64(integrationsNum))

// 更新api的抑制和静默数据
api.Update(conf, func(labels model.LabelSet) {
inhibitor.Mutes(labels)
silencer.Mutes(labels)
})

// 创建alertmanager调度器
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, logger, dispMetrics)
routes.Walk(func(r *dispatch.Route) {
if r.RouteOpts.RepeatInterval > *retention {
level.Warn(configLogger).Log(
"msg",
"repeat_interval is greater than the data retention period. It can lead to notifications being repeated more often than expected.",
"repeat_interval",
r.RouteOpts.RepeatInterval,
"retention",
*retention,
"route",
r.Key(),
)
}
})

// 独立协成去运行调度器和抑制器
go disp.Run()
go inhibitor.Run()

return nil
})

if err := configCoordinator.Reload(); err != nil {
return 1
}

// Make routePrefix default to externalURL path if empty string.
if *routePrefix == "" {
*routePrefix = amURL.Path
}
*routePrefix = "/" + strings.Trim(*routePrefix, "/")
level.Debug(logger).Log("routePrefix", *routePrefix)

router := route.New().WithInstrumentation(instrumentHandler)
if *routePrefix != "/" {
router.Get("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, *routePrefix, http.StatusFound)
})
router = router.WithPrefix(*routePrefix)
}

webReload := make(chan chan error)

ui.Register(router, webReload, logger)

// 从api中获取Handler mux
mux := api.Register(router, *routePrefix)

...

}

上面的代码中大部分都有注释,基本都是一些初始化的操作,看下几个重要的点

1
2
3
go disp.Run() // 告警聚合
go inhibitor.Run() // 告警抑制
mux := api.Register(router, *routePrefix) // 告警api,包含接收告警和实现告警屏蔽
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// alertmanager/api/v1/api.go
func (api *API) Register(r *route.Router) {
wrap := func(f http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
setCORS(w)
f(w, r)
})
}

r.Options("/*path", wrap(func(w http.ResponseWriter, r *http.Request) {}))

r.Get("/status", wrap(api.status))
r.Get("/receivers", wrap(api.receivers))

r.Get("/alerts", wrap(api.listAlerts))
r.Post("/alerts", wrap(api.addAlerts)) // 接收来自其余服务的告警(prometheus)

r.Get("/silences", wrap(api.listSilences))
r.Post("/silences", wrap(api.setSilence)) // 告警屏蔽
r.Get("/silence/:sid", wrap(api.getSilence))
r.Del("/silence/:sid", wrap(api.delSilence))
}

下面我们每个模块单独分析,首先这里的api肯定是有服务调用才会运行,所以我们先不管这边,先看下初始化的时候,alertmanager都做了什么事情。

告警聚合初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// go disp.Run()
func (d *Dispatcher) Run() {
// 初始化结束通道
d.done = make(chan struct{})

d.mtx.Lock()
d.aggrGroups = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()

// 运行调度器子运行函数
d.run(d.alerts.Subscribe())
close(d.done)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (d *Dispatcher) run(it provider.AlertIterator) {
// 创建清理ticker,其负责每30秒,检查所有的告警分组。
cleanup := time.NewTicker(30 * time.Second)
defer cleanup.Stop()

defer it.Close()

for {
select {
// 收到告警事件
case alert, ok := <-it.Next():
// 如果告警的通道被关闭,而且数据已经读取完毕,则返回。
if !ok {
// Iterator exhausted for some reason.
// ------------------------------------
// 记录下alert遍历器的错误
if err := it.Err(); err != nil {
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
}
return
}

level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)

// Log errors but keep trying.
// ----------------------------------
// 检查遍历器的错误,如果有错误则直接跳到下个循环
if err := it.Err(); err != nil {
level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
continue
}

// 根据这个告警的所有label来匹配分组,对匹配上的路由和告警进行处理
now := time.Now()
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
// 记录处理这个告警的时间到普罗米修斯指标中
d.metrics.processingDuration.Observe(time.Since(now).Seconds())

case <-cleanup.C: // 清理周期
// 锁住调度器锁
d.mtx.Lock()
// 循环分组列表,并查看每个分组下的唯一标识分组
for _, groups := range d.aggrGroups {
for _, ag := range groups {
// 如果这个唯一标识分组为空,终止并删除分组。
// 普罗米修斯计数-1
if ag.empty() {
ag.stop()
delete(groups, ag.fingerprint())
d.metrics.aggrGroups.Dec()
}
}
}

d.mtx.Unlock()

case <-d.ctx.Done():
return
}
}
}

可以看到dispather起了一个run方法,调用Subscribe方法作为参数,在这里case alert, ok := <-it.Next()等待告警事件,先不急分析Subscribe方法,我们再看下告警抑制

告警抑制初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// go inhibitor.Run()
func (ih *Inhibitor) Run() {
var (
g run.Group
ctx context.Context
)

// 创建context和cancel方法
ih.mtx.Lock()
ctx, ih.cancel = context.WithCancel(context.Background())
ih.mtx.Unlock()
runCtx, runCancel := context.WithCancel(ctx)

// 循环每个抑制规则,然后启动独立go routine去运行垃圾回收
for _, rule := range ih.rules {
go rule.scache.Run(runCtx, 15*time.Minute)
}

// 添加抑制方法到运行组,运行组并行运行。
// 在所有的方法退出时才会退出,
// 有错误时将返回第一个错误,一个return后,会中断其他方法。
g.Add(func() error {
ih.run(runCtx)
return nil
}, func(err error) {
runCancel()
})

if err := g.Run(); err != nil {
level.Warn(ih.logger).Log("msg", "error running inhibitor", "err", err)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 运行抑制器,会开始订阅告警。然后开始循环,如果订阅的告警遍历器里面有告警,
// 则获得告警,并循环每个抑制规则,如果匹配到
func (ih *Inhibitor) run(ctx context.Context) {
// 开始订阅告警
it := ih.alerts.Subscribe()
defer it.Close()

for {
select {
case <-ctx.Done():
return
case a := <-it.Next():
// 得到告警,
if err := it.Err(); err != nil {
level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err)
continue
}

// Update the inhibition rules' cache.
// -----------------------------------------
// 循环每个抑制rule,假如当前告警匹配上某个source。
// 缓存这个告警到这个抑制rule的告警map里,map的key
// 为label的finger print,value为alert。
for _, r := range ih.rules {
if r.SourceMatchers.Match(a.Labels) {
if err := r.scache.Set(a); err != nil {
level.Error(ih.logger).Log("msg", "error on set alert", "err", err)
}
}
}
}
}
}

可以看到这里也是在case a := <-it.Next(),与聚合逻辑是一致的,都是调用的Subscribe方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Alerts 接口负责装载告警对象,并且可以提供告警的遍历器,而且可以设置或可以通过告警
// 指纹获得告警。全部的方法,都是协成安全。
type Alerts interface {

// Subscribe returns an iterator over active alerts that have not been
// resolved and successfully notified about.
// They are not guaranteed to be in chronological order.
// --------------------------------------------------------------------
// Subscribe 方法,返回一个告警遍历器接口。遍历器会返回还没有解决和还没有被成功
// 通知出来的告警。遍历器所返回的告警,并不能保证是按照时间顺序来进行排序的。
Subscribe() AlertIterator

// GetPending returns an iterator over all alerts that have
// pending notifications.
// --------------------------------------------------------------------
// GetPending 方法,返回一个告警遍历器接口。遍历器会返回在等待通知的告警。
GetPending() AlertIterator

// Get returns the alert for a given fingerprint.
// --------------------------------------------------------------------
// Get 方法,通过告警的Label指纹,来获得Alert对象。Alert对象包含的信息,如
// 过期时间,更新时间,标签,告警开始时间等等。
Get(model.Fingerprint) (*types.Alert, error)

// Put adds the given alert to the set.
// --------------------------------------------------------------------
// Put 方法,把零到多个告警,放入此告警集合里。
Put(...*types.Alert) error
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Subscribe 方法返回一个告警遍历器,里面包含全部活跃的告警(没有被解决或成功通知),
// 这个遍历器并不能保证告警是按照时间顺序排列的。
func (a *Alerts) Subscribe() provider.AlertIterator {
a.mtx.Lock()
defer a.mtx.Unlock()

var (
done = make(chan struct{})
alerts = a.alerts.List()
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
)

for _, a := range alerts {
ch <- a
}

// 把新建告警channel,放入监听器存储到map里面。
a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
a.next++

return provider.NewAlertIterator(ch, done, nil)
}

可以看到这里维护了一份map,a.listeners[a.next] = listeningAlerts{alerts: ch, done: done},而上面的it.Next()是什么,其实就是ch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// NewAlertIterator 返回一个 AlertIterator 接口对象,底层实现类型是通过 alertIterator 来实现。
// Golang典型的设计思维,实现类接口为私有类,通过实现接口的方式,暴露出公开方法。
func NewAlertIterator(ch <-chan *types.Alert, done chan struct{}, err error) AlertIterator {
return &alertIterator{
ch: ch,
done: done,
err: err,
}
}
// alertIterator 实现了 AlertIterator 接口。以现在来说,这个实现满足现在所有providers的需求。
// 但是假如有新的需求,可以通过创建一个新的实现类,来实现 AlertIterator 接口,来让代码变得通用。
type alertIterator struct {
ch <-chan *types.Alert // ch 元素用来遍历告警的队列。
done chan struct{} // done 是用来通知这个遍历器被关闭。
err error // err 用来存储是否有错误,当调用完 Next 方法后,需要拿到 err 判断是否有错误。
}

// Next 方法,获取下一个告警。
func (ai alertIterator) Next() <-chan *types.Alert {
return ai.ch
}

Inhibitor和Dispatcher在初始化时会调用Subscribe(),然后一直监听并接收新的alerts

到这里就初始化完成了,因为刚开始没有告警产生,所以两个逻辑都在case中阻塞,下面分析一下api的逻辑

API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) {
var alerts []*types.Alert
if err := api.receive(r, &alerts); err != nil {
api.respondError(w, apiError{
typ: errorBadData,
err: err,
}, nil)
return
}

api.insertAlerts(w, r, alerts...)
}

func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*types.Alert) {
now := time.Now()

api.mtx.RLock()
resolveTimeout := time.Duration(api.config.Global.ResolveTimeout)
api.mtx.RUnlock()

for _, alert := range alerts {
alert.UpdatedAt = now

// Ensure StartsAt is set.
if alert.StartsAt.IsZero() {
if alert.EndsAt.IsZero() {
alert.StartsAt = now
} else {
alert.StartsAt = alert.EndsAt
}
}
// If no end time is defined, set a timeout after which an alert
// is marked resolved if it is not updated.
if alert.EndsAt.IsZero() {
alert.Timeout = true
alert.EndsAt = now.Add(resolveTimeout)
}
if alert.EndsAt.After(time.Now()) {
api.m.Firing().Inc()
} else {
api.m.Resolved().Inc()
}
}

// Make a best effort to insert all alerts that are valid.
var (
validAlerts = make([]*types.Alert, 0, len(alerts))
validationErrs = &types.MultiError{}
)
for _, a := range alerts {
removeEmptyLabels(a.Labels)

if err := a.Validate(); err != nil {
validationErrs.Add(err)
api.m.Invalid().Inc()
continue
}
validAlerts = append(validAlerts, a)
}
if err := api.alerts.Put(validAlerts...); err != nil {
api.respondError(w, apiError{
typ: errorInternal,
err: err,
}, nil)
return
}

if validationErrs.Len() > 0 {
api.respondError(w, apiError{
typ: errorBadData,
err: validationErrs,
}, nil)
return
}

api.respond(w, nil)
}

看到60行的Put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Put 添加一到多个告警到告警集合里。并且通知到所有监听器里。
func (a *Alerts) Put(alerts ...*types.Alert) error {

for _, alert := range alerts {
fp := alert.Fingerprint()

// Check that there's an alert existing within the store before
// trying to merge.
// ---------------------------------------------------------------------
// 检查是否已经有旧的告警了,如果有旧的告警,需要判断是否需要合并。
if old, err := a.alerts.Get(fp); err == nil {

// Merge alerts if there is an overlap in activity range.
// -----------------------------------------------------------------
// 合并条件:判断是否新旧告警有重叠时间,如果有的话,就进行合并告警。
//
// 条件1:新告警的结束时间大于旧的告警的开始时间,并且新告警结束时间在旧告警结束时间之前。
// 新告警 |----------|
// 旧告警 |-----------| -> 时间线
//
// 条件2:新告警的开始时间大于旧的告警的开始时间,并且新告警开始时间在旧告警结束时间之前。
// 新告警 |----------|
// 旧告警 |-----------| -> 时间线
//
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
alert = old.Merge(alert)
}
}

// 设置告警到集合。
if err := a.alerts.Set(alert); err != nil {
level.Error(a.logger).Log("msg", "error on set alert", "err", err)
continue
}

// 循环塞到每个监听器,去通知到每一个监听者。
a.mtx.Lock()
for _, l := range a.listeners {
select {
case l.alerts <- alert:
case <-l.done:
}
}
a.mtx.Unlock()
}

return nil
}

可以看到这里就是接收告警,然后将其塞到每个监听器,然后去通知每一个监听者去处理,即Inhibitor和Dispatcher会处理接收到的每个告警

下面看下处理流程

告警聚合处理流程

根据上面的分析,聚合初始化之后,最终调用的是processAlert方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 处理告警,得到相应分组,并对相应的分组插入这个告警。
// @param alert 告警结构体
// @param route 已经匹配上的分组路由
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// 根据分组路由的信息,获得此分组下的匹配中的labels。
// 并根据所得labels得到唯一id(指纹 finger print)。
groupLabels := getGroupLabels(alert, route)
fp := groupLabels.Fingerprint()

// 加锁进行hashmap操作
d.mtx.Lock()
defer d.mtx.Unlock()

// 通过分组路由获得分组map,如果分组列表hashmap不存在这个分组,
// 则进行创建。分组map里面key为分组finger print,value为具
// 体唯一标识的分组。
group, ok := d.aggrGroups[route]
if !ok {
group = map[model.Fingerprint]*aggrGroup{}
d.aggrGroups[route] = group
}

// If the group does not exist, create it.
// ----------------------------------------------------
// 假如当前告警的group labels的指纹在这个告警分组map里找不到,
// 则进行分组的创建。
ag, ok := group[fp]
if !ok {
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
group[fp] = ag
// 普罗米修斯的分组数量指标进行加一
d.metrics.aggrGroups.Inc()

// 开启新的协成,运行此告警指纹的分组
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
// 根据当前context的状态,来进行告警的处理。
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
lvl := level.Error(d.logger)
if ctx.Err() == context.Canceled {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
// ---------------------------------------------------
// 假如错误是因为reload或者关闭而导致的,那样日志等级为debug
lvl = level.Debug(d.logger)
}
lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
}
return err == nil
})
}

// 插入alert到这个唯一标识的分组里。
ag.insert(alert)
}

第35行是进行告警发送的流程,最终执行的是Exec方法

1
2
3
4
// Stage 在所给予的上下文限制中处理所有告警。
type Stage interface {
Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// createReceiverStage 为一个接口人创建一个扇出的阶段管道。这里循环这个接受方式的
// 每一个接收人,然后为每个接收人创建一个MultiStage,用来装载多个阶段。每个Multistage
// 包含一个等待阶段,去重阶段,重试阶段和设置通知信息阶段。最后把每个MultiStage都添加
// 到扇出阶段,并进行返回。
func createReceiverStage(
name string,
integrations []Integration,
wait func() time.Duration,
notificationLog NotificationLog,
metrics *metrics,
) Stage {
var fs FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
GroupName: name,
Integration: integrations[i].Name(),
Idx: uint32(integrations[i].Index()),
}
var s MultiStage
// 等待阶段
s = append(s, NewWaitStage(wait))
// 去重阶段
s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
// 重试阶段
s = append(s, NewRetryStage(integrations[i], name, metrics))
// 设置通知信息阶段
s = append(s, NewSetNotifiesStage(notificationLog, recv))

fs = append(fs, s)
}
return fs
}

在进行发送的时候依次进行,等待阶段、去重阶段、重试阶段、设置通知信息阶段,即分别是以下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// NewWaitStage 返回一个新的等待阶段。设置wait方法到阶段之中。
func NewWaitStage(wait func() time.Duration) *WaitStage {
return &WaitStage{
wait: wait,
}
}

// Exec implements the Stage interface.
// ------------------------------------------------------------------------------
// Exec 实现了 Stage 接口,等待特定的一段时间,或者ctx.Done。取决于哪个事件先完成。
// 如果ctx.Done 先发生,则返回上下文里的错误。
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
...
}

// NewDedupStage 包裹一个 DedupStage, 这个。
func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
rs: rs,
nflog: l,
recv: recv,
now: utcNow,
hash: hashAlert,
}
}

// Exec 实现了 Stage 接口,将进行告警的去重,并获取消息日志,查看是否需要更新日志状态。
func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
...
}

...其余的方法可自行查看

WaitStage

等待间隔用来设置发送告警的等待时间,对于集群操作中,需要根据不同的peer设置不同的超时时间,如果仅仅一个Server本身,等待间隔设置为0;

1
2
3
4
5
6
// clusterWait returns a function that inspects the current peer state and returns
// a duration of one base timeout for each peer with a higher ID than ourselves.
func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration {
return func() time.Duration {
return time.Duration(p.Position()) * timeout
}

具体的实现上采用一个timer来传递信号,一旦时间到达后才返回对应的alerts,由于是串行执行的,所以消息传递会中止一段时间。

1
2
3
4
5
6
7
8
9
// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
select {
case <-time.After(ws.wait()):
case <-ctx.Done():
return ctx, nil, ctx.Err()
}
return ctx, alerts, nil
}

DedupStage

DedupStage用于管理告警的去重,传递的参数中包含了一个NotificationLog,用来保存告警的发送记录。当有多个机器组成集群的时候,NotificationLog会通过协议去进行通信,传递彼此的记录信息,加入集群中的A如果发送了告警,该记录会传递给B机器,并进行merge操作,这样B机器在发送告警的时候如果查询已经发送,则不再进行告警发送。关于NotificationLog的实现nflog可以查看nflog/nflog.go文件。

1
2
3
4
5
6
7
8
9
10
// DedupStage filters alerts.
// Filtering happens based on a notification log.
type DedupStage struct {
nflog NotificationLog
recv *nflogpb.Receiver
conf notifierConfig

now func() time.Time
hash func(*types.Alert) uint64
}

具体的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
...
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))

if err != nil && err != nflog.ErrNotFound {
return ctx, nil, err
}
var entry *nflogpb.Entry
switch len(entries) {
case 0:
case 1:
entry = entries[0]
case 2:
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
return ctx, alerts, nil
}
return ctx, nil, nil
}

其中的nflog.Query将根据接收和group key进行查询,一旦查找到,则不再返回对应的alerts. nflog设置了GC用来删除过期的日志记录。防止一直存在log中导致告警无法继续发送.

RetryStage

RetryStage利用backoff策略来管理告警的重发,对于没有发送成功的告警将不断重试,直到超时时间,numFailedNotifications用来传递发送失败的统计metrics,numNotifications用来发送成功的metrics统计信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
select {
case <-tick.C:
now := time.Now()
retry, err := r.integration.Notify(ctx, sent...)
notificationLatencySeconds.WithLabelValues(r.integration.name).Observe(time.Since(now).Seconds())
if err != nil {
numFailedNotifications.WithLabelValues(r.integration.name).Inc()
level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.name, "receiver", r.groupName, "err", err)
if !retry {
return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.name, err)
}

// Save this error to be able to return the last seen error by an
// integration upon context timeout.
iErr = err
} else {
numNotifications.WithLabelValues(r.integration.name).Inc()
return ctx, alerts, nil
}
case <-ctx.Done():
if iErr != nil {
return ctx, nil, iErr
}

return ctx, nil, ctx.Err()
}

SetNotifiesStage

SetNotifiesStage用来设置发送告警的信息到nfLog,该模块仅仅用于被该AM发送的告警的记录(Retry组件传递的alerts和Dedup组件中发送出去的告警信息)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Exec implements the Stage interface.
func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}
firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("firing alerts missing")
}

resolved, ok := ResolvedAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("resolved alerts missing")
}

return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
}

告警抑制处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 运行抑制器,会开始订阅告警。然后开始循环,如果订阅的告警遍历器里面有告警,
// 则获得告警,并循环每个抑制规则,如果匹配到
func (ih *Inhibitor) run(ctx context.Context) {
// 开始订阅告警
it := ih.alerts.Subscribe()
defer it.Close()

for {
select {
case <-ctx.Done():
return
case a := <-it.Next():
// 得到告警,
if err := it.Err(); err != nil {
level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err)
continue
}

// Update the inhibition rules' cache.
// -----------------------------------------
// 循环每个抑制rule,假如当前告警匹配上某个source。
// 缓存这个告警到这个抑制rule的告警map里,map的key
// 为label的finger print,value为alert。
for _, r := range ih.rules {
if r.SourceMatchers.Match(a.Labels) {
if err := r.scache.Set(a); err != nil {
level.Error(ih.logger).Log("msg", "error on set alert", "err", err)
}
}
}
}
}
}

链接:

Prometheus Alertmanager报警组件

https://just4fun.im/2018/05/25/study_alertmanager/

alertmanager-with-comment

Prometheus AlertManager代码阅读笔记 Notify组件

Prometheus源码分析(二)配置文件说明

Prometheus系列5 - Alertmanager源码阅读

-------------本文结束 感谢您的阅读-------------
作者Magiceses
有问题请 留言 或者私信我的 微博
满分是10分的话,这篇文章你给几分