Prometheus 源码分析-总览

人的一生更是如此不要去看低每一个人,更不要以自己现有的权利去欺压别人。现在混得好不代表以后就混的好,时刻提醒自己你今天奋斗了吗?

以下包括链接中的分析皆是平时网上学习以及自己用到的一些知识,简单做个总结,基于版本2.27

架构总览

image

Prometheus server 的核心功能模块是 HTTP server、TSDB 、服务发现和指标抓取

Prometheus 整个工作流程大概是这样的:

  • 通过 Service discovery 知道要抓取什么指标
  • 抓取指标数据存入 TSDB
  • 客户通过 HTTP server 使用 PromQL 查询结果

主要工作流程(main.go)

  1. 设置命令行参数及其默认值和描述信息
  2. 解析启动命令的命令行参数为 cfg 实例
  3. 校验配置文件(–config.file 设置),默认是 prometheus.yml
  4. 打印 “Starting Prometheus” 和主机系统信息日志
  5. 初始化子任务对象
  6. 并发启动各个子任务

服务启动流程

  1. 接收 kill/web 终止信号,退出程序
  2. 启动 Scrape Discovery manager
  3. 启动 Notify Discovery manager
  4. 启动 Scrape manager
  5. 启动 Reload handler
  6. 初始加载配置
  7. 启动 Rule manager
  8. 初始化 TSDB
  9. 启动 Web server
  10. 启动 Notifier

服务预启动分析

  1. Storage组件初始化

    Prometheus的Storage组件是时序数据库,包含两个:localStorage和remoteStorage.localStorage当前版本指TSDB,用于对metrics的本地存储存储,remoteStorage用于metrics的远程存储,其中fanoutStorage作为localStorage和remoteStorage的读写代理服务器.初始化流程如下

    1
    2
    3
    4
    5
    prometheus/cmd/prometheus/main.go

    localStorage = &tsdb.ReadyStorage{} //本地存储
    remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), //远端存储 localStorage.StartTime, time.Duration(cfg.RemoteFlushDeadline))
    fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) //读写代理服务器
  2. notifier 组件初始化

    notifier组件用于发送告警信息给AlertManager,通过方法notifier.NewManager完成初始化

    1
    2
    3
    prometheus/cmd/prometheus/main.go

    notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))
  3. discoveryManagerScrape组件初始化

    discoveryManagerScrape组件用于服务发现,当前版本支持多种服务发现系统,比如kuberneters等,通过方法discovery.NewManager完成初始化

    1
    2
    3
    prometheus/cmd/prometheus/main.go

    discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape"))
  4. discoveryManagerNotify组件初始化

    discoveryManagerNotify组件用于告警通知服务发现,比如AlertManager服务.也是通过方法discovery.NewManager完成初始化,不同的是,discoveryManagerNotify服务于notify,而discoveryManagerScrape服务与scrape

    1
    2
    3
    prometheus/cmd/prometheus/main.go

    discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify")
  5. scrapeManager组件初始化

    scrapeManager组件利用discoveryManagerScrape组件发现的targets,抓取对应targets的所有metrics,并将抓取的metrics存储到fanoutStorage中,通过方法scrape.NewManager完成初始化

    1
    2
    3
    prometheus/cmd/prometheus/main.go

    scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
  6. queryEngine组件

    queryEngine组件用于rules查询和计算,通过方法promql.NewEngine完成初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    prometheus/cmd/prometheus/main.go

    opts = promql.EngineOpts{
    Logger: log.With(logger, "component", "query engine"),
    Reg: prometheus.DefaultRegisterer,
    MaxConcurrent: cfg.queryConcurrency,       //最大并发查询个数
    MaxSamples: cfg.queryMaxSamples,
    Timeout: time.Duration(cfg.queryTimeout), //查询超时时间
    }
    queryEngine = promql.NewEngine(opts)
  7. ruleManager组件初始化

    ruleManager组件通过方法rules.NewManager完成初始化.其中rules.NewManager的参数涉及多个组件:存储,queryEngine和notifier,整个流程包含rule计算和发送告警

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    prometheus/cmd/prometheus/main.go

    ruleManager = rules.NewManager(&rules.ManagerOptions{
    Appendable: fanoutStorage, //存储器
    TSDB: localStorage,              //本地时序数据库TSDB
    QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage), //rules计算
    NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()), //告警通知
    Context: ctxRule, //用于控制ruleManager组件的协程
    ExternalURL: cfg.web.ExternalURL, //通过Web对外开放的URL
    Registerer: prometheus.DefaultRegisterer,
    Logger: log.With(logger, "component", "rule manager"),
    OutageTolerance: time.Duration(cfg.outageTolerance), //当prometheus重启时,保持alert状态(https://ganeshvernekar.com/gsoc-2018/persist-for-state/)
    ForGracePeriod: time.Duration(cfg.forGracePeriod),
    ResendDelay: time.Duration(cfg.resendDelay),
    }
  8. Web组件初始化

    Web组件用于为Storage组件,queryEngine组件,scrapeManager组件, ruleManager组件和notifier 组件提供外部HTTP访问方式,初始化代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    prometheus/cmd/prometheus/main.go


    cfg.web.Context = ctxWeb
    cfg.web.TSDB = localStorage.Get
    cfg.web.Storage = fanoutStorage
    cfg.web.QueryEngine = queryEngine
    cfg.web.ScrapeManager = scrapeManager
    cfg.web.RuleManager = ruleManager
    cfg.web.Notifier = notifierManager

    cfg.web.Version = &web.PrometheusVersion{
    Version: version.Version,
    Revision: version.Revision,
    Branch: version.Branch,
    BuildUser: version.BuildUser,
    BuildDate: version.BuildDate,
    GoVersion: version.GoVersion,
    }

    cfg.web.Flags = map[string]string{}

    // Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager
    webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)

以上几个服务组件在Web页面对外的访问如图所示

img

服务配置分析

可以发现,除了服务组件ruleManager用的方法是Update,其他服务组件的在匿名函数中通过各自的ApplyConfig方法,实现配置的管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
prometheus/cmd/prometheus/main.go

reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig, //存储配置
webHandler.ApplyConfig, //web配置
notifierManager.ApplyConfig, //notifier配置
scrapeManager.ApplyConfig,  //scrapeManger配置
  //从配置文件中提取Section:scrape_configs
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
return discoveryManagerScrape.ApplyConfig(c)
},
//从配置文件中提取Section:alerting
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
b, err := json.Marshal(v)
if err != nil {
return err
}
c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
}
return discoveryManagerNotify.ApplyConfig(c)
},
//从配置文件中提取Section:rule_files
func(cfg *config.Config) error {
// Get all rule files matching the configuration paths.
var files []string
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
return fmt.Errorf("error retrieving rule files for %s: %s", pat, err)
}
files = append(files, fs...)
}
return ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), files)
},
}

其中,服务组件remoteStorage,webHandler,notifierManager和ScrapeManager的ApplyConfig方法,参数cfg *config.Config中传递的配置文件,是整个文件prometheus.yml,点击prometheus.yml查看一个完整的配置文件示例

1
2
3
4
5
prometheus/scrape/manager.go

func (m *Manager) ApplyConfig(cfg *config.Config) error {
.......
}

而服务组件discoveryManagerScrape和discoveryManagerNotify的Appliconfig方法,参数中传递的配置文件,是文件中的一个Section

1
2
3
4
5
prometheus/discovery/manager.go

func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
......
}

所以,需要利用匿名函数提前处理下,取出对应的Section

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
prometheus/cmd/prometheus/main.go

//从配置文件中提取Section:scrape_configs
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
return discoveryManagerScrape.ApplyConfig(c)
},
//从配置文件中提取Section:alerting
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
b, err := json.Marshal(v)
if err != nil {
return err
}
c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
}
return discoveryManagerNotify.ApplyConfig(c)
},

服务组件ruleManager,在匿名函数中提取出Section:rule_files

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
prometheus/cmd/prometheus/main.go

//从配置文件中提取Section:rule_files
func(cfg *config.Config) error {
// Get all rule files matching the configuration paths.
var files []string
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
return fmt.Errorf("error retrieving rule files for %s: %s", pat, err)
}
files = append(files, fs...)
}
return ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), files)
},

利用该组件内置的Update方法完成配置管理

1
2
3
4
5
prometheus/rules/manager.go

func (m *Manager) Update(interval time.Duration, files []string) error {
.......
}

最后,通过reloadConfig方法,加载各个服务组件的配置项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
prometheus/cmd/prometheus/main.go

func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)

defer func() {
if err == nil {
configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()
} else {
configSuccess.Set(0)
}
}()

conf, err := config.LoadFile(filename)
if err != nil {
return fmt.Errorf("couldn't load configuration (--config.file=%q): %v", filename, err)
}

failed := false
  //通过一个for循环,加载各个服务组件的配置项
for _, rl := range rls {
if err := rl(conf); err != nil {
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
failed = true
}
}
if failed {
return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
}
promql.SetDefaultEvaluationInterval(time.Duration(conf.GlobalConfig.EvaluationInterval))
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
return nil
}

服务启动分析

这里引用了github.com/oklog/oklog/pkg/group包,实例化一个对象g,包详解

1
2
3
4
5
6
7
prometheus/cmd/prometheus/main.go

// "github.com/oklog/oklog/pkg/group"
var g group.Group
{
  ......
}

对象g中包含各个服务组件的入口,通过调用Add方法把把这些入口添加到对象g中,以组件scrapeManager为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
prometheus/cmd/prometheus/main.go

{
// Scrape manager.
  //通过方法Add,把ScrapeManager组件添加到g中
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C
       //ScrapeManager组件的启动函数
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}

通过对象g,调用方法run,启动所有服务组件

1
2
3
4
5
6
7
prometheus/cmd/prometheus/main.go

if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
level.Info(logger).Log("msg", "See you next time!")

启动完成。

main 函数注解

prometheus/cmd/prometheus/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// The main package for the Prometheus server executable.
package main

import (
"context"
"fmt"
"io"
"math"
"math/bits"
"net"
"net/http"
_ "net/http/pprof" // Comment this line to disable pprof endpoint.
"net/url"
"os"
"os/signal"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
"syscall"
"time"

"github.com/alecthomas/units"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
conntrack "github.com/mwitkow/go-conntrack"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promlog"
promlogflag "github.com/prometheus/common/promlog/flag"
"github.com/prometheus/common/version"
toolkit_web "github.com/prometheus/exporter-toolkit/web"
toolkit_webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag"
jcfg "github.com/uber/jaeger-client-go/config"
jprom "github.com/uber/jaeger-lib/metrics/prometheus"
"go.uber.org/atomic"
kingpin "gopkg.in/alecthomas/kingpin.v2"
klog "k8s.io/klog"
klogv2 "k8s.io/klog/v2"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
_ "github.com/prometheus/prometheus/discovery/install" // Register service discovery implementations.
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/logging"
"github.com/prometheus/prometheus/pkg/relabel"
prom_runtime "github.com/prometheus/prometheus/pkg/runtime"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/prometheus/web"
)

var (
// 声明两个程序自身的监控指标
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_config_last_reload_successful",
Help: "Whether the last configuration reload attempt was successful.",
})
configSuccessTime = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})

// 默认数据保留时间 15 天
defaultRetentionString = "15d"
defaultRetentionDuration model.Duration
)

func init() {
prometheus.MustRegister(version.NewCollector("prometheus"))

// 将默认数据保留时间从字符串转换为 model.Duration 类型,
// 其底层是 time.Duration 类型。
var err error
// model.ParseDuration 会对 defaultRetentionString 进行正则匹配,提取
// 各个单位的数值然后计算总的毫秒数
// 例如 "1y2w3d4h5m6s7ms" 就 解析成 [1y2w3d4h5m6s7ms 1y 1 2w 2 3d 3 4h 4 5m 5 6s 6 7ms 7]
// 然后计算乘积
defaultRetentionDuration, err = model.ParseDuration(defaultRetentionString)
if err != nil {
panic(err)
}
}

