ruler.go 22.4 KB
Newer Older
J
junotx 已提交
1 2 3 4 5
package rules

import (
	"context"
	"fmt"
J
junotx 已提交
6
	"net/http"
J
junotx 已提交
7
	"sort"
J
junotx 已提交
8 9
	"strconv"
	"strings"
J
junotx 已提交
10

J
junotx 已提交
11
	"github.com/docker/docker/pkg/locker"
J
junotx 已提交
12 13 14 15 16 17
	"github.com/ghodss/yaml"
	"github.com/pkg/errors"
	promresourcesv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
	prominformersv1 "github.com/prometheus-operator/prometheus-operator/pkg/client/informers/externalversions/monitoring/v1"
	promresourcesclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned"
	corev1 "k8s.io/api/core/v1"
J
junotx 已提交
18
	apierrors "k8s.io/apimachinery/pkg/api/errors"
J
junotx 已提交
19 20
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
J
junotx 已提交
21
	"k8s.io/client-go/util/retry"
22
	"kubesphere.io/kubesphere/pkg/api/alerting/v2alpha1"
J
junotx 已提交
23 24 25 26
)

const (
	customAlertingRuleResourcePrefix = "custom-alerting-rule-"
J
junotx 已提交
27 28 29

	customRuleGroupDefaultPrefix = "alerting.custom.defaults."
	customRuleGroupSize          = 20
J
junotx 已提交
30 31 32 33
)

var (
	maxSecretSize        = corev1.MaxSecretSize
J
junotx 已提交
34
	maxConfigMapDataSize = int(float64(maxSecretSize) * 0.45)
J
junotx 已提交
35 36

	errOutOfConfigMapSize = errors.New("out of config map size")
J
junotx 已提交
37 38

	ruleResourceLocker locker.Locker
J
junotx 已提交
39 40 41 42 43 44 45 46 47 48
)

type Ruler interface {
	Namespace() string
	RuleResourceNamespaceSelector() (labels.Selector, error)
	RuleResourceSelector(extraRuleResourceSelector labels.Selector) (labels.Selector, error)
	ExternalLabels() func() map[string]string

	ListRuleResources(ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector) (
		[]*promresourcesv1.PrometheusRule, error)
J
junotx 已提交
49 50 51 52 53 54
	AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector,
		ruleResourceLabels map[string]string, rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error)
	UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector,
		ruleResourceLabels map[string]string, ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error)
	DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
		ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error)
J
junotx 已提交
55 56 57 58
}

type ruleResource promresourcesv1.PrometheusRule

J
junotx 已提交
59 60 61
// deleteAlertingRules deletes the rules.
// If there are rules to be deleted, return true to indicate the resource should be updated.
func (r *ruleResource) deleteAlertingRules(rules ...*RuleWithGroup) (bool, error) {
J
junotx 已提交
62
	var (
J
junotx 已提交
63 64 65
		gs     []promresourcesv1.RuleGroup
		dels   = make(map[string]struct{})
		commit bool
J
junotx 已提交
66 67
	)

J
junotx 已提交
68 69 70 71 72 73
	for _, rule := range rules {
		if rule != nil {
			dels[rule.Alert] = struct{}{}
		}
	}

J
junotx 已提交
74 75 76
	for _, g := range r.Spec.Groups {
		var rules []promresourcesv1.Rule
		for _, gr := range g.Rules {
J
junotx 已提交
77 78 79 80 81
			if gr.Alert != "" {
				if _, ok := dels[gr.Alert]; ok {
					commit = true
					continue
				}
J
junotx 已提交
82 83 84 85
			}
			rules = append(rules, gr)
		}
		if len(rules) > 0 {
J
junotx 已提交
86
			gs = append(gs, promresourcesv1.RuleGroup{
J
junotx 已提交
87 88 89 90 91 92 93 94
				Name:                    g.Name,
				Interval:                g.Interval,
				PartialResponseStrategy: g.PartialResponseStrategy,
				Rules:                   rules,
			})
		}
	}

J
junotx 已提交
95 96
	if commit {
		r.Spec.Groups = gs
J
junotx 已提交
97
	}
J
junotx 已提交
98
	return commit, nil
J
junotx 已提交
99 100
}

