Alertmanager 高可用总结-上

人生不过如此,且行且珍惜,每一次的失败,都是成功的伏笔;每一次的考验,都有一份收获;每一次的泪水,都有一次醒悟;每一次的磨难,都有生命的财富。

高可用简介

之前我们主要讨论了Prometheus Server自身的高可用问题。而接下来,重点将放在告警处理也就是Alertmanager部分。如下所示。

image-20210906110008076

为了提升Promthues的服务可用性,通常用户会部署两个或者两个以上的Promthus Server,它们具有完全相同的配置包括Job配置,以及告警配置等。当某一个Prometheus Server发生故障后可以确保Promthues持续可用。

同时基于Alertmanager的告警分组机制即使不同的Prometheus Sever分别发送相同的告警给Alertmanager,Alertmanager也可以自动将这些告警合并为一个通知向receiver发送。

image-20210906110119996

但不幸的是,虽然Alertmanager能够同时处理多个相同的Prometheus Server所产生的告警。但是由于单个Alertmanager的存在,当前的部署结构存在明显的单点故障风险,当Alertmanager单点失效后,告警的后续所有业务全部失效。

如下所示,最直接的方式,就是尝试部署多套Alertmanager。但是由于Alertmanager之间不存在并不了解彼此的存在,因此则会出现告警通知被不同的Alertmanager重复发送多次的问题。

image-20210906110143793

为了解决这一问题,如下所示。Alertmanager引入了Gossip机制。Gossip机制为多个Alertmanager之间提供了信息传递的机制。确保及时在多个Alertmanager分别接收到相同告警信息的情况下,也只有一个告警通知被发送给Receiver。

image-20210906110223932

Gossip协议

Gossip protocol 也叫 Epidemic Protocol (流行病协议),实际上它还有很多别名,比如:“流言算法”、“疫情传播算法”等。

这个协议的作用就像其名字表示的意思一样,非常容易理解,它的方式其实在我们日常生活中也很常见,比如电脑病毒的传播,森林大火,细胞扩散等等。

这里先简单介绍一下 Gossip 协议的执行过程:

Gossip 过程是由种子节点发起,当一个种子节点有状态需要更新到网络中的其他节点时,它会随机的选择周围几个节点散播消息,收到消息的节点也会重复该过程,直至最终网络中所有的节点都收到了消息。这个过程可能需要一定的时间,由于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终一致性协议。

Gossip 演示

现在,我们通过一个具体的实例来深入体会一下 Gossip 传播的完整过程

为了表述清楚,我们先做一些前提设定

1、Gossip 是周期性的散播消息,把周期限定为 1 秒

2、被感染节点随机选择 k 个邻接节点(fan-out)散播消息,这里把 fan-out 设置为 3,每次最多往 3 个节点散播。

3、每次散播消息都选择尚未发送过的节点进行散播

4、收到消息的节点不再往发送节点散播,比如 A -> B,那么 B 进行散播的时候,不再发给 A。

注意:Gossip 过程是异步的,也就是说发消息的节点不会关注对方是否收到,即不等待响应;不管对方有没有收到,它都会每隔 1 秒向周围节点发消息。异步是它的优点,而消息冗余则是它的缺点。

这里一共有 16 个节点,节点 1 为初始被感染节点,通过 Gossip 过程,最终所有节点都被感染:

Gossip 的特点(优势)

1)扩展性

网络可以允许节点的任意增加和减少,新增加的节点的状态最终会与其他节点一致。

2)容错

网络中任何节点的宕机和重启都不会影响 Gossip 消息的传播,Gossip 协议具有天然的分布式系统容错特性。

3)去中心化

Gossip 协议不要求任何中心节点,所有节点都可以是对等的,任何一个节点无需知道整个网络状况,只要网络是连通的,任意一个节点就可以把消息散播到全网。

4)一致性收敛

Gossip 协议中的消息会以一传十、十传百一样的指数级速度在网络中快速传播,因此系统状态的不一致可以在很快的时间内收敛到一致。消息传播速度达到了 logN。

5)简单

Gossip 协议的过程极其简单,实现起来几乎没有太多复杂性。

Gossip 的缺陷

分布式网络中,没有一种完美的解决方案,Gossip 协议跟其他协议一样,也有一些不可避免的缺陷,主要是两个:

1)消息的延迟

由于 Gossip 协议中,节点只会随机向少数几个节点发送消息,消息最终是通过多个轮次的散播而到达全网的,因此使用 Gossip 协议会造成不可避免的消息延迟。不适合用在对实时性要求较高的场景下。

2)消息冗余

Gossip 协议规定,节点会定期随机选择周围节点发送消息,而收到消息的节点也会重复该步骤,因此就不可避免的存在消息重复发送给同一节点的情况,造成了消息的冗余,同时也增加了收到消息的节点的处理压力。而且,由于是定期发送,因此,即使收到了消息的节点还会反复收到重复消息,加重了消息的冗余。

Gossip 类型

Gossip 有两种类型:

  • Anti-Entropy(反熵):以固定的概率传播所有的数据
  • Rumor-Mongering(谣言传播):仅传播新到达的数据

Anti-Entropy 是 SI model,节点只有两种状态,Suspective 和 Infective,叫做 simple epidemics。

Rumor-Mongering 是 SIR model,节点有三种状态,Suspective,Infective 和 Removed,叫做 complex epidemics。

其实,Anti-entropy 反熵是一个很奇怪的名词,之所以定义成这样,Jelasity 进行了解释,因为 entropy 是指混乱程度(disorder),而在这种模式下可以消除不同节点中数据的 disorder,因此 Anti-entropy 就是 anti-disorder。换句话说,它可以提高系统中节点之间的 similarity。

在 SI model 下,一个节点会把所有的数据都跟其他节点共享,以便消除节点之间数据的任何不一致,它可以保证最终、完全的一致。

由于在 SI model 下消息会不断反复的交换,因此消息数量是非常庞大的,无限制的(unbounded),这对一个系统来说是一个巨大的开销。

但是在 Rumor Mongering(SIR Model) 模型下,消息可以发送得更频繁,因为消息只包含最新 update,体积更小。而且,一个 Rumor 消息在某个时间点之后会被标记为 removed,并且不再被传播,因此,SIR model 下,系统有一定的概率会不一致。

而由于,SIR Model 下某个时间点之后消息不再传播,因此消息是有限的,系统开销小。

Gossip 中的通信模式

在 Gossip 协议下,网络中两个节点之间有三种通信方式:

  • Push: 节点 A 将数据 (key,value,version) 及对应的版本号推送给 B 节点,B 节点更新 A 中比自己新的数据
  • Pull:A 仅将数据 key, version 推送给 B,B 将本地比 A 新的数据(Key, value, version)推送给 A,A 更新本地
  • Push/Pull:与 Pull 类似,只是多了一步,A 再将本地比 B 新的数据推送给 B,B 则更新本地

如果把两个节点数据同步一次定义为一个周期,则在一个周期内,Push 需通信 1 次,Pull 需 2 次,Push/Pull 则需 3 次。虽然消息数增加了,但从效果上来讲,Push/Pull 最好,理论上一个周期内可以使两个节点完全一致。直观上,Push/Pull 的收敛速度也是最快的。

复杂度分析

对于一个节点数为 N 的网络来说,假设每个 Gossip 周期,新感染的节点都能再感染至少一个新节点,那么 Gossip 协议退化成一个二叉树查找,经过 LogN 个周期之后,感染全网,时间开销是 O(LogN)。由于每个周期,每个节点都会至少发出一次消息,因此,消息复杂度(消息数量 = N * N)是 O(N^2) 。注意,这是 Gossip 理论上最优的收敛速度,但是在实际情况中,最优的收敛速度是很难达到的。

假设某个节点在第 i 个周期被感染的概率为 pi,第 i+1 个周期被感染的概率为 pi+1 ,

1)则 Pull 的方式:

img

2)Push 方式:

img

显然 Pull 的收敛速度大于 Push ,而每个节点在每个周期被感染的概率都是固定的 p (0<p<1),因此 Gossip 算法是基于 p 的平方收敛,也称为概率收敛,这在众多的一致性算法中是非常独特的。

高可用方案

上面我们详细介绍了goosip协议的实现,简单的说,Gossip有两种实现方式分别为Push-based和Pull-based。在Push-based当集群中某一节点A完成一个工作后,随机的从其它节点B并向其发送相应的消息,节点B接收到消息后在重复完成相同的工作,直到传播到集群中的所有节点。而Pull-based的实现中节点A会随机的向节点B发起询问是否有新的状态需要同步,如果有则返回。

