提交 ea25842f 编写于 作者: 7 710leo

Optimize alert function

上级 4b218742
......@@ -41,3 +41,16 @@ func RRDData2HistoryData(datas []*RRDData) []*HistoryData {
}
return historyDatas
}
func HistoryData2RRDData(datas []*HistoryData) []*RRDData {
rrdDatas := make([]*RRDData, len(datas))
for i := range datas {
data := &RRDData{
Timestamp: datas[i].Timestamp,
Value: datas[i].Value,
}
rrdDatas[i] = data
}
return rrdDatas
}
......@@ -129,6 +129,8 @@ func (m *MetricValue) CheckValidity(now int64) (err error) {
m.Timestamp = now
}
m.Timestamp = alignTs(m.Timestamp, int64(m.Step))
valid := true
var vv float64
......@@ -344,3 +346,7 @@ func (bm BuiltinMetricSlice) Swap(i, j int) {
func (bm BuiltinMetricSlice) Less(i, j int) bool {
return bm[i].String() < bm[j].String()
}
func alignTs(ts int64, period int64) int64 {
return ts - ts%period
}
......@@ -4,15 +4,17 @@ import (
"errors"
"fmt"
"math/rand"
"strings"
"time"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
"github.com/didi/nightingale/src/dataobj"
"github.com/didi/nightingale/src/modules/judge/cache"
"github.com/didi/nightingale/src/toolkits/address"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/didi/nightingale/src/toolkits/str"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
)
var (
......@@ -35,29 +37,107 @@ type Counter struct {
// 执行Query操作
// 默认不重试, 如果要做重试, 在这里完成
func Query(reqs []*dataobj.QueryData) ([]*dataobj.TsdbQueryResponse, error) {
stats.Counter.Set("get.data", 1)
func Query(reqs []*dataobj.QueryData, sid int64, expFunc string) []*dataobj.TsdbQueryResponse {
stats.Counter.Set("query.data", 1)
var resp *dataobj.QueryDataResp
var respData []*dataobj.TsdbQueryResponse
var err error
for i := 0; i < 3; i++ {
err = TransferConnPools.Call("", "Transfer.Query", reqs, &resp)
if err == nil {
break
respData, reqs = QueryFromMem(reqs, sid)
if len(reqs) > 0 {
stats.Counter.Set("query.data.by.transfer", 1)
for i := 0; i < 3; i++ {
err = TransferConnPools.Call("", "Transfer.Query", reqs, &resp)
if err == nil {
break
}
time.Sleep(500 * time.Millisecond)
}
if err != nil {
stats.Counter.Set("query.data.transfer.err", 1)
logger.Warning("get data err:%v msg:%+v, query data from mem", err, resp)
} else {
respData = append(respData, resp.Data...)
}
time.Sleep(500 * time.Millisecond)
}
if err != nil {
return nil, err
return respData
}
type QueryData struct {
Start int64 `json:"start"`
End int64 `json:"end"`
ConsolFunc string `json:"consolFunc"`
Endpoints []string `json:"endpoints"`
Counters []string `json:"counters"`
Step int `json:"step"`
DsType string `json:"dstype"`
}
func QueryFromMem(reqs []*dataobj.QueryData, sid int64) ([]*dataobj.TsdbQueryResponse, []*dataobj.QueryData) {
stats.Counter.Set("query.data.by.mem", 1)
var resps []*dataobj.TsdbQueryResponse
var newReqs []*dataobj.QueryData
for _, req := range reqs {
newReq := &dataobj.QueryData{
Start: req.Start,
End: req.End,
ConsolFunc: req.ConsolFunc,
Step: req.Step,
DsType: req.DsType,
}
for _, endpoint := range req.Endpoints {
for _, counter := range req.Counters {
metric, tagsMap := Counter2Metric(counter)
resp := &dataobj.TsdbQueryResponse{
Endpoint: endpoint,
Counter: counter,
Step: req.Step,
DsType: req.DsType,
}
item := &dataobj.JudgeItem{
Endpoint: endpoint,
Metric: metric,
TagsMap: tagsMap,
Sid: sid,
}
pk := item.MD5()
linkedList, exists := cache.HistoryBigMap[pk[0:2]].Get(pk)
if exists {
historyData := linkedList.QueryDataByTS(req.Start, req.End)
resp.Values = dataobj.HistoryData2RRDData(historyData)
}
if len(resp.Values) > 0 {
resps = append(resps, resp)
} else {
newReq.Endpoints = append(newReq.Endpoints, endpoint)
newReq.Counters = append(newReq.Counters, counter)
}
}
}
if len(newReq.Counters) > 0 {
newReqs = append(newReqs, newReq)
}
}
if resp.Msg != "" {
return nil, errors.New(resp.Msg)
return resps, newReqs
}
func Counter2Metric(counter string) (string, map[string]string) {
arr := strings.Split(counter, "/")
if len(arr) == 1 {
return arr[0], nil
}
return resp.Data, nil
return arr[0], str.DictedTagstring(arr[1])
}
func NewQueryRequest(endpoint, metric string, tagsMap map[string]string,
start, end int64) (*dataobj.QueryData, error) {
step int, start, end int64) (*dataobj.QueryData, error) {
if end <= start || start < 0 {
return nil, ErrorQueryParamIllegal
}
......@@ -71,6 +151,7 @@ func NewQueryRequest(endpoint, metric string, tagsMap map[string]string,
return &dataobj.QueryData{
Start: start,
End: end,
Step: step,
ConsolFunc: "AVERAGE", // 硬编码
Endpoints: []string{endpoint},
Counters: []string{counter},
......
......@@ -53,7 +53,7 @@ func (i *IndexMap) Get(id int64) []Series {
}
func (i *IndexMap) CleanLoop() {
t1 := time.NewTicker(time.Duration(60) * time.Second)
t1 := time.NewTicker(time.Duration(600) * time.Second)
for {
<-t1.C
i.Clean()
......
......@@ -108,3 +108,59 @@ func (ll *SafeLinkedList) HistoryData(limit int) ([]*dataobj.HistoryData, bool)
return vs, isEnough
}
func (ll *SafeLinkedList) QueryDataByTS(start, end int64) []*dataobj.HistoryData {
size := ll.Len()
if size == 0 {
return []*dataobj.HistoryData{}
}
firstElement := ll.Front()
firstItem := firstElement.Value.(*dataobj.JudgeItem)
var vs []*dataobj.HistoryData
judgeType := firstItem.DsType[0]
if judgeType == 'G' || judgeType == 'g' {
if firstItem.Timestamp < start {
//最新的点也比起始时间旧,直接返回
return vs
}
v := &dataobj.HistoryData{
Timestamp: firstItem.Timestamp,
Value: dataobj.JsonFloat(firstItem.Value),
Extra: firstItem.Extra,
}
vs = append(vs, v)
currentElement := firstElement
for {
nextElement := currentElement.Next()
if nextElement == nil {
return vs
}
if nextElement.Value.(*dataobj.JudgeItem).Timestamp < start {
return vs
}
if nextElement.Value.(*dataobj.JudgeItem).Timestamp > end {
currentElement = nextElement
continue
}
v := &dataobj.HistoryData{
Timestamp: nextElement.Value.(*dataobj.JudgeItem).Timestamp,
Value: dataobj.JsonFloat(nextElement.Value.(*dataobj.JudgeItem).Value),
Extra: nextElement.Value.(*dataobj.JudgeItem).Extra,
}
vs = append(vs, v)
currentElement = nextElement
}
}
return vs
}
......@@ -30,8 +30,18 @@ var (
EVENT_RECOVER = "recovery"
)
func GetStra(sid int64) (*model.Stra, bool) {
if stra, exists := cache.Strategy.Get(sid); exists {
return stra, exists
}
if stra, exists := cache.NodataStra.Get(sid); exists {
return stra, exists
}
return nil, false
}
func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem, now int64) {
stra, exists := cache.Strategy.Get(val.Sid)
stra, exists := GetStra(val.Sid)
if !exists {
stats.Counter.Set("point.miss", 1)
return
......@@ -237,43 +247,31 @@ func GetData(stra *model.Stra, exp model.Exp, firstItem *dataobj.JudgeItem, now
var reqs []*dataobj.QueryData
var respData []*dataobj.TsdbQueryResponse
var err error
stats.Counter.Set("get.data", 1)
if sameTag {
if sameTag { //与条件要求是相同tag的场景,不需要查询索引
if firstItem.Tags != "" && len(firstItem.TagsMap) == 0 {
firstItem.TagsMap = str.DictedTagstring(firstItem.Tags)
}
//+1 防止由于查询不到最新点,导致点数不够
start := now - int64(stra.AlertDur) - int64(firstItem.Step) + 1
queryParam, err := query.NewQueryRequest(firstItem.Endpoint, exp.Metric, firstItem.TagsMap, start, now)
queryParam, err := query.NewQueryRequest(firstItem.Endpoint, exp.Metric, firstItem.TagsMap, firstItem.Step, start, now)
if err != nil {
return respData, err
}
reqs = append(reqs, queryParam)
} else if firstItem != nil {
reqs, err = GetReqs(stra, exp.Metric, []string{firstItem.Endpoint}, now)
if err != nil {
stats.Counter.Set("get.index.err", 1)
return respData, err
}
} else {
reqs, err = GetReqs(stra, exp.Metric, stra.Endpoints, now)
if err != nil {
stats.Counter.Set("get.index.err", 1)
return respData, err
}
} else if firstItem != nil { //点驱动告警策略的场景
reqs = GetReqs(stra, exp.Metric, []string{firstItem.Endpoint}, now)
} else { //nodata的场景
reqs = GetReqs(stra, exp.Metric, stra.Endpoints, now)
}
respData, err = query.Query(reqs)
if err != nil {
stats.Counter.Set("get.data.err", 1)
if len(reqs) == 0 {
return respData, err
}
respData = query.Query(reqs, stra.Id, exp.Func)
if len(respData) < 1 {
stats.Counter.Set("get.data.null", 1)
err = fmt.Errorf("get query data is null")
......@@ -281,8 +279,9 @@ func GetData(stra *model.Stra, exp model.Exp, firstItem *dataobj.JudgeItem, now
return respData, err
}
func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([]*dataobj.QueryData, error) {
func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) []*dataobj.QueryData {
var reqs []*dataobj.QueryData
stats.Counter.Set("query.index", 1)
req := &query.IndexReq{
Endpoints: endpoints,
......@@ -302,10 +301,10 @@ func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([]
}
}
stats.Counter.Set("get.index", 1)
indexsData, err := query.Xclude(req)
if err != nil {
logger.Warning("get index err:", err)
stats.Counter.Set("query.index.err", 1)
logger.Warning("query index err:", err)
}
lostSeries := []cache.Series{}
......@@ -353,7 +352,7 @@ func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([]
seriess := cache.SeriesMap.Get(stra.Id)
if len(seriess) == 0 && err != nil {
return reqs, err
return reqs
}
step := 0
......@@ -361,7 +360,7 @@ func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([]
step = seriess[0].Step
}
//防止由于不到最新点,导致点数不够
//防止由于查询不到最新点,导致点数不够
start := now - int64(stra.AlertDur) - int64(step) + 1
for _, series := range seriess {
counter := series.Metric
......@@ -397,7 +396,7 @@ func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([]
reqs = append(reqs, queryParam)
}
return reqs, nil
return reqs
}
func sendEventIfNeed(historyData []*dataobj.HistoryData, status []bool, event *dataobj.Event, stra *model.Stra) {
......
......@@ -19,6 +19,12 @@ func NodataJudge(concurrency int) {
concurrency = 1000
}
nodataJob = semaphore.NewSemaphore(concurrency)
for {
if time.Now().Unix()%10 == 0 {
break
}
time.Sleep(1 * time.Second)
}
t1 := time.NewTicker(time.Duration(10) * time.Second)
nodataJudge()
......@@ -41,23 +47,6 @@ func nodataJudge() {
respData, err := GetData(stra, stra.Exprs[0], nil, now, false)
if err != nil {
logger.Errorf("stra:%+v get query data err:%v", stra, err)
//获取数据报错,直接出发nodata
for _, endpoint := range stra.Endpoints {
if endpoint == "" {
continue
}
judgeItem := &dataobj.JudgeItem{
Endpoint: endpoint,
Metric: stra.Exprs[0].Metric,
Tags: "",
TagsMap: map[string]string{},
DsType: "GAUGE",
Step: 10,
}
nodataJob.Acquire()
go AsyncJudge(nodataJob, stra, stra.Exprs, []*dataobj.HistoryData{}, judgeItem, now, []dataobj.History{}, "", "", "", []bool{})
}
continue
}
......
package rpc
import (
"time"
"github.com/didi/nightingale/src/dataobj"
"github.com/didi/nightingale/src/modules/judge/cache"
"github.com/didi/nightingale/src/modules/judge/judge"
......@@ -19,10 +17,11 @@ func (j *Judge) Ping(req dataobj.NullRpcRequest, resp *dataobj.SimpleRpcResponse
func (j *Judge) Send(items []*dataobj.JudgeItem, resp *dataobj.SimpleRpcResponse) error {
// 把当前时间的计算放在最外层,是为了减少获取时间时的系统调用开销
now := time.Now().Unix()
for _, item := range items {
now := item.Timestamp
pk := item.MD5()
logger.Debug("recv-->", item)
logger.Debugf("recv-->%+v", item)
stats.Counter.Set("push.in", 1)
go judge.ToJudge(cache.HistoryBigMap[pk[0:2]], pk, item, now)
......
......@@ -250,8 +250,6 @@ func convert2TsdbItem(d *dataobj.MetricValue) *dataobj.TsdbItem {
Max: "U",
}
item.Timestamp = alignTs(item.Timestamp, int64(item.Step))
return item
}
......
......@@ -75,26 +75,24 @@ func getStrategy() {
logger.Warningf("illegal stra:%v exprs", stra)
continue
}
// nodata 策略不使用 push 模式
if stra.Exprs[0].Func == "nodata" {
continue
}
metric := stra.Exprs[0].Metric
for _, endpoint := range stra.Endpoints {
key := str.PK(metric, endpoint) //TODO get straMap key, 此处需要优化
k1 := key[0:2] //为了加快查找,增加一层 map,key 为计算出来的 hash 的前 2 位
for _, exp := range stra.Exprs {
metric := exp.Metric
for _, endpoint := range stra.Endpoints {
key := str.PK(metric, endpoint) //TODO get straMap key, 此处需要优化
k1 := key[0:2] //为了加快查找,增加一层 map,key 为计算出来的 hash 的前 2 位
if _, exists := straMap[k1]; !exists {
straMap[k1] = make(map[string][]*model.Stra)
}
if _, exists := straMap[k1]; !exists {
straMap[k1] = make(map[string][]*model.Stra)
}
if _, exists := straMap[k1][key]; !exists {
straMap[k1][key] = []*model.Stra{stra}
stats.Counter.Set("stra.key", 1)
if _, exists := straMap[k1][key]; !exists {
straMap[k1][key] = []*model.Stra{stra}
stats.Counter.Set("stra.key", 1)
} else {
straMap[k1][key] = append(straMap[k1][key], stra)
} else {
straMap[k1][key] = append(straMap[k1][key], stra)
}
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册