J
junotx 已提交
101
// updateAlertingRules updates the rules.
J
junotx 已提交
102 103
// If there are rules to be updated, return true to indicate the resource should be updated.
func (r *ruleResource) updateAlertingRules(rules ...*RuleWithGroup) (bool, error) {
J
junotx 已提交
104
	var (
J
junotx 已提交
105 106 107
		commit  bool
		spec    = r.Spec.DeepCopy()
		ruleMap = make(map[string]*RuleWithGroup)
J
junotx 已提交
108 109
	)

J
junotx 已提交
110 111
	if spec == nil {
		return false, nil
J
junotx 已提交
112 113
	}

J
junotx 已提交
114 115 116
	for i, rule := range rules {
		if rule != nil {
			ruleMap[rule.Alert] = rules[i]
J
junotx 已提交
117
		}
J
junotx 已提交
118
	}
J
junotx 已提交
119

J
junotx 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133
	for i, g := range spec.Groups {
		for j, r := range g.Rules {
			if r.Alert == "" {
				continue
			}
			if b, ok := ruleMap[r.Alert]; ok {
				if b == nil {
					spec.Groups[i].Rules = append(g.Rules[:j], g.Rules[j+1:]...)
				} else {
					spec.Groups[i].Rules[j] = b.Rule
					ruleMap[r.Alert] = nil // clear to mark it updated
				}
				commit = true
			}
J
junotx 已提交
134
		}
J
junotx 已提交
135
	}
J
junotx 已提交
136

J
junotx 已提交
137 138
	if commit {
		content, err := yaml.Marshal(spec)
J
junotx 已提交
139 140 141
		if err != nil {
			return false, errors.Wrap(err, "failed to unmarshal content")
		}
J
junotx 已提交
142 143
		if len(string(content)) > maxConfigMapDataSize { // check size limit
			return false, errOutOfConfigMapSize
J
junotx 已提交
144
		}
J
junotx 已提交
145
		r.Spec = *spec
J
junotx 已提交
146
	}
J
junotx 已提交
147
	return commit, nil
J
junotx 已提交
148 149
}

