diff --git a/judge/handler.go b/judge/handler.go index 0a68b315b0f4c1729bac695dbad81e001b1afbe3..c812875e62508610402a764f586bd1965acec145 100644 --- a/judge/handler.go +++ b/judge/handler.go @@ -49,6 +49,7 @@ func Send(points []*vos.MetricPoint) { // 这个监控数据没有关联任何告警策略,省事了不用处理 continue } + logger.Debugf("[point_match_alertRules][point:%+v][alertRuleNum:%+v]", points[i], rulesCount) // 不同的告警规则,alert_duration字段大小不同,找到最大的,按照最大的值来缓存历史数据 var maxAliveDuration = 0 @@ -57,10 +58,6 @@ func Send(points []*vos.MetricPoint) { maxAliveDuration = alertRules[j].AlertDuration } } - if len(points[i].PK) < 2 { - logger.Debugf("[point:%+v] len(pk)<2", points[i]) - continue - } ll := PointCaches[points[i].PK[0:2]].PutPoint(points[i], int64(maxAliveDuration)) diff --git a/judge/history.go b/judge/history.go index 0dcf3db60248569bbd2c0f01567e459c3f24bcf0..be6db525098f01d3ef374eb894a1804985dcc9a1 100644 --- a/judge/history.go +++ b/judge/history.go @@ -17,6 +17,7 @@ package judge import ( "container/list" "sync" + "time" "github.com/didi/nightingale/v5/vos" ) @@ -50,7 +51,7 @@ func (pc *PointCache) Len() int { } func (pc *PointCache) CleanStale(before int64) { - keys := []string{} + var keys []string pc.RLock() for key, L := range pc.M { @@ -97,12 +98,25 @@ func (pc *PointCache) PutPoint(p *vos.MetricPoint, maxAliveDuration int64) *Safe // 这是个线程不安全的大Map,需要提前初始化好 var PointCaches = make(map[string]*PointCache) +var pointChars = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"} +var pointHeadKeys = make([]string, 0, 256) func initPointCaches() { - arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"} for i := 0; i < 16; i++ { for j := 0; j < 16; j++ { - PointCaches[arr[i]+arr[j]] = NewPointCache() + pointHeadKeys = append(pointHeadKeys, pointChars[i]+pointChars[j]) } } + + for i := 0; i < 256; i++ { + PointCaches[pointHeadKeys[i]] = NewPointCache() + } +} + +func CleanStalePoints() { + // 监控数据2天都没关联到任何告警策略,说明对应的告警策略已经删除了 + before := time.Now().Unix() - 3600*24*2 + for i := 0; i < 256; i++ { + PointCaches[pointHeadKeys[i]].CleanStale(before) + } } diff --git a/judge/judge.go b/judge/judge.go index 49750f5e8ce19fe1bcca62110455e4587064858e..9024f2911b205235eb7c3fc49491ec7c1cb6b001 100644 --- a/judge/judge.go +++ b/judge/judge.go @@ -38,10 +38,14 @@ func Start(ctx context.Context) { os.Exit(1) } + // 启动心跳goroutinue,如果挂了,trans可以及时感知 go loopHeartbeat() // PULL型的策略不着急,等一段时间(等哈希环是稳态的)再开始周期性干活 go syncPullRules(ctx) + + // 告警策略删除之后,针对这些告警策略缓存的监控数据要被清理 + go loopCleanStalePoints() } func syncPullRules(ctx context.Context) { @@ -102,3 +106,10 @@ func heartbeat(endpoint string) error { } return nil } + +func loopCleanStalePoints() { + for { + time.Sleep(time.Hour) + CleanStalePoints() + } +}