type flagConfig struct {
configFile string

localStoragePath string
notifier notifier.Options
forGracePeriod model.Duration
outageTolerance model.Duration
resendDelay model.Duration
web web.Options
tsdb tsdbOptions
lookbackDelta model.Duration
webTimeout model.Duration
queryTimeout model.Duration
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline model.Duration

featureList []string
// These options are extracted from featureList
// for ease of use.
enablePromQLAtModifier bool
enablePromQLNegativeOffset bool
enableExpandExternalLabels bool

prometheusURL string
corsRegexString string

promlogConfig promlog.Config
}

// setFeatureListOptions sets the corresponding options from the featureList.
func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
maxExemplars := c.tsdb.MaxExemplars
// Disabled at first. Value from the flag is used if exemplar-storage is set.
c.tsdb.MaxExemplars = 0
for _, f := range c.featureList {
opts := strings.Split(f, ",")
for _, o := range opts {
switch o {
case "promql-at-modifier":
c.enablePromQLAtModifier = true
level.Info(logger).Log("msg", "Experimental promql-at-modifier enabled")
case "promql-negative-offset":
c.enablePromQLNegativeOffset = true
level.Info(logger).Log("msg", "Experimental promql-negative-offset enabled")
case "remote-write-receiver":
c.web.RemoteWriteReceiver = true
level.Info(logger).Log("msg", "Experimental remote-write-receiver enabled")
case "expand-external-labels":
c.enableExpandExternalLabels = true
level.Info(logger).Log("msg", "Experimental expand-external-labels enabled")
case "exemplar-storage":
c.tsdb.MaxExemplars = maxExemplars
level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled", "maxExemplars", maxExemplars)
case "":
continue
default:
level.Warn(logger).Log("msg", "Unknown option for --enable-feature", "option", o)
}
}
}
return nil
}

func main() {
if os.Getenv("DEBUG") != "" {
runtime.SetBlockProfileRate(20)
runtime.SetMutexProfileFraction(20)
}

var (
oldFlagRetentionDuration model.Duration
newFlagRetentionDuration model.Duration
)

// 启动配置项
cfg := flagConfig{
notifier: notifier.Options{
// 默认注册器注册 cpu 和 go 指标收集器
Registerer: prometheus.DefaultRegisterer,
},
web: web.Options{
Registerer: prometheus.DefaultRegisterer,
Gatherer: prometheus.DefaultGatherer,
},
// 初始化 go kit logger
promlogConfig: promlog.Config{},
}

// kingpin 解析 Args[0],即 prometheus,初始化 kingpin.Application
a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server").UsageWriter(os.Stdout)

// 设置一些标志位参数及其默认值
a.Version(version.Print("prometheus"))

a.HelpFlag.Short('h')

a.Flag("config.file", "Prometheus configuration file path.").
Default("prometheus.yml").StringVar(&cfg.configFile)

a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry.").
Default("0.0.0.0:9090").StringVar(&cfg.web.ListenAddress)

webConfig := toolkit_webflag.AddFlags(a)

// web 读取超时
a.Flag("web.read-timeout",
"Maximum duration before timing out read of the request, and closing idle connections.").
Default("5m").SetValue(&cfg.webTimeout)

// web 的最大连接数
a.Flag("web.max-connections", "Maximum number of simultaneous connections.").
Default("512").IntVar(&cfg.web.MaxConnections)

a.Flag("web.external-url",
"The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically.").
PlaceHolder("<URL>").StringVar(&cfg.prometheusURL)

a.Flag("web.route-prefix",
"Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url.").
PlaceHolder("<path>").StringVar(&cfg.web.RoutePrefix)

a.Flag("web.user-assets", "Path to static asset directory, available at /user.").
PlaceHolder("<path>").StringVar(&cfg.web.UserAssetsPath)

// 通过 http 请求热加载配置文件或者关闭服务器
a.Flag("web.enable-lifecycle", "Enable shutdown and reload via HTTP request.").
Default("false").BoolVar(&cfg.web.EnableLifecycle)

a.Flag("web.enable-admin-api", "Enable API endpoints for admin control actions.").
Default("false").BoolVar(&cfg.web.EnableAdminAPI)

a.Flag("web.console.templates", "Path to the console template directory, available at /consoles.").
Default("consoles").StringVar(&cfg.web.ConsoleTemplatesPath)

a.Flag("web.console.libraries", "Path to the console library directory.").
Default("console_libraries").StringVar(&cfg.web.ConsoleLibrariesPath)

a.Flag("web.page-title", "Document title of Prometheus instance.").
Default("Prometheus Time Series Collection and Processing Server").StringVar(&cfg.web.PageTitle)

// 限制访问源
a.Flag("web.cors.origin", `Regex for CORS origin. It is fully anchored. Example: 'https?://(domain1|domain2)\.com'`).
Default(".*").StringVar(&cfg.corsRegexString)

a.Flag("storage.tsdb.path", "Base path for metrics storage.").
Default("data/").StringVar(&cfg.localStoragePath)

// 每两小时将 wal 落盘
a.Flag("storage.tsdb.min-block-duration", "Minimum duration of a data block before being persisted. For use in testing.").
Hidden().Default("2h").SetValue(&cfg.tsdb.MinBlockDuration)

a.Flag("storage.tsdb.max-block-duration",
"Maximum duration compacted blocks may span. For use in testing. (Defaults to 10% of the retention period.)").
Hidden().PlaceHolder("<duration>").SetValue(&cfg.tsdb.MaxBlockDuration)

a.Flag("storage.tsdb.max-block-chunk-segment-size",
"The maximum size for a single chunk segment in a block. Example: 512MB").
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.tsdb.MaxBlockChunkSegmentSize)

a.Flag("storage.tsdb.wal-segment-size",
"Size at which to split the tsdb WAL segment files. Example: 100MB").
Hidden().PlaceHolder("<bytes>").BytesVar(&cfg.tsdb.WALSegmentSize)

a.Flag("storage.tsdb.retention", "[DEPRECATED] How long to retain samples in storage. This flag has been deprecated, use \"storage.tsdb.retention.time\" instead.").
SetValue(&oldFlagRetentionDuration)