J
junotx 已提交
150 151
// addAlertingRules adds the rules.
// If there are rules to be added, return true to indicate the resource should be updated.
J
junotx 已提交
152
func (r *ruleResource) addAlertingRules(rules ...*RuleWithGroup) (bool, error) {
J
junotx 已提交
153
	var (
J
junotx 已提交
154 155 156 157
		commit   bool
		spec     = r.Spec.DeepCopy()
		groupMax = -1

J
junotx 已提交
158 159 160 161
		cursor int // indicates which rule to start adding for the rules with no groups

		rulesNoGroup   []promresourcesv1.Rule                    // rules that do not specify group names
		rulesWithGroup = make(map[string][]promresourcesv1.Rule) // rules that have specific group names
J
junotx 已提交
162 163
	)

J
junotx 已提交
164 165 166
	for i, rule := range rules {
		if len(strings.TrimSpace(rule.Group)) == 0 {
			rulesNoGroup = append(rulesNoGroup, rules[i].Rule)
J
junotx 已提交
167
		} else {
J
junotx 已提交
168
			rulesWithGroup[rule.Group] = append(rulesWithGroup[rule.Group], rules[i].Rule)
J
junotx 已提交
169
		}
J
junotx 已提交
170
	}
J
junotx 已提交
171

J
junotx 已提交
172 173
	if spec == nil {
		spec = new(promresourcesv1.PrometheusRuleSpec)
J
junotx 已提交
174 175
	}

J
junotx 已提交
176 177
	// For the rules that have specific group names, add them to the matched groups.
	// For the rules that do not specify group names, add them to the automatically generated groups until the limit is reached.
J
junotx 已提交
178 179 180
	for i, g := range spec.Groups {
		var (
			gName         = g.Name
J
junotx 已提交
181 182
			doneNoGroup   = cursor >= len(rulesNoGroup) // whether all rules without groups have been added
			doneWithGroup = len(rulesWithGroup) == 0    // whether all rules with groups have been added
J
junotx 已提交
183 184 185
		)

		if doneNoGroup && doneWithGroup {
J
junotx 已提交
186 187
			break
		}
J
junotx 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208

		if !doneWithGroup {
			if _, ok := rulesWithGroup[gName]; ok {
				spec.Groups[i].Rules = append(spec.Groups[i].Rules, rulesWithGroup[gName]...)
				delete(rulesWithGroup, gName)
				commit = true
			}
		}

		g = spec.Groups[i]
		if !doneNoGroup && strings.HasPrefix(gName, customRuleGroupDefaultPrefix) {
			suf, err := strconv.Atoi(strings.TrimPrefix(gName, customRuleGroupDefaultPrefix))
			if err != nil {
				continue
			}
			if suf > groupMax {
				groupMax = suf
			}

			if size := len(g.Rules); size < customRuleGroupSize {
				num := customRuleGroupSize - size
J
junotx 已提交
209 210 211
				var stop int
				if stop = cursor + num; stop > len(rulesNoGroup) {
					stop = len(rulesNoGroup)
J
junotx 已提交
212
				}
J
junotx 已提交
213 214
				spec.Groups[i].Rules = append(spec.Groups[i].Rules, rulesNoGroup[cursor:stop]...)
				cursor = stop
J
junotx 已提交
215 216 217
				commit = true
			}
		}
J
junotx 已提交
218 219
	}

J
junotx 已提交
220 221 222 223 224 225 226 227 228
	// If no groups are available, new groups will be created to place the remaining rules.
	for gName := range rulesWithGroup {
		rules := rulesWithGroup[gName]
		if len(rules) == 0 {
			continue
		}
		spec.Groups = append(spec.Groups, promresourcesv1.RuleGroup{Name: gName, Rules: rules})
		commit = true
	}
J
junotx 已提交
229 230
	for groupMax++; cursor < len(rules); groupMax++ {
		g := promresourcesv1.RuleGroup{Name: fmt.Sprintf("%s%d", customRuleGroupDefaultPrefix, groupMax)}
J
junotx 已提交
231 232 233
		var stop int
		if stop = cursor + customRuleGroupSize; stop > len(rulesNoGroup) {
			stop = len(rulesNoGroup)
J
junotx 已提交
234
		}
J
junotx 已提交
235
		g.Rules = append(g.Rules, rulesNoGroup[cursor:stop]...)
J
junotx 已提交
236
		spec.Groups = append(spec.Groups, g)
J
junotx 已提交
237
		cursor = stop
J
junotx 已提交
238
		commit = true
J
junotx 已提交
239 240
	}

J
junotx 已提交
241 242 243 244 245 246 247 248 249
	if commit {
		content, err := yaml.Marshal(spec)
		if err != nil {
			return false, errors.Wrap(err, "failed to unmarshal content")
		}
		if len(string(content)) > maxConfigMapDataSize { // check size limit
			return false, errOutOfConfigMapSize
		}
		r.Spec = *spec
J
junotx 已提交
250
	}
J
junotx 已提交
251
	return commit, nil
J
junotx 已提交
252 253 254 255 256 257 258
}

func (r *ruleResource) commit(ctx context.Context, prometheusResourceClient promresourcesclient.Interface) error {
	var pr = (promresourcesv1.PrometheusRule)(*r)
	if len(pr.Spec.Groups) == 0 {
		return prometheusResourceClient.MonitoringV1().PrometheusRules(r.Namespace).Delete(ctx, r.Name, metav1.DeleteOptions{})
	}
J
junotx 已提交
259
	npr, err := prometheusResourceClient.MonitoringV1().PrometheusRules(r.Namespace).Update(ctx, &pr, metav1.UpdateOptions{})
J
junotx 已提交
260 261 262
	if err != nil {
		return err
	}
J
junotx 已提交
263
	npr.DeepCopyInto(&pr)
J
junotx 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
	return nil
}