在简单了解了Gossip协议之后,我们来看Alertmanager是如何基于Gossip协议实现集群高可用的。如下所示,当Alertmanager接收到来自Prometheus的告警消息后,会按照以下流程对告警进行处理:

image-20210906112831633

  • 在第一个阶段Silence中,Alertmanager会判断当前通知是否匹配到任何的静默规则,如果没有则进入下一个阶段,否则则中断流水线不发送通知;
  • 在第二个阶段Wait中,Alertmanager会根据当前Alertmanager在集群中所在的顺序(index)等待index * 5s的时间;
  • 当前Alertmanager等待阶段结束后,Dedup阶段则会判断当前Alertmanager数据库中该通知是否已经发送,如果已经发送则中断流水线,不发送告警,否则则进入下一阶段Send对外发送告警通知;
  • 告警发送完成后该Alertmanager进入最后一个阶段Gossip,Gossip会通知其他Alertmanager实例当前告警已经发送。其他实例接收到Gossip消息后,则会在自己的数据库中保存该通知已发送的记录。

因此如下所示,Gossip机制的关键在于两点:

image-20210906113513907

  • Silence设置同步:Alertmanager启动阶段基于Pull-based从集群其它节点同步Silence状态,当有新的Silence产生时使用Push-based方式在集群中传播Gossip信息;
  • 通知发送状态同步:告警通知发送完成后,基于Push-based同步告警发送状态。Wait阶段可以确保集群状态一致。

Alertmanager基于Gossip实现的集群机制虽然不能保证所有实例上的数据时刻保持一致,但是实现了CAP理论中的AP系统,即可用性和分区容错性。同时对于Prometheus Server而言保持了配置了简单性,Promthues Server之间不需要任何的状态同步。

高可用源码分析

集群启动

下面是从main.go中摘录的一些关于alertmanager组件集群的代码流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// 代码来自alertmanager/cmd/alertmanager/main.go
...

// 初始化集群
var peer *cluster.Peer
if *clusterBindAddr != "" {
peer, err = cluster.Create(
log.With(logger, "component", "cluster"),
prometheus.DefaultRegisterer,
*clusterBindAddr,
*clusterAdvertiseAddr,
*peers,
true,
*pushPullInterval,
*gossipInterval,
*tcpTimeout,
*probeTimeout,
*probeInterval,
)
if err != nil {
level.Error(logger).Log("msg", "unable to initialize gossip mesh", "err", err)
return 1
}
// 设置普罗米修斯集群指标,为已启用
clusterEnabled.Set(1)
}

...

if peer != nil {
c := peer.AddState("nfl", notificationLog, prometheus.DefaultRegisterer)
notificationLog.SetBroadcast(c.Broadcast)
}

...

if peer != nil {
c := peer.AddState("sil", silences, prometheus.DefaultRegisterer)
silences.SetBroadcast(c.Broadcast)
}

...

// 集群peer的状态监听器已经进行注册成功,现在可以进行加入集群和初始化状态。
// Peer state listeners have been registered, now we can join and get the initial state.
if peer != nil {
err = peer.Join(
*reconnectInterval,
*peerReconnectTimeout,
)
if err != nil {
level.Warn(logger).Log("msg", "unable to join gossip mesh", "err", err)
}
ctx, cancel := context.WithTimeout(context.Background(), *settleTimeout)
defer func() {
cancel()
if err := peer.Leave(10 * time.Second); err != nil {
level.Warn(logger).Log("msg", "unable to leave gossip mesh", "err", err)
}
}()
go peer.Settle(ctx, *gossipInterval*10)
}

...

waitFunc := func() time.Duration { return 0 }
if peer != nil {
waitFunc = clusterWait(peer, *peerTimeout)
}
timeoutFunc := func(d time.Duration) time.Duration {
if d < notify.MinTimeout {
d = notify.MinTimeout
}
return d + waitFunc()
}

...

pipeline := pipelineBuilder.New(
receivers,
waitFunc,
inhibitor,
silencer,
notificationLog,
peer,
)

...
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, logger, dispMetrics)

cluster.Create中主要就是一些初始化集群的工作,里面比较重要的就是用到了ml, err := memberlist.Create(cfg),这里用到了一个基于Gossip协议来传播消息,用来管理分布式集群内节点发现、 节点失效探测、节点列表的软件包memberlist,关于这个包,有必要分析一下,没兴趣的可以跳过这一节:

memberlist

memberlist 是HashiCorp公司出品的go语言开发库,使用基于Gossip协议管理集群成员和成员失败检测。咱们本文的主题就是memberlist。严格说起来,memberlist是基于Gossip协议变种实现的,它的指导论文是康奈尔大学计算机科学系Abhinandan Das, Indranil Gupta, Ashish Motivala在2002年发表的《SWIM:Scalable Weakly-consistent/Infection-styleProcess Group Membership Protocol》。

Membership协议中文名是 可伸缩最终一致性感染成员组协议。原理通过一个有效的点对点随机探测机制进行监控协议成员的故障检测、更新传播。Memberlist 构建在SWIM Membership之上,跟原始gossip协议有了一些补充和调整。咱们接下去从项目介绍、节点状态、消息类型、数据通讯来解说下。

项目结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
total 536
-rw-r--r-- 1 stark staff 15977 6 2 15:34 LICENSE
-rw-r--r-- 1 stark staff 670 6 2 15:34 Makefile
-rw-r--r-- 1 stark staff 3195 6 2 15:34 README.md
-rw-r--r-- 1 stark staff 591 6 2 15:34 alive_delegate.go
-rw-r--r-- 1 stark staff 1745 6 2 15:34 awareness.go
-rw-r--r-- 1 stark staff 3311 6 2 15:34 broadcast.go # 广播模块
-rw-r--r-- 1 stark staff 15003 6 2 15:34 config.go
-rw-r--r-- 1 stark staff 376 6 2 15:34 conflict_delegate.go
-rw-r--r-- 1 stark staff 1851 6 2 15:34 delegate.go
-rw-r--r-- 1 stark staff 1870 6 2 15:34 event_delegate.go
-rw-r--r-- 1 stark staff 666 6 2 15:34 go.mod
-rw-r--r-- 1 stark staff 4720 6 2 15:34 go.sum
-rw-r--r-- 1 stark staff 4505 6 2 15:34 keyring.go
-rw-r--r-- 1 stark staff 454 6 2 15:34 logging.go
-rw-r--r-- 1 stark staff 22474 6 2 15:34 memberlist.go # 主体模块
-rw-r--r-- 1 stark staff 570 6 2 15:34 merge_delegate.go
-rw-r--r-- 1 stark staff 4390 6 2 15:34 mock_transport.go
-rw-r--r-- 1 stark staff 35144 6 2 15:34 net.go # 通信模块
-rw-r--r-- 1 stark staff 10031 6 2 15:34 net_transport.go
-rw-r--r-- 1 stark staff 645 6 2 15:34 ping_delegate.go
-rw-r--r-- 1 stark staff 11384 6 2 15:34 queue.go
-rw-r--r-- 1 stark staff 5074 6 2 15:34 security.go
-rw-r--r-- 1 stark staff 37938 6 2 15:34 state.go # 状态模块
-rw-r--r-- 1 stark staff 4298 6 2 15:34 suspicion.go
-rw-r--r-- 1 stark staff 399 6 2 15:34 tag.sh
-rw-r--r-- 1 stark staff 211 6 2 15:34 todo.md
-rw-r--r-- 1 stark staff 4579 6 2 15:34 transport.go
-rw-r--r-- 1 stark staff 8170 6 2 15:34 util.go

项目基本流程如下:

  • 项目在memberlist.go 函数Create启动,调用sate.go中函数schedule

  • Schedule函数开启probe协程、pushpull协程、gossip协程

  • probe协程:进行节点状态维护

  • push/pull协程:进行节点状态、用户数据同步

  • gossip协程:进行udp广播发送消息。

Memberlist 结构体

