Alertmanager 高可用总结-下

我们每个人都不能抱怨自己的出身,没有好的家世,那就去创造好的家世。要知道,那些在雨里奔跑的,从来都是没有伞的孩子。

回顾

gossip

先理解一下gossip协议:在一个有界网络中,每个节点都随机地与其他节点通信,经过一番杂乱无章的通信,最终所有节点的状态都会达成一致。每个节点可能知道所有其他节点,也可能仅知道几个邻居节点,只要这些节可以通过网络连通,最终他们的状态都是一致的,当然这也是疫情传播的特点。

简单的描述下这个协议,首先要传播谣言就要有种子节点。种子节点每秒都会随机向其他节点发送自己所拥有的节点列表,以及需要传播的消息。任何新加入的节点,就在这种传播方式下很快地被全网所知道。这个协议的神奇就在于它从设计开始就没想到信息一定要传递给所有的节点,但是随着时间的增长,在最终的某一时刻,全网会得到相同的信息。当然这个时刻可能仅仅存在于理论,永远不可达。

memberlist

回忆一下memberlist的总体流程:

  • 项目在memberlist.go 函数Create启动,调用sate.go中函数schedule

  • Schedule函数开启probe协程、pushpull协程、gossip协程

  • probe协程:进行节点状态维护

  • push/pull协程:进行节点状态、用户数据同步

  • gossip协程:进行udp广播发送消息。

memberlist利用点对点随机探测机制实现成员的故障检测,因此将节点的状态分为3种:

  • StateAlive:活动节点
  • StateSuspect:可疑节点
  • StateDead:死亡节点

probe协程通过点对点随机探测实现成员的故障检测,强化系统的高可用。整体流程如下:

  • 随机探测:节点启动后,每隔一定时间间隔,会选取一个节点对其发送PING消息。
  • 重试与间隔探测请求:PING消息失败后,会随机选取N(由config中IndirectChecks设置)个节点发起间接PING请求和再发起一个TCP PING消息。
  • 间隔探测:收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。
  • 探测超时标识可疑:如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。
  • 可疑节点广播:启动一个定时器用于发出一个suspect广播,此期间内如果收到其他节点发来的相同的suspect信息时,将本地suspect的 确认数+1,当定时器超时后,该节点信息仍然不是alive的,且确认数达到要求,会将该节点标记为dead。
  • 可疑消除:当本节点收到别的节点发来的suspect消息时,会发送alive广播,从而清除其他节点上的suspect标记。。
  • 死亡通知:当本节点离开集群时或者本地探测的其他节点超时被标记死亡,会向集群发送本节点dead广播
  • 死亡消除:如果从其他节点收到自身的dead广播消息时,说明本节点相对于其他节点网络分区,此时会发起一个alive广播以修正其他节点上存储的本节点数据。

Memberlist在整个生命周期内,总的有两种类型的消息:

  • udp**协议消息:**传输PING消息、间接PING消息、ACK消息、NACK消息、Suspect消息、 Alive消息、Dead消息、消息广播;
  • tcp协议消息:用户数据同步、节点状态同步、PUSH-PULL消息。

push/pull协程周期性的从已知的alive的集群节点中选1个节点进行push/pull交换信息。交换的信息包含2种:

  • 集群信息:节点数据
  • 用户自定义的信息:实现Delegate接口的struct。

push/pull协程可以加速集群内信息的收敛速度,整体流程为:

  • 建立TCP链接:每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,
  • 将本地的全部节点 状态、用户数据发送过去,
  • 对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。

Gossip协程通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package main

import (
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"strings"
"sync"

"github.com/hashicorp/memberlist"
"github.com/pborman/uuid"
)

var (
mtx sync.RWMutex
members = flag.String("members", "", "comma seperated list of members")
port = flag.Int("port", 4001, "http port")
items = map[string]string{}
broadcasts *memberlist.TransmitLimitedQueue
)

type broadcast struct {
msg []byte
notify chan<- struct{}
}

type delegate struct{}

type update struct {
Action string // add, del
Data map[string]string
}