// 保存的时序数据的长度,默认15天,最多
a.Flag("storage.tsdb.retention.time", "How long to retain samples in storage. When this flag is set it overrides \"storage.tsdb.retention\". If neither this flag nor \"storage.tsdb.retention\" nor \"storage.tsdb.retention.size\" is set, the retention time defaults to "+defaultRetentionString+". Units Supported: y, w, d, h, m, s, ms.").
SetValue(&newFlagRetentionDuration)

a.Flag("storage.tsdb.retention.size", "[EXPERIMENTAL] Maximum number of bytes that can be stored for blocks. A unit is required, supported units: B, KB, MB, GB, TB, PB, EB. Ex: \"512MB\". This flag is experimental and can be changed in future releases.").
BytesVar(&cfg.tsdb.MaxBytes)

// 不在 data 目录创建锁文件
a.Flag("storage.tsdb.no-lockfile", "Do not create lockfile in data directory.").
Default("false").BoolVar(&cfg.tsdb.NoLockfile)

a.Flag("storage.tsdb.allow-overlapping-blocks", "[EXPERIMENTAL] Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge.").
Default("false").BoolVar(&cfg.tsdb.AllowOverlappingBlocks)

// 压缩 wal,默认压缩
a.Flag("storage.tsdb.wal-compression", "Compress the tsdb WAL.").
Default("true").BoolVar(&cfg.tsdb.WALCompression)

// 关闭或者重新加载配置前等待样本远程写落盘的时长,默认1分钟,应该是写 wal 的时间,不确定,远程读写没有接触过
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)

a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit. This limit is ignored for streamed response types.").
Default("5e7").IntVar(&cfg.web.RemoteReadSampleLimit)

a.Flag("storage.remote.read-concurrent-limit", "Maximum number of concurrent remote read calls. 0 means no limit.").
Default("10").IntVar(&cfg.web.RemoteReadConcurrencyLimit)

a.Flag("storage.remote.read-max-bytes-in-frame", "Maximum number of bytes in a single frame for streaming remote read response types before marshalling. Note that client might have limit on frame size as well. 1MB as recommended by protobuf by default.").
Default("1048576").IntVar(&cfg.web.RemoteReadBytesInFrame)

a.Flag("storage.exemplars.exemplars-limit", "[EXPERIMENTAL] Maximum number of exemplars to store in in-memory exemplar storage total. 0 disables the exemplar storage. This flag is effective only with --enable-feature=exemplar-storage.").
Default("100000").IntVar(&cfg.tsdb.MaxExemplars)

// rules 文件允许设置的 for 字段的最大值
a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert.").
Default("1h").SetValue(&cfg.outageTolerance)

a.Flag("rules.alert.for-grace-period", "Minimum duration between alert and restored \"for\" state. This is maintained only for alerts with configured \"for\" time greater than grace period.").
Default("10m").SetValue(&cfg.forGracePeriod)

// 间隔多久向 am 重发一次告警信息
a.Flag("rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
Default("1m").SetValue(&cfg.resendDelay)

a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to 2ms to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").
Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps)

// pending 状态的告警队列长度
a.Flag("alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
Default("10000").IntVar(&cfg.notifier.QueueCapacity)

// TODO: Remove in Prometheus 3.0.
// 发送给 am 告警的超时时长
alertmanagerTimeout := a.Flag("alertmanager.timeout", "[DEPRECATED] This flag has no effect.").Hidden().String()

a.Flag("query.lookback-delta", "The maximum lookback duration for retrieving metrics during expression evaluations and federation.").
Default("5m").SetValue(&cfg.lookbackDelta)

// 查询表达式超时时长,默认2分钟
a.Flag("query.timeout", "Maximum time a query may take before being aborted.").
Default("2m").SetValue(&cfg.queryTimeout)

// 并发查询的数量,感觉 cpu 和内存资源比较充裕的话可以适当放大
a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently.").
Default("20").IntVar(&cfg.queryConcurrency)

// 一次查询最多加载到内存的样本数,也是最多返回的样本数
a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
Default("50000000").IntVar(&cfg.queryMaxSamples)

a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: promql-at-modifier, promql-negative-offset, remote-write-receiver, exemplar-storage, expand-external-labels. See https://prometheus.io/docs/prometheus/latest/disabled_features/ for more details.").
Default("").StringsVar(&cfg.featureList)

// 添加日志设置的标志位参数,有日志级别([debug, info, warn, error]")和日志格式([logfmt, json])
promlogflag.AddFlags(a, &cfg.promlogConfig)

// 解析、校验参数
_, err := a.Parse(os.Args[1:])
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "Error parsing commandline arguments"))
a.Usage(os.Args[1:])
os.Exit(2)
}

// 根据配置文件初始化 logger
logger := promlog.New(&cfg.promlogConfig)

if err := cfg.setFeatureListOptions(logger); err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "Error parsing feature list"))
os.Exit(1)
}

// 检验配置参数
cfg.web.ExternalURL, err = computeExternalURL(cfg.prometheusURL, cfg.web.ListenAddress)
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "parse external URL %q", cfg.prometheusURL))
os.Exit(2)
}

cfg.web.CORSOrigin, err = compileCORSRegexString(cfg.corsRegexString)
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "could not compile CORS regex string %q", cfg.corsRegexString))
os.Exit(2)
}

if *alertmanagerTimeout != "" {
level.Warn(logger).Log("msg", "The flag --alertmanager.timeout has no effect and will be removed in the future.")
}

// Throw error for invalid config before starting other components.
// 校验命令行参数配置的 prometheus 配置文件是否有效,如果 配置文件(yml)为空就使用默认配置
// 校验后并没有声明 config 对象并把解析结果赋值给它,这里仅仅是解析。
if _, err := config.LoadFile(cfg.configFile, false, log.NewNopLogger()); err != nil {
level.Error(logger).Log("msg", fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "err", err)
os.Exit(2)
}
// Now that the validity of the config is established, set the config
// success metrics accordingly, although the config isn't really loaded
// yet. This will happen later (including setting these metrics again),
// but if we don't do it now, the metrics will stay at zero until the
// startup procedure is complete, which might take long enough to
// trigger alerts about an invalid config.
// 配置文件的有效性已经确立,即便尚未加载配置,将自身的 metrics 里的 prometheus_config_last_reload_successful 设置为1。
// 后面会加载配置并再次设置这个 metric,但是如果现在不做这一步,这个 metric 会一直为 0 直到启动过程完成,
// prometheus_config_last_reload_successful 为 0 的时间也就是启动的时间足够长就可能触发配置无效的告警。
configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()