在结构体Memberlist中,成员变量也是按照功能不同分隔

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 代码来自 github.com/hashicorp/memberlist/memberlist.go
type Memberlist struct {
sequenceNum uint32 // Local sequence number
incarnation uint32 // Local incarnation number
numNodes uint32 // Number of known nodes (estimate)
pushPullReq uint32 // Number of push/pull requests

advertiseLock sync.RWMutex
advertiseAddr net.IP
advertisePort uint16

//配置
config *Config
//本地服务关闭的标志位
shutdown int32 // Used as an atomic boolean value
shutdownCh chan struct{}
//本节点退出的标志位
leave int32 // Used as an atomic boolean value
leaveBroadcast chan struct{}

shutdownLock sync.Mutex // Serializes calls to Shutdown
leaveLock sync.Mutex // Serializes calls to Leave

transport NodeAwareTransport

handoffCh chan struct{}
highPriorityMsgQueue *list.List
lowPriorityMsgQueue *list.List
msgQueueLock sync.Mutex

nodeLock sync.RWMutex
nodes []*nodeState // Known nodes
nodeMap map[string]*nodeState // Maps Node.Name -> NodeState
nodeTimers map[string]*suspicion // Maps Node.Name -> suspicion timer
awareness *awareness

tickerLock sync.Mutex
tickers []*time.Ticker
stopTick chan struct{}
probeIndex int

ackLock sync.Mutex
ackHandlers map[uint32]*ackHandler

broadcasts *TransmitLimitedQueue

logger *log.Logger
}

Config 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
type Config struct {
// The name of this node. This must be unique in the cluster.
Name string

// Transport is a hook for providing custom code to communicate with
// other nodes. If this is left nil, then memberlist will by default
// make a NetTransport using BindAddr and BindPort from this structure.
Transport Transport

// Configuration related to what address to bind to and ports to
// listen on. The port is used for both UDP and TCP gossip. It is
// assumed other nodes are running on this port, but they do not need
// to.
BindAddr string
BindPort int

// Configuration related to what address to advertise to other
// cluster members. Used for nat traversal.
AdvertiseAddr string
AdvertisePort int

// ProtocolVersion is the configured protocol version that we
// will _speak_. This must be between ProtocolVersionMin and
// ProtocolVersionMax.
ProtocolVersion uint8

// TCPTimeout is the timeout for establishing a stream connection with
// a remote node for a full state sync, and for stream read and write
// operations. This is a legacy name for backwards compatibility, but
// should really be called StreamTimeout now that we have generalized
// the transport.
TCPTimeout time.Duration

// IndirectChecks is the number of nodes that will be asked to perform
// an indirect probe of a node in the case a direct probe fails. Memberlist
// waits for an ack from any single indirect node, so increasing this
// number will increase the likelihood that an indirect probe will succeed
// at the expense of bandwidth.
IndirectChecks int

// RetransmitMult is the multiplier for the number of retransmissions
// that are attempted for messages broadcasted over gossip. The actual
// count of retransmissions is calculated using the formula:
//
// Retransmits = RetransmitMult * log(N+1)
//
// This allows the retransmits to scale properly with cluster size. The
// higher the multiplier, the more likely a failed broadcast is to converge
// at the expense of increased bandwidth.
RetransmitMult int

// SuspicionMult is the multiplier for determining the time an
// inaccessible node is considered suspect before declaring it dead.
// The actual timeout is calculated using the formula:
//
// SuspicionTimeout = SuspicionMult * log(N+1) * ProbeInterval
//
// This allows the timeout to scale properly with expected propagation
// delay with a larger cluster size. The higher the multiplier, the longer
// an inaccessible node is considered part of the cluster before declaring
// it dead, giving that suspect node more time to refute if it is indeed
// still alive.
SuspicionMult int

// SuspicionMaxTimeoutMult is the multiplier applied to the
// SuspicionTimeout used as an upper bound on detection time. This max
// timeout is calculated using the formula:
//
// SuspicionMaxTimeout = SuspicionMaxTimeoutMult * SuspicionTimeout
//
// If everything is working properly, confirmations from other nodes will
// accelerate suspicion timers in a manner which will cause the timeout
// to reach the base SuspicionTimeout before that elapses, so this value
// will typically only come into play if a node is experiencing issues
// communicating with other nodes. It should be set to a something fairly
// large so that a node having problems will have a lot of chances to
// recover before falsely declaring other nodes as failed, but short
// enough for a legitimately isolated node to still make progress marking
// nodes failed in a reasonable amount of time.
SuspicionMaxTimeoutMult int

// PushPullInterval is the interval between complete state syncs.
// Complete state syncs are done with a single node over TCP and are
// quite expensive relative to standard gossiped messages. Setting this
// to zero will disable state push/pull syncs completely.
//
// Setting this interval lower (more frequent) will increase convergence
// speeds across larger clusters at the expense of increased bandwidth
// usage.
PushPullInterval time.Duration

// ProbeInterval and ProbeTimeout are used to configure probing
// behavior for memberlist.
//
// ProbeInterval is the interval between random node probes. Setting
// this lower (more frequent) will cause the memberlist cluster to detect
// failed nodes more quickly at the expense of increased bandwidth usage.
//
// ProbeTimeout is the timeout to wait for an ack from a probed node
// before assuming it is unhealthy. This should be set to 99-percentile
// of RTT (round-trip time) on your network.
ProbeInterval time.Duration
ProbeTimeout time.Duration

// DisableTcpPings will turn off the fallback TCP pings that are attempted
// if the direct UDP ping fails. These get pipelined along with the
// indirect UDP pings.
DisableTcpPings bool

// DisableTcpPingsForNode is like DisableTcpPings, but lets you control
// whether to perform TCP pings on a node-by-node basis.
DisableTcpPingsForNode func(nodeName string) bool

// AwarenessMaxMultiplier will increase the probe interval if the node
// becomes aware that it might be degraded and not meeting the soft real
// time requirements to reliably probe other nodes.
AwarenessMaxMultiplier int

// GossipInterval and GossipNodes are used to configure the gossip
// behavior of memberlist.
//
// GossipInterval is the interval between sending messages that need
// to be gossiped that haven't been able to piggyback on probing messages.
// If this is set to zero, non-piggyback gossip is disabled. By lowering
// this value (more frequent) gossip messages are propagated across
// the cluster more quickly at the expense of increased bandwidth.
//
// GossipNodes is the number of random nodes to send gossip messages to
// per GossipInterval. Increasing this number causes the gossip messages
// to propagate across the cluster more quickly at the expense of
// increased bandwidth.
//
// GossipToTheDeadTime is the interval after which a node has died that
// we will still try to gossip to it. This gives it a chance to refute.
GossipInterval time.Duration
GossipNodes int
GossipToTheDeadTime time.Duration

// GossipVerifyIncoming controls whether to enforce encryption for incoming
// gossip. It is used for upshifting from unencrypted to encrypted gossip on
// a running cluster.
GossipVerifyIncoming bool

// GossipVerifyOutgoing controls whether to enforce encryption for outgoing
// gossip. It is used for upshifting from unencrypted to encrypted gossip on
// a running cluster.
GossipVerifyOutgoing bool

// EnableCompression is used to control message compression. This can
// be used to reduce bandwidth usage at the cost of slightly more CPU
// utilization. This is only available starting at protocol version 1.
EnableCompression bool

// SecretKey is used to initialize the primary encryption key in a keyring.
// The primary encryption key is the only key used to encrypt messages and
// the first key used while attempting to decrypt messages. Providing a
// value for this primary key will enable message-level encryption and
// verification, and automatically install the key onto the keyring.
// The value should be either 16, 24, or 32 bytes to select AES-128,
// AES-192, or AES-256.
SecretKey []byte

// The keyring holds all of the encryption keys used internally. It is
// automatically initialized using the SecretKey and SecretKeys values.
Keyring *Keyring

// Delegate and Events are delegates for receiving and providing
// data to memberlist via callback mechanisms. For Delegate, see
// the Delegate interface. For Events, see the EventDelegate interface.
//
// The DelegateProtocolMin/Max are used to guarantee protocol-compatibility
// for any custom messages that the delegate might do (broadcasts,
// local/remote state, etc.). If you don't set these, then the protocol
// versions will just be zero, and version compliance won't be done.
Delegate Delegate
DelegateProtocolVersion uint8
DelegateProtocolMin uint8
DelegateProtocolMax uint8
Events EventDelegate
Conflict ConflictDelegate
Merge MergeDelegate
Ping PingDelegate
Alive AliveDelegate

// DNSConfigPath points to the system's DNS config file, usually located
// at /etc/resolv.conf. It can be overridden via config for easier testing.
DNSConfigPath string

// LogOutput is the writer where logs should be sent. If this is not
// set, logging will go to stderr by default. You cannot specify both LogOutput
// and Logger at the same time.
LogOutput io.Writer

// Logger is a custom logger which you provide. If Logger is set, it will use
// this for the internal logger. If Logger is not set, it will fall back to the
// behavior for using LogOutput. You cannot specify both LogOutput and Logger
// at the same time.
Logger *log.Logger

// Size of Memberlist's internal channel which handles UDP messages. The
// size of this determines the size of the queue which Memberlist will keep
// while UDP messages are handled.
HandoffQueueDepth int

// Maximum number of bytes that memberlist will put in a packet (this
// will be for UDP packets by default with a NetTransport). A safe value
// for this is typically 1400 bytes (which is the default). However,
// depending on your network's MTU (Maximum Transmission Unit) you may
// be able to increase this to get more content into each gossip packet.
// This is a legacy name for backward compatibility but should really be
// called PacketBufferSize now that we have generalized the transport.
UDPBufferSize int

// DeadNodeReclaimTime controls the time before a dead node's name can be
// reclaimed by one with a different address or port. By default, this is 0,
// meaning nodes cannot be reclaimed this way.
DeadNodeReclaimTime time.Duration

// RequireNodeNames controls if the name of a node is required when sending
// a message to that node.
RequireNodeNames bool
// CIDRsAllowed If nil, allow any connection (default), otherwise specify all networks
// allowed to connect (you must specify IPv6/IPv4 separately)
// Using [] will block all connections.
CIDRsAllowed []net.IPNet
}