func init() {
flag.Parse()
}

func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
}

func (b *broadcast) Message() []byte {
return b.msg
}

func (b *broadcast) Finished() {
if b.notify != nil {
close(b.notify)
}
}

func (d *delegate) NodeMeta(limit int) []byte {
return []byte{}
}

func (d *delegate) NotifyMsg(b []byte) {
if len(b) == 0 {
return
}

switch b[0] {
case 'd': // data
var updates []*update
if err := json.Unmarshal(b[1:], &updates); err != nil {
return
}
mtx.Lock()
for _, u := range updates {
for k, v := range u.Data {
switch u.Action {
case "add":
items[k] = v
case "del":
delete(items, k)
}
}
}
mtx.Unlock()
}
}

func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
return broadcasts.GetBroadcasts(overhead, limit)
}

func (d *delegate) LocalState(join bool) []byte {
mtx.RLock()
m := items
mtx.RUnlock()
b, _ := json.Marshal(m)
return b
}

func (d *delegate) MergeRemoteState(buf []byte, join bool) {
if len(buf) == 0 {
return
}
if !join {
return
}
var m map[string]string
if err := json.Unmarshal(buf, &m); err != nil {
return
}
mtx.Lock()
for k, v := range m {
items[k] = v
}
mtx.Unlock()
}

func addHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
val := r.Form.Get("val")
mtx.Lock()
items[key] = val
mtx.Unlock()

b, err := json.Marshal([]*update{
&update{
Action: "add",
Data: map[string]string{
key: val,
},
},
})

if err != nil {
http.Error(w, err.Error(), 500)
return
}
//广播数据
broadcasts.QueueBroadcast(&broadcast{
msg: append([]byte("d"), b...),
notify: nil,
})
}

func delHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
mtx.Lock()
delete(items, key)
mtx.Unlock()

b, err := json.Marshal([]*update{
&update{
Action: "del",
Data: map[string]string{
key: "",
},
},
})

if err != nil {
http.Error(w, err.Error(), 500)
return
}

broadcasts.QueueBroadcast(&broadcast{
msg: append([]byte("d"), b...),
notify: nil,
})
}

func getHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
mtx.RLock()
val := items[key]
mtx.RUnlock()
w.Write([]byte(val))
}

func start() error {
hostname, _ := os.Hostname()
c := memberlist.DefaultLocalConfig()
c.Delegate = &delegate{}
c.BindPort = 0
c.Name = hostname + "-" + uuid.NewUUID().String()
//创建gossip网络
m, err := memberlist.Create(c)
if err != nil {
return err
}
//第一个节点没有member,但从第二个开始就有member了
if len(*members) > 0 {
parts := strings.Split(*members, ",")
_, err := m.Join(parts)
if err != nil {
return err
}
}
broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return m.NumMembers()
},
RetransmitMult: 3,
}
node := m.LocalNode()
fmt.Printf("Local member %s:%d\n", node.Addr, node.Port)
return nil
}

func main() {
if err := start(); err != nil {
fmt.Println(err)
}

http.HandleFunc("/add", addHandler)
http.HandleFunc("/del", delHandler)
http.HandleFunc("/get", getHandler)
fmt.Printf("Listening on :%d\n", *port)
if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil); err != nil {
fmt.Println(err)
}
}

这里通过一个简单的http服务查询和插入数据,找两台机器,第一台执行

1
memberlist

会生成gossip监听的服务ip和端口
使用上面的ip和端口在第二台执行

1
memberlist --members=xxx.xxx.xxx.xxx:xxxx

那么一个gossip的网络就搭建完成了。

1
2
3
4
5
6
7
8
# add
curl "http://localhost:4001/add?key=foo&val=bar"

# get
curl "http://另一台机器:4001/get?key=foo"

# delete
curl "http://localhost:4001/del?key=foo"

alertmanager 高可用实现