cfg.web.ReadTimeout = time.Duration(cfg.webTimeout)
// Default -web.route-prefix to path of -web.external-url.
if cfg.web.RoutePrefix == "" {
cfg.web.RoutePrefix = cfg.web.ExternalURL.Path
}
// RoutePrefix must always be at least '/'.
cfg.web.RoutePrefix = "/" + strings.Trim(cfg.web.RoutePrefix, "/")

{ // Time retention settings.
if oldFlagRetentionDuration != 0 {
level.Warn(logger).Log("deprecation_notice", "'storage.tsdb.retention' flag is deprecated use 'storage.tsdb.retention.time' instead.")
cfg.tsdb.RetentionDuration = oldFlagRetentionDuration
}

// When the new flag is set it takes precedence.
if newFlagRetentionDuration != 0 {
cfg.tsdb.RetentionDuration = newFlagRetentionDuration
}

if cfg.tsdb.RetentionDuration == 0 && cfg.tsdb.MaxBytes == 0 {
cfg.tsdb.RetentionDuration = defaultRetentionDuration
level.Info(logger).Log("msg", "No time or size retention was set so using the default time retention", "duration", defaultRetentionDuration)
}

// Check for overflows. This limits our max retention to 100y.
// 如果设置的时序数据保留时长溢出整数,就限制为 100 年。最大可设置的值为292y。
if cfg.tsdb.RetentionDuration < 0 {
y, err := model.ParseDuration("100y")
if err != nil {
panic(err)
}
cfg.tsdb.RetentionDuration = y
level.Warn(logger).Log("msg", "Time retention value is too high. Limiting to: "+y.String())
}
}

{ // Max block size settings.
if cfg.tsdb.MaxBlockDuration == 0 {
maxBlockDuration, err := model.ParseDuration("31d")
if err != nil {
panic(err)
}
// When the time retention is set and not too big use to define the max block duration.
if cfg.tsdb.RetentionDuration != 0 && cfg.tsdb.RetentionDuration/10 < maxBlockDuration {
maxBlockDuration = cfg.tsdb.RetentionDuration / 10
}

cfg.tsdb.MaxBlockDuration = maxBlockDuration
}
}

// 设置默认的指标估值时间间隔
noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{}
noStepSubqueryInterval.Set(config.DefaultGlobalConfig.EvaluationInterval)

// Above level 6, the k8s client would log bearer tokens in clear-text.
klog.ClampLevel(6)
klog.SetLogger(log.With(logger, "component", "k8s_client_runtime"))
klogv2.ClampLevel(6)
klogv2.SetLogger(log.With(logger, "component", "k8s_client_runtime"))

// 打印启动日志
level.Info(logger).Log("msg", "Starting Prometheus", "version", version.Info())
// 32位系统兼容性提示信息
if bits.UintSize < 64 {
level.Warn(logger).Log("msg", "This Prometheus binary has not been compiled for a 64-bit architecture. Due to virtual memory constraints of 32-bit systems, it is highly recommended to switch to a 64-bit binary of Prometheus.", "GOARCH", runtime.GOARCH)
}

// 打印系统信息日志
level.Info(logger).Log("build_context", version.BuildContext())
level.Info(logger).Log("host_details", prom_runtime.Uname())
level.Info(logger).Log("fd_limits", prom_runtime.FdLimits())
level.Info(logger).Log("vm_limits", prom_runtime.VMLimits())

// 声明和 scraper 和存储相关结构体变量
var (
localStorage = &readyStorage{}
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

var (
// cancel web 的 context
ctxWeb, cancelWeb = context.WithCancel(context.Background())
ctxRule = context.Background()

// notifier 是用于向 am 发送告警的
notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))

// cancel scrape 的 context
ctxScrape, cancelScrape = context.WithCancel(context.Background())
// 声明 discovery manager,将其 context 设置为 scrape 的 context
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape"))

// 一共有两个 discovery.Manager 一个是上面的 scrape,一个是下面的 notify
ctxNotify, cancelNotify = context.WithCancel(context.Background())
discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))

// 声明 scrapeManager,fanout Storage 是一个读写多个底层存储的代理
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)

// 声明 promql 的引擎配置
opts = promql.EngineOpts{
Logger: log.With(logger, "component", "query engine"),
Reg: prometheus.DefaultRegisterer,
MaxSamples: cfg.queryMaxSamples,
Timeout: time.Duration(cfg.queryTimeout),
ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")),
LookbackDelta: time.Duration(cfg.lookbackDelta),
NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
EnableAtModifier: cfg.enablePromQLAtModifier,
EnableNegativeOffset: cfg.enablePromQLNegativeOffset,
}

// 声明 promql 的 quieryEngine
queryEngine = promql.NewEngine(opts)

// 声明 ruleManager
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
Queryable: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})
)

// 设置 scraper 的 scrapeManager 字段值
scraper.Set(scrapeManager)

// 从命令行解析的配置项都赋值给 cfg.web 的配置项
cfg.web.Context = ctxWeb
cfg.web.TSDBRetentionDuration = cfg.tsdb.RetentionDuration
cfg.web.TSDBMaxBytes = cfg.tsdb.MaxBytes
cfg.web.TSDBDir = cfg.localStoragePath
cfg.web.LocalStorage = localStorage
cfg.web.Storage = fanoutStorage
cfg.web.ExemplarStorage = localStorage
cfg.web.QueryEngine = queryEngine
cfg.web.ScrapeManager = scrapeManager
cfg.web.RuleManager = ruleManager
cfg.web.Notifier = notifierManager
cfg.web.LookbackDelta = time.Duration(cfg.lookbackDelta)

