Thanos 源码分析

不要因为没有掌声,而放弃梦想,我们需要的是坚持,而不是观众。

thanos组件版本:v0.16.0

Thanos Query

Thanos Query的作用

Thanos Query组件是http服务器 + grpc服务器,它的数据源是位于下游的已发现的实现STORE API的组件(例如Thanos Sidecar组件、Thanos Store组件、Thanos Ruler组件),同时实现了Prometheus官方的HTTP API。Thanos Query组件从下游处获得数据后,能进行合并、去重等操作,最后将结果返回给外部的客户端。因此,Thanos Query就是数据库中间件的角色。

在这里插入图片描述

使用github.com/oklog/run包来启动一组协程,这些协程的逻辑主要是启动了http server、grpc server、动态发现位于下游的实现STORE API的组件等。

架构

在这里插入图片描述

源码分析

启动参数

Thanos的启动命令格式如下,格式都是thanos开头(因为是同一个可执行二进制文件)。启动哪个组件,在于第一个参数,在本例子中是query,因此这条命令是启动query组件的逻辑。

1
2
3
4
5
6
7
8
9
10
11
thanos query \
--log.level=debug \
--query.auto-downsampling \
--grpc-address=0.0.0.0:10901 \
--http-address=0.0.0.0:9090 \
--query.partial-response \
--query.replica-label=prometheus_replica \
--query.replica-label=rule_replica \
--store=dnssrv+_grpc._tcp.prometheus-headless.thanos.svc.cluster.local \
--store=dnssrv+_grpc._tcp.thanos-rule.thanos.svc.cluster.local \
--store=dnssrv+_grpc._tcp.thanos-store.thanos.svc.cluster.local

注意:
1、partial-response 一定要加上

这个flag设立的目的是遵照数据一致性原则
如果1、2、3中有某个返回查询数据为空或者超时(默认是2分钟)的情况下,这次整体的结果是否还要保留:
开启的情况下 , 有点儿部分人成虎的感觉
不开启,则必须要求每个上游都要有结果才行,真一致性

2、deduplicate

不去重的情况下,会看到相同数据了具体来自哪个数据源,尤其是prom和ruler源
去重开启,程序后台会对每次响应的数据源做个打分,选择优秀的源作为本次的gRPC对象

代码分析

main方法

来具体看看main方法。创建app对象,app对象包含了所有Thanos组件的启动函数,但真正启动时只从map中取出一个函数进行启动,取出哪个函数取决于启动命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func main() {
app := extkingpin.NewApp(kingpin.New(filepath.Base(os.Args[0]), "A block storage based long-term storage for Prometheus").Version(version.Print("thanos")))

// 把所有组件的启动逻辑都放进app对象中的setups列表中
registerSidecar(app)
registerStore(app)
registerQuery(app)
registerRule(app)
registerCompact(app)
registerTools(app)
registerReceive(app)
registerQueryFrontend(app)

// 根据命令行的信息,从app对象的setups列表中取出一个组件逻辑
cmd, setup := app.Parse()
logger := logging.NewLogger(*logLevel, *logFormat, *debugName)

var g run.Group
var tracer opentracing.Tracer

/*
tracing相关的代码
*/


reloadCh := make(chan struct{}, 1)

// 启动特定的一个组件(sidecar、query、store等组件中的一种),底层还是执行g.Add(...)
if err := setup(&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil {
os.Exit(1)
}

// 监听来自系统的杀死信号.
{
cancel := make(chan struct{})
g.Add(func() error {
return interrupt(logger, cancel)
}, func(error) {
close(cancel)
})
}

// 监听来配置重载的信号
{
cancel := make(chan struct{})
g.Add(func() error {
return reload(logger, cancel, reloadCh)
}, func(error) {
close(cancel)
})
}

// 阻塞地等待所有协程中的退出
// 有一个协程返回,其他协程也会返回
if err := g.Run(); err != nil {
level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "%s command failed", cmd)))
os.Exit(1)
}

// 到达此处,说明整个程序结束了。
level.Info(logger).Log("msg", "exiting")
}

registerQuery函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func registerQuery(app *extkingpin.App) {
cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")
/*
解析命令行参数
*/

//Setup()的参数是一个函数,会被放入app对象的setups列表中
//闭包的使用技巧以及相关堆栈的分析
//最核心的是runQuery()方法
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
...
...
...
return runQuery(
g,
logger,
reg,
tracer,
*requestLoggingDecision,
*grpcBindAddr,
time.Duration(*grpcGracePeriod),
*grpcCert,
*grpcKey,
*grpcClientCA,
/*
其他代码
*/
)
)

}