上文我们说到,alertmanager在初始化时调用了memberlist的create方法,返回了Peer结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Peer is a single peer in a gossip cluster.
type Peer struct {
mlist *memberlist.Memberlist
delegate *delegate

resolvedPeers []string

mtx sync.RWMutex
states map[string]State
stopc chan struct{}
readyc chan struct{}

peerLock sync.RWMutex
peers map[string]peer
failedPeers []peer

knownPeers []string
advertiseAddr string

failedReconnectionsCounter prometheus.Counter
reconnectionsCounter prometheus.Counter
failedRefreshCounter prometheus.Counter
refreshCounter prometheus.Counter
peerLeaveCounter prometheus.Counter
peerUpdateCounter prometheus.Counter
peerJoinCounter prometheus.Counter

logger log.Logger
}

然后加入集群和初始化状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 集群peer的状态监听器已经进行注册成功,现在可以进行加入集群和初始化状态。
// Peer state listeners have been registered, now we can join and get the initial state.
if peer != nil {
err = peer.Join(
*reconnectInterval,
*peerReconnectTimeout,
)
if err != nil {
level.Warn(logger).Log("msg", "unable to join gossip mesh", "err", err)
}
ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout)
defer func() {
cancel()
if err := peer.Leave(10 * time.Second); err != nil {
level.Warn(logger).Log("msg", "unable to leave gossip mesh", "err", err)
}
}()
go peer.Settle(ctx, *gossipInterval*10)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Join is used to take an existing Memberlist and attempt to join a cluster
// by contacting all the given hosts and performing a state sync. Initially,
// the Memberlist only contains our own state, so doing this will cause
// remote nodes to become aware of the existence of this node, effectively
// joining the cluster.
//
// This returns the number of hosts successfully contacted and an error if
// none could be reached. If an error is returned, the node did not successfully
// join the cluster.
func (m *Memberlist) Join(existing []string) (int, error) {
numSuccess := 0
var errs error
for _, exist := range existing {
addrs, err := m.resolveAddr(exist)
if err != nil {
err = fmt.Errorf("Failed to resolve %s: %v", exist, err)
errs = multierror.Append(errs, err)
m.logger.Printf("[WARN] memberlist: %v", err)
continue
}

for _, addr := range addrs {
hp := joinHostPort(addr.ip.String(), addr.port)
a := Address{Addr: hp, Name: addr.nodeName}
if err := m.pushPullNode(a, true); err != nil {
err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
errs = multierror.Append(errs, err)
m.logger.Printf("[DEBUG] memberlist: %v", err)
continue
}
numSuccess++
}

}
if numSuccess > 0 {
errs = nil
}
return numSuccess, errs
}

join的注释很详细了,最后起了一个协程 go peer.Settle(ctx, *gossipInterval*10),用于同步集群状态,如果同步完成就关闭 channel p.readyc,后面判断集群状态是否OK,都是根据该 channel 判断的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Settle waits until the mesh is ready (and sets the appropriate internal state when it is).
// The idea is that we don't want to start "working" before we get a chance to know most of the alerts and/or silences.
// Inspired from https://github.com/apache/cassandra/blob/7a40abb6a5108688fb1b10c375bb751cbb782ea4/src/java/org/apache/cassandra/gms/Gossiper.java
// This is clearly not perfect or strictly correct but should prevent the alertmanager to send notification before it is obviously not ready.
// This is especially important for those that do not have persistent storage.
func (p *Peer) Settle(ctx context.Context, interval time.Duration) {
const NumOkayRequired = 3
level.Info(p.logger).Log("msg", "Waiting for gossip to settle...", "interval", interval)
start := time.Now()
nPeers := 0
nOkay := 0
totalPolls := 0
for {
select {
case <-ctx.Done():
elapsed := time.Since(start)
level.Info(p.logger).Log("msg", "gossip not settled but continuing anyway", "polls", totalPolls, "elapsed", elapsed)
close(p.readyc)
return
case <-time.After(interval):
}
elapsed := time.Since(start)
n := len(p.Peers())
if nOkay >= NumOkayRequired {
level.Info(p.logger).Log("msg", "gossip settled; proceeding", "elapsed", elapsed)
break
}
if n == nPeers {
nOkay++
level.Debug(p.logger).Log("msg", "gossip looks settled", "elapsed", elapsed)
} else {
nOkay = 0
level.Info(p.logger).Log("msg", "gossip not settled", "polls", totalPolls, "before", nPeers, "now", n, "elapsed", elapsed)
}
nPeers = n
totalPolls++
}
close(p.readyc)
}