cfg.web.Version = &web.PrometheusVersion{
Version: version.Version,
Revision: version.Revision,
Branch: version.Branch,
BuildUser: version.BuildUser,
BuildDate: version.BuildDate,
GoVersion: version.GoVersion,
}

// 把 Prometheus 启动的配置参数都记录在这个 map 里
cfg.web.Flags = map[string]string{}

// Exclude kingpin default flags to expose only Prometheus ones.
boilerplateFlags := kingpin.New("", "").Version("")
for _, f := range a.Model().Flags {
if boilerplateFlags.GetFlag(f.Name) != nil {
continue
}

cfg.web.Flags[f.Name] = f.Value.String()
}

// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager.
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)

// Monitor outgoing connections on default transport with conntrack.
http.DefaultTransport.(*http.Transport).DialContext = conntrack.NewDialContextFunc(
conntrack.DialWithTracing(),
)

// 声明多个 reloader
reloaders := []reloader{
{
name: "remote_storage",
reloader: remoteStorage.ApplyConfig,
}, {
name: "web_handler",
reloader: webHandler.ApplyConfig,
}, {
name: "query_engine",
reloader: func(cfg *config.Config) error {
if cfg.GlobalConfig.QueryLogFile == "" {
queryEngine.SetQueryLogger(nil)
return nil
}

l, err := logging.NewJSONFileLogger(cfg.GlobalConfig.QueryLogFile)
if err != nil {
return err
}
queryEngine.SetQueryLogger(l)
return nil
},
}, {
// The Scrape and notifier managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
// scrape 和 notifier manager 要在 discovery manager 之前重新加载,因为它们要在获取新的监控目标之前重新配置。
name: "scrape",
reloader: scrapeManager.ApplyConfig,
}, {
name: "scrape_sd",
reloader: func(cfg *config.Config) error {
c := make(map[string]discovery.Configs)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfigs
}
return discoveryManagerScrape.ApplyConfig(c)
},
}, {
name: "notify",
reloader: notifierManager.ApplyConfig,
}, {
name: "notify_sd",
reloader: func(cfg *config.Config) error {
c := make(map[string]discovery.Configs)
for k, v := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
c[k] = v.ServiceDiscoveryConfigs
}
return discoveryManagerNotify.ApplyConfig(c)
},
}, {
name: "rules",
reloader: func(cfg *config.Config) error {
// Get all rule files matching the configuration paths.
var files []string
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
return errors.Wrapf(err, "error retrieving rule files for %s", pat)
}
files = append(files, fs...)
}
return ruleManager.Update(
time.Duration(cfg.GlobalConfig.EvaluationInterval),
files,
cfg.GlobalConfig.ExternalLabels,
)
},
},
}

// 注册自身的 metric
prometheus.MustRegister(configSuccess)
prometheus.MustRegister(configSuccessTime)

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
// 在等待开启 TSDB 的时候启动所有的组件,进加载初始配置,全部启动完成后标记为 ready。
// 这个 channel 初始化 DB 完成的信号
dbOpen := make(chan struct{})

// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
// sync.Once 用于确保在不同的执行阶段(SIGTERM 或加载完配置)关闭 channel。
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
// 等待直到 server 准备好处理配置重加载。
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}

// 启动 jaeger 链路追踪
closer, err := initTracing(logger)
if err != nil {
level.Error(logger).Log("msg", "Unable to init tracing", "err", err)
os.Exit(2)
}
defer closer.Close()

// 启动 web 服务
listener, err := webHandler.Listener()
if err != nil {
level.Error(logger).Log("msg", "Unable to start web listener", "err", err)
os.Exit(1)
}

// 验证额外的 web 配置,比如 tsl
err = toolkit_web.Validate(*webConfig)
if err != nil {
level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err)
os.Exit(1)
}

// oklog.run.Group 和 google errgroup 功能相近,维护一组并发任务的执行。
var g run.Group
{
// Termination handler.
term := make(chan os.Signal, 1)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
cancel := make(chan struct{})
// 接收信号退出
g.Add(
func() error {
// Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
select {
case <-term:
level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
reloadReady.Close()
case <-webHandler.Quit():
level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...")
case <-cancel:
reloadReady.Close()
}
return nil
},
func(err error) {
close(cancel)
},
)
}
{
// Scrape discovery manager.
g.Add(
func() error {
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
cancelScrape()
},
)
}
{
// Notify discovery manager.
g.Add(
func() error {
err := discoveryManagerNotify.Run()
level.Info(logger).Log("msg", "Notify discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping notify discovery manager...")
cancelNotify()
},
)
}
{
// Scrape manager.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
// scrape manager 获取到新的抓取目标列表时,它需要读取每个 job 的合法的配置。
// 这依赖于正在被 discovery manager 同步的配置文件,所以要等到配置加载完成。
<-reloadReady.C

err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}
{
// Reload handler.

// Make sure that sighup handler is registered with a redirect to the channel before the potentially
// long and synchronous tsdb init.
// tsdb 初始化时间可能很长,确保 sighup 处理函数在这之前注册完成。
hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
cancel := make(chan struct{})
g.Add(
func() error {
<-reloadReady.C

for {
select {
case <-hup:
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
}
case rc := <-webHandler.Reload():
if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
rc <- err
} else {
rc <- nil
}
case <-cancel:
return nil
}
}

},
func(err error) {
// Wait for any in-progress reloads to complete to avoid
// reloading things after they have been shutdown.
cancel <- struct{}{}
},
)
}
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
select {
case <-dbOpen:
// In case a shutdown is initiated before the dbOpen is released
case <-cancel:
reloadReady.Close()
return nil
}

if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, logger, noStepSubqueryInterval, reloaders...); err != nil {
return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
}

reloadReady.Close()

webHandler.Ready()
level.Info(logger).Log("msg", "Server is ready to receive web requests.")
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}
{
// Rule manager.
g.Add(
func() error {
<-reloadReady.C
ruleManager.Run()
return nil
},
func(err error) {
ruleManager.Stop()
},
)
}
{
// TSDB.
opts := cfg.tsdb.ToTSDBOptions()
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "Starting TSDB ...")
if cfg.tsdb.WALSegmentSize != 0 {
if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 {
return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
}
}
if cfg.tsdb.MaxBlockChunkSegmentSize != 0 {
if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 {
return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB")
}
}
db, err := openDBWithMetrics(
cfg.localStoragePath,
logger,
prometheus.DefaultRegisterer,
&opts,
)
if err != nil {
return errors.Wrapf(err, "opening storage failed")
}