runQuery函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//使用run.Group对象来启动http server、grpc server、服务发现协程。
func runQuery(
g *run.Group, //其实来自main()方法
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
requestLoggingDecision string,
grpcBindAddr string,
grpcGracePeriod time.Duration,
grpcCert string,
grpcKey string,
grpcClientCA string,
/*
其他代码
*/
) error {

var (
// stores对象的类型StoreSet。它包含了一组store组件
//(位于下游的实现Store API的组件),这一组store组件是可以动态变化的
/*
type StoreSet struct {
//其他属性
stores map[string]*storeRef
}
*/
stores = query.NewStoreSet(...)

// proxy对象,即下游的Store API组件的代理
// 下游的Store API组件的列表,其实就是构造方法的入参stores.Get这个方法来获取
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)

/*
queryableCreator是一个方法,用于创建一个querier结构体对象;
querier结构体的属性proxy就是proxy对象,它包含了一组会动态变化的thanos store组件(动态变化是因为启动了一些额外的专门的协程来动态地修改这个切片);
*/
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
proxy,
maxConcurrentSelects,
queryTimeout,
)

/*
这一段代码都是启动一些协程,定时发现和动态发现Store API组件的变化,随即更新stores对象中的类型为map[string]*storeRef的属性
*/


// 创建http server,注册http handler,并启动server
{

router := route.New()
//新建QueryAPI结构体对象
api := v1.NewQueryAPI(
logger,
stores,
engine,
queryableCreator,
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
enableAutodownsampling,
enableQueryPartialResponse,
enableRulePartialResponse,
queryReplicaLabels,
flagsMap,
instantDefaultMaxSourceResolution,
defaultMetadataTimeRange,
gate.New(
extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg),
maxConcurrentQueries,
),
)

// 为router对象注册http方法
api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)

srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
// http服务器使用router对象
srv.Handle("/", router)

g.Add(func() error {
statusProber.Healthy()
// 启动http server
return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)
srv.Shutdown(err)
})
}

// 创建gprc server,注册grpc handler,并启动server
// 它本身也可以是一个store, 为上层的front做准备
{
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxy)), // 注册grpc handler
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)), // 注册grpc handler
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
)

g.Add(func() error {
statusProber.Ready()
// 启动grpc server
return s.ListenAndServe()
}, func(error) {
statusProber.NotReady(err)
s.Shutdown(err)
})
}

// 至此,http server和grpc server都启动了。
level.Info(logger).Log("msg", "starting query node")
return nil
)

}

QueryAPI结构体及其方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// QueryAPI is an API used by Thanos Query.
type QueryAPI struct {
baseAPI *api.BaseAPI
logger log.Logger
gate gate.Gate

// 构造方法,用于创建一个querier结构体对象
queryableCreate query.QueryableCreator

queryEngine *promql.Engine
ruleGroups rules.UnaryClient
...
...
...
replicaLabels []string
storeSet *query.StoreSet
}


func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) {
qapi.baseAPI.Register(r, tracer, logger, ins, logMiddleware)
instr := api.GetInstr(tracer, logger, ins, logMiddleware)
/*
其他代码
*/

// 把qapi.query、qapi.series、 qapi.stores注册到入参r,从而完成http handler的注册
// 不管是/query接口和/series接口,每次请求到达都会创建querier对象,而querier对象内含了一组的Store API组件
r.Get("/query", instr("query", qapi.query))
r.Get("/series", instr("series", qapi.series))
r.Get("/stores", instr("stores", qapi.stores))
}

//返回指标数据
func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiError) {
/*
其他代码
*/

// 创建一个querier对象
// querier对象的属性proxy则包含了一组thanos store组件
q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))

/*
其他代码
*/

var (
metrics = []labels.Labels{}
sets []storage.SeriesSet
)
for _, mset := range matcherSets {
// 调用querier对象的Select()方法获取指标
sets = append(sets, q.Select(false, nil, mset...))
}
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
for set.Next() {
metrics = append(metrics, set.At().Labels())
}
return metrics, set.Warnings(), nil
}

querier结构体及其方法