可以看到上面这里是个死循环,实现了一个类似心跳机制,定时检测集群是否已经同步完成,接着往下看

1
2
3
4
5
6
7
8
9
10
11
12
	waitFunc := func() time.Duration { return 0 }
if peer != nil {
waitFunc = clusterWait(peer, *peerTimeout)
}

// clusterWait returns a function that inspects the current peer state and returns
// a duration of one base timeout for each peer with a higher ID than ourselves.
func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration {
return func() time.Duration {
return time.Duration(p.Position()) * timeout
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
		pipeline := pipelineBuilder.New(
receivers,
waitFunc,
inhibitor,
silencer,
notificationLog,
peer,
)

// New returns a map of receivers to Stages.
// --------------------------------------------------------
// New 返回一个接收人map运行的Stages,每个接收人,都会经历固定的Gossip,
// 抑制和静默阶段。然后根据receiver的不同,创建各自的分组阶段。
func (pb *PipelineBuilder) New(
receivers map[string][]Integration,
wait func() time.Duration,
inhibitor *inhibit.Inhibitor,
silencer *silence.Silencer,
notificationLog NotificationLog,
peer *cluster.Peer,
) RoutingStage {
rs := make(RoutingStage, len(receivers))

// 创建gossip协议检查就绪阶段,和抑制和静默的静音阶段。
ms := NewGossipSettleStage(peer)
is := NewMuteStage(inhibitor)
ss := NewMuteStage(silencer)

// 根据接收人创建,分组等待,去重,重试,通知阶段。
// 这里每个接收人都有一个独立的接收人阶段,并且
// 它们公用gossip就绪和静音阶段。
for name := range receivers {
// 为每一个接收方式的所有接收人创建扇出的阶段
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
rs[name] = MultiStage{ms, is, ss, st}
}
return rs
}

这里很重要,在pipeline中创建gossip协议检查就绪阶段,ms := NewGossipSettleStage(peer),和其他一些需要处理message的阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// NewGossipSettleStage returns a new GossipSettleStage.
// ------------------------------------------------------------------------------
// NewGossipSettleStage 返回一个gossip检查就绪阶段,正如我们所知集群内的节点,
// 通过gossip协议进行通讯。
func NewGossipSettleStage(p *cluster.Peer) *GossipSettleStage {
return &GossipSettleStage{peer: p}
}

// GossipSettleStage waits until the Gossip has settled to forward alerts.
// ------------------------------------------------------------------------------
// GossipSettleStage 此阶段,一直等待gossip协议准备就绪状态,等待其他的节点就绪。
type GossipSettleStage struct {
peer *cluster.Peer
}

那么这个就绪阶段是干嘛的呢?往下看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go disp.Run()

// Run starts dispatching alerts incoming via the updates channel.
// ----------------------------------------------------------------
// 运行调度器,初始化分组列表map和普罗米修斯分组计数指标。
func (d *Dispatcher) Run() {
// 初始化结束通道
d.done = make(chan struct{})

d.mtx.Lock()
d.aggrGroups = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.metrics.aggrGroups.Set(0)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()

// 运行调度器子运行函数
d.run(d.alerts.Subscribe())
close(d.done)
}

看下这个方法 d.run(d.alerts.Subscribe()),其中 d.alerts.Subscribe() 返回一个告警遍历器接口。遍历器会返回还没有解决和还没有被成功通知出来的告警。遍历器所返回的告警,并不能保证是按照时间顺序来进行排序的。

看下 run 中的 d.processAlert(alert, r)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// 根据分组路由的信息,获得此分组下的匹配中的labels。
// 并根据所得labels得到唯一id(指纹 finger print)。
groupLabels := getGroupLabels(alert, route)
fp := groupLabels.Fingerprint()

// 加锁进行hashmap操作
d.mtx.Lock()
defer d.mtx.Unlock()

// 通过分组路由获得分组map,如果分组列表hashmap不存在这个分组,
// 则进行创建。分组map里面key为分组finger print,value为具
// 体唯一标识的分组。
group, ok := d.aggrGroups[route]
if !ok {
group = map[model.Fingerprint]*aggrGroup{}
d.aggrGroups[route] = group
}

// If the group does not exist, create it.
// ----------------------------------------------------
// 假如当前告警的group labels的指纹在这个告警分组map里找不到,
// 则进行分组的创建。
ag, ok := group[fp]
if !ok {
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
group[fp] = ag
// 普罗米修斯的分组数量指标进行加一
d.metrics.aggrGroups.Inc()

// 开启新的协成,运行此告警指纹的分组
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
// 根据当前context的状态,来进行告警的处理。
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
lvl := level.Error(d.logger)
if ctx.Err() == context.Canceled {
// It is expected for the context to be canceled on
// configuration reload or shutdown. In this case, the
// message should only be logged at the debug level.
// ---------------------------------------------------
// 假如错误是因为reload或者关闭而导致的,那样日志等级为debug
lvl = level.Debug(d.logger)
}
lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
}
return err == nil
})
}