从create开始

1
2
3
4
5
6
7
8
9
10
11
12
13
// 代码来自github.com/hashicorp/memberlist/memberlist.go
func Create(conf *Config) (*Memberlist, error) {
m, err := newMemberlist(conf)
if err != nil {
return nil, err
}
if err := m.setAlive(); err != nil {
m.Shutdown()
return nil, err
}
m.schedule()
return m, nil
}

这里面有两个重要步骤:

  • newMemberlist
  • m.schedule

newMemberlist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 代码来自github.com/hashicorp/memberlist/memberlist.go
func newMemberlist(conf *Config) (*Memberlist, error) {
...
m := &Memberlist{
config: conf,
shutdownCh: make(chan struct{}),
leaveBroadcast: make(chan struct{}, 1),
transport: nodeAwareTransport,
handoffCh: make(chan struct{}, 1),
highPriorityMsgQueue: list.New(),
lowPriorityMsgQueue: list.New(),
nodeMap: make(map[string]*nodeState),
nodeTimers: make(map[string]*suspicion),
awareness: newAwareness(conf.AwarenessMaxMultiplier),
ackHandlers: make(map[uint32]*ackHandler),
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
logger: logger,
}
...
go m.streamListen()
go m.packetListen()
go m.packetHandler()
return m, nil
}

在newMemberlist中,最主要的动作就是开启了tcp服务(m.streamListen())和udp服务(m.packetListen()),那么就看看net服务(tcp和udp):

TCP 处理
1
2
3
4
5
6
7
8
9
10
11
12
13
// 代码来自github.com/hashicorp/memberlist/net.go
func (m *Memberlist) streamListen() {
for {
select {
case conn := <-m.transport.StreamCh(): //tcp accept
// 每个链接都有一个处理部分handleConn
go m.handleConn(conn)

case <-m.shutdownCh:
return
}
}
}

继续看下处理函数 m.handleConn(conn)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// 代码来自github.com/hashicorp/memberlist/net.go
func (m *Memberlist) handleConn(conn net.Conn) {
defer conn.Close()
m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn))

metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)

conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
msgType, bufConn, dec, err := m.readStream(conn)
if err != nil {
if err != io.EOF {
m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn))

resp := errResp{err.Error()}
out, err := encode(errMsg, &resp)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode error response: %s", err)
return
}

err = m.rawSendMsgStream(conn, out.Bytes())
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send error: %s %s", err, LogConn(conn))
return
}
}
return
}

switch msgType {
case userMsg:
if err := m.readUserMsg(bufConn, dec); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn))
}
case pushPullMsg:
// Increment counter of pending push/pulls
numConcurrent := atomic.AddUint32(&m.pushPullReq, 1)
defer atomic.AddUint32(&m.pushPullReq, ^uint32(0))

// Check if we have too many open push/pull requests
if numConcurrent >= maxPushPullRequests {
m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests")
return
}

join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn))
return
}

if err := m.sendLocalState(conn, join); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to push local state: %s %s", err, LogConn(conn))
return
}

if err := m.mergeRemoteState(join, remoteNodes, userState); err != nil {
m.logger.Printf("[ERR] memberlist: Failed push/pull merge: %s %s", err, LogConn(conn))
return
}
case pingMsg:
var p ping
if err := dec.Decode(&p); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to decode ping: %s %s", err, LogConn(conn))
return
}

if p.Node != "" && p.Node != m.config.Name {
m.logger.Printf("[WARN] memberlist: Got ping for unexpected node %s %s", p.Node, LogConn(conn))
return
}

ack := ackResp{p.SeqNo, nil}
out, err := encode(ackRespMsg, &ack)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to encode ack: %s", err)
return
}

err = m.rawSendMsgStream(conn, out.Bytes())
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn))
return
}
default:
m.logger.Printf("[ERR] memberlist: Received invalid msgType (%d) %s", msgType, LogConn(conn))
}
}

整体来看,这个方法就是读取TCP流数据,然后对数据进行判断类型,进行相应的处理,ping包和user包暂时不看,看下pullPush的类型的处理,分为三步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 代码来自github.com/hashicorp/memberlist/net.go		
// 读取Remote的状态
join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn))
return
}

// 发送本地节点的状态
if err := m.sendLocalState(conn, join); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to push local state: %s %s", err, LogConn(conn))
return
}

// 将收到的Remote状态进行更新
if err := m.mergeRemoteState(join, remoteNodes, userState); err != nil {
m.logger.Printf("[ERR] memberlist: Failed push/pull merge: %s %s", err, LogConn(conn))
return
}

可以看到tcp服务提供的功能就是:同步节点状态。

分别砍下这三个步骤的处理

readRemoteState

读取节点状态信息,并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 代码来自github.com/hashicorp/memberlist/net.go
// readRemoteState is used to read the remote state from a connection
func (m *Memberlist) readRemoteState(bufConn io.Reader, dec *codec.Decoder) (bool, []pushNodeState, []byte, error) {
// Read the push/pull header
// 读取数据
var header pushPullHeader
if err := dec.Decode(&header); err != nil {
return false, nil, nil, err
}

// Allocate space for the transfer
// 解码所有的节点信息
remoteNodes := make([]pushNodeState, header.Nodes)
// Try to decode all the states
for i := 0; i < header.Nodes; i++ {
if err := dec.Decode(&remoteNodes[i]); err != nil {
return false, nil, nil, err
}
}

// Read the remote user state into a buffer
var userBuf []byte
if header.UserStateLen > 0 {
userBuf = make([]byte, header.UserStateLen)
bytes, err := io.ReadAtLeast(bufConn, userBuf, header.UserStateLen)
if err == nil && bytes != header.UserStateLen {
err = fmt.Errorf(
"Failed to read full user state (%d / %d)",
bytes, header.UserStateLen)
}
if err != nil {
return false, nil, nil, err
}
}

// For proto versions < 2, there is no port provided. Mask old
// behavior by using the configured port
for idx := range remoteNodes {
if m.ProtocolVersion() < 2 || remoteNodes[idx].Port == 0 {
remoteNodes[idx].Port = uint16(m.config.BindPort)
}
}

// 返回节点状态信息
return header.Join, remoteNodes, userBuf, nil
}
sendLocalState

发送本地存储的节点状态信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 代码来自github.com/hashicorp/memberlist/net.go
// sendLocalState is invoked to send our local state over a stream connection.
func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
// Setup a deadline
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))

// Prepare the local node state
// 收集本地存储的节点状态信息
m.nodeLock.RLock()
localNodes := make([]pushNodeState, len(m.nodes))
for idx, n := range m.nodes {
localNodes[idx].Name = n.Name
localNodes[idx].Addr = n.Addr
localNodes[idx].Port = n.Port
localNodes[idx].Incarnation = n.Incarnation
localNodes[idx].State = n.State
localNodes[idx].Meta = n.Meta
localNodes[idx].Vsn = []uint8{
n.PMin, n.PMax, n.PCur,
n.DMin, n.DMax, n.DCur,
}
}
m.nodeLock.RUnlock()

// Get the delegate state
var userData []byte
if m.config.Delegate != nil {
userData = m.config.Delegate.LocalState(join)
}

// Create a bytes buffer writer
bufConn := bytes.NewBuffer(nil)

// Send our node state
// 添加头部信息
header := pushPullHeader{Nodes: len(localNodes), UserStateLen: len(userData), Join: join}
hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(bufConn, &hd)

// Begin state push
if _, err := bufConn.Write([]byte{byte(pushPullMsg)}); err != nil {
return err
}

// 编码并发送
if err := enc.Encode(&header); err != nil {
return err
}
for i := 0; i < header.Nodes; i++ {
if err := enc.Encode(&localNodes[i]); err != nil {
return err
}
}