switch fsType := prom_runtime.Statfs(cfg.localStoragePath); fsType {
case "NFS_SUPER_MAGIC":
level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
default:
level.Info(logger).Log("fs_type", fsType)
}

level.Info(logger).Log("msg", "TSDB started")
level.Debug(logger).Log("msg", "TSDB options",
"MinBlockDuration", cfg.tsdb.MinBlockDuration,
"MaxBlockDuration", cfg.tsdb.MaxBlockDuration,
"MaxBytes", cfg.tsdb.MaxBytes,
"NoLockfile", cfg.tsdb.NoLockfile,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"WALSegmentSize", cfg.tsdb.WALSegmentSize,
"AllowOverlappingBlocks", cfg.tsdb.AllowOverlappingBlocks,
"WALCompression", cfg.tsdb.WALCompression,
)

startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
localStorage.Set(db, startTimeMargin)
close(dbOpen)
<-cancel
return nil
},
func(err error) {
if err := fanoutStorage.Close(); err != nil {
level.Error(logger).Log("msg", "Error stopping storage", "err", err)
}
close(cancel)
},
)
}
{
// Web handler.
g.Add(
func() error {
if err := webHandler.Run(ctxWeb, listener, *webConfig); err != nil {
return errors.Wrapf(err, "error starting web server")
}
return nil
},
func(err error) {
cancelWeb()
},
)
}
{
// Notifier.

// Calling notifier.Stop() before ruleManager.Stop() will cause a panic if the ruleManager isn't running,
// so keep this interrupt after the ruleManager.Stop().
// 如果 ruleManager不在运行,在调用 ruleManager.Stop() 之前调用 notifier.Stop() 会引发 pannic,
// 所以确保在 ruleManager.Stop() 中断 notifier。
g.Add(
func() error {
// When the notifier manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager
// so we wait until the config is fully loaded.
<-reloadReady.C

notifierManager.Run(discoveryManagerNotify.SyncCh())
level.Info(logger).Log("msg", "Notifier manager stopped")
return nil
},
func(err error) {
notifierManager.Stop()
},
)
}
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
level.Info(logger).Log("msg", "See you next time!")
}

func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, opts *tsdb.Options) (*tsdb.DB, error) {
db, err := tsdb.Open(
dir,
log.With(logger, "component", "tsdb"),
reg,
opts,
)
if err != nil {
return nil, err
}

reg.MustRegister(
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_lowest_timestamp_seconds",
Help: "Lowest timestamp value stored in the database.",
}, func() float64 {
bb := db.Blocks()
if len(bb) == 0 {
return float64(db.Head().MinTime() / 1000)
}
return float64(db.Blocks()[0].Meta().MinTime / 1000)
}), prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_min_time_seconds",
Help: "Minimum time bound of the head block.",
}, func() float64 { return float64(db.Head().MinTime() / 1000) }),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time_seconds",
Help: "Maximum timestamp of the head block.",
}, func() float64 { return float64(db.Head().MaxTime() / 1000) }),
)

return db, nil
}

type safePromQLNoStepSubqueryInterval struct {
value atomic.Int64
}

func durationToInt64Millis(d time.Duration) int64 {
return int64(d / time.Millisecond)
}
func (i *safePromQLNoStepSubqueryInterval) Set(ev model.Duration) {
i.value.Store(durationToInt64Millis(time.Duration(ev)))
}

func (i *safePromQLNoStepSubqueryInterval) Get(int64) int64 {
return i.value.Load()
}

type reloader struct {
name string
reloader func(*config.Config) error
}

func reloadConfig(filename string, expandExternalLabels bool, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...reloader) (err error) {
start := time.Now()
timings := []interface{}{}
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)

defer func() {
if err == nil {
configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()
} else {
configSuccess.Set(0)
}
}()

conf, err := config.LoadFile(filename, expandExternalLabels, logger)
if err != nil {
return errors.Wrapf(err, "couldn't load configuration (--config.file=%q)", filename)
}

failed := false
for _, rl := range rls {
rstart := time.Now()
if err := rl.reloader(conf); err != nil {
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
failed = true
}
timings = append(timings, rl.name, time.Since(rstart))
}
if failed {
return errors.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
}

noStepSuqueryInterval.Set(conf.GlobalConfig.EvaluationInterval)
l := []interface{}{"msg", "Completed loading of configuration file", "filename", filename, "totalDuration", time.Since(start)}
level.Info(logger).Log(append(l, timings...)...)
return nil
}

func startsOrEndsWithQuote(s string) bool {
return strings.HasPrefix(s, "\"") || strings.HasPrefix(s, "'") ||
strings.HasSuffix(s, "\"") || strings.HasSuffix(s, "'")
}

// compileCORSRegexString compiles given string and adds anchors
func compileCORSRegexString(s string) (*regexp.Regexp, error) {
r, err := relabel.NewRegexp(s)
if err != nil {
return nil, err
}
return r.Regexp, nil
}

// computeExternalURL computes a sanitized external URL from a raw input. It infers unset
// URL parts from the OS and the given listen address.
func computeExternalURL(u, listenAddr string) (*url.URL, error) {
if u == "" {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
_, port, err := net.SplitHostPort(listenAddr)
if err != nil {
return nil, err
}
u = fmt.Sprintf("http://%s:%s/", hostname, port)
}

if startsOrEndsWithQuote(u) {
return nil, errors.New("URL must not begin or end with quotes")
}

eu, err := url.Parse(u)
if err != nil {
return nil, err
}

ppref := strings.TrimRight(eu.Path, "/")
if ppref != "" && !strings.HasPrefix(ppref, "/") {
ppref = "/" + ppref
}
eu.Path = ppref

return eu, nil
}

type sender interface {
Send(alerts ...*notifier.Alert)
}

