未验证 提交 a659820b 编写于 作者: Q qinyening 提交者: GitHub

refactor judge (#405)

* refactor judge
* rdb user add organization typ status
上级 d32f9ef7
......@@ -6,20 +6,21 @@ use n9e_rdb;
CREATE TABLE `user`
(
`id` int unsigned not null AUTO_INCREMENT,
`uuid` varchar(128) not null comment 'use in cookie',
`username` varchar(64) not null comment 'login name, cannot rename',
`password` varchar(128) not null default '',
`dispname` varchar(32) not null default '' comment 'display name, chinese name',
`phone` varchar(16) not null default '',
`email` varchar(64) not null default '',
`im` varchar(64) not null default '',
`portrait` varchar(2048) not null default '',
`intro` varchar(2048) not null default '',
`is_root` tinyint(1) not null,
`leader_id` int unsigned not null default 0,
`leader_name` varchar(32) not null default '',
`create_at` timestamp not null default CURRENT_TIMESTAMP,
`id` int unsigned not null AUTO_INCREMENT,
`uuid` varchar(128) not null comment 'use in cookie',
`username` varchar(64) not null comment 'login name, cannot rename',
`password` varchar(128) not null default '',
`dispname` varchar(32) not null default '' comment 'display name, chinese name',
`phone` varchar(16) not null default '',
`email` varchar(64) not null default '',
`im` varchar(64) not null default '',
`portrait` varchar(2048) not null default '',
`intro` varchar(2048) not null default '',
`organization` varchar(255) not null default '',
`is_root` tinyint(1) not null,
`leader_id` int unsigned not null default 0,
`leader_name` varchar(32) not null default '',
`create_at` timestamp not null default CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY (`username`),
UNIQUE KEY (`uuid`)
......
......@@ -31,3 +31,7 @@ CREATE TABLE `captcha` (
alter table user add column create_at timestamp not null default CURRENT_TIMESTAMP;
update user set create_at = '2020-11-14 17:00:08';
alter table user add `organization` varchar(255) not null default '' after intro;
alter table user add `typ` int(1) not null default 1 comment '0: temporary account; 1: long-term account' after intro;
alter table user add `status` int(1) not null default 1 comment '0: disabled; 1: active 2: inactive' after intro;
......@@ -31,27 +31,29 @@ type HistoryData struct {
}
func RRDData2HistoryData(datas []*RRDData) []*HistoryData {
historyDatas := make([]*HistoryData, len(datas))
for i := range datas {
count := len(datas)
historyDatas := make([]*HistoryData, 0, count)
for i := count - 1; i >= 0; i-- {
historyData := &HistoryData{
Timestamp: datas[i].Timestamp,
Value: datas[i].Value,
}
historyDatas[i] = historyData
historyDatas = append(historyDatas, historyData)
}
return historyDatas
}
func HistoryData2RRDData(datas []*HistoryData) []*RRDData {
rrdDatas := make([]*RRDData, len(datas))
count := len(datas)
rrdDatas := make([]*RRDData, 0, count)
for i := range datas {
for i := count - 1; i >= 0; i-- {
data := &RRDData{
Timestamp: datas[i].Timestamp,
Value: datas[i].Value,
}
rrdDatas[i] = data
rrdDatas = append(rrdDatas, data)
}
return rrdDatas
}
......@@ -4,6 +4,44 @@ import (
"github.com/toolkits/pkg/slice"
)
func TreeUntilProjectsGetByNid(nid int64) ([]Node, error) {
nodes, err := NodeByIds([]int64{nid})
if err != nil {
return []Node{}, err
}
ret, err := PermNodes(nodes)
if err != nil {
return ret, err
}
cnt := len(ret)
all := make(map[string]Node, cnt)
for i := 0; i < cnt; i++ {
all[ret[i].Path] = ret[i]
}
// 只取project(含)以上的部分
var oks []Node
set := make(map[string]struct{})
for i := 0; i < cnt; i++ {
if ret[i].Cate == "project" {
paths := Paths(ret[i].Path)
for _, path := range paths {
if _, has := set[path]; has {
continue
}
set[path] = struct{}{}
oks = append(oks, all[path])
}
}
}
return oks, err
}
func TreeUntilProjectsGetByUser(user *User) ([]Node, error) {
ret, err := UserPermNodes(user)
if err != nil {
......
......@@ -28,20 +28,23 @@ const (
)
type User struct {
Id int64 `json:"id"`
UUID string `json:"uuid" xorm:"'uuid'"`
Username string `json:"username"`
Password string `json:"-"`
Dispname string `json:"dispname"`
Phone string `json:"phone"`
Email string `json:"email"`
Im string `json:"im"`
Portrait string `json:"portrait"`
Intro string `json:"intro"`
IsRoot int `json:"is_root"`
LeaderId int64 `json:"leader_id"`
LeaderName string `json:"leader_name"`
CreateAt time.Time `json:"create_at" xorm:"<-"`
Id int64 `json:"id"`
UUID string `json:"uuid" xorm:"'uuid'"`
Username string `json:"username"`
Password string `json:"-"`
Dispname string `json:"dispname"`
Phone string `json:"phone"`
Email string `json:"email"`
Im string `json:"im"`
Portrait string `json:"portrait"`
Intro string `json:"intro"`
Organization string `json:"organization"`
Typ int `json:"typ"`
Status int `json:"status"`
IsRoot int `json:"is_root"`
LeaderId int64 `json:"leader_id"`
LeaderName string `json:"leader_name"`
CreateAt time.Time `json:"create_at" xorm:"<-"`
}
func (u *User) CopyLdapAttr(sr *ldap.SearchResult) {
......
......@@ -25,14 +25,17 @@ func (ll *SafeLinkedList) Len() int {
}
// @return needJudge 如果是false不需要做judge,因为新上来的数据不合法
func (ll *SafeLinkedList) PushFrontAndMaintain(v *dataobj.JudgeItem, maxCount int) bool {
func (ll *SafeLinkedList) PushFrontAndMaintain(v *dataobj.JudgeItem, alertDur int) bool {
ll.Lock()
defer ll.Unlock()
sz := ll.L.Len()
lastPointTs := ll.L.Front().Value.(*dataobj.JudgeItem).Timestamp
earliestTs := v.Timestamp - int64(alertDur)
if sz > 0 {
// 新push上来的数据有可能重复了,或者timestamp不对,这种数据要丢掉
if v.Timestamp <= ll.L.Front().Value.(*dataobj.JudgeItem).Timestamp || v.Timestamp <= 0 {
if v.Timestamp <= lastPointTs {
return false
}
}
......@@ -40,12 +43,12 @@ func (ll *SafeLinkedList) PushFrontAndMaintain(v *dataobj.JudgeItem, maxCount in
ll.L.PushFront(v)
sz++
if sz <= maxCount {
return true
}
del := sz - maxCount
for i := 0; i < del; i++ {
for i := 0; i < sz; i++ {
if ll.L.Back().Value.(*dataobj.JudgeItem).Timestamp >= earliestTs {
break
}
//最前面的点已经不在告警策略时间周期内,丢弃掉
ll.L.Remove(ll.L.Back())
}
......@@ -53,60 +56,47 @@ func (ll *SafeLinkedList) PushFrontAndMaintain(v *dataobj.JudgeItem, maxCount in
}
// @param limit 至多返回这些,如果不够,有多少返回多少
// @return bool isEnough
func (ll *SafeLinkedList) HistoryData(limit int) ([]*dataobj.HistoryData, bool) {
if limit < 1 {
// 其实limit不合法,此处也返回false吧,上层代码要注意
// 因为false通常使上层代码进入异常分支,这样就统一了
return []*dataobj.HistoryData{}, false
}
func (ll *SafeLinkedList) HistoryData() []*dataobj.HistoryData {
size := ll.Len()
if size == 0 {
return []*dataobj.HistoryData{}, false
return []*dataobj.HistoryData{}
}
firstElement := ll.Front()
firstItem := firstElement.Value.(*dataobj.JudgeItem)
var vs []*dataobj.HistoryData
isEnough := true
judgeType := firstItem.DsType[0]
if judgeType == 'G' || judgeType == 'g' {
if size < limit {
// 有多少获取多少
limit = size
isEnough = false
}
vs = make([]*dataobj.HistoryData, limit)
vs[0] = &dataobj.HistoryData{
vs = make([]*dataobj.HistoryData, 0)
v := &dataobj.HistoryData{
Timestamp: firstItem.Timestamp,
Value: dataobj.JsonFloat(firstItem.Value),
Extra: firstItem.Extra,
}
vs = append(vs, v)
i := 1
currentElement := firstElement
for i < limit {
for i := 1; i < size; i++ {
nextElement := currentElement.Next()
if nextElement == nil {
isEnough = false
return vs, isEnough
return vs
}
vs[i] = &dataobj.HistoryData{
v := &dataobj.HistoryData{
Timestamp: nextElement.Value.(*dataobj.JudgeItem).Timestamp,
Value: dataobj.JsonFloat(nextElement.Value.(*dataobj.JudgeItem).Value),
Extra: nextElement.Value.(*dataobj.JudgeItem).Extra,
}
i++
vs = append(vs, v)
currentElement = nextElement
}
}
return vs, isEnough
return vs
}
func (ll *SafeLinkedList) QueryDataByTS(start, end int64) []*dataobj.HistoryData {
......
......@@ -42,6 +42,6 @@ func getData(c *gin.Context) {
errors.Dangerous(c.ShouldBind(&input))
pk := input.MD5()
linkedList, _ := cache.HistoryBigMap[pk[0:2]].Get(pk)
data, _ := linkedList.HistoryData(10)
data := linkedList.HistoryData()
render.Data(c, data, nil)
}
......@@ -19,12 +19,18 @@ type MaxFunction struct {
}
func (f MaxFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
count := len(vs)
if count < 1 {
return
}
duration := int(vs[0].Timestamp - vs[count-1].Timestamp)
if duration < f.Limit {
return
}
max := vs[0].Value
for i := 1; i < f.Limit; i++ {
for i := 1; i < len(vs); i++ {
if max < vs[i].Value {
max = vs[i].Value
}
......@@ -43,12 +49,19 @@ type MinFunction struct {
}
func (f MinFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
count := len(vs)
if count < 1 {
return
}
duration := int(vs[0].Timestamp - vs[count-1].Timestamp)
if duration < f.Limit {
return
}
min := vs[0].Value
for i := 1; i < f.Limit; i++ {
for i := 1; i < len(vs); i++ {
if min > vs[i].Value {
min = vs[i].Value
}
......@@ -67,12 +80,17 @@ type AllFunction struct {
}
func (f AllFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
count := len(vs)
if count < 1 {
return
}
isTriggered = true
for i := 0; i < f.Limit; i++ {
duration := int(vs[0].Timestamp - vs[count-1].Timestamp)
if duration < f.Limit {
return
}
for i := 0; i < len(vs); i++ {
isTriggered = checkIsTriggered(vs[i].Value, f.Operator, f.RightValue)
if !isTriggered {
break
......@@ -91,12 +109,18 @@ type SumFunction struct {
}
func (f SumFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
count := len(vs)
if count < 1 {
return
}
duration := int(vs[0].Timestamp - vs[count-1].Timestamp)
if duration < f.Limit {
return
}
sum := dataobj.JsonFloat(0.0)
for i := 0; i < f.Limit; i++ {
for i := 0; i < count; i++ {
sum += vs[i].Value
}
......@@ -113,16 +137,23 @@ type AvgFunction struct {
}
func (f AvgFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
vsLen := len(vs)
if vsLen < 1 {
return
}
duration := int(vs[0].Timestamp - vs[vsLen-1].Timestamp)
if duration < f.Limit {
return
}
sum := dataobj.JsonFloat(0.0)
for i := 0; i < f.Limit; i++ {
for i := 0; i < vsLen; i++ {
sum += vs[i].Value
}
leftValue = sum / dataobj.JsonFloat(f.Limit)
leftValue = sum / dataobj.JsonFloat(vsLen)
isTriggered = checkIsTriggered(leftValue, f.Operator, f.RightValue)
return
}
......@@ -134,22 +165,29 @@ type StddevFunction struct {
}
func (f StddevFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
var sum float64
vsLen := len(vs)
if vsLen < 1 {
return
}
var sum float64
for i := 0; i < f.Limit; i++ {
duration := int(vs[0].Timestamp - vs[vsLen-1].Timestamp)
if duration < f.Limit {
return
}
for i := 0; i < vsLen; i++ {
sum += float64(vs[i].Value)
}
mean := sum / float64(f.Limit)
mean := sum / float64(vsLen)
var num float64
for i := 0; i < f.Limit; i++ {
for i := 0; i < vsLen; i++ {
num += math.Pow(float64(vs[i].Value)-mean, 2)
}
std := math.Sqrt(num / float64(f.Limit))
std := math.Sqrt(num / float64(vsLen))
upperBound := mean + std*float64(f.Num)
lowerBound := mean - std*float64(f.Num)
......@@ -167,14 +205,20 @@ type DiffFunction struct {
// 只要有一个点的diff触发阈值,就报警
func (f DiffFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
vsLen := len(vs)
if vsLen < 1 {
return
}
duration := int(vs[0].Timestamp - vs[vsLen-1].Timestamp)
if duration < f.Limit {
return
}
first := vs[0].Value
isTriggered = false
for i := 1; i < f.Limit; i++ {
for i := 1; i < vsLen; i++ {
// diff是当前值减去历史值
leftValue = first - vs[i].Value
isTriggered = checkIsTriggered(leftValue, f.Operator, f.RightValue)
......@@ -195,13 +239,19 @@ type PDiffFunction struct {
}
func (f PDiffFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
vsLen := len(vs)
if vsLen < 1 {
return
}
duration := int(vs[0].Timestamp - vs[vsLen-1].Timestamp)
if duration < f.Limit {
return
}
first := vs[0].Value
isTriggered = false
for i := 1; i < f.Limit; i++ {
for i := 1; i < len(vs); i++ {
if vs[i].Value == 0 {
continue
}
......@@ -260,16 +310,23 @@ type CAvgAbsFunction struct {
}
func (f CAvgAbsFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
vsLen := len(vs)
if vsLen < 1 {
return
}
duration := int(vs[0].Timestamp - vs[vsLen-1].Timestamp)
if duration < f.Limit {
return
}
sum := dataobj.JsonFloat(0.0)
for i := 0; i < f.Limit; i++ {
for i := 0; i < vsLen; i++ {
sum += vs[i].Value
}
value := sum / dataobj.JsonFloat(f.Limit)
value := sum / dataobj.JsonFloat(vsLen)
leftValue = dataobj.JsonFloat(math.Abs(float64(value) - float64(f.CompareValue)))
isTriggered = checkIsTriggered(leftValue, f.Operator, f.RightValue)
......@@ -285,16 +342,22 @@ type CAvgFunction struct {
}
func (f CAvgFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
vsLen := len(vs)
if vsLen < 1 {
return
}
duration := int(vs[0].Timestamp - vs[vsLen-1].Timestamp)
if duration < f.Limit {
return
}
sum := dataobj.JsonFloat(0.0)
for i := 0; i < f.Limit; i++ {
for i := 0; i < vsLen; i++ {
sum += vs[i].Value
}
leftValue = sum/dataobj.JsonFloat(f.Limit) - dataobj.JsonFloat(f.CompareValue)
leftValue = sum/dataobj.JsonFloat(vsLen) - dataobj.JsonFloat(f.CompareValue)
isTriggered = checkIsTriggered(leftValue, f.Operator, f.RightValue)
return
......@@ -309,16 +372,22 @@ type CAvgRateAbsFunction struct {
}
func (f CAvgRateAbsFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
vsLen := len(vs)
if vsLen < 1 {
return
}
duration := int(vs[0].Timestamp - vs[vsLen-1].Timestamp)
if duration < f.Limit {
return
}
sum := dataobj.JsonFloat(0.0)
for i := 0; i < f.Limit; i++ {
for i := 0; i < vsLen; i++ {
sum += vs[i].Value
}
value := sum / dataobj.JsonFloat(f.Limit)
value := sum / dataobj.JsonFloat(vsLen)
leftValue = dataobj.JsonFloat(math.Abs(float64(value) - float64(f.CompareValue)))
isTriggered = checkIsTriggered(leftValue, f.Operator, f.RightValue)
......@@ -334,16 +403,22 @@ type CAvgRateFunction struct {
}
func (f CAvgRateFunction) Compute(vs []*dataobj.HistoryData) (leftValue dataobj.JsonFloat, isTriggered bool) {
if len(vs) < f.Limit {
vsLen := len(vs)
if vsLen < 1 {
return
}
duration := int(vs[0].Timestamp - vs[vsLen-1].Timestamp)
if duration < f.Limit {
return
}
sum := dataobj.JsonFloat(0.0)
for i := 0; i < f.Limit; i++ {
for i := 0; i < vsLen; i++ {
sum += vs[i].Value
}
value := sum / dataobj.JsonFloat(f.Limit)
value := sum / dataobj.JsonFloat(vsLen)
leftValue = (value - dataobj.JsonFloat(f.CompareValue)) / dataobj.JsonFloat(math.Abs(f.CompareValue))
isTriggered = checkIsTriggered(leftValue, f.Operator, f.RightValue)
......
......@@ -47,13 +47,9 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem,
return
}
needCount := stra.AlertDur / int(val.Step)
if needCount < 1 {
needCount = 1
}
linkedList, exists := historyMap.Get(key)
if exists {
needJudge := linkedList.PushFrontAndMaintain(val, needCount)
needJudge := linkedList.PushFrontAndMaintain(val, stra.AlertDur)
if !needJudge {
return
}
......@@ -64,145 +60,115 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem,
historyMap.Set(key, linkedList)
}
historyData, isEnough := linkedList.HistoryData(needCount)
if !isEnough {
historyData := linkedList.HistoryData()
if len(historyData) == 0 {
return
}
history := []dataobj.History{}
Judge(stra, stra.Exprs, historyData, val, now, history, "", "", "", []bool{})
}
historyArr := []dataobj.History{}
statusArr := []bool{}
eventInfo := ""
value := ""
func Judge(stra *models.Stra, exps []models.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, extra string, status []bool) {
stats.Counter.Set("running", 1)
if len(stra.Exprs) == 1 {
for _, expr := range stra.Exprs {
history, info, lastValue, status := Judge(stra, expr, historyData, val, now)
statusArr = append(statusArr, status)
if len(exps) < 1 {
stats.Counter.Set("stra.illegal", 1)
logger.Warningf("stra:%+v exp is null", stra)
return
}
exp := exps[0]
var leftValue dataobj.JsonFloat
var isTriggered bool
if exp.Func == "nodata" {
info += fmt.Sprintf(" %s (%s,%ds)", exp.Metric, exp.Func, stra.AlertDur)
} else if exp.Func == "stddev" {
info += fmt.Sprintf(" %s (%s,%ds) %v", exp.Metric, exp.Func, stra.AlertDur, exp.Params)
} else {
info += fmt.Sprintf(" %s(%s,%ds) %s %v", exp.Metric, exp.Func, stra.AlertDur, exp.Eopt, exp.Threshold)
}
h := dataobj.History{
Metric: exp.Metric,
Tags: firstItem.TagsMap,
Granularity: int(firstItem.Step),
Points: historyData,
}
history = append(history, h)
if value == "" {
value = fmt.Sprintf("%s: %s", expr.Metric, lastValue)
} else {
value += fmt.Sprintf("; %s: %s", expr.Metric, lastValue)
}
defer func() {
if len(exps) == 1 {
bs, err := json.Marshal(history)
historyArr = append(historyArr, history)
eventInfo += info
}
} else { //与条件
for _, expr := range stra.Exprs {
respData, err := GetData(stra, expr, val, now, true)
if err != nil {
logger.Errorf("Marshal history:%+v err:%v", history, err)
logger.Errorf("stra:%+v get query data err:%v", stra, err)
return
}
event := &dataobj.Event{
ID: fmt.Sprintf("s_%d_%s", stra.Id, firstItem.PrimaryKey()),
Etime: now,
Endpoint: firstItem.Endpoint,
CurNid: firstItem.Nid,
Info: info,
Detail: string(bs),
Value: value,
Partition: redi.Config.Prefix + "/event/p" + strconv.Itoa(stra.Priority),
Sid: stra.Id,
Hashid: getHashId(stra.Id, firstItem),
if len(respData) != 1 {
logger.Errorf("stra:%+v get query data respData:%v err", stra, respData)
return
}
sendEventIfNeed(historyData, status, event, stra)
history, info, lastValue, status := Judge(stra, expr, dataobj.RRDData2HistoryData(respData[0].Values), val, now)
statusArr = append(statusArr, status)
if value == "" {
value = fmt.Sprintf("%s: %s", expr.Metric, lastValue)
} else {
value += fmt.Sprintf("; %s: %s", expr.Metric, lastValue)
}
historyArr = append(historyArr, history)
eventInfo += info
}
}()
leftValue, isTriggered = judgeItemWithStrategy(stra, historyData, exps[0], firstItem, now)
lastValue := "null"
if !math.IsNaN(float64(leftValue)) {
lastValue = strconv.FormatFloat(float64(leftValue), 'f', -1, 64)
}
if value == "" {
value = fmt.Sprintf("%s: %s", exp.Metric, lastValue)
bs, err := json.Marshal(historyArr)
if err != nil {
logger.Errorf("Marshal history:%+v err:%v", historyArr, err)
}
event := &dataobj.Event{
ID: fmt.Sprintf("s_%d_%s", stra.Id, val.PrimaryKey()),
Etime: now,
Endpoint: val.Endpoint,
CurNid: val.Nid,
Info: eventInfo,
Detail: string(bs),
Value: value,
Partition: redi.Config.Prefix + "/event/p" + strconv.Itoa(stra.Priority),
Sid: stra.Id,
Hashid: getHashId(stra.Id, val),
}
sendEventIfNeed(historyData, statusArr, event, stra)
}
func Judge(stra *models.Stra, exp models.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64) (history dataobj.History, info string, lastValue string, status bool) {
stats.Counter.Set("running", 1)
var leftValue dataobj.JsonFloat
if exp.Func == "nodata" {
info = fmt.Sprintf(" %s (%s,%ds)", exp.Metric, exp.Func, stra.AlertDur)
} else if exp.Func == "stddev" {
info = fmt.Sprintf(" %s (%s,%ds) %v", exp.Metric, exp.Func, stra.AlertDur, exp.Params)
} else if exp.Func == "happen" {
info = fmt.Sprintf(" %s (%s,%ds) %v %s %v", exp.Metric, exp.Func, stra.AlertDur, exp.Params, exp.Eopt, exp.Threshold)
} else {
value += fmt.Sprintf("; %s: %s", exp.Metric, lastValue)
info = fmt.Sprintf(" %s(%s,%ds) %s %v", exp.Metric, exp.Func, stra.AlertDur, exp.Eopt, exp.Threshold)
}
status = append(status, isTriggered)
//与条件情况下执行
if len(exps) > 1 {
if exps[1].Func == "nodata" { //nodata重新查询索引来进行告警判断
respData, err := GetData(stra, exps[1], firstItem, now, false)
if err != nil {
logger.Errorf("stra:%v get query data err:%v", stra, err)
judgeItem := &dataobj.JudgeItem{
Endpoint: firstItem.Endpoint,
Nid: firstItem.Nid,
Metric: stra.Exprs[0].Metric,
Tags: "",
DsType: "GAUGE",
}
Judge(stra, exps[1:], []*dataobj.HistoryData{}, judgeItem, now, history, info, value, extra, status)
return
}
leftValue, status = judgeItemWithStrategy(stra, historyData, exp, firstItem, now)
for i := range respData {
firstItem.Endpoint = respData[i].Endpoint
firstItem.Tags = getTags(respData[i].Counter)
firstItem.Step = respData[i].Step
Judge(stra, exps[1:], dataobj.RRDData2HistoryData(respData[i].Values), firstItem, now, history, info, value, extra, status)
}
lastValue = "null"
if !math.IsNaN(float64(leftValue)) {
lastValue = strconv.FormatFloat(float64(leftValue), 'f', -1, 64)
}
} else {
var respData []*dataobj.TsdbQueryResponse
var err error
if firstItem.Step != 0 { //上报点的逻辑会走到这里,使用第一个exp上报点的索引进行告警判断
respData, err = GetData(stra, exps[1], firstItem, now, true)
} else { //上一个规则是nodata没有获取到索引数据,重新获取索引做计算
respData, err = GetData(stra, exps[1], firstItem, now, false)
}
if err != nil {
logger.Errorf("stra:%+v get query data err:%v", stra, err)
return
}
for i := range respData {
if respData[i].Nid != "" {
firstItem.Nid = respData[i].Nid
} else if respData[i].Endpoint != "" {
firstItem.Endpoint = respData[i].Endpoint
}
firstItem.Tags = getTags(respData[i].Counter)
firstItem.Step = respData[i].Step
Judge(stra, exps[1:], dataobj.RRDData2HistoryData(respData[i].Values), firstItem, now, history, info, value, extra, status)
}
}
history = dataobj.History{
Metric: exp.Metric,
Tags: firstItem.TagsMap,
Granularity: int(firstItem.Step),
Points: historyData,
}
return
}
func judgeItemWithStrategy(stra *models.Stra, historyData []*dataobj.HistoryData, exp models.Exp, firstItem *dataobj.JudgeItem, now int64) (leftValue dataobj.JsonFloat, isTriggered bool) {
straFunc := exp.Func
var straParam []interface{}
if firstItem.Step == 0 {
logger.Errorf("wrong step:%+v", firstItem)
return
}
limit := stra.AlertDur / firstItem.Step
if limit <= 0 {
limit = 1
}
straParam = append(straParam, limit)
straParam = append(straParam, stra.AlertDur)
switch straFunc {
case "happen", "stddev":
......@@ -269,7 +235,7 @@ func GetData(stra *models.Stra, exp models.Exp, firstItem *dataobj.JudgeItem, no
}
reqs = append(reqs, queryParam)
} else if firstItem != nil { //点驱动告警策略的场景
} /*else if firstItem != nil { //点驱动告警策略的场景
var nids, endpoints []string
if firstItem.Nid != "" {
nids = []string{firstItem.Nid}
......@@ -277,9 +243,9 @@ func GetData(stra *models.Stra, exp models.Exp, firstItem *dataobj.JudgeItem, no
endpoints = []string{firstItem.Endpoint}
}
reqs = GetReqs(stra, exp.Metric, nids, endpoints, now)
} else { //nodata的场景
reqs = GetReqs(stra, exp.Metric, stra.Nids, stra.Endpoints, now)
}
//} else { //nodata的场景
// reqs = GetReqs(stra, exp.Metric, stra.Nids, stra.Endpoints, now)
}*/
if len(reqs) == 0 {
return respData, err
......
package judge
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/models"
"github.com/didi/nightingale/src/modules/judge/backend/redi"
"github.com/didi/nightingale/src/modules/judge/cache"
"github.com/toolkits/pkg/concurrent/semaphore"
......@@ -76,12 +80,61 @@ func nodataJudge() {
}
nodataJob.Acquire()
go AsyncJudge(nodataJob, stra, stra.Exprs, dataobj.RRDData2HistoryData(data.Values), judgeItem, now, []dataobj.History{}, "", "", "", []bool{})
go AsyncJudge(nodataJob, stra, stra.Exprs, dataobj.RRDData2HistoryData(data.Values), judgeItem, now)
}
}
}
func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, extra string, status []bool) {
func AsyncJudge(sema *semaphore.Semaphore, stra *models.Stra, exps []models.Exp, historyData []*dataobj.HistoryData, firstItem *dataobj.JudgeItem, now int64) {
defer sema.Release()
Judge(stra, exps, historyData, firstItem, now, history, info, value, extra, status)
historyArr := []dataobj.History{}
statusArr := []bool{}
eventInfo := ""
value := ""
for _, expr := range exps {
respData, err := GetData(stra, expr, firstItem, now, true)
if err != nil {
logger.Errorf("stra:%+v get query data err:%v", stra, err)
return
}
if len(respData) != 1 {
logger.Errorf("stra:%+v get query data respData:%v err", stra, respData)
return
}
history, info, lastValue, status := Judge(stra, expr, dataobj.RRDData2HistoryData(respData[0].Values), firstItem, now)
statusArr = append(statusArr, status)
if value == "" {
value = fmt.Sprintf("%s: %s", expr.Metric, lastValue)
} else {
value += fmt.Sprintf("; %s: %s", expr.Metric, lastValue)
}
historyArr = append(historyArr, history)
eventInfo += info
}
bs, err := json.Marshal(historyArr)
if err != nil {
logger.Errorf("Marshal history:%+v err:%v", historyArr, err)
}
event := &dataobj.Event{
ID: fmt.Sprintf("s_%d_%s", stra.Id, firstItem.PrimaryKey()),
Etime: now,
Endpoint: firstItem.Endpoint,
CurNid: firstItem.Nid,
Info: eventInfo,
Detail: string(bs),
Value: value,
Partition: redi.Config.Prefix + "/event/p" + strconv.Itoa(stra.Priority),
Sid: stra.Id,
Hashid: getHashId(stra.Id, firstItem),
}
sendEventIfNeed(historyData, statusArr, event, stra)
}
......@@ -150,6 +150,7 @@ func Config(r *gin.Engine) {
v1.GET("/nodes", nodeGets)
v1.GET("/node/:id", nodeGet)
v1.GET("/node/:id/projs", v1treeUntilProjectGetsByNid)
v1.GET("/tree/projs", v1TreeUntilProjectGets)
// 外部系统推送一些操作日志过来,RDB统一存储,实际用MQ会更好一些
......
......@@ -116,6 +116,13 @@ func treeUntilLeafGets(c *gin.Context) {
renderData(c, oks, nil)
}
func v1treeUntilProjectGetsByNid(c *gin.Context) {
nid := urlParamInt64(c, "id")
oks, err := models.TreeUntilProjectsGetByNid(nid)
renderData(c, oks, err)
}
// 这个方法,展示的树只到project,节点搜索功能放到前台去
func treeUntilProjectGets(c *gin.Context) {
me := loginUser(c)
......
......@@ -42,6 +42,8 @@ type userProfileForm struct {
Im string `json:"im"`
IsRoot int `json:"is_root"`
LeaderId int64 `json:"leader_id"`
Typ int `json:"typ"`
Status int `json:"status"`
}
func userAddPost(c *gin.Context) {
......@@ -129,7 +131,17 @@ func userProfilePut(c *gin.Context) {
target.IsRoot = f.IsRoot
}
err := target.Update("dispname", "phone", "email", "im", "is_root", "leader_id", "leader_name")
if f.Typ != target.Typ {
arr = append(arr, fmt.Sprintf("typ: %s -> %s", target.Typ, f.Typ))
target.Typ = f.Typ
}
if f.Status != target.Status {
arr = append(arr, fmt.Sprintf("typ: %s -> %s", target.Status, f.Status))
target.Status = f.Status
}
err := target.Update("dispname", "phone", "email", "im", "is_root", "leader_id", "leader_name", "typ", "status")
if err == nil && len(arr) > 0 {
content := strings.Join(arr, ",")
go models.OperationLogNew(root.Username, "user", target.Id, fmt.Sprintf("UserModify %s %s", target.Username, content))
......
......@@ -9,6 +9,7 @@ import (
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/modules/transfer/aggr"
"github.com/didi/nightingale/src/modules/transfer/cache"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/Shopify/sarama"
"github.com/toolkits/pkg/logger"
......@@ -72,6 +73,8 @@ func consumer() {
item := aggrOut2MetricValue(out)
var args []*dataobj.MetricValue
args = append(args, item)
stats.Counter.Set("aggr.points.in", len(args))
PushData(args)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册