// 插入alert到这个唯一标识的分组里。
ag.insert(alert)
}

主要看下这个方法 _, _, err := d.stage.Exec(ctx, d.logger, alerts...)

Exec 循环执行 MultiStage 里面的每一个阶段。MultiStage 主要使用两个场景。
场景一: RoutingStage 的map[receiver name] MultiStage。
里面有集群Gossip阶段,静默Mute阶段,抑制Mute阶段,Receiver阶段。
场景二: FanoutStage 的切片,里面每个元素是一个 MultiStage。
里面有分组等待阶段,去重阶段,重试阶段,设置通知阶段。

也就是说每次有新告警过来的时候都会经历同步集群状态的阶段,保证当前集群状态是OK的。

同时当有多个prometheus往alertmanager发送消息的时候,可能发生告警重复的情况,在alertmanager中有个去重阶段(DedupStage)是处理这样的情况:

DedupStage用于管理告警的去重,传递的参数中包含了一个NotificationLog,用来保存告警的发送记录。当有多个机器组成集群的时候,NotificationLog会通过协议去进行通信,传递彼此的记录信息,加入集群中的A如果发送了告警,该记录会传递给B机器,并进行merge操作,这样B机器在发送告警的时候如果查询已经发送,则不再进行告警发送。关于NotificationLog的实现nflog可以查看nflog/nflog.go文件。

1
2
3
4
5
6
7
8
9
10
// DedupStage filters alerts.
// Filtering happens based on a notification log.
type DedupStage struct {
nflog NotificationLog
recv *nflogpb.Receiver
conf notifierConfig

now func() time.Time
hash func(*types.Alert) uint64
}

具体的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
...
entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))

if err != nil && err != nflog.ErrNotFound {
return ctx, nil, err
}
var entry *nflogpb.Entry
switch len(entries) {
case 0:
case 1:
entry = entries[0]
case 2:
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
return ctx, alerts, nil
}
return ctx, nil, nil
}

其中的nflog.Query将根据接收和group key进行查询,一旦查找到,则不再返回对应的alerts. nflog设置了GC用来删除过期的日志记录。防止一直存在log中导致告警无法继续发送.

这里有个疑问?上面我们看到判断集群状态是否OK的是用的 p.readyc 这个channel,这个channel在 go peer.Settle(ctx, *gossipInterval*10) 同步完成之后就关闭了,也就是说后面读取这个channel的话,都是同步成功的状态,如果中途有个alertmanager实例挂掉了,这个还会是集群组建成功的状态吗?

参考:

Alertmanager高可用

一致性算法-Gossip协议实践(Memberlist)

gossip协议的原理和实战应用

-------------本文结束 感谢您的阅读-------------
作者Magiceses
有问题请 留言 或者私信我的 微博
满分是10分的话,这篇文章你给几分