实现了 Querier接口(github.com/prometheus/prometheus/storage/interface.go),此接口的核心方法是Select(…),这个方法在/query和/series等接口中都会被使用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type querier struct {
ctx context.Context
logger log.Logger
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
storeDebugMatchers [][]*labels.Matcher

// proxy包含了一组动态的thanos store组件
proxy storepb.StoreServer

deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
selectGate gate.Gate
selectTimeout time.Duration
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
/*
其他代码
*/
promise := make(chan storage.SeriesSet, 1)
go func() {
defer close(promise)
var err error
/*
其他代码
*/
//获取到指标数据
set, err := q.selectFn(ctx, hints, ms...)
if err != nil {
// 把错误送至管道,并退出本协程
promise <- storage.ErrSeriesSet(err)
return
}
//将指标数据送至管道
promise <- set
}()

// 返回指标的封装
return &lazySeriesSet{
create: func() (storage.SeriesSet, bool) {
/*
其他代码
*/
// 从管道中读取指标
set, ok := <-promise
return set, set.Next()
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 获取指标,调用的是属性proxy的Series(...)方法
func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {

/*
其他代码
*/

// seriesServer结构体重写了Send()方法,在Sender()方法中将gprc返回的数据数据存储到它的seriesSet属性
resp := &seriesServer{ctx: ctx}

// q.proxy的实现是ProxyStore结构体
// q.proxy.Series()是grpc方法(流式)
// q.proxy.Series()调用完毕后,resp的seriesSet属性的值会被填充
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Matchers: sms,
/*
其他代码
*/
}, resp); err != nil {
return nil, errors.Wrap(err, "proxy Series()")
}

/*
其他代码
*/


set := &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet), // 把resp的seriesSet属性抽出来
aggrs: aggrs,
warns: warns,
}
// set就是指标
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), nil
}

ProxyStore对象

1
2
3
4
5
6
7
8
9
10
11
12
13
// ProxyStore implements the store API that proxies request to all given underlying stores.
type ProxyStore struct {
logger log.Logger

// 返回位于下游的实现Store API接口的组件,查询指标时会用到此属性
stores func() []Client

component component.StoreAPI
selectorLabels labels.Labels

responseTimeout time.Duration
metrics *proxyStoreMetrics
}

查询指标时,会从下游的所有的Store API的组件中查询指标以及进行合并、去重(如果设置了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
根据客户端的请求,从下游的所有的Store API的组件中查询指标以及进行合并、去重,最后将指标传输给入参srv.
这是一个gprc流式接口。
*/
func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {

/*
其他代码
*/
g, gctx := errgroup.WithContext(srv.Context())
respSender, respCh := newCancelableRespChannel(gctx, 10)


// 生产者协程
g.Go(func() error {
/*
本协程会从后端的thanos store组件中获取指标,并进行指标合并操作。
本协程的关闭,消费者协程也会关闭。
*/

var (
seriesSet []storepb.SeriesSet
storeDebugMsgs []string
wg = &sync.WaitGroup{}
)

defer func() {
wg.Wait()
//close()方法会引发消费者协程退出
close(respCh)
}()

// 遍历后端的Store API组件
for _, st := range s.stores() {

/*
其他代码
*/

sc, err := st.Series(seriesCtx, r)
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses))

/*
其他代码
*/

// 获得合并后的指标,再发送给respCh管道
mergedSet := storepb.MergeSeriesSets(seriesSet...)
for mergedSet.Next() {
lset, chk := mergedSet.At()
// respSender.send(...)其实是将指标发送给respCh管道
respSender.send(storepb.NewSeriesResponse(&storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(lset), Chunks: chk}))
}
return mergedSet.Err()
})

// 消费者协程
g.Go(func() error {
// 响应(已被merged)被本协程获取,并将响应输送给方法入参srv.
for resp := range respCh {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
}
return nil
})

// 等待生产者协程和消费者协程结束
if err := g.Wait(); err != nil {
return err
}
return nil
}

小结

本文分析了代码的轮廓,还有许多细节没有被提及,但Thanos Query组件的代码结构清晰易懂,使用了github.com/oklog/run包来启动一组协程,编写http server和grpc server的思路、动态发现下游Store API组件的套路都值得模仿。

Thanos Sidecar

Thanos Sidecar的作用

Thanos Query组件和prometheus实例绑定在一起,三大作用:

  • 作为访问代理,对客户端暴露grpc接口,业务逻辑是访问其绑定的prometheus实例的http接口,从而获取metrics和rule数据,最终返回给客户端;
  • 如果开启对象存储功能,会将promethues tsdb目录下的所有block目录上传至指定的对象存储系统中;
  • 监听promethues配置文件的变化,发现文件变化后也是访问prometheus实例的http接口让prometheus重载配置。