type PrometheusRuler struct {
	resource *promresourcesv1.Prometheus
	informer prominformersv1.PrometheusRuleInformer
	client   promresourcesclient.Interface
}

func NewPrometheusRuler(resource *promresourcesv1.Prometheus, informer prominformersv1.PrometheusRuleInformer,
	client promresourcesclient.Interface) Ruler {
	return &PrometheusRuler{
		resource: resource,
		informer: informer,
		client:   client,
	}
}

func (r *PrometheusRuler) Namespace() string {
	return r.resource.Namespace
}

func (r *PrometheusRuler) RuleResourceNamespaceSelector() (labels.Selector, error) {
	if r.resource.Spec.RuleNamespaceSelector == nil {
		return nil, nil
	}
	return metav1.LabelSelectorAsSelector(r.resource.Spec.RuleNamespaceSelector)
}

func (r *PrometheusRuler) RuleResourceSelector(extraRuleResourceSelector labels.Selector) (labels.Selector, error) {
	rSelector, err := metav1.LabelSelectorAsSelector(r.resource.Spec.RuleSelector)
	if err != nil {
		return nil, err
	}
	if extraRuleResourceSelector != nil {
		if requirements, ok := extraRuleResourceSelector.Requirements(); ok {
			rSelector = rSelector.Add(requirements...)
		}
	}
	return rSelector, nil
}

func (r *PrometheusRuler) ExternalLabels() func() map[string]string {
	// ignoring the external labels because rules gotten from prometheus endpoint do not include them
	return nil
}

func (r *PrometheusRuler) ListRuleResources(ruleNamespace *corev1.Namespace, extraRuleResourceSelector labels.Selector) (
	[]*promresourcesv1.PrometheusRule, error) {
	selected, err := ruleNamespaceSelected(r, ruleNamespace)
	if err != nil {
		return nil, err
	}
	if !selected {
		return nil, nil
	}
	rSelector, err := r.RuleResourceSelector(extraRuleResourceSelector)
	if err != nil {
		return nil, err
	}
	return r.informer.Lister().PrometheusRules(ruleNamespace.Name).List(rSelector)
}

J
junotx 已提交
327 328 329
func (r *PrometheusRuler) AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
	extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
	rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) {
J
junotx 已提交
330
	return nil, errors.New("Adding Prometheus rules not supported")
J
junotx 已提交
331 332
}

J
junotx 已提交
333 334 335
func (r *PrometheusRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
	extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
	ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {
J
junotx 已提交
336
	return nil, errors.New("Updating Prometheus rules not supported")
J
junotx 已提交
337 338
}

J
junotx 已提交
339 340
func (r *PrometheusRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
	ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {
J
junotx 已提交
341
	return nil, errors.New("Deleting Prometheus rules not supported.")
J
junotx 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
}

type ThanosRuler struct {
	resource *promresourcesv1.ThanosRuler
	informer prominformersv1.PrometheusRuleInformer
	client   promresourcesclient.Interface
}

func NewThanosRuler(resource *promresourcesv1.ThanosRuler, informer prominformersv1.PrometheusRuleInformer,
	client promresourcesclient.Interface) Ruler {
	return &ThanosRuler{
		resource: resource,
		informer: informer,
		client:   client,
	}
}

func (r *ThanosRuler) Namespace() string {
	return r.resource.Namespace
}

func (r *ThanosRuler) RuleResourceNamespaceSelector() (labels.Selector, error) {
	if r.resource.Spec.RuleNamespaceSelector == nil {
		return nil, nil
	}
	return metav1.LabelSelectorAsSelector(r.resource.Spec.RuleNamespaceSelector)
}

func (r *ThanosRuler) RuleResourceSelector(extraRuleSelector labels.Selector) (labels.Selector, error) {
	rSelector, err := metav1.LabelSelectorAsSelector(r.resource.Spec.RuleSelector)
	if err != nil {
		return nil, err
	}
	if extraRuleSelector != nil {
		if requirements, ok := extraRuleSelector.Requirements(); ok {
			rSelector = rSelector.Add(requirements...)
		}
	}
	return rSelector, nil
}

func (r *ThanosRuler) ExternalLabels() func() map[string]string {
	// rules gotten from thanos ruler endpoint include the labels
	lbls := make(map[string]string)
	if ls := r.resource.Spec.Labels; ls != nil {
		for k, v := range ls {
			lbls[k] = v
		}
	}
	return func() map[string]string {
		return lbls
	}
}

func (r *ThanosRuler) ListRuleResources(ruleNamespace *corev1.Namespace, extraRuleSelector labels.Selector) (
	[]*promresourcesv1.PrometheusRule, error) {
	selected, err := ruleNamespaceSelected(r, ruleNamespace)
	if err != nil {
		return nil, err
	}
	if !selected {
		return nil, nil
	}
	rSelector, err := r.RuleResourceSelector(extraRuleSelector)
	if err != nil {
		return nil, err
	}
	return r.informer.Lister().PrometheusRules(ruleNamespace.Name).List(rSelector)
}

J
junotx 已提交
412 413 414
func (r *ThanosRuler) AddAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
	extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
	rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) {
J
junotx 已提交
415

J
junotx 已提交
416
	return r.addAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector, nil, ruleResourceLabels, rules...)
J
junotx 已提交
417 418
}