// Write the user state as well
if userData != nil {
if _, err := bufConn.Write(userData); err != nil {
return err
}
}

// Get the send buffer
return m.rawSendMsgStream(conn, bufConn.Bytes())
}
mergeRemoteState

更新节点状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 代码来自github.com/hashicorp/memberlist/net.go
// mergeRemoteState is used to merge the remote state with our local state
func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, userBuf []byte) error {
if err := m.verifyProtocol(remoteNodes); err != nil {
return err
}

// Invoke the merge delegate if any
if join && m.config.Merge != nil {
nodes := make([]*Node, len(remoteNodes))
for idx, n := range remoteNodes {
nodes[idx] = &Node{
Name: n.Name,
Addr: n.Addr,
Port: n.Port,
Meta: n.Meta,
State: n.State,
PMin: n.Vsn[0],
PMax: n.Vsn[1],
PCur: n.Vsn[2],
DMin: n.Vsn[3],
DMax: n.Vsn[4],
DCur: n.Vsn[5],
}
}
if err := m.config.Merge.NotifyMerge(nodes); err != nil {
return err
}
}

// Merge the membership state
m.mergeState(remoteNodes)

// Invoke the delegate for user state
if userBuf != nil && m.config.Delegate != nil {
m.config.Delegate.MergeRemoteState(userBuf, join)
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 代码来自github.com/hashicorp/memberlist/state.go
// mergeState is invoked by the network layer when we get a Push/Pull
// state transfer
func (m *Memberlist) mergeState(remote []pushNodeState) {
for _, r := range remote {
switch r.State {
case StateAlive:
a := alive{
Incarnation: r.Incarnation,
Node: r.Name,
Addr: r.Addr,
Port: r.Port,
Meta: r.Meta,
Vsn: r.Vsn,
}
m.aliveNode(&a, nil, false)

case StateLeft:
d := dead{Incarnation: r.Incarnation, Node: r.Name, From: r.Name}
m.deadNode(&d)
case StateDead:
// If the remote node believes a node is dead, we prefer to
// suspect that node instead of declaring it dead instantly
fallthrough
case StateSuspect:
s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
m.suspectNode(&s)
}
}
}

存在四种状态处理:

  • StateAlive
  • StateLeft
  • StateDead/StateSuspect

这几种状态的处理在后面说

到这里小结一下,tcp链接,主要处理节点状态信息的同步与更新。

UDP 处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 代码来自github.com/hashicorp/memberlist/net.go
// packetListen is a long running goroutine that pulls packets out of the
// transport and hands them off for processing.
func (m *Memberlist) packetListen() {
for {
select {
case packet := <-m.transport.PacketCh():
m.ingestPacket(packet.Buf, packet.From, packet.Timestamp)

case <-m.shutdownCh:
return
}
}
}

func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time) {
// Check if encryption is enabled
if m.config.EncryptionEnabled() {
// Decrypt the payload
// 读取数据
plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil)
if err != nil {
if !m.config.GossipVerifyIncoming {
// Treat the message as plaintext
plain = buf
} else {
m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from))
return
}
}

// Continue processing the plaintext buffer
buf = plain
}

// See if there's a checksum included to verify the contents of the message
if len(buf) >= 5 && messageType(buf[0]) == hasCrcMsg {
crc := crc32.ChecksumIEEE(buf[5:])
expected := binary.BigEndian.Uint32(buf[1:5])
if crc != expected {
m.logger.Printf("[WARN] memberlist: Got invalid checksum for UDP packet: %x, %x", crc, expected)
return
}
// 处理函数
m.handleCommand(buf[5:], from, timestamp)
} else {
m.handleCommand(buf, from, timestamp)
}
}
handleCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
// Decode the message type
// 解码消息类型
msgType := messageType(buf[0])
buf = buf[1:]

// Switch on the msgType
// 根据消息不同消息类型,进行不同的处理
switch msgType {
case compoundMsg:
m.handleCompound(buf, from, timestamp)
case compressMsg:
m.handleCompressed(buf, from, timestamp)

case pingMsg:
m.handlePing(buf, from)
case indirectPingMsg:
m.handleIndirectPing(buf, from)
case ackRespMsg:
m.handleAck(buf, from, timestamp)
case nackRespMsg:
m.handleNack(buf, from)

case suspectMsg:
fallthrough
case aliveMsg:
fallthrough
case deadMsg:
fallthrough
case userMsg:
// Determine the message queue, prioritize alive
queue := m.lowPriorityMsgQueue
if msgType == aliveMsg {
queue = m.highPriorityMsgQueue
}

// Check for overflow and append if not full
m.msgQueueLock.Lock()
if queue.Len() >= m.config.HandoffQueueDepth {
m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
} else {
queue.PushBack(msgHandoff{msgType, buf, from})
}
m.msgQueueLock.Unlock()

// Notify of pending message
select {
case m.handoffCh <- struct{}{}:
default:
}

default:
m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from))
}
}
  • compoundMsg:处理函数为handleCompound,多个消息聚合在一起,进行分割,然后再重新调用handleCommand.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    func (m *Memberlist) handleCompound(buf []byte, from net.Addr, timestamp time.Time) {
    // Decode the parts
    // 消息分割
    trunc, parts, err := decodeCompoundMessage(buf)
    if err != nil {
    m.logger.Printf("[ERR] memberlist: Failed to decode compound request: %s %s", err, LogAddress(from))
    return
    }

    // Log any truncation
    if trunc > 0 {
    m.logger.Printf("[WARN] memberlist: Compound request had %d truncated messages %s", trunc, LogAddress(from))
    }

    // Handle each message
    for _, part := range parts {
    // 分割的消息重新调用handleCommand
    m.handleCommand(part, from, timestamp)
    }
    }
  • pingMsg:处理函数为:handlePing;

  • indirectPingMsg: 处理函数为handleindirectPing;

  • ackRespMsg: 处理函数为handleAck

  • suspectMsg/aliveMsg/deadMsg/userMsg: 处理函数为:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    	// Determine the message queue, prioritize alive
    queue := m.lowPriorityMsgQueue
    if msgType == aliveMsg {
    queue = m.highPriorityMsgQueue
    }

    // Check for overflow and append if not full
    m.msgQueueLock.Lock()
    if queue.Len() >= m.config.HandoffQueueDepth {
    m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
    } else {
    queue.PushBack(msgHandoff{msgType, buf, from})
    }
    m.msgQueueLock.Unlock()

    // Notify of pending message
    select {
    case m.handoffCh <- struct{}{}:
    default:
    }

    default:
    m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from))

    m.handoffCh <- struct{}{},是否还记得上面开启TCP和UDP的时候,还有一个协程运行着:

    1
    2
    3
    go m.streamListen()
    go m.packetListen()
    go m.packetHandler()

    go m.packetHandler()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // packetHandler is a long running goroutine that processes messages received
    // over the packet interface, but is decoupled from the listener to avoid
    // blocking the listener which may cause ping/ack messages to be delayed.
    func (m *Memberlist) packetHandler() {
    for {
    select {
    case <-m.handoffCh:
    for {
    msg, ok := m.getNextMessage()
    if !ok {
    break
    }
    msgType := msg.msgType
    buf := msg.buf
    from := msg.from

    switch msgType {
    case suspectMsg:
    m.handleSuspect(buf, from)
    case aliveMsg:
    m.handleAlive(buf, from)
    case deadMsg:
    m.handleDead(buf, from)
    case userMsg:
    m.handleUser(buf, from)
    default:
    m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
    }
    }

    case <-m.shutdownCh:
    return
    }
    }
    }

    这里监听 m.handoffCh,当UDP有消息传过来时,分别处理以下类型的消息,就不展开了

  • suspectMsg

  • aliveMsg

  • deadMsg

  • userMsg

udp服务提供了一些基本的Command操作

schedule

  • Schedule函数开启probe协程、pushpull协程、gossip协程
  • probe协程:进行节点状态维护
  • push/pull协程:进行节点状态、用户数据同步
  • gossip协程:进行udp广播发送消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Schedule is used to ensure the Tick is performed periodically. This
// function is safe to call multiple times. If the memberlist is already
// scheduled, then it won't do anything.
func (m *Memberlist) schedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()

// If we already have tickers, then don't do anything, since we're
// scheduled
if len(m.tickers) > 0 {
return
}

// Create the stop tick channel, a blocking channel. We close this
// when we should stop the tickers.
stopCh := make(chan struct{})