在这里插入图片描述

使用github.com/oklog/run包来启动一组协程,这些协程的逻辑主要是启动了http server、grpc server、动态发现位于下游的实现STORE API的组件等。

源码分析

启动参数

Thanos的启动命令格式如下,格式都是thanos开头(因为是同一个可执行二进制文件)。启动哪个组件,在于第一个参数,在本例子中是sidecar,因此这条命令是启动sidecar组件的逻辑。

1
2
3
4
5
thanos sidecar \
--prometheus.url=http://localhost:9090/ \
--tsdb.path=/prometheus \
--grpc-address=[$(POD_IP)]:10901 \
--http-address=[$(POD_IP)]:10902 \

代码分析

main方法

来具体看看main方法。创建app对象,app对象包含了所有Thanos组件的启动函数,但真正启动时只从map中取出一个函数进行启动,取出哪个函数取决于启动命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
func main() {


/*
其他代码
*/

app := extkingpin.NewApp(kingpin.New(filepath.Base(os.Args[0]), "A block storage based long-term storage for Prometheus").Version(version.Print("thanos")))
/*
其他代码
*/


// 把所有组件的启动逻辑都放进app对象中的setups列表中
registerSidecar(app)
registerStore(app)
registerQuery(app)
registerRule(app)
registerCompact(app)
registerTools(app)
registerReceive(app)
registerQueryFrontend(app)

// 根据命令行的信息,从app对象的setups列表中取出一个组件逻辑
cmd, setup := app.Parse()
logger := logging.NewLogger(*logLevel, *logFormat, *debugName)

/*
其他代码
*/

var g run.Group
var tracer opentracing.Tracer

/*
tracing相关的代码
*/


reloadCh := make(chan struct{}, 1)

// 启动特定的一个组件(sidecar、query、store等组件中的一种),底层还是执行g.Add(...)
if err := setup(&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil {
os.Exit(1)
}

// 监听来自系统的杀死信号.
{
cancel := make(chan struct{})
g.Add(func() error {
return interrupt(logger, cancel)
}, func(error) {
close(cancel)
})
}

// 监听来配置重载的信号
{
cancel := make(chan struct{})
g.Add(func() error {
return reload(logger, cancel, reloadCh)
}, func(error) {
close(cancel)
})
}

// 阻塞地等待所有协程中的退出
// 有一个协程返回,其他协程也会返回
if err := g.Run(); err != nil {
level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "%s command failed", cmd)))
os.Exit(1)
}

// 到达此处,说明整个程序结束了。
level.Info(logger).Log("msg", "exiting")
}

registerSidecar方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

func registerSidecar(app *extkingpin.App) {
cmd := app.Command(component.Sidecar.String(), "Sidecar for Prometheus server")
conf := &sidecarConfig{}
// 解析命令行参数
conf.registerFlag(cmd)

// Setup()的入参方法,会被放入app对象的setups列表中
// 最核心的是runSidecar()方法
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
rl := reloader.New(log.With(logger, "component", "reloader"),
extprom.WrapRegistererWithPrefix("thanos_sidecar_", reg),
&reloader.Options{
ReloadURL: reloader.ReloadURLFromBase(conf.prometheus.url),
CfgFile: conf.reloader.confFile,
CfgOutputFile: conf.reloader.envVarConfFile,
WatchedDirs: conf.reloader.ruleDirectories,
WatchInterval: conf.reloader.watchInterval,
RetryInterval: conf.reloader.retryInterval,
})

return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf)
})
}

runSidecar方法

使用run.Group对象来启动http server、grpc server、传输block目录至对象存储的协程、监听prometheus配置文件的协程、定期检测prometheus实例存活的协程。

细节说明:

  • 查看prometheus实例的心跳机制是通过/api/v1/status/config接口;
  • 监听prometheus配置文件变化的工具包是github.com/fsnotify/fsnotify;
  • 开启上传block功能,则每30s遍历prometheus tsdb目录下的所有的block目录(已上传的block或空block会被忽略,默认情况下被压缩过的block也会被忽略),并上传相应的文件至对象存储;
  • 获取不到prometheus实例的external label或者prometheus没有配置external label,会导致sidecar启动失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
