当前位置:网站首页>[thanos source code analysis series]thanos query component source code analysis
[thanos source code analysis series]thanos query component source code analysis
2022-06-28 07:20:00 【nangonghen】
1 summary :
1.1 Source environment
The version information is as follows :
a、thanos The component version :v0.16.0
1.2 Thanos Query The role of
Thanos Query Components are http The server + grpc The server , Its data source is a downstream discovered implementation STORE API The components of ( for example Thanos Sidecar Components 、Thanos Store Components 、Thanos Ruler Components ), At the same time Prometheus Official HTTP API.Thanos Query After the component obtains the data from the downstream , Can be merged 、 To wait for operation , Finally, the result is returned to the external client . therefore ,Thanos Query Is the role of database middleware .
2 Analysis of source code :
Use github.com/oklog/run Package to start a set of coroutines , The logic of these processes mainly starts http server、grpc server、 Dynamic discovery of downstream implementations STORE API And so on .
2.1 main Method
Thanos The format of the startup command for is as follows , The format is thanos start ( Because it is the same executable binary ). Which component to start , Lies in the first parameter , In this case it's query, So this command is to start query Logic of components .
thanos query \
--log.level=debug \
--query.auto-downsampling \
--grpc-address=0.0.0.0:10901 \
--http-address=0.0.0.0:9090 \
--query.partial-response \
--query.replica-label=prometheus_replica \
--query.replica-label=rule_replica \
--store=dnssrv+_grpc._tcp.prometheus-headless.thanos.svc.cluster.local \
--store=dnssrv+_grpc._tcp.thanos-rule.thanos.svc.cluster.local \
--store=dnssrv+_grpc._tcp.thanos-store.thanos.svc.cluster.local
Let's have a specific look at main Method . establish app object ,app Object contains all Thanos Component startup function , But the real startup only starts from map Get a function from the , Which function to take out depends on the startup command .
func main() {
/*
Other code
*/
app := extkingpin.NewApp(kingpin.New(filepath.Base(os.Args[0]), "A block storage based long-term storage for Prometheus").Version(version.Print("thanos")))
/*
Other code
*/
// Put the startup logic of all components into app Object setups In the list
registerSidecar(app)
registerStore(app)
registerQuery(app)
registerRule(app)
registerCompact(app)
registerTools(app)
registerReceive(app)
registerQueryFrontend(app)
// According to the command line information , from app Object's setups Take a component logic from the list
cmd, setup := app.Parse()
logger := logging.NewLogger(*logLevel, *logFormat, *debugName)
/*
Other code
*/
var g run.Group
var tracer opentracing.Tracer
/*
tracing Related code
*/
reloadCh := make(chan struct{}, 1)
// Start a specific component (sidecar、query、store And other components ), The bottom layer is still execution g.Add(...)
if err := setup(&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil {
os.Exit(1)
}
// Listen for kill signals from the system .
{
cancel := make(chan struct{})
g.Add(func() error {
return interrupt(logger, cancel)
}, func(error) {
close(cancel)
})
}
// Listen to configure overloaded signals
{
cancel := make(chan struct{})
g.Add(func() error {
return reload(logger, cancel, reloadCh)
}, func(error) {
close(cancel)
})
}
// Block waiting for all exits in the process
// A coroutine returns , Other coroutines will also return
if err := g.Run(); err != nil {
level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "%s command failed", cmd)))
os.Exit(1)
}
// Get here , It means that the whole process is over .
level.Info(logger).Log("msg", "exiting")
}
2.2 registerQuery Method
func registerQuery(app *extkingpin.App) {
cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")
/*
Parsing command line arguments
secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool()
And so on
*/
//Setup() The input method of , Will be put in app Object's setups In the list
// The bottom line is this runQuery() Method
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runQuery(
g,
logger,
reg,
tracer,
*requestLoggingDecision,
*grpcBindAddr,
time.Duration(*grpcGracePeriod),
*grpcCert,
*grpcKey,
*grpcClientCA,
/*
Other code
*/
)
)
}
2.3 runQuery Method
Use run.Group Object to start http server、grpc server、 Service discovery process .
func runQuery(
g *run.Group, // Actually, it comes from main() Method
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
requestLoggingDecision string,
grpcBindAddr string,
grpcGracePeriod time.Duration,
grpcCert string,
grpcKey string,
grpcClientCA string,
/*
Other code
*/
) error {
var (
// stores Type of object StoreSet. It contains a set of store Components ( Downstream implementation Store API The components of ), This group store Components can change dynamically
/*
type StoreSet struct {
// Other attributes
stores map[string]*storeRef
}
*/
stores = query.NewStoreSet(...)
// proxy object , It's downstream Store API The proxy for the component
// The downstream Store API List of components , In fact, it is the input parameter of the construction method stores.Get This is the way to get
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
/*
queryableCreator It's a method , Used to create a querier Structure object ;
querier The properties of the structure proxy Namely proxy object , It contains a set of that change dynamically thanos store Components ( The dynamic change is due to the initiation of additional specialized procedures to dynamically modify the slice );
*/
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
proxy,
maxConcurrentSelects,
queryTimeout,
)
/*
This piece of code starts some coroutines , Regular discovery and dynamic discovery Store API Changes in components , Update now stores The type in the object is map[string]*storeRef Properties of
*/
// establish http server, register http handler, And start the server
{
router := route.New()
// newly build QueryAPI Structure object
api := v1.NewQueryAPI(
logger,
stores,
engine,
queryableCreator,
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
enableAutodownsampling,
enableQueryPartialResponse,
enableRulePartialResponse,
queryReplicaLabels,
flagsMap,
instantDefaultMaxSourceResolution,
defaultMetadataTimeRange,
gate.New(
extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg),
maxConcurrentQueries,
),
)
// by router Object registration http Method
api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
// http Server usage router object
srv.Handle("/", router)
g.Add(func() error {
statusProber.Healthy()
// start-up http server
return srv.ListenAndServe()
}, func(err error) {
statusProber.NotReady(err)
defer statusProber.NotHealthy(err)
srv.Shutdown(err)
})
}
// establish gprc server, register grpc handler, And start the server
{
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
if err != nil {
return errors.Wrap(err, "setup gRPC server")
}
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxy)), // register grpc handler
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)), // register grpc handler
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
)
g.Add(func() error {
statusProber.Ready()
// start-up grpc server
return s.ListenAndServe()
}, func(error) {
statusProber.NotReady(err)
s.Shutdown(err)
})
}
// thus ,http server and grpc server It's all on .
level.Info(logger).Log("msg", "starting query node")
return nil
)
}
2.4 QueryAPI Structure and its method
// QueryAPI is an API used by Thanos Query.
type QueryAPI struct {
baseAPI *api.BaseAPI
logger log.Logger
gate gate.Gate
// Construction method , Used to create a querier Structure object
queryableCreate query.QueryableCreator
queryEngine *promql.Engine
ruleGroups rules.UnaryClient
/*
Other code
*/
replicaLabels []string
storeSet *query.StoreSet
}
func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) {
qapi.baseAPI.Register(r, tracer, logger, ins, logMiddleware)
instr := api.GetInstr(tracer, logger, ins, logMiddleware)
/*
Other code
*/
// hold qapi.query、qapi.series、 qapi.stores Register to the login r, To complete the http handler Registration of
// Whether it's /query Interface and /series Interface , Each time a request arrives, it creates querier object , and querier Object contains a set of Store API Components
r.Get("/query", instr("query", qapi.query))
r.Get("/series", instr("series", qapi.series))
r.Get("/stores", instr("stores", qapi.stores))
}
have a look qapi.series.
// Return indicator data
func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiError) {
/*
Other code
*/
// Create a querier object
// querier Object properties proxy It contains a set of thanos store Components
q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
/*
Other code
*/
var (
metrics = []labels.Labels{}
sets []storage.SeriesSet
)
for _, mset := range matcherSets {
// call querier Object's Select() Method to obtain the index
sets = append(sets, q.Select(false, nil, mset...))
}
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
for set.Next() {
metrics = append(metrics, set.At().Labels())
}
return metrics, set.Warnings(), nil
}
2.5 querier Structure and its method
Realized Querier Interface (github.com/prometheus/prometheus/storage/interface.go), The core method of this interface is Select(…), This method is /query and /series And other interfaces .
type querier struct {
ctx context.Context
logger log.Logger
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
storeDebugMatchers [][]*labels.Matcher
// proxy Contains a dynamic set of thanos store Components
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
selectGate gate.Gate
selectTimeout time.Duration
}
func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
/*
Other code
*/
promise := make(chan storage.SeriesSet, 1)
go func() {
defer close(promise)
var err error
/*
Other code
*/
// Get the index data
set, err := q.selectFn(ctx, hints, ms...)
if err != nil {
// Send the error to the pipeline , And exit this agreement
promise <- storage.ErrSeriesSet(err)
return
}
// Send the index data to the pipeline
promise <- set
}()
// Returns the encapsulation of the indicator
return &lazySeriesSet{
create: func() (storage.SeriesSet, bool) {
/*
Other code
*/
// Read indicators from the pipeline
set, ok := <-promise
return set, set.Next()
}
}
}
// Get indicators , What is called is the property proxy Of Series(...) Method
func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {
/*
Other code
*/
// seriesServer Structure weight says Send() Method , stay Sender() method gprc The returned data is stored in its seriesSet attribute
resp := &seriesServer{ctx: ctx}
// q.proxy The implementation of ProxyStore Structure
// q.proxy.Series() yes grpc Method ( streaming )
// q.proxy.Series() After calling ,resp Of seriesSet The value of the attribute will be filled
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Matchers: sms,
/*
Other code
*/
}, resp); err != nil {
return nil, errors.Wrap(err, "proxy Series()")
}
/*
Other code
*/
set := &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet), // hold resp Of seriesSet Properties are extracted
aggrs: aggrs,
warns: warns,
}
// set It's the indicator
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), nil
}
2.6 ProxyStore object
// ProxyStore implements the store API that proxies request to all given underlying stores.
type ProxyStore struct {
logger log.Logger
// Returns the downstream implementation Store API Components of the interface , This attribute will be used when querying indicators
stores func() []Client
component component.StoreAPI
selectorLabels labels.Labels
responseTimeout time.Duration
metrics *proxyStoreMetrics
}
When querying indicators , From all downstream Store API Query indicators and merge them in the component of 、 duplicate removal ( If set )
/*
At the request of the client , From all downstream Store API Query indicators and merge them in the component of 、 duplicate removal , Finally, the index is transferred to the input parameter srv.
This is a gprc Streaming interface .
*/
func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
/*
Other code
*/
g, gctx := errgroup.WithContext(srv.Context())
respSender, respCh := newCancelableRespChannel(gctx, 10)
// Producer's agreement
g.Go(func() error {
/*
This process will start from the back end thanos store Get indicators from the component , And perform indicator consolidation .
Closure of the agreement , The consumer collaboration process will also be closed .
*/
var (
seriesSet []storepb.SeriesSet
storeDebugMsgs []string
wg = &sync.WaitGroup{}
)
defer func() {
wg.Wait()
//close() Method will cause consumers to exit the process
close(respCh)
}()
// Traversing the backend Store API Components
for _, st := range s.stores() {
/*
Other code
*/
sc, err := st.Series(seriesCtx, r)
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses))
/*
Other code
*/
// Obtain the merged indicators , Send it to respCh The Conduit
mergedSet := storepb.MergeSeriesSets(seriesSet...)
for mergedSet.Next() {
lset, chk := mergedSet.At()
// respSender.send(...) In fact, it is to send indicators to respCh The Conduit
respSender.send(storepb.NewSeriesResponse(&storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(lset), Chunks: chk}))
}
return mergedSet.Err()
})
// Consumer collaboration
g.Go(func() error {
// Respond to ( Has been merged) Obtained by this agreement , And send the response to the method input srv.
for resp := range respCh {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
}
return nil
})
// Wait for the end of the producer process and the consumer process
if err := g.Wait(); err != nil {
return err
}
return nil
}
3 summary :
This article analyzes the outline of the code , There are many details that have not been mentioned , but Thanos Query The code structure of the component is clear and easy to understand , Used github.com/oklog/run Package to start a set of coroutines , To write http server and grpc server The idea of 、 Dynamic discovery downstream Store API The routines of components are worth imitating .
边栏推荐
- The practice of event driven architecture in vivo content platform
- Principle and practice of bytecode reference detection
- R语言绘制 ggplot2 季节性图
- 推荐10个好用到爆的Jupyter Notebook插件,让你效率飞起
- [C language] detailed explanation of C language to obtain array length
- LeetCode+ 66 - 70 高精度、二分专题
- 同花顺网上开户安全吗
- hack the box:RouterSpace题解
- [rust translation] implement rust asynchronous actuator from scratch
- Hack the box:routerspace
猜你喜欢
Vivo browser rapid development platform practice - Overview
面经---测试工程师web端自动化---大厂面试题
Libuv framework echo server C source code explanation (TCP part)
声网 VQA:将实时互动中未知的视频画质用户主观体验变可知
一个小工具可以更快的写爬虫
No suspense about the No. 1 Internet company overtime table
golang gin框架进行分块传输
扩展Prometheus的解决方案thanos的简介和几个月使用心得
[ thanos源码分析系列 ]thanos query组件源码简析
阿里云服务器创建快照、回滚磁盘
随机推荐
编译配置in文件
okcc呼叫中心没有电脑的坐席能不能开展工作?
R 语言 ggmap
Encyclopedia of scala operators
饿久了,大脑会进入“省电模式”!感官被削弱,还看不清东西丨爱丁堡大学...
mysql57 zip文件安装
C language tutorial
[ thanos源码分析系列 ]thanos query组件源码简析
R language drawing ggplot2 seasonal graph
Kubernetes cluster lossless upgrade practice
自律挑战30天
Compile configuration in file
A small code editor can also run programs -- a summary of sublime Text3 running programs in various languages
【Rust日报】 2020-04-23 Rust 1.43.0 发布
Practice and exploration of vivo live broadcast application technology
代码提交规范
网传互联网公司加班表,排名第一的没悬念
C语言教程大全
东方财富上开户是安全的吗
redis的入门学习到起飞,就这一篇搞定