recording_rule_cache.go 2.9 KB
Newer Older
T
Tripitakav 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
package memsto

import (
	"fmt"
	"sync"
	"time"

	"github.com/didi/nightingale/v5/src/models"
	"github.com/didi/nightingale/v5/src/server/config"
	promstat "github.com/didi/nightingale/v5/src/server/stat"
	"github.com/pkg/errors"
	"github.com/toolkits/pkg/logger"
)

type RecordingRuleCacheType struct {
	statTotal       int64
	statLastUpdated int64

	sync.RWMutex
	rules map[int64]*models.RecordingRule // key: rule id
}

var RecordingRuleCache = RecordingRuleCacheType{
	statTotal:       -1,
	statLastUpdated: -1,
	rules:           make(map[int64]*models.RecordingRule),
}

func (rrc *RecordingRuleCacheType) StatChanged(total, lastUpdated int64) bool {
	if rrc.statTotal == total && rrc.statLastUpdated == lastUpdated {
		return false
	}

	return true
}

func (rrc *RecordingRuleCacheType) Set(m map[int64]*models.RecordingRule, total, lastUpdated int64) {
	rrc.Lock()
	rrc.rules = m
	rrc.Unlock()

	// only one goroutine used, so no need lock
	rrc.statTotal = total
	rrc.statLastUpdated = lastUpdated
}

func (rrc *RecordingRuleCacheType) Get(ruleId int64) *models.RecordingRule {
	rrc.RLock()
	defer rrc.RUnlock()
	return rrc.rules[ruleId]
}

func (rrc *RecordingRuleCacheType) GetRuleIds() []int64 {
	rrc.RLock()
	defer rrc.RUnlock()

	count := len(rrc.rules)
	list := make([]int64, 0, count)
	for ruleId := range rrc.rules {
		list = append(list, ruleId)
	}

	return list
}

func SyncRecordingRules() {
	err := syncRecordingRules()
	if err != nil {
		fmt.Println("failed to sync recording rules:", err)
		exit(1)
	}

	go loopSyncRecordingRules()
}

func loopSyncRecordingRules() {
	duration := time.Duration(9000) * time.Millisecond
	for {
		time.Sleep(duration)
		if err := syncRecordingRules(); err != nil {
			logger.Warning("failed to sync recording rules:", err)
		}
	}
}

func syncRecordingRules() error {
	start := time.Now()

	stat, err := models.RecordingRuleStatistics(config.C.ClusterName)
	if err != nil {
		return errors.WithMessage(err, "failed to exec RecordingRuleStatistics")
	}

	if !RecordingRuleCache.StatChanged(stat.Total, stat.LastUpdated) {
		promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0)
		promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0)
		logger.Debug("recoding rules not changed")
		return nil
	}

	lst, err := models.RecordingRuleGetsByCluster(config.C.ClusterName)
	if err != nil {
		return errors.WithMessage(err, "failed to exec RecordingRuleGetsByCluster")
	}

	m := make(map[int64]*models.RecordingRule)
	for i := 0; i < len(lst); i++ {
		m[lst[i].Id] = lst[i]
	}

	RecordingRuleCache.Set(m, stat.Total, stat.LastUpdated)

	ms := time.Since(start).Milliseconds()
	promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(ms))
	promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(len(m)))
	logger.Infof("timer: sync recording rules done, cost: %dms, number: %d", ms, len(m))

	return nil
}