J
junotx 已提交
419 420 421
func (r *ThanosRuler) addAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
	extraRuleResourceSelector labels.Selector, excludePrometheusRules map[string]struct{},
	ruleResourceLabels map[string]string, rules ...*RuleWithGroup) ([]*v2alpha1.BulkItemResponse, error) {
J
junotx 已提交
422

J
junotx 已提交
423 424 425 426 427
	prometheusRules, err := r.ListRuleResources(ruleNamespace, extraRuleResourceSelector)
	if err != nil {
		return nil, err
	}
	// sort by the left space to speed up the hit rate
J
junotx 已提交
428 429 430 431
	sort.Slice(prometheusRules, func(i, j int) bool {
		return len(fmt.Sprint(prometheusRules[i])) <= len(fmt.Sprint(prometheusRules[j]))
	})

J
junotx 已提交
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
	var (
		respItems = make([]*v2alpha1.BulkItemResponse, 0, len(rules))
		cursor    int
	)

	resp := func(rule *RuleWithGroup, err error) *v2alpha1.BulkItemResponse {
		if err != nil {
			return v2alpha1.NewBulkItemErrorServerResponse(rule.Alert, err)
		}
		return v2alpha1.NewBulkItemSuccessResponse(rule.Alert, v2alpha1.ResultCreated)
	}

	for _, pr := range prometheusRules {
		if cursor >= len(rules) {
			break
		}
J
junotx 已提交
448
		if len(excludePrometheusRules) > 0 {
J
junotx 已提交
449
			if _, ok := excludePrometheusRules[pr.Name]; ok {
J
junotx 已提交
450 451 452
				continue
			}
		}
J
junotx 已提交
453 454

		var (
J
junotx 已提交
455 456 457 458
			err  error
			num  = len(rules) - cursor
			stop = len(rules)
			rs   []*RuleWithGroup
J
junotx 已提交
459 460
		)

J
junotx 已提交
461 462
		// First add all the rules to this resource,
		// and if the limit is exceeded, add half
J
junotx 已提交
463
		for i := 1; i <= 2; i++ {
J
junotx 已提交
464 465
			stop = cursor + num/i
			rs = rules[cursor:stop]
J
junotx 已提交
466 467 468 469

			err = r.doRuleResourceOperation(ctx, pr.Namespace, pr.Name, func(pr *promresourcesv1.PrometheusRule) error {
				resource := ruleResource(*pr)
				if ok, err := resource.addAlertingRules(rs...); err != nil {
J
junotx 已提交
470
					return err
J
junotx 已提交
471 472 473 474
				} else if ok {
					if err = resource.commit(ctx, r.client); err != nil {
						return err
					}
J
junotx 已提交
475
				}
J
junotx 已提交
476 477 478 479
				return nil
			})
			if err == errOutOfConfigMapSize && num > 1 {
				continue
J
junotx 已提交
480
			}
J
junotx 已提交
481 482 483 484 485 486 487 488 489 490 491 492
			break
		}

		switch {
		case err == errOutOfConfigMapSize:
			break
		case resourceNotFound(err):
			continue
		default:
			for _, rule := range rs {
				respItems = append(respItems, resp(rule, err))
			}
J
junotx 已提交
493
			cursor = stop
J
junotx 已提交
494 495 496
		}
	}

J
junotx 已提交
497 498
	// create new rule resources and add rest rules into them
	// when all existing rule resources are full.
J
junotx 已提交
499 500
	for cursor < len(rules) {
		var (
J
junotx 已提交
501 502 503 504
			err  error
			num  = len(rules) - cursor
			stop = len(rules)
			rs   []*RuleWithGroup
J
junotx 已提交
505
		)
J
junotx 已提交
506 507
		// If adding the rules to the new resource exceeds the limit,
		// reduce the amount to 1/2, 1/3... of rest rules until it can accommodate.
J
junotx 已提交
508
		for i := 1; ; i++ {
J
junotx 已提交
509 510
			stop = cursor + num/i
			rs = rules[cursor:stop]
J
junotx 已提交
511 512 513 514 515 516 517 518 519 520 521

			pr := &promresourcesv1.PrometheusRule{
				ObjectMeta: metav1.ObjectMeta{
					Namespace:    ruleNamespace.Name,
					GenerateName: customAlertingRuleResourcePrefix,
					Labels:       ruleResourceLabels,
				},
			}
			resource := ruleResource(*pr)
			var ok bool
			ok, err = resource.addAlertingRules(rs...)
J
junotx 已提交
522
			if err == errOutOfConfigMapSize {
J
junotx 已提交
523
				continue
J
junotx 已提交
524
			}
J
junotx 已提交
525 526 527 528 529
			if ok {
				pr.Spec = resource.Spec
				_, err = r.client.MonitoringV1().PrometheusRules(ruleNamespace.Name).Create(ctx, pr, metav1.CreateOptions{})
			}
			break
J
junotx 已提交
530
		}
J
junotx 已提交
531 532 533 534

		for _, rule := range rs {
			respItems = append(respItems, resp(rule, err))
		}
J
junotx 已提交
535
		cursor = stop
J
junotx 已提交
536
	}
J
junotx 已提交
537 538

	return respItems, nil
J
junotx 已提交
539 540
}

