diff --git a/promql/engine.go b/promql/engine.go index 4d52396362e8f5cf9f310b7684c84e607e7777f0..f22b585df0bca426ef3078d56237c15450e7ff94 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -85,7 +85,7 @@ type Query interface { // Statement returns the parsed statement of the query. Statement() Statement // Stats returns statistics about the lifetime of the query. - Stats() *stats.TimerGroup + Stats() *stats.QueryTimers // Cancel signals that a running query execution should be aborted. Cancel() } @@ -99,7 +99,7 @@ type query struct { // Statement of the parsed query. stmt Statement // Timer stats for the query execution. - stats *stats.TimerGroup + stats *stats.QueryTimers // Result matrix for reuse. matrix Matrix // Cancellation function for the query. @@ -115,7 +115,7 @@ func (q *query) Statement() Statement { } // Stats implements the Query interface. -func (q *query) Stats() *stats.TimerGroup { +func (q *query) Stats() *stats.QueryTimers { return q.stats } @@ -276,7 +276,7 @@ func (ng *Engine) newQuery(q storage.Queryable, expr Expr, start, end time.Time, qry := &query{ stmt: es, ng: ng, - stats: stats.NewTimerGroup(), + stats: stats.NewQueryTimers(), queryable: q, } return qry @@ -294,7 +294,7 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query { q: "test statement", stmt: testStmt(f), ng: ng, - stats: stats.NewTimerGroup(), + stats: stats.NewQueryTimers(), } return qry } @@ -310,25 +310,25 @@ func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { ctx, cancel := context.WithTimeout(ctx, ng.timeout) q.cancel = cancel - execTimer := q.stats.GetTimer(stats.ExecTotalTime).Start() - defer execTimer.Stop() - queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start() + execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime) + defer execSpanTimer.Finish() + + queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) if err := ng.gate.Start(ctx); err != nil { return nil, err } defer ng.gate.Done() - queueTimer.Stop() - ng.metrics.queryQueueTime.Observe(queueTimer.ElapsedTime().Seconds()) + queueSpanTimer.Finish() // Cancel when execution is done or an error was raised. defer q.cancel() const env = "query execution" - evalTimer := q.stats.GetTimer(stats.EvalTotalTime).Start() - defer evalTimer.Stop() + evalSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.EvalTotalTime) + defer evalSpanTimer.Finish() // The base context might already be canceled on the first iteration (e.g. during shutdown). if err := contextDone(ctx, env); err != nil { @@ -355,10 +355,9 @@ func durationMilliseconds(d time.Duration) int64 { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) { - prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() - querier, err := ng.populateSeries(ctx, query.queryable, s) - prepareTimer.Stop() - ng.metrics.queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds()) + prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime) + querier, err := ng.populateSeries(ctxPrepare, query.queryable, s) + prepareSpanTimer.Finish() // XXX(fabxc): the querier returned by populateSeries might be instantiated // we must not return without closing irrespective of the error. @@ -371,7 +370,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return nil, err } - evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() + evalSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval) // Instant evaluation. This is executed as a range evaluation with one step. if s.Start == s.End && s.Interval == 0 { start := timeMilliseconds(s.Start) @@ -387,8 +386,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return nil, err } - evalTimer.Stop() - ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) + evalSpanTimer.Finish() mat, ok := val.(Matrix) if !ok { @@ -427,8 +425,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( if err != nil { return nil, err } - evalTimer.Stop() - ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) + evalSpanTimer.Finish() mat, ok := val.(Matrix) if !ok { @@ -442,11 +439,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( // TODO(fabxc): order ensured by storage? // TODO(fabxc): where to ensure metric labels are a copy from the storage internals. - sortTimer := query.stats.GetTimer(stats.ResultSortTime).Start() + sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort) sort.Sort(mat) - sortTimer.Stop() + sortSpanTimer.Finish() - ng.metrics.queryResultSort.Observe(sortTimer.ElapsedTime().Seconds()) return mat, nil } diff --git a/util/stats/query_stats.go b/util/stats/query_stats.go index 3fd593cb924d648bebe57407c55565342713c0d4..181bdde3b944a4dcf817f42d2776637b6aafa910 100644 --- a/util/stats/query_stats.go +++ b/util/stats/query_stats.go @@ -13,6 +13,13 @@ package stats +import ( + "context" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" +) + // QueryTiming identifies the code area or functionality in which time is spent // during a query. type QueryTiming int @@ -47,6 +54,26 @@ func (s QueryTiming) String() string { } } +// Return a string representation of a QueryTiming span operation. +func (s QueryTiming) SpanOperation() string { + switch s { + case EvalTotalTime: + return "promqlEval" + case ResultSortTime: + return "promqlSort" + case QueryPreparationTime: + return "promqlPrepare" + case InnerEvalTime: + return "promqlInnerEval" + case ExecQueueTime: + return "promqlExecQueue" + case ExecTotalTime: + return "promqlExec" + default: + return "Unknown query timing" + } +} + // queryTimings with all query timers mapped to durations. type queryTimings struct { EvalTotalTime float64 `json:"evalTotalTime"` @@ -64,10 +91,10 @@ type QueryStats struct { // NewQueryStats makes a QueryStats struct with all QueryTimings found in the // given TimerGroup. -func NewQueryStats(tg *TimerGroup) *QueryStats { +func NewQueryStats(tg *QueryTimers) *QueryStats { var qt queryTimings - for s, timer := range tg.timers { + for s, timer := range tg.TimerGroup.timers { switch s { case EvalTotalTime: qt.EvalTotalTime = timer.Duration() @@ -87,3 +114,44 @@ func NewQueryStats(tg *TimerGroup) *QueryStats { qs := QueryStats{Timings: qt} return &qs } + +// SpanTimer unifies tracing and timing, to reduce repetition. +type SpanTimer struct { + timer *Timer + observers []prometheus.Observer + + span opentracing.Span +} + +func NewSpanTimer(ctx context.Context, operation string, timer *Timer, observers ...prometheus.Observer) (*SpanTimer, context.Context) { + span, ctx := opentracing.StartSpanFromContext(ctx, operation) + timer.Start() + + return &SpanTimer{ + timer: timer, + observers: observers, + + span: span, + }, ctx +} + +func (s *SpanTimer) Finish() { + s.timer.Stop() + s.span.Finish() + + for _, obs := range s.observers { + obs.Observe(s.timer.ElapsedTime().Seconds()) + } +} + +type QueryTimers struct { + *TimerGroup +} + +func NewQueryTimers() *QueryTimers { + return &QueryTimers{NewTimerGroup()} +} + +func (qs *QueryTimers) GetSpanTimer(ctx context.Context, qt QueryTiming, observers ...prometheus.Observer) (*SpanTimer, context.Context) { + return NewSpanTimer(ctx, qt.SpanOperation(), qs.TimerGroup.GetTimer(qt), observers...) +} diff --git a/util/stats/stats_test.go b/util/stats/stats_test.go index f5175a0278c7018b63d9d3af0404203c4757a279..55f1e9428ad1725014754a1cc5dc3e7b756ddee0 100644 --- a/util/stats/stats_test.go +++ b/util/stats/stats_test.go @@ -39,13 +39,13 @@ func TestTimerGroupNewTimer(t *testing.T) { } func TestQueryStatsWithTimers(t *testing.T) { - tg := NewTimerGroup() - timer := tg.GetTimer(ExecTotalTime) + qt := NewQueryTimers() + timer := qt.GetTimer(ExecTotalTime) timer.Start() time.Sleep(2 * time.Millisecond) timer.Stop() - qs := NewQueryStats(tg) + qs := NewQueryStats(qt) actual, err := json.Marshal(qs) if err != nil { t.Fatalf("Unexpected error during serialization: %v", err)