// Create a new probeTicker
// 开启了probe协程
if m.config.ProbeInterval > 0 {
t := time.NewTicker(m.config.ProbeInterval)
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
m.tickers = append(m.tickers, t)
}

// Create a push pull ticker if needed
// 开启了pushpull协程
if m.config.PushPullInterval > 0 {
go m.pushPullTrigger(stopCh)
}

// Create a gossip ticker if needed
// 开启了gossip协程
if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
t := time.NewTicker(m.config.GossipInterval)
go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
m.tickers = append(m.tickers, t)
}

// If we made any tickers, then record the stopTick channel for
// later.
if len(m.tickers) > 0 {
m.stopTick = stopCh
}
}

在这里面一共开启了三个定时任务,probe、pushpull、gossip

probe

当节点启动后,每隔一定时间间隔,会选取一个节点对其发送PING消息,当PING消息失败后,会随机选取 IndirectChecks 个节点发起间接PING的请求和直接更其再发起一个tcp PING消息。 收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。 如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。

https://www.colabug.com/1010287.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
// Track the number of indexes we've considered probing
numCheck := 0
START:
m.nodeLock.RLock()

// Make sure we don't wrap around infinitely
if numCheck >= len(m.nodes) {
m.nodeLock.RUnlock()
return
}

// Handle the wrap around case
// probeIndex是node索引,循环进行探测
if m.probeIndex >= len(m.nodes) {
m.nodeLock.RUnlock()
m.resetNodes()
m.probeIndex = 0
numCheck++
goto START
}

// Determine if we should probe this node
skip := false
var node nodeState

node = *m.nodes[m.probeIndex]
if node.Name == m.config.Name {
skip = true //当node在配置文件中
} else if node.DeadOrLeft() {
skip = true //当node为dead时候
}

// Potentially skip
m.nodeLock.RUnlock()
m.probeIndex++
if skip { //node在配置文件中或者为dead时候则跳过
numCheck++
goto START
}

// Probe the specific node
m.probeNode(&node)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// probeNode handles a single round of failure checking on a node.
func (m *Memberlist) probeNode(node *nodeState) {
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())

// We use our health awareness to scale the overall probe interval, so we
// slow down if we detect problems. The ticker that calls us can handle
// us running over the base interval, and will skip missed ticks.
probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
if probeInterval > m.config.ProbeInterval {
metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
}

// Prepare a ping message and setup an ack handler.
selfAddr, selfPort := m.getAdvertise()
ping := ping{
SeqNo: m.nextSeqNo(),
Node: node.Name,
SourceAddr: selfAddr,
SourcePort: selfPort,
SourceNode: m.config.Name,
}
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
nackCh := make(chan struct{}, m.config.IndirectChecks+1)
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)

// Mark the sent time here, which should be after any pre-processing but
// before system calls to do the actual send. This probably over-reports
// a bit, but it's the best we can do. We had originally put this right
// after the I/O, but that would sometimes give negative RTT measurements
// which was not desirable.
sent := time.Now()

// Send a ping to the node. If this node looks like it's suspect or dead,
// also tack on a suspect message so that it has a chance to refute as
// soon as possible.
deadline := sent.Add(probeInterval)
addr := node.Address()

// Arrange for our self-awareness to get updated.
var awarenessDelta int
defer func() {
m.awareness.ApplyDelta(awarenessDelta)
}()
if node.State == StateAlive {
// 发送pingMsg
if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
if failedRemote(err) {
goto HANDLE_REMOTE_FAILURE
} else {
return
}
}
} else {
...
}

...

HANDLE_REMOTE_FAILURE:
// Get some random live nodes.
m.nodeLock.RLock()
// 随机获取一些节点
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.Name == node.Name ||
n.State != StateAlive
})
m.nodeLock.RUnlock()

// Attempt an indirect ping.
expectedNacks := 0
selfAddr, selfPort = m.getAdvertise()
ind := indirectPingReq{
SeqNo: ping.SeqNo,
Target: node.Addr,
Port: node.Port,
Node: node.Name,
SourceAddr: selfAddr,
SourcePort: selfPort,
SourceNode: m.config.Name,
}
for _, peer := range kNodes {
// We only expect nack to be sent from peers who understand
// version 4 of the protocol.
if ind.Nack = peer.PMax >= 4; ind.Nack {
expectedNacks++
}

// 发送indirectPingMsg
if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
}
}

// Also make an attempt to contact the node directly over TCP. This
// helps prevent confused clients who get isolated from UDP traffic
// but can still speak TCP (which also means they can possibly report
// misinformation to other nodes via anti-entropy), avoiding flapping in
// the cluster.
//
// This is a little unusual because we will attempt a TCP ping to any
// member who understands version 3 of the protocol, regardless of
// which protocol version we are speaking. That's why we've included a
// config option to turn this off if desired.
fallbackCh := make(chan bool, 1)

disableTcpPings := m.config.DisableTcpPings ||
(m.config.DisableTcpPingsForNode != nil && m.config.DisableTcpPingsForNode(node.Name))
if (!disableTcpPings) && (node.PMax >= 3) {
go func() {
defer close(fallbackCh)
didContact, err := m.sendPingAndWaitForAck(node.FullAddress(), ping, deadline)
if err != nil {
m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
} else {
fallbackCh <- didContact
}
}()
} else {
close(fallbackCh)
}

// Wait for the acks or timeout. Note that we don't check the fallback
// channel here because we want to issue a warning below if that's the
// *only* way we hear back from the peer, so we have to let this time
// out first to allow the normal UDP-based acks to come in.
select {
case v := <-ackCh:
if v.Complete == true {
return
}
}

// Finally, poll the fallback channel. The timeouts are set such that
// the channel will have something or be closed without having to wait
// any additional time here.
for didContact := range fallbackCh {
if didContact {
m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
return
}
}

// Update our self-awareness based on the results of this failed probe.
// If we don't have peers who will send nacks then we penalize for any
// failed probe as a simple health metric. If we do have peers to nack
// verify, then we can use that as a more sophisticated measure of self-
// health because we assume them to be working, and they can help us
// decide if the probed node was really dead or if it was something wrong
// with ourselves.
awarenessDelta = 0
if expectedNacks > 0 {
if nackCount := len(nackCh); nackCount < expectedNacks {
awarenessDelta += (expectedNacks - nackCount)
}
} else {
awarenessDelta += 1
}

// No acks received from target, suspect it as failed.
m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
//若探测结果失败则将node设置为suspect
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
m.suspectNode(&s)
}
pushpull

每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,然后将本地的全部节点 状态、用户数据发送过去,然后对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。 此动作可以加速集群内信息的收敛速度。

https://www.jianshu.com/p/e2173b44db65

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
interval := m.config.PushPullInterval

// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
select {
case <-time.After(randStagger):
case <-stop:
return
}

// Tick using a dynamic timer
for {
tickTime := pushPullScale(interval, m.estNumNodes())
select {
case <-time.After(tickTime):
m.pushPull()
case <-stop:
return
}
}
}

func (m *Memberlist) pushPull() {
// Get a random live node
m.nodeLock.RLock()
// 随机选取1个节点
nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
return n.Name == m.config.Name ||
n.State != StateAlive
})
m.nodeLock.RUnlock()

// If no nodes, bail
if len(nodes) == 0 {
return
}
node := nodes[0]

// Attempt a push pull
// 调用pushPullNode
if err := m.pushPullNode(node.FullAddress(), false); err != nil {
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
}
}

上面随机选取一个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// pushPullNode does a complete state exchange with a specific node.
func (m *Memberlist) pushPullNode(a Address, join bool) error {
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())

// Attempt to send and receive with the node
// 发送并获取状态信息
remote, userState, err := m.sendAndReceiveState(a, join)
if err != nil {
return err
}

// 合并更新节点状态信息
if err := m.mergeRemoteState(join, remote, userState); err != nil {
return err
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// sendAndReceiveState is used to initiate a push/pull over a stream with a
// remote host.
func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState, []byte, error) {
if a.Name == "" && m.config.RequireNodeNames {
return nil, nil, errNodeNamesAreRequired
}

// Attempt to connect
// 创建tcp client链接
conn, err := m.transport.DialAddressTimeout(a, m.config.TCPTimeout)
if err != nil {
return nil, nil, err
}
defer conn.Close()
m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s %s", a.Name, conn.RemoteAddr())
metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1)

// Send our state
// 发送本地节点状态信息
if err := m.sendLocalState(conn, join); err != nil {
return nil, nil, err
}

conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
msgType, bufConn, dec, err := m.readStream(conn)
if err != nil {
return nil, nil, err
}