J
junotx 已提交
541 542 543
func (r *ThanosRuler) UpdateAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
	extraRuleResourceSelector labels.Selector, ruleResourceLabels map[string]string,
	ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {
J
junotx 已提交
544 545

	var (
J
junotx 已提交
546 547
		itemsMap  = make(map[string][]*ResourceRuleItem)
		respItems = make([]*v2alpha1.BulkItemResponse, 0, len(ruleItems))
J
junotx 已提交
548
		// rules updated successfully. The key is the rule name.
J
junotx 已提交
549
		successes = make(map[string]struct{})
J
junotx 已提交
550 551 552 553
		// rules to be moved to other resources. The key is the resource name in which the rules were.
		moveMap = make(map[string][]*ResourceRuleItem)
		// duplicate rules to be deleted
		delMap = make(map[string][]*ResourceRuleItem)
J
junotx 已提交
554
	)
J
junotx 已提交
555 556 557 558 559

	for i, item := range ruleItems {
		itemsMap[item.ResourceName] = append(itemsMap[item.ResourceName], ruleItems[i])
	}

J
junotx 已提交
560 561 562 563
	// Update the rules in the resources where the rules reside.
	// If duplicate rules are found, the first will be updated and the others will be deleted.
	// if updating the rules in the original resources causes exceeding size limit,
	// they will be moved to other resources and then be updated.
J
junotx 已提交
564 565 566 567 568 569 570 571 572 573 574
	for name, items := range itemsMap {
		var (
			nrules []*RuleWithGroup
			nitems []*ResourceRuleItem
		)

		for i := range items {
			item := items[i]
			if _, ok := successes[item.Alert]; ok {
				delMap[name] = append(delMap[name], item)
				continue
J
junotx 已提交
575
			}
J
junotx 已提交
576 577 578 579
			nrules = append(nrules, &item.RuleWithGroup)
			nitems = append(nitems, item)
		}
		if len(nrules) == 0 {
J
junotx 已提交
580 581 582
			continue
		}

J
junotx 已提交
583
		err := r.doRuleResourceOperation(ctx, ruleNamespace.Name, name, func(pr *promresourcesv1.PrometheusRule) error {
584
			resource := ruleResource(*pr)
J
junotx 已提交
585
			if ok, err := resource.updateAlertingRules(nrules...); err != nil {
J
junotx 已提交
586 587 588 589 590 591
				return err
			} else if ok {
				if err = resource.commit(ctx, r.client); err != nil {
					return err
				}
			}
J
junotx 已提交
592
			return nil
J
junotx 已提交
593 594 595 596 597 598 599 600
		})

		switch {
		case err == nil:
			for _, item := range items {
				successes[item.Alert] = struct{}{}
				respItems = append(respItems, v2alpha1.NewBulkItemSuccessResponse(item.Alert, v2alpha1.ResultUpdated))
			}
J
junotx 已提交
601
		case err == errOutOfConfigMapSize: // Cannot update the rules in the original resource
J
junotx 已提交
602 603 604 605 606 607 608 609 610 611 612 613
			moveMap[name] = append(moveMap[name], nitems...)
		case resourceNotFound(err):
			for _, item := range items {
				respItems = append(respItems, &v2alpha1.BulkItemResponse{
					RuleName:  item.Alert,
					Status:    v2alpha1.StatusError,
					ErrorType: v2alpha1.ErrNotFound,
				})
			}
		default:
			for _, item := range items {
				respItems = append(respItems, v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err))
J
junotx 已提交
614 615 616 617
			}
		}
	}

