未验证 提交 135cc300 编写于 作者: J Julien Pivotto 提交者: GitHub

rules: Make deleted rule series as stale after a reload (#6745)

* rules: Make deleted rule series as stale after a reload
Signed-off-by: NJulien Pivotto <roidelapluie@inuits.eu>
上级 c35438f2
groups:
- name: test_2 copy
rules:
- record: test_2
expr: vector(2)
......@@ -231,37 +231,48 @@ type Group struct {
shouldRestore bool
done chan struct{}
terminated chan struct{}
done chan bool
terminated chan struct{}
managerDone chan struct{}
logger log.Logger
metrics *Metrics
}
type GroupOptions struct {
Name, File string
Interval time.Duration
Rules []Rule
ShouldRestore bool
Opts *ManagerOptions
done chan struct{}
}
// NewGroup makes a new Group with the given name, options, and rules.
func NewGroup(name, file string, interval time.Duration, rules []Rule, shouldRestore bool, opts *ManagerOptions) *Group {
metrics := opts.Metrics
func NewGroup(o GroupOptions) *Group {
metrics := o.Opts.Metrics
if metrics == nil {
metrics = NewGroupMetrics(opts.Registerer)
metrics = NewGroupMetrics(o.Opts.Registerer)
}
metrics.groupLastEvalTime.WithLabelValues(groupKey(file, name))
metrics.groupLastDuration.WithLabelValues(groupKey(file, name))
metrics.groupRules.WithLabelValues(groupKey(file, name)).Set(float64(len(rules)))
metrics.groupInterval.WithLabelValues(groupKey(file, name)).Set(interval.Seconds())
metrics.groupLastEvalTime.WithLabelValues(groupKey(o.File, o.Name))
metrics.groupLastDuration.WithLabelValues(groupKey(o.File, o.Name))
metrics.groupRules.WithLabelValues(groupKey(o.File, o.Name)).Set(float64(len(o.Rules)))
metrics.groupInterval.WithLabelValues(groupKey(o.File, o.Name)).Set(o.Interval.Seconds())
return &Group{
name: name,
file: file,
interval: interval,
rules: rules,
shouldRestore: shouldRestore,
opts: opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)),
done: make(chan struct{}),
name: o.Name,
file: o.File,
interval: o.Interval,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: o.Opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan bool),
managerDone: o.done,
terminated: make(chan struct{}),
logger: log.With(opts.Logger, "group", name),
logger: log.With(o.Opts.Logger, "group", o.Name),
metrics: metrics,
}
}
......@@ -314,6 +325,29 @@ func (g *Group) run(ctx context.Context) {
tick := time.NewTicker(g.interval)
defer tick.Stop()
makeStale := func(s bool) {
if !s {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
// Wait for 2 intervals to give the opportunity to renamed rules
// to insert new series in the tsdb. At this point if there is a
// renamed rule, it should already be started.
select {
case <-g.managerDone:
case <-time.After(2 * g.interval):
g.cleanupStaleSeries(now)
}
}(time.Now())
}
iter()
if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish.
......@@ -321,7 +355,8 @@ func (g *Group) run(ctx context.Context) {
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
case <-g.done:
case stale := <-g.done:
makeStale(stale)
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
......@@ -339,11 +374,13 @@ func (g *Group) run(ctx context.Context) {
for {
select {
case <-g.done:
case stale := <-g.done:
makeStale(stale)
return
default:
select {
case <-g.done:
case stale := <-g.done:
makeStale(stale)
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
......@@ -358,6 +395,11 @@ func (g *Group) run(ctx context.Context) {
}
}
func (g *Group) stopAndMakeStale() {
g.done <- true
<-g.terminated
}
func (g *Group) stop() {
close(g.done)
<-g.terminated
......@@ -596,31 +638,35 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
}(i, rule)
}
g.cleanupStaleSeries(ts)
}
if len(g.staleSeries) != 0 {
app, err := g.opts.Appendable.Appender()
if err != nil {
level.Warn(g.logger).Log("msg", "creating appender failed", "err", err)
return
}
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.
_, err = app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("msg", "adding stale sample for previous configuration failed", "sample", s, "err", err)
}
}
if err := app.Commit(); err != nil {
level.Warn(g.logger).Log("msg", "stale sample appending for previous configuration failed", "err", err)
} else {
g.staleSeries = nil
func (g *Group) cleanupStaleSeries(ts time.Time) {
if len(g.staleSeries) == 0 {
return
}
app, err := g.opts.Appendable.Appender()
if err != nil {
level.Warn(g.logger).Log("msg", "creating appender failed", "err", err)
return
}
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.
_, err = app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("msg", "adding stale sample for previous configuration failed", "sample", s, "err", err)
}
}
if err := app.Commit(); err != nil {
level.Warn(g.logger).Log("msg", "stale sample appending for previous configuration failed", "err", err)
} else {
g.staleSeries = nil
}
}
// RestoreForState restores the 'for' state of the alerts
......@@ -784,6 +830,7 @@ type Manager struct {
groups map[string]*Group
mtx sync.RWMutex
block chan struct{}
done chan struct{}
restored bool
logger log.Logger
......@@ -825,6 +872,7 @@ func NewManager(o *ManagerOptions) *Manager {
groups: map[string]*Group{},
opts: o,
block: make(chan struct{}),
done: make(chan struct{}),
logger: o.Logger,
}
......@@ -848,6 +896,10 @@ func (m *Manager) Stop() {
eg.stop()
}
// Shut down the groups waiting multiple evaluation intervals to write
// staleness markers.
close(m.done)
level.Info(m.logger).Log("msg", "Rule manager stopped")
}
......@@ -899,14 +951,18 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
}
// Stop remaining old groups.
wg.Add(len(m.groups))
for n, oldg := range m.groups {
oldg.stop()
if m := oldg.metrics; m != nil {
m.groupInterval.DeleteLabelValues(n)
m.groupLastEvalTime.DeleteLabelValues(n)
m.groupLastDuration.DeleteLabelValues(n)
m.groupRules.DeleteLabelValues(n)
}
go func(n string, g *Group) {
g.stopAndMakeStale()
if m := g.metrics; m != nil {
m.groupInterval.DeleteLabelValues(n)
m.groupLastEvalTime.DeleteLabelValues(n)
m.groupLastDuration.DeleteLabelValues(n)
m.groupRules.DeleteLabelValues(n)
}
wg.Done()
}(n, oldg)
}
wg.Wait()
......@@ -962,7 +1018,15 @@ func (m *Manager) LoadGroups(
))
}
groups[groupKey(fn, rg.Name)] = NewGroup(rg.Name, fn, itv, rules, shouldRestore, m.opts)
groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
File: fn,
Interval: itv,
Rules: rules,
ShouldRestore: shouldRestore,
Opts: m.opts,
done: m.done,
})
}
}
......
......@@ -376,7 +376,13 @@ func TestForStateRestore(t *testing.T) {
nil, nil, true, nil,
)
group := NewGroup("default", "", time.Second, []Rule{rule}, true, opts)
group := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{rule},
ShouldRestore: true,
Opts: opts,
})
groups := make(map[string]*Group)
groups["default;"] = group
......@@ -435,7 +441,13 @@ func TestForStateRestore(t *testing.T) {
labels.FromStrings("severity", "critical"),
nil, nil, false, nil,
)
newGroup := NewGroup("default", "", time.Second, []Rule{newRule}, true, opts)
newGroup := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{newRule},
ShouldRestore: true,
Opts: opts,
})
newGroups := make(map[string]*Group)
newGroups["default;"] = newGroup
......@@ -519,7 +531,13 @@ func TestStaleness(t *testing.T) {
expr, err := promql.ParseExpr("a + 1")
testutil.Ok(t, err)
rule := NewRecordingRule("a_plus_one", expr, labels.Labels{})
group := NewGroup("default", "", time.Second, []Rule{rule}, true, opts)
group := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{rule},
ShouldRestore: true,
Opts: opts,
})
// A time series that has two samples and then goes stale.
app, _ := storage.Appender()
......@@ -842,7 +860,13 @@ func TestNotify(t *testing.T) {
expr, err := promql.ParseExpr("a > 1")
testutil.Ok(t, err)
rule := NewAlertingRule("aTooHigh", expr, 0, labels.Labels{}, labels.Labels{}, nil, true, log.NewNopLogger())
group := NewGroup("alert", "", time.Second, []Rule{rule}, true, opts)
group := NewGroup(GroupOptions{
Name: "alert",
Interval: time.Second,
Rules: []Rule{rule},
ShouldRestore: true,
Opts: opts,
})
app, _ := storage.Appender()
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
......@@ -948,3 +972,152 @@ func TestMetricsUpdate(t *testing.T) {
testutil.Equals(t, c.metrics, countMetrics(), "test %d: invalid count of metrics", i)
}
}
func TestGroupStalenessOnRemoval(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
files := []string{"fixtures/rules2.yaml"}
sameFiles := []string{"fixtures/rules2_copy.yaml"}
storage := teststorage.New(t)
defer storage.Close()
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
TSDB: storage,
QueryFunc: EngineQueryFunc(engine, storage),
Context: context.Background(),
Logger: log.NewNopLogger(),
})
var stopped bool
ruleManager.Run()
defer func() {
if !stopped {
ruleManager.Stop()
}
}()
cases := []struct {
files []string
staleNaN int
}{
{
files: files,
staleNaN: 0,
},
{
// When we remove the files, it should produce a staleness marker.
files: files[:0],
staleNaN: 1,
},
{
// Rules that produce the same metrics but in a different file
// should not produce staleness marker.
files: sameFiles,
staleNaN: 0,
},
{
// Staleness marker should be present as we don't have any rules
// loaded anymore.
files: files[:0],
staleNaN: 1,
},
{
// Add rules back so we have rules loaded when we stop the manager
// and check for the absence of staleness markers.
files: sameFiles,
staleNaN: 0,
},
}
var totalStaleNaN int
for i, c := range cases {
err := ruleManager.Update(time.Second, c.files, nil)
testutil.Ok(t, err)
time.Sleep(3 * time.Second)
totalStaleNaN += c.staleNaN
testutil.Equals(t, totalStaleNaN, countStaleNaN(t, storage), "test %d/%q: invalid count of staleness markers", i, c.files)
}
ruleManager.Stop()
stopped = true
testutil.Equals(t, totalStaleNaN, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine")
}
func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
files := []string{"fixtures/rules2.yaml"}
storage := teststorage.New(t)
defer storage.Close()
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
TSDB: storage,
QueryFunc: EngineQueryFunc(engine, storage),
Context: context.Background(),
Logger: log.NewNopLogger(),
})
var stopped bool
ruleManager.Run()
defer func() {
if !stopped {
ruleManager.Stop()
}
}()
err := ruleManager.Update(2*time.Second, files, nil)
time.Sleep(4 * time.Second)
testutil.Ok(t, err)
start := time.Now()
err = ruleManager.Update(3*time.Second, files[:0], nil)
testutil.Ok(t, err)
ruleManager.Stop()
stopped = true
testutil.Assert(t, time.Since(start) < 1*time.Second, "rule manager does not stop early")
time.Sleep(5 * time.Second)
testutil.Equals(t, 0, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine")
}
func countStaleNaN(t *testing.T, storage storage.Storage) int {
var c int
querier, err := storage.Querier(context.Background(), 0, time.Now().Unix()*1000)
testutil.Ok(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2")
testutil.Ok(t, err)
set, _, err := querier.Select(nil, matcher)
testutil.Ok(t, err)
samples, err := readSeriesSet(set)
testutil.Ok(t, err)
metric := labels.FromStrings(model.MetricNameLabel, "test_2").String()
metricSample, ok := samples[metric]
testutil.Assert(t, ok, "Series %s not returned.", metric)
for _, s := range metricSample {
if value.IsStaleNaN(s.V) {
c++
}
}
return c
}
......@@ -258,7 +258,14 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group {
recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{})
r = append(r, recordingRule)
group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts)
group := rules.NewGroup(rules.GroupOptions{
Name: "grp",
File: "/path/to/file",
Interval: time.Second,
Rules: r,
ShouldRestore: false,
Opts: opts,
})
return []*rules.Group{group}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册