if msgType == errMsg {
var resp errResp
if err := dec.Decode(&resp); err != nil {
return nil, nil, err
}
return nil, nil, fmt.Errorf("remote error: %v", resp.Error)
}

// Quit if not push/pull
if msgType != pushPullMsg {
err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn))
return nil, nil, err
}

// Read remote state
// 读取Remote节点状态信息并返回
_, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
return remoteNodes, userState, err
}
gossip

节点通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。发送次数参考Config 里的 RetransmitMul的注释。

https://www.jianshu.com/p/e2173b44db65

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())

// Get some random live, suspect, or recently dead nodes
m.nodeLock.RLock()
// 随机获取gossipNodes配置项个数的节点
kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
if n.Name == m.config.Name {
return true
}

switch n.State {
case StateAlive, StateSuspect:
return false

case StateDead:
return time.Since(n.StateChange) > m.config.GossipToTheDeadTime

default:
return true
}
})
m.nodeLock.RUnlock()

// Compute the bytes available
bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead
if m.config.EncryptionEnabled() {
bytesAvail -= encryptOverhead(m.encryptionVersion())
}

for _, node := range kNodes {
// Get any pending broadcasts
// 获取能够广播消息大小
msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
if len(msgs) == 0 {
return
}

addr := node.Address()
if len(msgs) == 1 {
// Send single message as is
if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, msgs[0]); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
} else {
// Otherwise create and send a compound message
// 创建一个合并的消息
compound := makeCompoundMessage(msgs)
// 发送消息
if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
}
}
}
}

因此将节点的状态分为3种

  • alive: 用于标识活跃节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    // aliveNode is invoked by the network layer when we get a message about a
    // live node.
    func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
    m.nodeLock.Lock()
    defer m.nodeLock.Unlock()
    state, ok := m.nodeMap[a.Node]

    // It is possible that during a Leave(), there is already an aliveMsg
    // in-queue to be processed but blocked by the locks above. If we let
    // that aliveMsg process, it'll cause us to re-join the cluster. This
    // ensures that we don't.
    if m.hasLeft() && a.Node == m.config.Name {
    return
    }

    if len(a.Vsn) >= 3 {
    pMin := a.Vsn[0]
    pMax := a.Vsn[1]
    pCur := a.Vsn[2]
    if pMin == 0 || pMax == 0 || pMin > pMax {
    m.logger.Printf("[WARN] memberlist: Ignoring an alive message for '%s' (%v:%d) because protocol version(s) are wrong: %d <= %d <= %d should be >0", a.Node, net.IP(a.Addr), a.Port, pMin, pCur, pMax)
    return
    }
    }

    // Invoke the Alive delegate if any. This can be used to filter out
    // alive messages based on custom logic. For example, using a cluster name.
    // Using a merge delegate is not enough, as it is possible for passive
    // cluster merging to still occur.
    if m.config.Alive != nil {
    if len(a.Vsn) < 6 {
    m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s' (%v:%d) because Vsn is not present",
    a.Node, net.IP(a.Addr), a.Port)
    return
    }
    node := &Node{
    Name: a.Node,
    Addr: a.Addr,
    Port: a.Port,
    Meta: a.Meta,
    PMin: a.Vsn[0],
    PMax: a.Vsn[1],
    PCur: a.Vsn[2],
    DMin: a.Vsn[3],
    DMax: a.Vsn[4],
    DCur: a.Vsn[5],
    }
    if err := m.config.Alive.NotifyAlive(node); err != nil {
    m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s",
    a.Node, err)
    return
    }
    }

    // Check if we've never seen this node before, and if not, then
    // store this node in our node map.
    var updatesNode bool
    if !ok {
    errCon := m.config.IPAllowed(a.Addr)
    if errCon != nil {
    m.logger.Printf("[WARN] memberlist: Rejected node %s (%v): %s", a.Node, net.IP(a.Addr), errCon)
    return
    }
    state = &nodeState{
    Node: Node{
    Name: a.Node,
    Addr: a.Addr,
    Port: a.Port,
    Meta: a.Meta,
    },
    State: StateDead,
    }
    if len(a.Vsn) > 5 {
    state.PMin = a.Vsn[0]
    state.PMax = a.Vsn[1]
    state.PCur = a.Vsn[2]
    state.DMin = a.Vsn[3]
    state.DMax = a.Vsn[4]
    state.DCur = a.Vsn[5]
    }

    // Add to map
    m.nodeMap[a.Node] = state

    // Get a random offset. This is important to ensure
    // the failure detection bound is low on average. If all
    // nodes did an append, failure detection bound would be
    // very high.
    n := len(m.nodes)
    offset := randomOffset(n)

    // Add at the end and swap with the node at the offset
    m.nodes = append(m.nodes, state)
    m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]

    // Update numNodes after we've added a new node
    atomic.AddUint32(&m.numNodes, 1)
    } else {
    // Check if this address is different than the existing node unless the old node is dead.
    if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
    errCon := m.config.IPAllowed(a.Addr)
    if errCon != nil {
    m.logger.Printf("[WARN] memberlist: Rejected IP update from %v to %v for node %s: %s", a.Node, state.Addr, net.IP(a.Addr), errCon)
    return
    }
    // If DeadNodeReclaimTime is configured, check if enough time has elapsed since the node died.
    canReclaim := (m.config.DeadNodeReclaimTime > 0 &&
    time.Since(state.StateChange) > m.config.DeadNodeReclaimTime)

    // Allow the address to be updated if a dead node is being replaced.
    if state.State == StateLeft || (state.State == StateDead && canReclaim) {
    m.logger.Printf("[INFO] memberlist: Updating address for left or failed node %s from %v:%d to %v:%d",
    state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
    updatesNode = true
    } else {
    m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d Old state: %v",
    state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port, state.State)

    // Inform the conflict delegate if provided
    if m.config.Conflict != nil {
    other := Node{
    Name: a.Node,
    Addr: a.Addr,
    Port: a.Port,
    Meta: a.Meta,
    }
    m.config.Conflict.NotifyConflict(&state.Node, &other)
    }
    return
    }
    }
    }

    // Bail if the incarnation number is older, and this is not about us
    isLocalNode := state.Name == m.config.Name
    if a.Incarnation <= state.Incarnation && !isLocalNode && !updatesNode {
    return
    }

    // Bail if strictly less and this is about us
    if a.Incarnation < state.Incarnation && isLocalNode {
    return
    }

    // Clear out any suspicion timer that may be in effect.
    delete(m.nodeTimers, a.Node)

    // Store the old state and meta data
    oldState := state.State
    oldMeta := state.Meta

    // If this is us we need to refute, otherwise re-broadcast
    if !bootstrap && isLocalNode {
    // Compute the version vector
    versions := []uint8{
    state.PMin, state.PMax, state.PCur,
    state.DMin, state.DMax, state.DCur,
    }

    // If the Incarnation is the same, we need special handling, since it
    // possible for the following situation to happen:
    // 1) Start with configuration C, join cluster
    // 2) Hard fail / Kill / Shutdown
    // 3) Restart with configuration C', join cluster
    //
    // In this case, other nodes and the local node see the same incarnation,
    // but the values may not be the same. For this reason, we always
    // need to do an equality check for this Incarnation. In most cases,
    // we just ignore, but we may need to refute.
    //
    if a.Incarnation == state.Incarnation &&
    bytes.Equal(a.Meta, state.Meta) &&
    bytes.Equal(a.Vsn, versions) {
    return
    }
    m.refute(state, a.Incarnation)
    m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions)
    } else {
    m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)

    // Update protocol versions if it arrived
    if len(a.Vsn) > 0 {
    state.PMin = a.Vsn[0]
    state.PMax = a.Vsn[1]
    state.PCur = a.Vsn[2]
    state.DMin = a.Vsn[3]
    state.DMax = a.Vsn[4]
    state.DCur = a.Vsn[5]
    }

    // Update the state and incarnation number
    state.Incarnation = a.Incarnation
    state.Meta = a.Meta
    state.Addr = a.Addr
    state.Port = a.Port
    if state.State != StateAlive {
    state.State = StateAlive
    state.StateChange = time.Now()
    }
    }

    // Update metrics
    metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)

    // Notify the delegate of any relevant updates
    if m.config.Events != nil {
    if oldState == StateDead || oldState == StateLeft {
    // if Dead/Left -> Alive, notify of join
    m.config.Events.NotifyJoin(&state.Node)

    } else if !bytes.Equal(oldMeta, state.Meta) {
    // if Meta changed, trigger an update notification
    m.config.Events.NotifyUpdate(&state.Node)
    }
    }
    }
  • suspect: 当探测一些节点失败时,或者suspect某个节点的信息时,会将本地对应的信息标记为suspect,然后启动一个 定时器,并发出一个suspect广播,此期间内如果收到其他节点发来的相同的suspect信息时,将本地suspect的 确认数+1,当定时器超时后,该节点信息仍然不是alive的,且确认数达到要求,会将该节点标记为dead。 当本节点收到别的节点发来的suspect消息时,会发送alive广播,从而清除其他节点上的suspect标记。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    // suspectNode is invoked by the network layer when we get a message
    // about a suspect node
    func (m *Memberlist) suspectNode(s *suspect) {
    m.nodeLock.Lock()
    defer m.nodeLock.Unlock()
    state, ok := m.nodeMap[s.Node]

    // If we've never heard about this node before, ignore it
    if !ok {
    return
    }

    // Ignore old incarnation numbers
    if s.Incarnation < state.Incarnation {
    return
    }

    // See if there's a suspicion timer we can confirm. If the info is new
    // to us we will go ahead and re-gossip it. This allows for multiple
    // independent confirmations to flow even when a node probes a node
    // that's already suspect.
    if timer, ok := m.nodeTimers[s.Node]; ok {
    if timer.Confirm(s.From) {
    m.encodeAndBroadcast(s.Node, suspectMsg, s)
    }
    return
    }

    // Ignore non-alive nodes
    if state.State != StateAlive {
    return
    }

    // If this is us we need to refute, otherwise re-broadcast
    if state.Name == m.config.Name {
    m.refute(state, s.Incarnation)
    m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
    return // Do not mark ourself suspect
    } else {
    m.encodeAndBroadcast(s.Node, suspectMsg, s)
    }

    // Update metrics
    metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)

    // Update the state
    state.Incarnation = s.Incarnation
    state.State = StateSuspect
    changeTime := time.Now()
    state.StateChange = changeTime

    // Setup a suspicion timer. Given that we don't have any known phase
    // relationship with our peers, we set up k such that we hit the nominal
    // timeout two probe intervals short of what we expect given the suspicion
    // multiplier.
    k := m.config.SuspicionMult - 2

    // If there aren't enough nodes to give the expected confirmations, just
    // set k to 0 to say that we don't expect any. Note we subtract 2 from n
    // here to take out ourselves and the node being probed.
    n := m.estNumNodes()
    if n-2 < k {
    k = 0
    }

    // Compute the timeouts based on the size of the cluster.
    min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
    max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
    fn := func(numConfirmations int) {
    m.nodeLock.Lock()
    state, ok := m.nodeMap[s.Node]
    timeout := ok && state.State == StateSuspect && state.StateChange == changeTime
    m.nodeLock.Unlock()

    if timeout {
    if k > 0 && numConfirmations < k {
    metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
    }

    m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
    state.Name, numConfirmations)
    d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
    m.deadNode(&d)
    }
    }
    m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
    }
  • dead: 当本节点离开集群时或者本地探测的其他节点超时被标记死亡,会向集群发送本节点dead广播。收到dead广播 消息的节点会跟本地的记录比较,当本地记录也是dead时会忽略消息,当本地的记录不是dead时,会删除本地 的记录再将dead消息再次广播出去,形成再次传播。 如果从其他节点收到自身的dead广播消息时,说明本节点相对于其他节点网络分区,此时会发起一个alive广播 以修正其他节点上存储的本节点数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    // deadNode is invoked by the network layer when we get a message
    // about a dead node
    func (m *Memberlist) deadNode(d *dead) {
    m.nodeLock.Lock()
    defer m.nodeLock.Unlock()
    state, ok := m.nodeMap[d.Node]

    // If we've never heard about this node before, ignore it
    if !ok {
    return
    }

    // Ignore old incarnation numbers
    if d.Incarnation < state.Incarnation {
    return
    }

    // Clear out any suspicion timer that may be in effect.
    delete(m.nodeTimers, d.Node)

    // Ignore if node is already dead
    if state.DeadOrLeft() {
    return
    }

    // Check if this is us
    if state.Name == m.config.Name {
    // If we are not leaving we need to refute
    if !m.hasLeft() {
    m.refute(state, d.Incarnation)
    m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
    return // Do not mark ourself dead
    }

    // If we are leaving, we broadcast and wait
    m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast)
    } else {
    m.encodeAndBroadcast(d.Node, deadMsg, d)
    }

    // Update metrics
    metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)

    // Update the state
    state.Incarnation = d.Incarnation

    // If the dead message was send by the node itself, mark it is left
    // instead of dead.
    if d.Node == d.From {
    state.State = StateLeft
    } else {
    state.State = StateDead
    }
    state.StateChange = time.Now()

    // Notify of death
    if m.config.Events != nil {
    m.config.Events.NotifyLeave(&state.Node)
    }
    }

