提交 5df96d41 编写于 作者: 7 710leo

refactor: add some metrics

上级 1e05be4e
...@@ -74,7 +74,7 @@ func getCollects() (CollectResp, error) { ...@@ -74,7 +74,7 @@ func getCollects() (CollectResp, error) {
url := fmt.Sprintf("http://%s%s%s", addr, StraConfig.Api, identity.Identity) url := fmt.Sprintf("http://%s%s%s", addr, StraConfig.Api, identity.Identity)
err = httplib.Get(url).SetTimeout(time.Duration(StraConfig.Timeout) * time.Millisecond).ToJSON(&res) err = httplib.Get(url).SetTimeout(time.Duration(StraConfig.Timeout) * time.Millisecond).ToJSON(&res)
if err != nil { if err != nil {
err = fmt.Errorf("get collects from remote failed, error:%v", err) err = fmt.Errorf("get collects from remote:%s failed, error:%v", url, err)
} }
return res, err return res, err
......
...@@ -48,7 +48,9 @@ func Push(items []*dataobj.MetricValue) { ...@@ -48,7 +48,9 @@ func Push(items []*dataobj.MetricValue) {
logger.Error(err) logger.Error(err)
continue continue
} else { } else {
logger.Info("push succ, reply: ", reply) if reply.Msg != "ok" {
logger.Error("some item push err", reply)
}
return return
} }
} }
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/didi/nightingale/src/toolkits/address" "github.com/didi/nightingale/src/toolkits/address"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/toolkits/pkg/concurrent/semaphore" "github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
...@@ -49,11 +50,12 @@ func reportEndpoint(endpoints []interface{}) { ...@@ -49,11 +50,12 @@ func reportEndpoint(endpoints []interface{}) {
err := httplib.Post(url).JSONBodyQuiet(m).SetTimeout(3*time.Second).Header("x-srv-token", "monapi-builtin-token").ToJSON(&body) err := httplib.Post(url).JSONBodyQuiet(m).SetTimeout(3*time.Second).Header("x-srv-token", "monapi-builtin-token").ToJSON(&body)
if err != nil { if err != nil {
logger.Warningf("curl %s fail: %v. retry", url, err) logger.Warningf("curl %s fail: %v. retry", url, err)
stats.Counter.Set("report.endpoint.err", 1)
continue continue
} }
if body.Err != "" { //数据库连接出错会出现此情况
if body.Err != "" {
logger.Warningf("curl %s fail: %s. retry", url, body.Err) logger.Warningf("curl %s fail: %s. retry", url, body.Err)
stats.Counter.Set("report.endpoint.err", 1)
continue continue
} }
......
...@@ -19,6 +19,7 @@ import ( ...@@ -19,6 +19,7 @@ import (
"github.com/didi/nightingale/src/toolkits/compress" "github.com/didi/nightingale/src/toolkits/compress"
"github.com/didi/nightingale/src/toolkits/identity" "github.com/didi/nightingale/src/toolkits/identity"
"github.com/didi/nightingale/src/toolkits/report" "github.com/didi/nightingale/src/toolkits/report"
"github.com/didi/nightingale/src/toolkits/stats"
) )
type CacheSection struct { type CacheSection struct {
...@@ -72,8 +73,8 @@ func StartPersist(interval int) { ...@@ -72,8 +73,8 @@ func StartPersist(interval int) {
err := Persist("normal") err := Persist("normal")
if err != nil { if err != nil {
logger.Error("Persist err:", err) logger.Error("Persist err:", err)
stats.Counter.Set("persist.err", 1)
} }
//logger.Infof("clean %+v, took %.2f ms\n", cleanRet, float64(time.Since(start).Nanoseconds())*1e-6)
} }
} }
......
...@@ -42,6 +42,5 @@ func Push(event *dataobj.Event) error { ...@@ -42,6 +42,5 @@ func Push(event *dataobj.Event) error {
return nil return nil
} }
stats.Counter.Set("redis.failed", 1)
return fmt.Errorf("redis publish failed finally:%v", err) return fmt.Errorf("redis publish failed finally:%v", err)
} }
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"log" "log"
"time" "time"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
) )
...@@ -44,6 +45,7 @@ func Init(cfg RedisSection) { ...@@ -44,6 +45,7 @@ func Init(cfg RedisSection) {
c, err := redis.Dial("tcp", addr, redis.DialConnectTimeout(connTimeout), redis.DialReadTimeout(readTimeout), redis.DialWriteTimeout(writeTimeout)) c, err := redis.Dial("tcp", addr, redis.DialConnectTimeout(connTimeout), redis.DialReadTimeout(readTimeout), redis.DialWriteTimeout(writeTimeout))
if err != nil { if err != nil {
logger.Errorf("conn redis err:%v", err) logger.Errorf("conn redis err:%v", err)
stats.Counter.Set("redis.conn.failed", 1)
return nil, err return nil, err
} }
...@@ -51,6 +53,8 @@ func Init(cfg RedisSection) { ...@@ -51,6 +53,8 @@ func Init(cfg RedisSection) {
if _, err := c.Do("AUTH", pass); err != nil { if _, err := c.Do("AUTH", pass); err != nil {
c.Close() c.Close()
logger.Errorf("ERR: redis auth fail:%v", err) logger.Errorf("ERR: redis auth fail:%v", err)
stats.Counter.Set("redis.conn.failed", 1)
return nil, err return nil, err
} }
} }
......
...@@ -65,7 +65,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f ...@@ -65,7 +65,7 @@ func Judge(stra *model.Stra, exps []model.Exp, historyData []*dataobj.RRDData, f
stats.Counter.Set("running", 1) stats.Counter.Set("running", 1)
if len(exps) < 1 { if len(exps) < 1 {
stats.Counter.Set("stra.err", 1) stats.Counter.Set("stra.illegal", 1)
logger.Warningf("stra:%v exp is null", stra) logger.Warningf("stra:%v exp is null", stra)
return return
} }
...@@ -421,6 +421,7 @@ func sendEvent(event *dataobj.Event) { ...@@ -421,6 +421,7 @@ func sendEvent(event *dataobj.Event) {
err := redi.Push(event) err := redi.Push(event)
if err != nil { if err != nil {
stats.Counter.Set("redis.push.failed", 1)
logger.Errorf("push event:%v err:%v", event, err) logger.Errorf("push event:%v err:%v", event, err)
} }
} }
......
...@@ -56,11 +56,13 @@ func getStrategy(opts StrategySection) { ...@@ -56,11 +56,13 @@ func getStrategy(opts StrategySection) {
if err != nil { if err != nil {
logger.Warningf("get strategy from remote failed, error:%v", err) logger.Warningf("get strategy from remote failed, error:%v", err)
stats.Counter.Set("stra.get.err", 1)
continue continue
} }
if resp.Err != "" { if resp.Err != "" {
logger.Warningf("get strategy from remote failed, error:%v", resp.Err) logger.Warningf("get strategy from remote failed, error:%v", resp.Err)
stats.Counter.Set("stra.get.err", 1)
continue continue
} }
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/didi/nightingale/src/model" "github.com/didi/nightingale/src/model"
"github.com/didi/nightingale/src/modules/monapi/config" "github.com/didi/nightingale/src/modules/monapi/config"
"github.com/didi/nightingale/src/modules/monapi/scache" "github.com/didi/nightingale/src/modules/monapi/scache"
"github.com/didi/nightingale/src/toolkits/stats"
) )
func CheckJudgeLoop() { func CheckJudgeLoop() {
...@@ -19,6 +20,7 @@ func CheckJudgeLoop() { ...@@ -19,6 +20,7 @@ func CheckJudgeLoop() {
time.Sleep(duration) time.Sleep(duration)
err := CheckJudge() err := CheckJudge()
if err != nil { if err != nil {
stats.Counter.Set("get.judge.err", 1)
logger.Error("check judge fail: ", err) logger.Error("check judge fail: ", err)
} }
} }
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/src/modules/monapi/config" "github.com/didi/nightingale/src/modules/monapi/config"
"github.com/didi/nightingale/src/toolkits/stats"
) )
var RedisConnPool *redis.Pool var RedisConnPool *redis.Pool
...@@ -29,6 +30,8 @@ func InitRedis() { ...@@ -29,6 +30,8 @@ func InitRedis() {
Dial: func() (redis.Conn, error) { Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", addr, redis.DialConnectTimeout(connTimeout), redis.DialReadTimeout(readTimeout), redis.DialWriteTimeout(writeTimeout)) c, err := redis.Dial("tcp", addr, redis.DialConnectTimeout(connTimeout), redis.DialReadTimeout(readTimeout), redis.DialWriteTimeout(writeTimeout))
if err != nil { if err != nil {
logger.Errorf("conn redis err:%v", err)
stats.Counter.Set("redis.conn.failed", 1)
return nil, err return nil, err
} }
...@@ -36,6 +39,7 @@ func InitRedis() { ...@@ -36,6 +39,7 @@ func InitRedis() {
if _, err := c.Do("AUTH", pass); err != nil { if _, err := c.Do("AUTH", pass); err != nil {
c.Close() c.Close()
logger.Error("redis auth fail, pass: ", pass) logger.Error("redis auth fail, pass: ", pass)
stats.Counter.Set("redis.conn.failed", 1)
return nil, err return nil, err
} }
} }
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/didi/nightingale/src/dataobj" "github.com/didi/nightingale/src/dataobj"
"github.com/didi/nightingale/src/modules/transfer/calc" "github.com/didi/nightingale/src/modules/transfer/calc"
"github.com/didi/nightingale/src/toolkits/address" "github.com/didi/nightingale/src/toolkits/address"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/toolkits/pkg/logger" "github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib" "github.com/toolkits/pkg/net/httplib"
...@@ -161,10 +162,12 @@ func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step i ...@@ -161,10 +162,12 @@ func fetchDataSync(start, end int64, consolFun, endpoint, counter string, step i
defer func() { defer func() {
<-worker <-worker
}() }()
stats.Counter.Set("query.tsdb", 1)
data, err := fetchData(start, end, consolFun, endpoint, counter, step) data, err := fetchData(start, end, consolFun, endpoint, counter, step)
if err != nil { if err != nil {
logger.Warning(err) logger.Warning(err)
stats.Counter.Set("query.data.err", 1)
} }
dataChan <- data dataChan <- data
return return
......
...@@ -107,6 +107,7 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent ...@@ -107,6 +107,7 @@ func Send2TsdbTask(Q *list.SafeListLimited, node string, addr string, concurrent
// 将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定 // 将数据 打入 某个Tsdb的发送缓存队列, 具体是哪一个Tsdb 由一致性哈希 决定
func Push2TsdbSendQueue(items []*dataobj.MetricValue) { func Push2TsdbSendQueue(items []*dataobj.MetricValue) {
errCnt := 0
for _, item := range items { for _, item := range items {
tsdbItem := convert2TsdbItem(item) tsdbItem := convert2TsdbItem(item)
stats.Counter.Set("tsdb.queue.push", 1) stats.Counter.Set("tsdb.queue.push", 1)
...@@ -118,19 +119,18 @@ func Push2TsdbSendQueue(items []*dataobj.MetricValue) { ...@@ -118,19 +119,18 @@ func Push2TsdbSendQueue(items []*dataobj.MetricValue) {
} }
cnode := Config.ClusterList[node] cnode := Config.ClusterList[node]
errCnt := 0
for _, addr := range cnode.Addrs { for _, addr := range cnode.Addrs {
Q := TsdbQueues[node+addr] Q := TsdbQueues[node+addr]
if !Q.PushFront(tsdbItem) { if !Q.PushFront(tsdbItem) {
errCnt += 1 errCnt += 1
} }
} }
}
// statistics // statistics
if errCnt > 0 { if errCnt > 0 {
stats.Counter.Set("tsdb.queue.err", errCnt) stats.Counter.Set("tsdb.queue.err", errCnt)
logger.Error("Push2TsdbSendQueue err num: ", errCnt) logger.Error("Push2TsdbSendQueue err num: ", errCnt)
}
} }
} }
...@@ -172,7 +172,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) { ...@@ -172,7 +172,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
if !sendOk { if !sendOk {
stats.Counter.Set("points.out.judge.err", 1) stats.Counter.Set("points.out.judge.err", 1)
logger.Errorf("send judge %s fail: %v", addr, err) logger.Errorf("send %v to judge %s fail: %v", judgeItems, addr, err)
} }
}(addr, judgeItems, count) }(addr, judgeItems, count)
...@@ -180,6 +180,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) { ...@@ -180,6 +180,7 @@ func Send2JudgeTask(Q *list.SafeListLimited, addr string, concurrent int) {
} }
func Push2JudgeSendQueue(items []*dataobj.MetricValue) { func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
errCnt := 0
for _, item := range items { for _, item := range items {
key := str.PK(item.Metric, item.Endpoint) key := str.PK(item.Metric, item.Endpoint)
stras := cache.StraMap.GetByKey(key) stras := cache.StraMap.GetByKey(key)
...@@ -203,11 +204,13 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) { ...@@ -203,11 +204,13 @@ func Push2JudgeSendQueue(items []*dataobj.MetricValue) {
q, exists := JudgeQueues.Get(stra.JudgeInstance) q, exists := JudgeQueues.Get(stra.JudgeInstance)
if exists { if exists {
q.PushFront(judgeItem) if !q.PushFront(judgeItem) {
errCnt += 1
}
} }
} }
} }
stats.Counter.Set("judge.queue.err", errCnt)
} }
// 打到Tsdb的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp // 打到Tsdb的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp
......
...@@ -27,9 +27,10 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp ...@@ -27,9 +27,10 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp
err := v.CheckValidity() err := v.CheckValidity()
if err != nil { if err != nil {
stats.Counter.Set("points.in.err", 1) stats.Counter.Set("points.in.err", 1)
logger.Warningf("item is illegal item:%s err:%v", v, err) msg := fmt.Sprintf("item is illegal item:%s err:%v", v, err)
logger.Warningf(msg)
reply.Invalid += 1 reply.Invalid += 1
reply.Msg += fmt.Sprintf("%v\n", err) reply.Msg += msg
continue continue
} }
......
...@@ -47,7 +47,7 @@ func GetIndexLoop() { ...@@ -47,7 +47,7 @@ func GetIndexLoop() {
func GetIndex() { func GetIndex() {
instances, err := report.GetAlive("index", Config.HbsMod) instances, err := report.GetAlive("index", Config.HbsMod)
if err != nil { if err != nil {
stats.Counter.Set("index.get.err", 1) stats.Counter.Set("get.index.err", 1)
logger.Warningf("get index list err:%v", err) logger.Warningf("get index list err:%v", err)
return return
} }
......
...@@ -58,7 +58,6 @@ func handleItems(items []*dataobj.TsdbItem) { ...@@ -58,7 +58,6 @@ func handleItems(items []*dataobj.TsdbItem) {
//todo hash冲突问题需要解决 //todo hash冲突问题需要解决
if err := cache.Caches.Push(item.Key, item.Timestamp, item.Value); err != nil { if err := cache.Caches.Push(item.Key, item.Timestamp, item.Value); err != nil {
stats.Counter.Set("points.in.err", 1) stats.Counter.Set("points.in.err", 1)
logger.Warningf("push obj error, obj: %v, error: %v\n", items[i], err) logger.Warningf("push obj error, obj: %v, error: %v\n", items[i], err)
fail++ fail++
} }
......
...@@ -231,6 +231,7 @@ func FlushRRD(flushChunks map[interface{}][]*cache.Chunk) { ...@@ -231,6 +231,7 @@ func FlushRRD(flushChunks map[interface{}][]*cache.Chunk) {
err := FlushFile(seriesID, items) err := FlushFile(seriesID, items)
if err != nil { if err != nil {
stats.Counter.Set("flush.rrd.err", 1)
logger.Errorf("flush %v data to rrd err:%v", seriesID, err) logger.Errorf("flush %v data to rrd err:%v", seriesID, err)
continue continue
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册