J
junotx 已提交
618 619 620
	// The move here is not really move, because the move also requires an update.
	// So the actual operations are firstly add the new rules in other resources
	// and then delete the old rules in old resources.
J
junotx 已提交
621 622 623 624 625 626 627 628 629 630 631 632 633
	for name, items := range moveMap {
		var (
			nrules = make([]*RuleWithGroup, 0, len(items))
			nitems = make(map[string]*ResourceRuleItem, len(items))
		)
		for i := range items {
			item := items[i]
			nrules = append(nrules, &item.RuleWithGroup)
			nitems[item.Alert] = item
		}
		if len(nrules) == 0 {
			continue
		}
J
junotx 已提交
634

J
junotx 已提交
635 636
		aRespItems, err := r.addAlertingRules(ctx, ruleNamespace, extraRuleResourceSelector,
			map[string]struct{}{name: {}}, ruleResourceLabels, nrules...)
J
junotx 已提交
637
		if err != nil {
J
junotx 已提交
638 639 640 641
			for _, item := range items {
				respItems = append(respItems, v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err))
			}
			continue
J
junotx 已提交
642
		}
J
junotx 已提交
643 644 645 646 647 648 649

		for i := range aRespItems {
			resp := aRespItems[i]
			switch resp.Status {
			case v2alpha1.StatusSuccess:
				if item, ok := nitems[resp.RuleName]; ok {
					delMap[name] = append(delMap[name], item)
J
junotx 已提交
650
				}
J
junotx 已提交
651 652
			default:
				respItems = append(respItems, resp)
J
junotx 已提交
653 654 655
			}
		}
	}
J
junotx 已提交
656 657 658 659 660 661 662 663 664 665 666 667

	for _, items := range delMap {
		dRespItems, err := r.DeleteAlertingRules(ctx, ruleNamespace, items...)
		if err != nil {
			for _, item := range items {
				respItems = append(respItems, v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err))
			}
			continue
		}
		for i := range dRespItems {
			resp := dRespItems[i]
			if resp.Status == v2alpha1.StatusSuccess {
J
junotx 已提交
668
				// The delete operation here is for updating, so update the result to v2alpha1.ResultUpdated
J
junotx 已提交
669 670 671 672 673 674 675
				resp.Result = v2alpha1.ResultUpdated
			}
			respItems = append(respItems, resp)
		}
	}

	return respItems, nil