broadcast模块

broadcast模块是广播模块,提供了三个函数,最主要的函数是 getBroadcasts,返回一个广播的最大size,主要是用于填充udp包。很简单代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// getBroadcasts is used to return a slice of broadcasts to send up to
// a maximum byte size, while imposing a per-broadcast overhead. This is used
// to fill a UDP packet with piggybacked data
func (m *Memberlist) getBroadcasts(overhead, limit int) [][]byte {
// Get memberlist messages first
toSend := m.broadcasts.GetBroadcasts(overhead, limit)

// Check if the user has anything to broadcast
d := m.config.Delegate
if d != nil {
// Determine the bytes used already
bytesUsed := 0
for _, msg := range toSend {
bytesUsed += len(msg) + overhead
}

// Check space remaining for user messages
avail := limit - bytesUsed
if avail > overhead+userMsgOverhead {
userMsgs := d.GetBroadcasts(overhead+userMsgOverhead, avail)

// Frame each user message
for _, msg := range userMsgs {
buf := make([]byte, 1, len(msg)+1)
buf[0] = byte(userMsg)
buf = append(buf, msg...)
toSend = append(toSend, buf)
}
}
}
return toSend
}

总结

回忆一下总体流程:

  • 项目在memberlist.go 函数Create启动,调用sate.go中函数schedule

  • Schedule函数开启probe协程、pushpull协程、gossip协程

  • probe协程:进行节点状态维护

  • push/pull协程:进行节点状态、用户数据同步

  • gossip协程:进行udp广播发送消息。

memberlist利用点对点随机探测机制实现成员的故障检测,因此将节点的状态分为3种:

  • StateAlive:活动节点
  • StateSuspect:可疑节点
  • StateDead:死亡节点

probe协程通过点对点随机探测实现成员的故障检测,强化系统的高可用。整体流程如下:

  • 随机探测:节点启动后,每隔一定时间间隔,会选取一个节点对其发送PING消息。
  • 重试与间隔探测请求:PING消息失败后,会随机选取N(由config中IndirectChecks设置)个节点发起间接PING请求和再发起一个TCP PING消息。
  • 间隔探测:收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。
  • 探测超时标识可疑:如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。
  • 可疑节点广播:启动一个定时器用于发出一个suspect广播,此期间内如果收到其他节点发来的相同的suspect信息时,将本地suspect的 确认数+1,当定时器超时后,该节点信息仍然不是alive的,且确认数达到要求,会将该节点标记为dead。
  • 可疑消除:当本节点收到别的节点发来的suspect消息时,会发送alive广播,从而清除其他节点上的suspect标记。。
  • 死亡通知:当本节点离开集群时或者本地探测的其他节点超时被标记死亡,会向集群发送本节点dead广播
  • 死亡消除:如果从其他节点收到自身的dead广播消息时,说明本节点相对于其他节点网络分区,此时会发起一个alive广播以修正其他节点上存储的本节点数据。

Memberlist在整个生命周期内,总的有两种类型的消息:

  • udp**协议消息:**传输PING消息、间接PING消息、ACK消息、NACK消息、Suspect消息、 Alive消息、Dead消息、消息广播;
  • tcp协议消息:用户数据同步、节点状态同步、PUSH-PULL消息。

push/pull协程周期性的从已知的alive的集群节点中选1个节点进行push/pull交换信息。交换的信息包含2种:

  • 集群信息:节点数据
  • 用户自定义的信息:实现Delegate接口的struct。

push/pull协程可以加速集群内信息的收敛速度,整体流程为:

  • 建立TCP链接:每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,
  • 将本地的全部节点 状态、用户数据发送过去,
  • 对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。

Gossip协程通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。

参考:

Alertmanager高可用

一致性算法-Gossip协议实践(Memberlist)

-------------本文结束 感谢您的阅读-------------
作者Magiceses
有问题请 留言 或者私信我的 微博
满分是10分的话,这篇文章你给几分