提交 d7e5e2b4 编写于 作者: 7 710leo

refactor judge

上级 29234321
......@@ -68,124 +68,103 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem,
if !isEnough {
return
}
history := []dataobj.History{}
Judge(stra, stra.Exprs, historyData, val, now, history, "", "", "", []bool{})
}
func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, extra string, status []bool) {
stats.Counter.Set("running", 1)
if len(exps) < 1 {
stats.Counter.Set("stra.illegal", 1)
logger.Warningf("stra:%+v exp is null", stra)
return
}
exp := exps[0]
var leftValue dataobj.JsonFloat
var isTriggered bool
historyArr := []dataobj.History{}
statusArr := []bool{}
eventInfo := ""
value := ""
if exp.Func == "nodata" {
info += fmt.Sprintf(" %s (%s,%ds)", exp.Metric, exp.Func, stra.AlertDur)
} else if exp.Func == "stddev" {
info += fmt.Sprintf(" %s (%s,%ds) %v", exp.Metric, exp.Func, stra.AlertDur, exp.Params)
} else {
info += fmt.Sprintf(" %s(%s,%ds) %s %v", exp.Metric, exp.Func, stra.AlertDur, exp.Eopt, exp.Threshold)
}
if len(stra.Exprs) == 1 {
for _, expr := range stra.Exprs {
history, info, lastValue, status := Judge(stra, expr, historyData, val, now)
statusArr = append(statusArr, status)
h := dataobj.History{
Metric: exp.Metric,
Tags: firstItem.TagsMap,
Granularity: int(firstItem.Step),
Points: historyData,
}
history = append(history, h)
if value == "" {
value = fmt.Sprintf("%s: %s", expr.Metric, lastValue)
} else {
value += fmt.Sprintf("; %s: %s", expr.Metric, lastValue)
}
defer func() {
if len(exps) == 1 {
bs, err := json.Marshal(history)
historyArr = append(historyArr, history)
eventInfo += info
}
} else { //与条件
for _, expr := range stra.Exprs {
respData, err := GetData(stra, expr, val, now, true)
if err != nil {
logger.Errorf("Marshal history:%+v err:%v", history, err)
logger.Errorf("stra:%+v get query data err:%v", stra, err)
return
}
event := &dataobj.Event{
ID: fmt.Sprintf("s_%d_%s", stra.Id, firstItem.PrimaryKey()),
Etime: now,
Endpoint: firstItem.Endpoint,
CurNid: firstItem.Nid,
Info: info,
Detail: string(bs),
Value: value,
Partition: redi.Config.Prefix + "/event/p" + strconv.Itoa(stra.Priority),
Sid: stra.Id,
Hashid: getHashId(stra.Id, firstItem),
if len(respData) != 1 {
logger.Errorf("stra:%+v get query data respData:%v err", stra, respData)
return
}
sendEventIfNeed(historyData, status, event, stra)
history, info, lastValue, status := Judge(stra, expr, dataobj.RRDData2HistoryData(respData[0].Values), val, now)
statusArr = append(statusArr, status)
if value == "" {
value = fmt.Sprintf("%s: %s", expr.Metric, lastValue)
} else {
value += fmt.Sprintf("; %s: %s", expr.Metric, lastValue)
}
historyArr = append(historyArr, history)
eventInfo += info
}
}()
leftValue, isTriggered = judgeItemWithStrategy(stra, historyData, exps[0], firstItem, now)
lastValue := "null"
if !math.IsNaN(float64(leftValue)) {
lastValue = strconv.FormatFloat(float64(leftValue), 'f', -1, 64)
}
if value == "" {
value = fmt.Sprintf("%s: %s", exp.Metric, lastValue)
bs, err := json.Marshal(historyArr)
if err != nil {
logger.Errorf("Marshal history:%+v err:%v", historyArr, err)
}
event := &dataobj.Event{
ID: fmt.Sprintf("s_%d_%s", stra.Id, val.PrimaryKey()),
Etime: now,
Endpoint: val.Endpoint,
CurNid: val.Nid,
Info: eventInfo,
Detail: string(bs),
Value: value,
Partition: redi.Config.Prefix + "/event/p" + strconv.Itoa(stra.Priority),
Sid: stra.Id,
Hashid: getHashId(stra.Id, val),
}
sendEventIfNeed(historyData, statusArr, event, stra)
}
func Judge(stra *model.Stra, exp model.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64) (history dataobj.History, info string, lastValue string, status bool) {
stats.Counter.Set("running", 1)
var leftValue dataobj.JsonFloat
if exp.Func == "nodata" {
info = fmt.Sprintf(" %s (%s,%ds)", exp.Metric, exp.Func, stra.AlertDur)
} else if exp.Func == "stddev" {
info = fmt.Sprintf(" %s (%s,%ds) %v", exp.Metric, exp.Func, stra.AlertDur, exp.Params)
} else if exp.Func == "happen" {
info = fmt.Sprintf(" %s (%s,%ds) %v %s %v", exp.Metric, exp.Func, stra.AlertDur, exp.Params, exp.Eopt, exp.Threshold)
} else {
value += fmt.Sprintf("; %s: %s", exp.Metric, lastValue)
info = fmt.Sprintf(" %s(%s,%ds) %s %v", exp.Metric, exp.Func, stra.AlertDur, exp.Eopt, exp.Threshold)
}
status = append(status, isTriggered)
//与条件情况下执行
if len(exps) > 1 {
if exps[1].Func == "nodata" { //nodata重新查询索引来进行告警判断
respData, err := GetData(stra, exps[1], firstItem, now, false)
if err != nil {
logger.Errorf("stra:%v get query data err:%v", stra, err)
judgeItem := &dataobj.JudgeItem{
Endpoint: firstItem.Endpoint,
Nid: firstItem.Nid,
Metric: stra.Exprs[0].Metric,
Tags: "",
DsType: "GAUGE",
}
Judge(stra, exps[1:], []*dataobj.HistoryData{}, judgeItem, now, history, info, value, extra, status)
return
}
leftValue, status = judgeItemWithStrategy(stra, historyData, exp, firstItem, now)
for i := range respData {
firstItem.Endpoint = respData[i].Endpoint
firstItem.Tags = getTags(respData[i].Counter)
firstItem.Step = respData[i].Step
Judge(stra, exps[1:], dataobj.RRDData2HistoryData(respData[i].Values), firstItem, now, history, info, value, extra, status)
}
lastValue = "null"
if !math.IsNaN(float64(leftValue)) {
lastValue = strconv.FormatFloat(float64(leftValue), 'f', -1, 64)
}
} else {
var respData []*dataobj.TsdbQueryResponse
var err error
if firstItem.Step != 0 { //上报点的逻辑会走到这里,使用第一个exp上报点的索引进行告警判断
respData, err = GetData(stra, exps[1], firstItem, now, true)
} else { //上一个规则是nodata没有获取到索引数据,重新获取索引做计算
respData, err = GetData(stra, exps[1], firstItem, now, false)
}
if err != nil {
logger.Errorf("stra:%+v get query data err:%v", stra, err)
return
}
for i := range respData {
if respData[i].Nid != "" {
firstItem.Nid = respData[i].Nid
} else if respData[i].Endpoint != "" {
firstItem.Endpoint = respData[i].Endpoint
}
firstItem.Tags = getTags(respData[i].Counter)
firstItem.Step = respData[i].Step
Judge(stra, exps[1:], dataobj.RRDData2HistoryData(respData[i].Values), firstItem, now, history, info, value, extra, status)
}
}
history = dataobj.History{
Metric: exp.Metric,
Tags: firstItem.TagsMap,
Granularity: int(firstItem.Step),
Points: historyData,
}
return
}
func judgeItemWithStrategy(stra *model.Stra, historyData []*dataobj.HistoryData, exp model.Exp, firstItem *dataobj.JudgeItem, now int64) (leftValue dataobj.JsonFloat, isTriggered bool) {
......
package judge
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/didi/nightingale/src/dataobj"
"github.com/didi/nightingale/src/model"
"github.com/didi/nightingale/src/modules/judge/backend/redi"
"github.com/didi/nightingale/src/modules/judge/cache"
"github.com/toolkits/pkg/concurrent/semaphore"
......@@ -38,8 +42,8 @@ func nodataJudge() {
stras := cache.NodataStra.GetAll()
for _, stra := range stras {
//nodata处理
if len(stra.Endpoints) == 0 && len(stra.Nids) == 0 {
logger.Debugf("stra:%+v endpoints or nids is null", stra)
if len(stra.Endpoints) == 0 {
logger.Warningf("stra:%+v endpoints is null", stra)
continue
}
......@@ -61,12 +65,10 @@ func nodataJudge() {
metric = data.Counter
}
if data.Endpoint == "" && data.Nid == "" {
if data.Endpoint == "" {
continue
}
judgeItem := &dataobj.JudgeItem{
Nid: data.Nid,
Endpoint: data.Endpoint,
Metric: metric,
Tags: tag,
......@@ -76,12 +78,61 @@ func nodataJudge() {
}
nodataJob.Acquire()
go AsyncJudge(nodataJob, stra, stra.Exprs, dataobj.RRDData2HistoryData(data.Values), judgeItem, now, []dataobj.History{}, "", "", "", []bool{})
go AsyncJudge(nodataJob, stra, stra.Exprs, dataobj.RRDData2HistoryData(data.Values), judgeItem, now)
}
}
}
func AsyncJudge(sema *semaphore.Semaphore, stra *model.Stra, exps []model.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, extra string, status []bool) {
func AsyncJudge(sema *semaphore.Semaphore, stra *model.Stra, exps []model.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64) {
defer sema.Release()
Judge(stra, exps, historyData, firstItem, now, history, info, value, extra, status)
historyArr := []dataobj.History{}
statusArr := []bool{}
eventInfo := ""
value := ""
for _, expr := range exps {
respData, err := GetData(stra, expr, firstItem, now, true)
if err != nil {
logger.Errorf("stra:%+v get query data err:%v", stra, err)
return
}
if len(respData) != 1 {
logger.Errorf("stra:%+v get query data respData:%v err", stra, respData)
return
}
history, info, lastValue, status := Judge(stra, expr, dataobj.RRDData2HistoryData(respData[0].Values), firstItem, now)
statusArr = append(statusArr, status)
if value == "" {
value = fmt.Sprintf("%s: %s", expr.Metric, lastValue)
} else {
value += fmt.Sprintf("; %s: %s", expr.Metric, lastValue)
}
historyArr = append(historyArr, history)
eventInfo += info
}
bs, err := json.Marshal(historyArr)
if err != nil {
logger.Errorf("Marshal history:%+v err:%v", historyArr, err)
}
event := &dataobj.Event{
ID: fmt.Sprintf("s_%d_%s", stra.Id, firstItem.PrimaryKey()),
Etime: now,
Endpoint: firstItem.Endpoint,
CurNid: firstItem.Nid,
Info: eventInfo,
Detail: string(bs),
Value: value,
Partition: redi.Config.Prefix + "/event/p" + strconv.Itoa(stra.Priority),
Sid: stra.Id,
Hashid: getHashId(stra.Id, firstItem),
}
sendEventIfNeed(historyData, statusArr, event, stra)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册