提交 bddef26a 编写于 作者: U UlricQin

Merge branch 'master' of github.com:didi/nightingale

*.css linguist-language=go
*.less linguist-language=go
*.js linguist-language=go
*.tsx linguist-language=go
*.html linguist-language=go
......@@ -53,4 +53,3 @@ type AlertUpgrade struct {
Duration int `json:"duration"`
Level int `json:"level"`
}
......@@ -14,7 +14,7 @@ type CounterTsMap struct {
}
func NewCounterTsMap() *CounterTsMap {
return &CounterTsMap{M: make(map[string]int64, 0)}
return &CounterTsMap{M: make(map[string]int64)}
}
func (c *CounterTsMap) Set(counter string, ts int64) {
......
......@@ -37,8 +37,6 @@ func (e *EndpointIndexMap) Push(item dataobj.IndexModel, now int64) {
return
}
metricIndex.Set(item, counter, now)
return
}
func (e *EndpointIndexMap) Clean(timeDuration int64) {
......@@ -123,7 +121,7 @@ func (e *EndpointIndexMap) GetEndpoints() []string {
length := len(e.M)
ret := make([]string, length)
i := 0
for endpoint, _ := range e.M {
for endpoint := range e.M {
ret[i] = endpoint
i++
}
......
......@@ -69,6 +69,4 @@ func reportEndpoint(endpoints []interface{}) {
}
time.Sleep(100 * time.Millisecond)
}
return
}
......@@ -41,7 +41,7 @@ var semaPermanence = semaphore.NewSemaphore(1)
func InitDB(cfg CacheSection) {
Config = cfg
IndexDB = &EndpointIndexMap{M: make(map[string]*MetricIndexMap, 0)}
IndexDB = &EndpointIndexMap{M: make(map[string]*MetricIndexMap)}
NewEndpoints = list.NewSafeListLimited(100000)
Rebuild(Config.PersistDir, Config.RebuildWorker)
......
......@@ -78,7 +78,6 @@ func (m *MetricIndexMap) DelMetric(metric string) {
m.Lock()
defer m.Unlock()
delete(m.Data, metric)
return
}
func (m *MetricIndexMap) Len() int {
......@@ -106,7 +105,7 @@ func (m *MetricIndexMap) GetMetrics() []string {
m.RLock()
defer m.RUnlock()
var metrics []string
for k, _ := range m.Data {
for k := range m.Data {
metrics = append(metrics, k)
}
return metrics
......
......@@ -16,8 +16,9 @@ func (t TagPairs) Len() int {
}
func (t TagPairs) Less(i, j int) bool {
return t[i].Key > t[i].Key
return t[i].Key > t[j].Key
}
func (t TagPairs) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}
......@@ -135,7 +136,7 @@ func TagPairToMap(tagPairs []*TagPair) map[string][]string {
func GetSortTags(tagMap map[string][]string) []*TagPair {
var keys []string
for key, _ := range tagMap {
for key := range tagMap {
keys = append(keys, key)
}
sort.Strings(keys)
......
......@@ -33,7 +33,7 @@ func (t *TagkvIndex) GetTagkv() []*TagPair {
for k, vm := range t.Tagkv {
var vs []string
for v, _ := range vm {
for v := range vm {
vs = append(vs, v)
}
tagkv := TagPair{
......@@ -53,7 +53,7 @@ func (t *TagkvIndex) GetTagkvMap() map[string][]string {
for k, vm := range t.Tagkv {
var vs []string
for v, _ := range vm {
for v := range vm {
vs = append(vs, v)
}
......
......@@ -125,7 +125,7 @@ func GetTagPairs(c *gin.Context) {
for tagk, tagvFilter := range tagkvFilter {
tagvs := []string{}
for v, _ := range tagvFilter {
for v := range tagvFilter {
tagvs = append(tagvs, v)
}
tagkv := &cache.TagPair{
......@@ -305,7 +305,7 @@ func GetIndexByClude(c *gin.Context) {
var err error
var tags []string
if len(includeList) == 0 && len(excludeList) == 0 {
for counter, _ := range counterMap {
for counter := range counterMap {
tagList = append(tagList, counter)
}
resp = append(resp, XcludeResp{
......
......@@ -42,5 +42,4 @@ func push(args []*dataobj.IndexModel, reply *dataobj.IndexResp) {
reply.Total = len(args)
reply.Latency = (time.Now().UnixNano() - start.UnixNano()) / 1000000
return
}
......@@ -30,7 +30,7 @@ type IndexRequest struct {
type Counter struct {
Counter string `json:"counter"`
Step int `json:"step"`
Dstype string `json:"dstype`
Dstype string `json:"dstype"`
}
// 执行Query操作
......
......@@ -12,32 +12,32 @@ type SafeLinkedList struct {
L *list.List
}
func (this *SafeLinkedList) Front() *list.Element {
this.RLock()
defer this.RUnlock()
return this.L.Front()
func (ll *SafeLinkedList) Front() *list.Element {
ll.RLock()
defer ll.RUnlock()
return ll.L.Front()
}
func (this *SafeLinkedList) Len() int {
this.RLock()
defer this.RUnlock()
return this.L.Len()
func (ll *SafeLinkedList) Len() int {
ll.RLock()
defer ll.RUnlock()
return ll.L.Len()
}
// @return needJudge 如果是false不需要做judge,因为新上来的数据不合法
func (this *SafeLinkedList) PushFrontAndMaintain(v *dataobj.JudgeItem, maxCount int) bool {
this.Lock()
defer this.Unlock()
func (ll *SafeLinkedList) PushFrontAndMaintain(v *dataobj.JudgeItem, maxCount int) bool {
ll.Lock()
defer ll.Unlock()
sz := this.L.Len()
sz := ll.L.Len()
if sz > 0 {
// 新push上来的数据有可能重复了,或者timestamp不对,这种数据要丢掉
if v.Timestamp <= this.L.Front().Value.(*dataobj.JudgeItem).Timestamp || v.Timestamp <= 0 {
if v.Timestamp <= ll.L.Front().Value.(*dataobj.JudgeItem).Timestamp || v.Timestamp <= 0 {
return false
}
}
this.L.PushFront(v)
ll.L.PushFront(v)
sz++
if sz <= maxCount {
......@@ -46,7 +46,7 @@ func (this *SafeLinkedList) PushFrontAndMaintain(v *dataobj.JudgeItem, maxCount
del := sz - maxCount
for i := 0; i < del; i++ {
this.L.Remove(this.L.Back())
ll.L.Remove(ll.L.Back())
}
return true
......@@ -54,19 +54,19 @@ func (this *SafeLinkedList) PushFrontAndMaintain(v *dataobj.JudgeItem, maxCount
// @param limit 至多返回这些,如果不够,有多少返回多少
// @return bool isEnough
func (this *SafeLinkedList) HistoryData(limit int) ([]*dataobj.RRDData, bool) {
func (ll *SafeLinkedList) HistoryData(limit int) ([]*dataobj.RRDData, bool) {
if limit < 1 {
// 其实limit不合法,此处也返回false吧,上层代码要注意
// 因为false通常使上层代码进入异常分支,这样就统一了
return []*dataobj.RRDData{}, false
}
size := this.Len()
size := ll.Len()
if size == 0 {
return []*dataobj.RRDData{}, false
}
firstElement := this.Front()
firstElement := ll.Front()
firstItem := firstElement.Value.(*dataobj.JudgeItem)
var vs []*dataobj.RRDData
......
......@@ -58,10 +58,10 @@ func ToJudge(historyMap *cache.JudgeItemMap, key string, val *dataobj.JudgeItem,
}
history := []dataobj.History{}
Judge(stra, stra.Exprs, historyData, val, now, history, "", "")
Judge(stra, stra.Exprs, historyData, val, now, history, "", "", []bool{})
}
func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string) {
func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, status []bool) {
stats.Counter.Set("running", 1)
if len(exps) < 1 {
......@@ -89,7 +89,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
defer func() {
if len(exps) == 1 {
bytes, err := json.Marshal(history)
bs, err := json.Marshal(history)
if err != nil {
logger.Error("Marshal history:%v err:%v", history, err)
}
......@@ -98,14 +98,14 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
Etime: now,
Endpoint: firstItem.Endpoint,
Info: info,
Detail: string(bytes),
Detail: string(bs),
Value: value,
Partition: redi.Config.Prefix + "/event/p" + strconv.Itoa(stra.Priority),
Sid: stra.Id,
Hashid: getHashId(stra.Id, firstItem),
}
sendEventIfNeed(historyData, isTriggered, event)
sendEventIfNeed(historyData, status, event)
}
}()
......@@ -115,10 +115,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
} else {
value += fmt.Sprintf("; %s: %v", exp.Metric, leftValue)
}
if !isTriggered {
return
}
status = append(status, isTriggered)
//与条件情况下执行
if len(exps) > 1 {
......@@ -133,15 +130,15 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
Tags: "",
DsType: "GAUGE",
}
Judge(stra, exps[1:], []*dataobj.RRDData{}, judgeItem, now, history, info, value)
Judge(stra, exps[1:], []*dataobj.RRDData{}, judgeItem, now, history, info, value, status)
return
}
for i, _ := range respData {
for i := range respData {
firstItem.Endpoint = respData[i].Endpoint
firstItem.Tags = getTags(respData[i].Counter)
firstItem.Step = respData[i].Step
Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value)
Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, status)
}
} else {
......@@ -156,11 +153,11 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
logger.Errorf("stra:%v get query data err:%v", stra, err)
return
}
for i, _ := range respData {
for i := range respData {
firstItem.Endpoint = respData[i].Endpoint
firstItem.Tags = getTags(respData[i].Counter)
firstItem.Step = respData[i].Step
Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value)
Judge(stra, exps[1:], respData[i].Values, firstItem, now, history, info, value, status)
}
}
}
......@@ -207,7 +204,7 @@ func judgeItemWithStrategy(stra *model.Stra, historyData []*dataobj.RRDData, exp
var sum float64
data := respItems[0]
for i, _ := range data.Values {
for i := range data.Values {
sum += float64(data.Values[i].Value)
}
......@@ -387,7 +384,12 @@ func GetReqs(stra *model.Stra, metric string, endpoints []string, now int64) ([]
return reqs, nil
}
func sendEventIfNeed(historyData []*dataobj.RRDData, isTriggered bool, event *dataobj.Event) {
func sendEventIfNeed(historyData []*dataobj.RRDData, status []bool, event *dataobj.Event) {
isTriggered := true
for _, s := range status {
isTriggered = isTriggered && s
}
lastEvent, exists := cache.LastEvents.Get(event.ID)
if isTriggered {
event.EventType = EVENT_ALERT
......
......@@ -18,7 +18,7 @@ func NodataJudge(concurrency int) {
if concurrency < 1 {
concurrency = 1000
}
nodataJob = semaphore.NewSemaphore(1000)
nodataJob = semaphore.NewSemaphore(concurrency)
t1 := time.NewTicker(time.Duration(9000) * time.Millisecond)
nodataJudge()
......@@ -49,10 +49,10 @@ func nodataJudge() {
}
nodataJob.Acquire()
go func(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string) {
go func(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, status []bool) {
defer nodataJob.Release()
Judge(stra, exps, historyData, firstItem, now, history, info, value)
}(stra, stra.Exprs, []*dataobj.RRDData{}, judgeItem, now, []dataobj.History{}, "", "")
Judge(stra, exps, historyData, firstItem, now, history, info, value, status)
}(stra, stra.Exprs, []*dataobj.RRDData{}, judgeItem, now, []dataobj.History{}, "", "", []bool{})
}
return
}
......@@ -79,10 +79,10 @@ func nodataJudge() {
}
nodataJob.Acquire()
go func(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string) {
go func(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, firstItem *dataobj.JudgeItem, now int64, history []dataobj.History, info string, value string, status []bool) {
defer nodataJob.Release()
Judge(stra, exps, historyData, firstItem, now, history, info, value)
}(stra, stra.Exprs, data.Values, judgeItem, now, []dataobj.History{}, "", "")
Judge(stra, exps, historyData, firstItem, now, history, info, value, status)
}(stra, stra.Exprs, data.Values, judgeItem, now, []dataobj.History{}, "", "", []bool{})
}
}
}
......@@ -83,7 +83,7 @@ func (cp *ConnPools) Update(cluster []string) {
cp.M[address] = createOnePool(address, address, ct, maxConns, maxIdle)
}
for address, _ := range cp.M {
for address := range cp.M {
if _, exists := newCluster[address]; !exists {
delete(cp.M, address)
}
......@@ -91,8 +91,8 @@ func (cp *ConnPools) Update(cluster []string) {
}
// 同步发送, 完成发送或超时后 才能返回
func (this *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error {
connPool, exists := this.Get(addr)
func (cp *ConnPools) Call(addr, method string, args interface{}, resp interface{}) error {
connPool, exists := cp.Get(addr)
if !exists {
return fmt.Errorf("%s has no connection pool", addr)
}
......@@ -103,7 +103,7 @@ func (this *ConnPools) Call(addr, method string, args interface{}, resp interfac
}
rpcClient := conn.(RpcClient)
callTimeout := time.Duration(this.CallTimeout) * time.Millisecond
callTimeout := time.Duration(cp.CallTimeout) * time.Millisecond
done := make(chan error, 1)
go func() {
......@@ -125,10 +125,10 @@ func (this *ConnPools) Call(addr, method string, args interface{}, resp interfac
}
}
func (this *ConnPools) Get(address string) (*pool.ConnPool, bool) {
this.RLock()
defer this.RUnlock()
p, exists := this.M[address]
func (cp *ConnPools) Get(address string) (*pool.ConnPool, bool) {
cp.RLock()
defer cp.RUnlock()
p, exists := cp.M[address]
return p, exists
}
......@@ -138,23 +138,23 @@ type RpcClient struct {
name string
}
func (this RpcClient) Name() string {
return this.name
func (rc RpcClient) Name() string {
return rc.name
}
func (this RpcClient) Closed() bool {
return this.cli == nil
func (rc RpcClient) Closed() bool {
return rc.cli == nil
}
func (this RpcClient) Close() error {
if this.cli != nil {
err := this.cli.Close()
this.cli = nil
func (rc RpcClient) Close() error {
if rc.cli != nil {
err := rc.cli.Close()
rc.cli = nil
return err
}
return nil
}
func (this RpcClient) Call(method string, args interface{}, reply interface{}) error {
return this.cli.Call(method, args, reply)
func (rc RpcClient) Call(method string, args interface{}, reply interface{}) error {
return rc.cli.Call(method, args, reply)
}
......@@ -20,11 +20,19 @@ import (
)
func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse {
resp := []*dataobj.TsdbQueryResponse{}
workerNum := 100
worker := make(chan struct{}, workerNum) //控制goroutine并发数
worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数
dataChan := make(chan *dataobj.TsdbQueryResponse, 20000)
done := make(chan struct{}, 1)
resp := make([]*dataobj.TsdbQueryResponse, 0)
go func() {
defer func() { done <- struct{}{} }()
for d := range dataChan {
resp = append(resp, d)
}
}()
for _, input := range inputs {
for _, endpoint := range input.Endpoints {
for _, counter := range input.Counters {
......@@ -34,29 +42,32 @@ func FetchData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse {
}
}
//等待所有goroutine执行完成
// 等待所有 goroutine 执行完成
for i := 0; i < workerNum; i++ {
worker <- struct{}{}
}
close(dataChan)
for {
d, ok := <-dataChan
if !ok {
break
}
resp = append(resp, d)
}
// 等待所有 dataChan 被消费完
<-done
return resp
}
func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
resp := []*dataobj.TsdbQueryResponse{}
workerNum := 100
worker := make(chan struct{}, workerNum) //控制goroutine并发数
worker := make(chan struct{}, workerNum) // 控制 goroutine 并发数
dataChan := make(chan *dataobj.TsdbQueryResponse, 20000)
done := make(chan struct{}, 1)
resp := make([]*dataobj.TsdbQueryResponse, 0)
go func() {
defer func() { done <- struct{}{} }()
for d := range dataChan {
resp = append(resp, d)
}
}()
for _, endpoint := range input.Endpoints {
if len(input.Tags) == 0 {
counter, err := GetCounter(input.Metric, "", nil)
......@@ -85,16 +96,10 @@ func FetchDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
}
close(dataChan)
for {
d, ok := <-dataChan
if !ok {
break
}
resp = append(resp, d)
}
<-done
//进行数据计算
aggrDatas := []*dataobj.TsdbQueryResponse{}
aggrDatas := make([]*dataobj.TsdbQueryResponse, 0)
if input.AggrFunc != "" && len(resp) > 1 {
aggrCounter := make(map[string][]*dataobj.TsdbQueryResponse)
......@@ -173,7 +178,6 @@ func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step i
stats.Counter.Set("query.data.err", 1)
}
dataChan <- data
return
}
func fetchData(start, end int64, consolFun, endpoint, counter string, step int) (*dataobj.TsdbQueryResponse, error) {
......
......@@ -22,7 +22,6 @@ func (this *ConsistentHashRing) Set(r *consistent.Consistent) {
this.Lock()
defer this.Unlock()
this.ring = r
return
}
func (this *ConsistentHashRing) GetRing() *consistent.Consistent {
......
......@@ -46,7 +46,7 @@ var (
)
func NewClusterNode(addrs []string) *backend.ClusterNode {
return &backend.ClusterNode{addrs}
return &backend.ClusterNode{Addrs: addrs}
}
// map["node"]="host1,host2" --> map["node"]=["host1", "host2"]
......
......@@ -73,7 +73,7 @@ func getStrategy() {
//var metric string
if len(stra.Exprs) < 1 {
logger.Warning("stra:%v exprs illegal", stra)
logger.Warningf("stra:%v exprs illegal", stra)
continue
}
if stra.Exprs[0].Func == "nodata" {
......
......@@ -52,5 +52,4 @@ func PushData(c *gin.Context) {
}
render.Data(c, "ok", nil)
return
}
......@@ -76,5 +76,4 @@ func push(items []*dataobj.MetricValue) {
}
defer resp.Body.Close()
return
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册