func runSidecar(
g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
reloader *reloader.Reloader,
comp component.Component,
conf sidecarConfig,
) error {

// 用一个结构体来保存prometheus实例的url、prometheus实例的external label、prometheus client等信息。
var m = &promMetadata{
promURL: conf.prometheus.url,

mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, "thanos-sidecar"),
}

// 获取对象存储的配置信息,如果有,说明是开启上传block至对象存储的功能。
confContentYaml, err := conf.objStore.Content()
if err != nil {
return errors.Wrap(err, "getting object store config")
}
var uploads = true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")
uploads = false
}


grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)

// 创建http server,并启动server(只有/metrics、/-/healthy、/-/ready等接口)
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(conf.http.bindAddress),
httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)),
)
g.Add(func() error {
statusProber.Healthy()
return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)
srv.Shutdown(err)
})


// 获取promehtues实例的external label,并做心跳
{
// promUp记录promehtues是否正常,0表示不正常,1表示正常
promUp := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_sidecar_prometheus_up",
Help: "Boolean indicator whether the sidecar can reach its Prometheus peer.",
})
// lastHeartbeat记录最后一次心跳时间
lastHeartbeat := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_sidecar_last_heartbeat_success_time_seconds",
Help: "Timestamp of the last successful heartbeat in seconds.",
})

ctx, cancel := context.WithCancel(context.Background())
// 获取prometheus实例的external label(/api/v1/status/config接口),并通过定期(30s)做这件事情来做心跳
g.Add(func() error {
/*
检查性代码
*/

// 获取prometheus实例的external label
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
// m.UpdateLabels(ctx)去访问prometheus实例的/api/v1/status/config接口,并将返回的数据设置到自己的属性labels
if err := m.UpdateLabels(ctx); err != nil {
promUp.Set(0)
statusProber.NotReady(err)
return err
}
promUp.Set(1)
statusProber.Ready()
// 记录心跳时间
lastHeartbeat.SetToCurrentTime()
return nil
})

// 拿不到prometheus实例的external label或者prometheus没有配置external label则退出
if err != nil {
return errors.Wrap(err, "initial external labels query")
}
if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}

// 每个30s从prometheus实例获取exterlan label,通过此方式来记录心跳时间
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
/*
其他代码
*/

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
promUp.Set(0)
} else {
promUp.Set(1)
// 记录心跳时间
lastHeartbeat.SetToCurrentTime()
}
return nil
})
}, func(error) {
cancel()
})
}

// 使用github.com/fsnotify/fsnotify包监听prometheus实例的配置文件的变化
// 如果文件发生变化则发送一个POST请求给prometheus实例,让它重新加载配置文件
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return reloader.Watch(ctx)
}, func(error) {
cancel()
})
}

{
t := exthttp.NewTransport()
t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHost
t.MaxIdleConns = conf.connection.maxIdleConns
c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"),
conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}

// 创建并grpc server
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
// 注册grpc handler(通过http client从prometheus实例中获取指标数据)
grpcserver.WithServer(store.RegisterStoreServer(promStore)),
// 注册grpc handler(通过http client从prometheus实例中获取rule数据)
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithListen(conf.grpc.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
)
g.Add(func() error {
statusProber.Ready()
return s.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
s.Shutdown(err)
})
}

// 若开启了上传block功能,则定期遍历prometehus tsdb目录下的所有block目录并上传文件至对象存储。
if uploads {

// 获取一个对象存储bucket
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())
if err != nil {
return err
}

/*
其他代码
*/

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
/*
其他代码
*/

/*
拿不到prometheus实例的external label或者prometheus没有配置external label则退出
*/

s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload)

// 每30执行一次s.Sync(ctx)
// s.Sync(ctx)会遍历prometheus tsdb目录下的所有block目录(已上传的block或空block会被忽略,默认情况下被压缩过的block也会被忽略),并上传相应的文件
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
// 至少有一个block上传失败,则打印日志
}
/*
其他代码
*/
return nil
})
}, func(error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting sidecar")
return nil
}

小结

Thanos Sidecar组件的代码逻辑简单、易懂,通过http协议访问与其绑定的prometheus实例,从prometheus实例中获取到的数据则通过grpc接口对外进行暴露,遍历所有block目录进行文件上传,还有监听promethues配置文件变化的小功能。

参考:

thanos sidecar组件源码简析

thanos query组件源码简析

-------------本文结束 感谢您的阅读-------------
作者Magiceses
有问题请 留言 或者私信我的 微博
满分是10分的话,这篇文章你给几分