J
junotx 已提交
676 677
}

J
junotx 已提交
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694
func (r *ThanosRuler) DeleteAlertingRules(ctx context.Context, ruleNamespace *corev1.Namespace,
	ruleItems ...*ResourceRuleItem) ([]*v2alpha1.BulkItemResponse, error) {

	var (
		itemsMap  = make(map[string][]*ResourceRuleItem)
		respItems = make([]*v2alpha1.BulkItemResponse, 0, len(ruleItems))
	)

	for i, ruleItem := range ruleItems {
		itemsMap[ruleItem.ResourceName] = append(itemsMap[ruleItem.ResourceName], ruleItems[i])
	}

	resp := func(item *ResourceRuleItem, err error) *v2alpha1.BulkItemResponse {
		if err != nil {
			return v2alpha1.NewBulkItemErrorServerResponse(item.Alert, err)
		}
		return v2alpha1.NewBulkItemSuccessResponse(item.Alert, v2alpha1.ResultDeleted)
J
junotx 已提交
695
	}
J
junotx 已提交
696 697 698 699 700 701 702 703

	for name, items := range itemsMap {
		var rules []*RuleWithGroup
		for i := range items {
			rules = append(rules, &items[i].RuleWithGroup)
		}

		err := r.doRuleResourceOperation(ctx, ruleNamespace.Name, name, func(pr *promresourcesv1.PrometheusRule) error {
704
			resource := ruleResource(*pr)
J
junotx 已提交
705
			if ok, err := resource.deleteAlertingRules(rules...); err != nil {
J
junotx 已提交
706
				return err
J
junotx 已提交
707 708 709 710
			} else if ok {
				if err = resource.commit(ctx, r.client); err != nil {
					return err
				}
J
junotx 已提交
711
			}
J
junotx 已提交
712
			return nil
J
junotx 已提交
713 714 715
		})
		for _, item := range items {
			respItems = append(respItems, resp(item, err))
J
junotx 已提交
716 717
		}
	}
J
junotx 已提交
718 719

	return respItems, nil
J
junotx 已提交
720 721
}

J
junotx 已提交
722
func (r *ThanosRuler) doRuleResourceOperation(ctx context.Context, namespace, name string,
723
	operation func(pr *promresourcesv1.PrometheusRule) error) error {
J
junotx 已提交
724
	// Lock here is used to lock specific resource in order to prevent frequent conflicts
J
junotx 已提交
725
	key := namespace + "/" + name
J
junotx 已提交
726
	return retry.RetryOnConflict(retry.DefaultRetry, func() error {
J
junotx 已提交
727 728 729
		ruleResourceLocker.Lock(key)
		defer ruleResourceLocker.Unlock(key)
		pr, err := r.client.MonitoringV1().PrometheusRules(namespace).Get(ctx, name, metav1.GetOptions{})
J
junotx 已提交
730 731 732 733 734 735 736
		if err != nil {
			return err
		}
		return operation(pr)
	})
}

J
junotx 已提交
737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
func ruleNamespaceSelected(r Ruler, ruleNamespace *corev1.Namespace) (bool, error) {
	rnSelector, err := r.RuleResourceNamespaceSelector()
	if err != nil {
		return false, err
	}
	if rnSelector == nil { // refer to the comment of Prometheus.Spec.RuleResourceNamespaceSelector
		if r.Namespace() != ruleNamespace.Name {
			return false, nil
		}
	} else {
		if !rnSelector.Matches(labels.Set(ruleNamespace.Labels)) {
			return false, nil
		}
	}
	return true, nil
}
J
junotx 已提交
753 754 755 756 757 758 759 760 761 762

func resourceNotFound(err error) bool {
	switch e := err.(type) {
	case *apierrors.StatusError:
		if e.Status().Code == http.StatusNotFound {
			return true
		}
	}
	return false
}