// sendAlerts implements the rules.NotifyFunc for a Notifier.
func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
var res []*notifier.Alert

for _, alert := range alerts {
a := &notifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}

if len(alerts) > 0 {
s.Send(res...)
}
}
}

// readyStorage implements the Storage interface while allowing to set the actual
// storage at a later point in time.
type readyStorage struct {
mtx sync.RWMutex
db *tsdb.DB
startTimeMargin int64
}

// Set the storage.
func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) {
s.mtx.Lock()
defer s.mtx.Unlock()

s.db = db
s.startTimeMargin = startTimeMargin
}

// get is internal, you should use readyStorage as the front implementation layer.
func (s *readyStorage) get() *tsdb.DB {
s.mtx.RLock()
x := s.db
s.mtx.RUnlock()
return x
}

// StartTime implements the Storage interface.
func (s *readyStorage) StartTime() (int64, error) {
if x := s.get(); x != nil {
var startTime int64

if len(x.Blocks()) > 0 {
startTime = x.Blocks()[0].Meta().MinTime
} else {
startTime = time.Now().Unix() * 1000
}
// Add a safety margin as it may take a few minutes for everything to spin up.
return startTime + s.startTimeMargin, nil
}

return math.MaxInt64, tsdb.ErrNotReady
}

// Querier implements the Storage interface.
func (s *readyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
if x := s.get(); x != nil {
return x.Querier(ctx, mint, maxt)
}
return nil, tsdb.ErrNotReady
}

// ChunkQuerier implements the Storage interface.
func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
if x := s.get(); x != nil {
return x.ChunkQuerier(ctx, mint, maxt)
}
return nil, tsdb.ErrNotReady
}

func (s *readyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
if x := s.get(); x != nil {
return x.ExemplarQuerier(ctx)
}
return nil, tsdb.ErrNotReady
}

// Appender implements the Storage interface.
func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
if x := s.get(); x != nil {
return x.Appender(ctx)
}
return notReadyAppender{}
}

type notReadyAppender struct{}

func (n notReadyAppender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) {
return 0, tsdb.ErrNotReady
}

func (n notReadyAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) {
return 0, tsdb.ErrNotReady
}

func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady }

func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }

// Close implements the Storage interface.
func (s *readyStorage) Close() error {
if x := s.get(); x != nil {
return x.Close()
}
return nil
}

// CleanTombstones implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) CleanTombstones() error {
if x := s.get(); x != nil {
return x.CleanTombstones()
}
return tsdb.ErrNotReady
}

// Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
if x := s.get(); x != nil {
return x.Delete(mint, maxt, ms...)
}
return tsdb.ErrNotReady
}

// Snapshot implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Snapshot(dir string, withHead bool) error {
if x := s.get(); x != nil {
return x.Snapshot(dir, withHead)
}
return tsdb.ErrNotReady
}

// Stats implements the api_v1.TSDBAdminStats interface.
func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) {
if x := s.get(); x != nil {
return x.Head().Stats(statsByLabelName), nil
}
return nil, tsdb.ErrNotReady
}

// ErrNotReady is returned if the underlying scrape manager is not ready yet.
var ErrNotReady = errors.New("Scrape manager not ready")

// ReadyScrapeManager allows a scrape manager to be retrieved. Even if it's set at a later point in time.
type readyScrapeManager struct {
mtx sync.RWMutex
m *scrape.Manager
}

// Set the scrape manager.
func (rm *readyScrapeManager) Set(m *scrape.Manager) {
rm.mtx.Lock()
defer rm.mtx.Unlock()

rm.m = m
}

// Get the scrape manager. If is not ready, return an error.
func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
rm.mtx.RLock()
defer rm.mtx.RUnlock()

if rm.m != nil {
return rm.m, nil
}

return nil, ErrNotReady
}

// tsdbOptions is tsdb.Option version with defined units.
// This is required as tsdb.Option fields are unit agnostic (time).
type tsdbOptions struct {
WALSegmentSize units.Base2Bytes
MaxBlockChunkSegmentSize units.Base2Bytes
RetentionDuration model.Duration
MaxBytes units.Base2Bytes
NoLockfile bool
AllowOverlappingBlocks bool
WALCompression bool
StripeSize int
MinBlockDuration model.Duration
MaxBlockDuration model.Duration
MaxExemplars int
}

func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
return tsdb.Options{
WALSegmentSize: int(opts.WALSegmentSize),
MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize),
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
MaxBytes: int64(opts.MaxBytes),
NoLockfile: opts.NoLockfile,
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
WALCompression: opts.WALCompression,
StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
MaxExemplars: opts.MaxExemplars,
}
}

func initTracing(logger log.Logger) (io.Closer, error) {
// Set tracing configuration defaults.
cfg := &jcfg.Configuration{
ServiceName: "prometheus",
Disabled: true,
}

// Available options can be seen here:
// https://github.com/jaegertracing/jaeger-client-go#environment-variables
cfg, err := cfg.FromEnv()
if err != nil {
return nil, errors.Wrap(err, "unable to get tracing config from environment")
}

jLogger := jaegerLogger{logger: log.With(logger, "component", "tracing")}

tracer, closer, err := cfg.NewTracer(
jcfg.Logger(jLogger),
jcfg.Metrics(jprom.New()),
)
if err != nil {
return nil, errors.Wrap(err, "unable to init tracing")
}

opentracing.SetGlobalTracer(tracer)
return closer, nil
}

type jaegerLogger struct {
logger log.Logger
}

func (l jaegerLogger) Error(msg string) {
level.Error(l.logger).Log("msg", msg)
}

func (l jaegerLogger) Infof(msg string, args ...interface{}) {
keyvals := []interface{}{"msg", fmt.Sprintf(msg, args...)}
level.Info(l.logger).Log(keyvals...)
}

参考:

https://blog.csdn.net/dengxiafubi/article/details/102845639

https://so.csdn.net/so/search?q=Prometheus%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0&t=blog&u=qq_35753140

-------------本文结束 感谢您的阅读-------------
作者Magiceses
有问题请 留言 或者私信我的 微博
满分是10分的话,这篇文章你给几分