diff --git a/src/dataobj/event.go b/src/dataobj/event.go index 157b466bd59577859aab4b9a442e6ab8aef349be..e52b9559e428085e4d255fbecc52122debdb627c 100644 --- a/src/dataobj/event.go +++ b/src/dataobj/event.go @@ -41,3 +41,16 @@ func RRDData2HistoryData(datas []*RRDData) []*HistoryData { } return historyDatas } + +func HistoryData2RRDData(datas []*HistoryData) []*RRDData { + rrdDatas := make([]*RRDData, len(datas)) + + for i := range datas { + data := &RRDData{ + Timestamp: datas[i].Timestamp, + Value: datas[i].Value, + } + rrdDatas[i] = data + } + return rrdDatas +} diff --git a/src/dataobj/metric.go b/src/dataobj/metric.go index 61cbedd745990feefab5556d160c5663c8d82ea9..2966c65bbf92214f13a9253ad9796330e8959cc5 100644 --- a/src/dataobj/metric.go +++ b/src/dataobj/metric.go @@ -129,6 +129,8 @@ func (m *MetricValue) CheckValidity(now int64) (err error) { m.Timestamp = now } + m.Timestamp = alignTs(m.Timestamp, int64(m.Step)) + valid := true var vv float64 @@ -344,3 +346,7 @@ func (bm BuiltinMetricSlice) Swap(i, j int) { func (bm BuiltinMetricSlice) Less(i, j int) bool { return bm[i].String() < bm[j].String() } + +func alignTs(ts int64, period int64) int64 { + return ts - ts%period +} diff --git a/src/modules/judge/backend/query/query.go b/src/modules/judge/backend/query/query.go index 3fff76c950c8de45ee979ec186c2f1e3ac02d3f2..4ef54658a080127590878ba29f8643f20298493d 100644 --- a/src/modules/judge/backend/query/query.go +++ b/src/modules/judge/backend/query/query.go @@ -4,15 +4,17 @@ import ( "errors" "fmt" "math/rand" + "strings" "time" - "github.com/toolkits/pkg/logger" - "github.com/toolkits/pkg/net/httplib" - "github.com/didi/nightingale/src/dataobj" + "github.com/didi/nightingale/src/modules/judge/cache" "github.com/didi/nightingale/src/toolkits/address" "github.com/didi/nightingale/src/toolkits/stats" "github.com/didi/nightingale/src/toolkits/str" + + "github.com/toolkits/pkg/logger" + "github.com/toolkits/pkg/net/httplib" ) var ( @@ -35,29 +37,107 @@ type Counter struct { // 执行Query操作 // 默认不重试, 如果要做重试, 在这里完成 -func Query(reqs []*dataobj.QueryData) ([]*dataobj.TsdbQueryResponse, error) { - stats.Counter.Set("get.data", 1) - +func Query(reqs []*dataobj.QueryData, sid int64, expFunc string) []*dataobj.TsdbQueryResponse { + stats.Counter.Set("query.data", 1) var resp *dataobj.QueryDataResp + var respData []*dataobj.TsdbQueryResponse var err error - for i := 0; i < 3; i++ { - err = TransferConnPools.Call("", "Transfer.Query", reqs, &resp) - if err == nil { - break + + respData, reqs = QueryFromMem(reqs, sid) + if len(reqs) > 0 { + stats.Counter.Set("query.data.by.transfer", 1) + for i := 0; i < 3; i++ { + err = TransferConnPools.Call("", "Transfer.Query", reqs, &resp) + if err == nil { + break + } + time.Sleep(500 * time.Millisecond) + } + if err != nil { + stats.Counter.Set("query.data.transfer.err", 1) + logger.Warning("get data err:%v msg:%+v, query data from mem", err, resp) + } else { + respData = append(respData, resp.Data...) } - time.Sleep(500 * time.Millisecond) } - if err != nil { - return nil, err + + return respData +} + +type QueryData struct { + Start int64 `json:"start"` + End int64 `json:"end"` + ConsolFunc string `json:"consolFunc"` + Endpoints []string `json:"endpoints"` + Counters []string `json:"counters"` + Step int `json:"step"` + DsType string `json:"dstype"` +} + +func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryResponse, []*dataobj.QueryData) { + stats.Counter.Set("query.data.by.mem", 1) + + var resps []*dataobj.TsdbQueryResponse + var newReqs []*dataobj.QueryData + for _, req := range reqs { + newReq := &dataobj.QueryData{ + Start: req.Start, + End: req.End, + ConsolFunc: req.ConsolFunc, + Step: req.Step, + DsType: req.DsType, + } + + for _, endpoint := range req.Endpoints { + for _, counter := range req.Counters { + metric, tagsMap := Counter2Metric(counter) + resp := &dataobj.TsdbQueryResponse{ + Endpoint: endpoint, + Counter: counter, + Step: req.Step, + DsType: req.DsType, + } + + item := &dataobj.JudgeItem{ + Endpoint: endpoint, + Metric: metric, + TagsMap: tagsMap, + Sid: sid, + } + + pk := item.MD5() + linkedList, exists := cache.HistoryBigMap[pk[0:2]].Get(pk) + if exists { + historyData := linkedList.QueryDataByTS(req.Start, req.End) + resp.Values = dataobj.HistoryData2RRDData(historyData) + } + if len(resp.Values) > 0 { + resps = append(resps, resp) + } else { + newReq.Endpoints = append(newReq.Endpoints, endpoint) + newReq.Counters = append(newReq.Counters, counter) + } + } + } + if len(newReq.Counters) > 0 { + newReqs = append(newReqs, newReq) + } } - if resp.Msg != "" { - return nil, errors.New(resp.Msg) + + return resps, newReqs +} + +func Counter2Metric(counter string) (string, map[string]string) { + arr := strings.Split(counter, "/") + if len(arr) == 1 { + return arr[0], nil } - return resp.Data, nil + + return arr[0], str.DictedTagstring(arr[1]) } func NewQueryRequest(endpoint, metric string, tagsMap map[string]string, - start, end int64) (*dataobj.QueryData, error) { + step int, start, end int64) (*dataobj.QueryData, error) { if end <= start || start < 0 { return nil, ErrorQueryParamIllegal } @@ -71,6 +151,7 @@ func NewQueryRequest(endpoint, metric string, tagsMap map[string]string, return &dataobj.QueryData{ Start: start, End: end, + Step: step, ConsolFunc: "AVERAGE", // 硬编码 Endpoints: []string{endpoint}, Counters: []string{counter}, diff --git a/src/modules/judge/cache/index.go b/src/modules/judge/cache/index.go index 1f38ee804979c133bb46c07436632f03953a36a4..cf125908afcf1b96d394a13e356f4da902a9dee1 100644 --- a/src/modules/judge/cache/index.go +++ b/src/modules/judge/cache/index.go @@ -53,7 +53,7 @@ func (i *IndexMap) Get(id int64) []Series { } func (i *IndexMap) CleanLoop() { - t1 := time.NewTicker(time.Duration(60) * time.Second) + t1 := time.NewTicker(time.Duration(600) * time.Second) for { <-t1.C i.Clean() diff --git a/src/modules/judge/cache/linkedlist.go b/src/modules/judge/cache/linkedlist.go index 77773ff94bef8cfca193c6c3e776092cb1a35b21..8189fb826d2e6f852ef5a09da875042080d8768e 100644 --- a/src/modules/judge/cache/linkedlist.go +++ b/src/modules/judge/cache/linkedlist.go @@ -108,3 +108,59 @@ func (ll *SafeLinkedList) HistoryData(limit int) ([]*dataobj.HistoryData, bool) return vs, isEnough } + +func (ll *SafeLinkedList) QueryDataByTS(start, end int64) []*dataobj.HistoryData { + size := ll.Len() + if size == 0 { + return []*dataobj.HistoryData{} + } + + firstElement := ll.Front() + firstItem := firstElement.Value.(*dataobj.JudgeItem) + + var vs []*dataobj.HistoryData + judgeType := firstItem.DsType[0] + + if judgeType == 'G' || judgeType == 'g' { + if firstItem.Timestamp < start { + //最新的点也比起始时间旧,直接返回 + return vs + } + + v := &dataobj.HistoryData{ + Timestamp: firstItem.Timestamp, + Value: dataobj.JsonFloat(firstItem.Value), + Extra: firstItem.Extra, + } + + vs = append(vs, v) + currentElement := firstElement + + for { + nextElement := currentElement.Next() + if nextElement == nil { + return vs + } + + if nextElement.Value.(*dataobj.JudgeItem).Timestamp < start { + return vs + } + + if nextElement.Value.(*dataobj.JudgeItem).Timestamp > end { + currentElement = nextElement + continue + } + + v := &dataobj.HistoryData{ + Timestamp: nextElement.Value.(*dataobj.JudgeItem).Timestamp, + Value: dataobj.JsonFloat(nextElement.Value.(*dataobj.JudgeItem).Value), + Extra: nextElement.Value.(*dataobj.JudgeItem).Extra, + } + + vs = append(vs, v) + currentElement = nextElement + } + } + + return vs +} diff --git a/src/modules/judge/judge/judge.go b/src/modules/judge/judge/judge.go index 422bdd5a43b62a0bec68fd6c3746870bb0a615c6..f4e0d688c06798cc6372297508181a6048ac8867 100644 --- a/src/modules/judge/judge/judge.go +++ b/src/modules/judge/judge/judge.go @@ -30,8 +30,18 @@ var ( EVENT_RECOVER = "recovery" ) +func GetStra(sid int64) (*model.Stra, bool) { + if stra, exists := cache.Strategy.Get(sid); exists { + return stra, exists + } + if stra, exists := cache.NodataStra.Get(sid); exists { + return stra, exists + } + return nil, false +} + func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem, now int64) { - stra, exists := cache.Strategy.Get(val.Sid) + stra, exists := GetStra(val.Sid) if !exists { stats.Counter.Set("point.miss", 1) return @@ -237,43 +247,31 @@ func GetData(stra *model.Stra, exp model.Exp, firstItem *dataobj.JudgeItem, now var reqs []*dataobj.QueryData var respData []*dataobj.TsdbQueryResponse var err error - stats.Counter.Set("get.data", 1) - - if sameTag { + if sameTag { //与条件要求是相同tag的场景,不需要查询索引 if firstItem.Tags != "" && len(firstItem.TagsMap) == 0 { firstItem.TagsMap = str.DictedTagstring(firstItem.Tags) } //+1 防止由于查询不到最新点,导致点数不够 start := now - int64(stra.AlertDur) - int64(firstItem.Step) + 1 - queryParam, err := query.NewQueryRequest(firstItem.Endpoint, exp.Metric, firstItem.TagsMap, start, now) + queryParam, err := query.NewQueryRequest(firstItem.Endpoint, exp.Metric, firstItem.TagsMap, firstItem.Step, start, now) if err != nil { return respData, err } reqs = append(reqs, queryParam) - } else if firstItem != nil { - reqs, err = GetReqs(stra, exp.Metric, []string{firstItem.Endpoint}, now) - if err != nil { - stats.Counter.Set("get.index.err", 1) - - return respData, err - } - } else { - reqs, err = GetReqs(stra, exp.Metric, stra.Endpoints, now) - if err != nil { - stats.Counter.Set("get.index.err", 1) - - return respData, err - } + } else if firstItem != nil { //点驱动告警策略的场景 + reqs = GetReqs(stra, exp.Metric, []string{firstItem.Endpoint}, now) + } else { //nodata的场景 + reqs = GetReqs(stra, exp.Metric, stra.Endpoints, now) } - respData, err = query.Query(reqs) - if err != nil { - stats.Counter.Set("get.data.err", 1) + if len(reqs) == 0 { return respData, err } + respData = query.Query(reqs, stra.Id, exp.Func) + if len(respData) < 1 { stats.Counter.Set("get.data.null", 1) err = fmt.Errorf("get query data is null") @@ -281,8 +279,9 @@ func GetData(stra *model.Stra, exp model.Exp, firstItem *dataobj.JudgeItem, now return respData, err } -func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([]*dataobj.QueryData, error) { +func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) []*dataobj.QueryData { var reqs []*dataobj.QueryData + stats.Counter.Set("query.index", 1) req := &query.IndexReq{ Endpoints: endpoints, @@ -302,10 +301,10 @@ func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([] } } - stats.Counter.Set("get.index", 1) indexsData, err := query.Xclude(req) if err != nil { - logger.Warning("get index err:", err) + stats.Counter.Set("query.index.err", 1) + logger.Warning("query index err:", err) } lostSeries := []cache.Series{} @@ -353,7 +352,7 @@ func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([] seriess := cache.SeriesMap.Get(stra.Id) if len(seriess) == 0 && err != nil { - return reqs, err + return reqs } step := 0 @@ -361,7 +360,7 @@ func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([] step = seriess[0].Step } - //防止由于差不到最新点,导致点数不够 + //防止由于查询不到最新点,导致点数不够 start := now - int64(stra.AlertDur) - int64(step) + 1 for _, series := range seriess { counter := series.Metric @@ -397,7 +396,7 @@ func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([] reqs = append(reqs, queryParam) } - return reqs, nil + return reqs } func sendEventIfNeed(historyData []*dataobj.HistoryData, status []bool, event *dataobj.Event, stra *model.Stra) { diff --git a/src/modules/judge/judge/nodata.go b/src/modules/judge/judge/nodata.go index bdf0a0fd730fb56ddfd46d52d54283096c044ca6..af6c83004531147283a3988b3b8238c0158cf3cd 100644 --- a/src/modules/judge/judge/nodata.go +++ b/src/modules/judge/judge/nodata.go @@ -19,6 +19,12 @@ func NodataJudge(concurrency int) { concurrency = 1000 } nodataJob = semaphore.NewSemaphore(concurrency) + for { + if time.Now().Unix()%10 == 0 { + break + } + time.Sleep(1 * time.Second) + } t1 := time.NewTicker(time.Duration(10) * time.Second) nodataJudge() @@ -41,23 +47,6 @@ func nodataJudge() { respData, err := GetData(stra, stra.Exprs[0], nil, now, false) if err != nil { logger.Errorf("stra:%+v get query data err:%v", stra, err) - //获取数据报错,直接出发nodata - for _, endpoint := range stra.Endpoints { - if endpoint == "" { - continue - } - judgeItem := &dataobj.JudgeItem{ - Endpoint: endpoint, - Metric: stra.Exprs[0].Metric, - Tags: "", - TagsMap: map[string]string{}, - DsType: "GAUGE", - Step: 10, - } - - nodataJob.Acquire() - go AsyncJudge(nodataJob, stra, stra.Exprs, []*dataobj.HistoryData{}, judgeItem, now, []dataobj.History{}, "", "", "", []bool{}) - } continue } diff --git a/src/modules/judge/rpc/push.go b/src/modules/judge/rpc/push.go index d0595b5763b8c0af2c675489b397c02a64896a53..159623eb051e98808c6cf3a1444763dcb828aad8 100644 --- a/src/modules/judge/rpc/push.go +++ b/src/modules/judge/rpc/push.go @@ -1,8 +1,6 @@ package rpc import ( - "time" - "github.com/didi/nightingale/src/dataobj" "github.com/didi/nightingale/src/modules/judge/cache" "github.com/didi/nightingale/src/modules/judge/judge" @@ -19,10 +17,11 @@ func (j *Judge) Ping(req dataobj.NullRpcRequest, resp *dataobj.SimpleRpcResponse func (j *Judge) Send(items []*dataobj.JudgeItem, resp *dataobj.SimpleRpcResponse) error { // 把当前时间的计算放在最外层,是为了减少获取时间时的系统调用开销 - now := time.Now().Unix() + for _, item := range items { + now := item.Timestamp pk := item.MD5() - logger.Debug("recv-->", item) + logger.Debugf("recv-->%+v", item) stats.Counter.Set("push.in", 1) go judge.ToJudge(cache.HistoryBigMap[pk[0:2]], pk, item, now) diff --git a/src/modules/transfer/backend/sender.go b/src/modules/transfer/backend/sender.go index b5e9f85502793eb81851292356d66898f27b2c59..aa9b29f38ac693dc8986e557492c0aebea3fc8a7 100644 --- a/src/modules/transfer/backend/sender.go +++ b/src/modules/transfer/backend/sender.go @@ -250,8 +250,6 @@ func convert2TsdbItem(d *dataobj.MetricValue) *dataobj.TsdbItem { Max: "U", } - item.Timestamp = alignTs(item.Timestamp, int64(item.Step)) - return item } diff --git a/src/modules/transfer/cron/stra.go b/src/modules/transfer/cron/stra.go index 969161c5716a2e4563e758c63bd273088f84e94a..3281346a408e3ba4ba440aaca967e3eab090a69c 100644 --- a/src/modules/transfer/cron/stra.go +++ b/src/modules/transfer/cron/stra.go @@ -75,26 +75,24 @@ func getStrategy() { logger.Warningf("illegal stra:%v exprs", stra) continue } - // nodata 策略不使用 push 模式 - if stra.Exprs[0].Func == "nodata" { - continue - } - metric := stra.Exprs[0].Metric - for _, endpoint := range stra.Endpoints { - key := str.PK(metric, endpoint) //TODO get straMap key, 此处需要优化 - k1 := key[0:2] //为了加快查找,增加一层 map,key 为计算出来的 hash 的前 2 位 + for _, exp := range stra.Exprs { + metric := exp.Metric + for _, endpoint := range stra.Endpoints { + key := str.PK(metric, endpoint) //TODO get straMap key, 此处需要优化 + k1 := key[0:2] //为了加快查找,增加一层 map,key 为计算出来的 hash 的前 2 位 - if _, exists := straMap[k1]; !exists { - straMap[k1] = make(map[string][]*model.Stra) - } + if _, exists := straMap[k1]; !exists { + straMap[k1] = make(map[string][]*model.Stra) + } - if _, exists := straMap[k1][key]; !exists { - straMap[k1][key] = []*model.Stra{stra} - stats.Counter.Set("stra.key", 1) + if _, exists := straMap[k1][key]; !exists { + straMap[k1][key] = []*model.Stra{stra} + stats.Counter.Set("stra.key", 1) - } else { - straMap[k1][key] = append(straMap[k1][key], stra) + } else { + straMap[k1][key] = append(straMap[k1][key], stra) + } } } }