diff --git a/pkg/models/alerting/rules/ruler.go b/pkg/models/alerting/rules/ruler.go index 887a6e899faedb1956bce7bf0dad7886c6da31f3..13ff90a4bd9f8b6e66efb206ac3dd2a29f62eb78 100644 --- a/pkg/models/alerting/rules/ruler.go +++ b/pkg/models/alerting/rules/ruler.go @@ -3,16 +3,20 @@ package rules import ( "context" "fmt" + "net/http" "sort" + "github.com/docker/docker/pkg/locker" "github.com/ghodss/yaml" "github.com/pkg/errors" promresourcesv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" prominformersv1 "github.com/prometheus-operator/prometheus-operator/pkg/client/informers/externalversions/monitoring/v1" promresourcesclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/util/retry" "kubesphere.io/kubesphere/pkg/api/alerting/v2alpha1" ) @@ -268,6 +272,7 @@ type ThanosRuler struct { resource *promresourcesv1.ThanosRuler informer prominformersv1.PrometheusRuleInformer client promresourcesclient.Interface + locker locker.Locker } func NewThanosRuler(resource *promresourcesv1.ThanosRuler, informer prominformersv1.PrometheusRuleInformer, @@ -345,7 +350,7 @@ func (r *ThanosRuler) AddAlertingRule(ctx context.Context, ruleNamespace *corev1 } func (r *ThanosRuler) addAlertingRule(ctx context.Context, ruleNamespace *corev1.Namespace, - prometheusRules []*promresourcesv1.PrometheusRule, excludeRuleResources map[string]*ruleResource, + prometheusRules []*promresourcesv1.PrometheusRule, excludePrometheusRules map[string]*promresourcesv1.PrometheusRule, group string, rule *promresourcesv1.Rule, ruleResourceLabels map[string]string) error { sort.Slice(prometheusRules, func(i, j int) bool { @@ -353,23 +358,30 @@ func (r *ThanosRuler) addAlertingRule(ctx context.Context, ruleNamespace *corev1 }) for _, prometheusRule := range prometheusRules { - if len(excludeRuleResources) > 0 { - if _, ok := excludeRuleResources[prometheusRule.Name]; ok { + if len(excludePrometheusRules) > 0 { + if _, ok := excludePrometheusRules[prometheusRule.Name]; ok { continue } } - resource := ruleResource(*prometheusRule) - if ok, err := resource.addAlertingRule(group, rule); err != nil { + if err := r.doRuleResourceOperation(prometheusRule, func(newerPr *promresourcesv1.PrometheusRule) error { + resource := ruleResource(*newerPr) + if ok, err := resource.addAlertingRule(group, rule); err != nil { + return err + } else if ok { + if err = resource.commit(ctx, r.client); err != nil { + return err + } + } + return nil + }); err != nil { if err == errOutOfConfigMapSize { break + } else if resourceNotFound(err) { + continue } return err - } else if ok { - if err = resource.commit(ctx, r.client); err != nil { - return err - } - return nil } + return nil } // create a new rule resource and add rule into it when all existing rule resources are full. newPromRule := promresourcesv1.PrometheusRule{ @@ -403,38 +415,52 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor } var ( - found bool - success bool - resourcesToDelRule = make(map[string]*ruleResource) + found bool + success bool + prsToDelRule = make(map[string]*promresourcesv1.PrometheusRule) ) - for _, prometheusRule := range prometheusRules { - resource := ruleResource(*prometheusRule) + for i, prometheusRule := range prometheusRules { if success { // If the update has been successful, delete the possible same rule in other resources - if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil { + if err := r.doRuleResourceOperation(prometheusRule, func(newerPr *promresourcesv1.PrometheusRule) error { + resource := ruleResource(*newerPr) + if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil { + return err + } else if ok { + if err = resource.commit(ctx, r.client); err != nil { + return err + } + } + return nil + }); err != nil && !resourceNotFound(err) { + return err + } + continue + } + + if err := r.doRuleResourceOperation(prometheusRule, func(newerPr *promresourcesv1.PrometheusRule) error { + resource := ruleResource(*newerPr) + if ok, err := resource.updateAlertingRule(group, rule); err != nil { return err } else if ok { if err = resource.commit(ctx, r.client); err != nil { return err } } - continue - } - if ok, err := resource.updateAlertingRule(group, rule); err != nil { - if err == errOutOfConfigMapSize { - // updating the rule in the resource will oversize the size limit, + return nil + }); err != nil { + if resourceNotFound(err) { + continue + } else if err == errOutOfConfigMapSize { + // updating the rule in the resource may oversize the size limit, // so delete it and then add the new rule to a new resource. - resourcesToDelRule[resource.Name] = &resource + prsToDelRule[prometheusRule.Name] = prometheusRules[i] found = true - } else { - return err - } - } else if ok { - if err = resource.commit(ctx, r.client); err != nil { - return err + continue } - found = true - success = true + return err } + found = true + success = true } if !found { @@ -442,18 +468,24 @@ func (r *ThanosRuler) UpdateAlertingRule(ctx context.Context, ruleNamespace *cor } if !success { - err := r.addAlertingRule(ctx, ruleNamespace, prometheusRules, resourcesToDelRule, group, rule, ruleResourceLabels) + err := r.addAlertingRule(ctx, ruleNamespace, prometheusRules, prsToDelRule, group, rule, ruleResourceLabels) if err != nil { return err } } - for _, resource := range resourcesToDelRule { - if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil { - return err - } else if ok { - if err = resource.commit(ctx, r.client); err != nil { + for _, pr := range prsToDelRule { + if err := r.doRuleResourceOperation(pr, func(newerPr *promresourcesv1.PrometheusRule) error { + resource := ruleResource(*newerPr) + if ok, err := resource.deleteAlertingRule(rule.Alert); err != nil { return err + } else if ok { + if err = resource.commit(ctx, r.client); err != nil { + return err + } } + return nil + }); err != nil && !resourceNotFound(err) { + return err } } return nil @@ -467,15 +499,23 @@ func (r *ThanosRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *cor } var success bool for _, prometheusRule := range prometheusRules { - resource := ruleResource(*prometheusRule) - if ok, err := resource.deleteAlertingRule(name); err != nil { - return err - } else if ok { - if err = resource.commit(ctx, r.client); err != nil { + if err := r.doRuleResourceOperation(prometheusRule, func(newerPr *promresourcesv1.PrometheusRule) error { + resource := ruleResource(*newerPr) + if ok, err := resource.deleteAlertingRule(name); err != nil { return err + } else if ok { + if err = resource.commit(ctx, r.client); err != nil { + return err + } } - success = true + return nil + }); err != nil { + if resourceNotFound(err) { + continue + } + return err } + success = true } if !success { return v2alpha1.ErrAlertingRuleNotFound @@ -483,6 +523,20 @@ func (r *ThanosRuler) DeleteAlertingRule(ctx context.Context, ruleNamespace *cor return nil } +func (r *ThanosRuler) doRuleResourceOperation(pr *promresourcesv1.PrometheusRule, + operation func(newerPr *promresourcesv1.PrometheusRule) error) error { + key := pr.Namespace + "/" + pr.Name + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + r.locker.Lock(key) + defer r.locker.Unlock(key) + pr, err := r.informer.Lister().PrometheusRules(pr.Namespace).Get(pr.Name) + if err != nil { + return err + } + return operation(pr) + }) +} + func ruleNamespaceSelected(r Ruler, ruleNamespace *corev1.Namespace) (bool, error) { rnSelector, err := r.RuleResourceNamespaceSelector() if err != nil { @@ -499,3 +553,13 @@ func ruleNamespaceSelected(r Ruler, ruleNamespace *corev1.Namespace) (bool, erro } return true, nil } + +func resourceNotFound(err error) bool { + switch e := err.(type) { + case *apierrors.StatusError: + if e.Status().Code == http.StatusNotFound { + return true + } + } + return false +}