未验证 提交 69ceeff9 编写于 作者: Y yubo 提交者: GitHub

M3db (#388)

* support openID2.0

* generate UUID if it's not set

* add m3db support

* add test shell

* update transfer.yml

* remove klog

* use remote m3 repo

* remove some file

* add description for tansfer.m3db config

* add query data for ui
上级 b2baef06
......@@ -4,10 +4,14 @@ backend:
enabled: false
name: "m3db"
namespace: "test"
writeConsistencyLevel: "majority"
readConsistencyLevel: "unstrict_majority"
# https://m3db.github.io/m3/m3db/architecture/consistencylevels/
writeConsistencyLevel: "majority" # one|majority|all
readConsistencyLevel: "unstrict_majority" # one|unstrict_majority|majority|all
config:
service:
# KV environment, zone, and service from which to write/read KV data (placement
# and configuration). Leave these as the default values unless you know what
# you're doing.
env: default_env
zone: embedded
service: m3db
......
package m3db
import (
"math"
"strings"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/modules/transfer/calc"
"github.com/didi/nightingale/src/toolkits/str"
"github.com/m3db/m3/src/query/storage/m3/consolidators"
"github.com/m3db/m3/src/x/ident"
......@@ -101,8 +105,129 @@ func xcludeResp(iter ident.TagIterator) (ret dataobj.XcludeResp) {
ret.Tags = append(ret.Tags, dataobj.SortedTags(tags))
if err := iter.Err(); err != nil {
logger.Errorf("FetchTaggedIDs iter:", err)
logger.Errorf("FetchTaggedIDs iter: %s", err)
}
return ret
}
func aggregateResp(data []*dataobj.TsdbQueryResponse, opts dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse {
if len(data) < 2 || opts.AggrFunc == "" {
return data
}
// Adjust the data
for _, v := range data {
v.Values = resample(v.Values, opts.Start, opts.End, int64(opts.Step))
}
// 没有聚合 tag, 或者曲线没有其他 tags, 直接所有曲线进行计算
if len(opts.GroupKey) == 0 || getTags(data[0].Counter) == "" {
return []*dataobj.TsdbQueryResponse{&dataobj.TsdbQueryResponse{
Counter: opts.AggrFunc,
Start: opts.Start,
End: opts.End,
Values: calc.Compute(opts.AggrFunc, data),
}}
}
aggrDatas := make([]*dataobj.TsdbQueryResponse, 0)
aggrCounter := make(map[string][]*dataobj.TsdbQueryResponse)
for _, v := range data {
counterMap := make(map[string]string)
tagsMap, err := dataobj.SplitTagsString(getTags(v.Counter))
if err != nil {
logger.Warningf("split tag string error: %+v", err)
continue
}
if v.Nid != "" {
tagsMap["node"] = v.Nid
} else {
tagsMap["endpoint"] = v.Endpoint
}
// 校验 GroupKey 是否在 tags 中
for _, key := range opts.GroupKey {
if value, exists := tagsMap[key]; exists {
counterMap[key] = value
}
}
counter := dataobj.SortedTags(counterMap)
if _, exists := aggrCounter[counter]; exists {
aggrCounter[counter] = append(aggrCounter[counter], v)
} else {
aggrCounter[counter] = []*dataobj.TsdbQueryResponse{v}
}
}
// 有需要聚合的 tag 需要将 counter 带上
for counter, datas := range aggrCounter {
if counter != "" {
counter = "/" + opts.AggrFunc + "," + counter
}
aggrData := &dataobj.TsdbQueryResponse{
Start: opts.Start,
End: opts.End,
Counter: counter,
Values: calc.Compute(opts.AggrFunc, datas),
}
aggrDatas = append(aggrDatas, aggrData)
}
return aggrDatas
}
var (
nanFloat = dataobj.JsonFloat(math.NaN())
)
func resample(data []*dataobj.RRDData, start, end, step int64) []*dataobj.RRDData {
l := int((end - start) / step)
if l <= 0 {
return []*dataobj.RRDData{}
}
ret := make([]*dataobj.RRDData, 0, l)
j := 0
ts := start
if t := data[0].Timestamp; t > start {
ts = t - t%step
}
for ; ts < end; ts += step {
get := func() *dataobj.RRDData {
if j == len(data) {
return nil
}
d := data[j]
for {
if j == len(data) {
return &dataobj.RRDData{Timestamp: ts, Value: nanFloat}
}
if d.Timestamp < ts {
j++
continue
}
if d.Timestamp >= ts+step {
return &dataobj.RRDData{Timestamp: ts, Value: nanFloat}
}
j++
return d
}
}
ret = append(ret, get())
}
return ret
}
func getTags(counter string) (tags string) {
idx := strings.IndexAny(counter, "/")
if idx == -1 {
return ""
}
return counter[idx+1:]
}
......@@ -136,8 +136,11 @@ func (p *Client) QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQue
return nil
}
// TODO: groupKey, aggrFunc, consolFunc, Comparisons
return ret
if input.Step == 0 {
input.Step = 60
}
return aggregateResp(ret, input)
}
// QueryMetrics: || (&& (endpoint)) (counter)...
......
......@@ -18,7 +18,7 @@
curl -X POST \
http://localhost:8008/api/transfer/push \
-d '[{
"metric": "test2",
"metric": "test",
"endpoint": "m3db-dev01-yubo.py",
"timestamp": '$(date "+%s")',
"step": 60,
......
......@@ -19,7 +19,6 @@ curl -X POST \
"end": '$(date "+%s")',
"consolFunc": "",
"endpoints": ["m3db-dev01-yubo.py"],
"nids": ["1"],
"counters": [],
"step": 60,
"dstype": "GAUGE"
......
......@@ -20,11 +20,11 @@
curl -X POST \
http://localhost:8008/api/transfer/data/ui \
-d '[{
"start": "1",
"end": '$(data "+%s")',
-d '{
"start": '$(date -d "1 hour ago" "+%s")',
"end": '$(date "+%s")',
"metric": "test",
"endpoints": [],
"endpoints": ["m3db-dev01-yubo.py"],
"nids": [],
"tags": [],
"step": 60,
......@@ -32,7 +32,7 @@ curl -X POST \
"groupKey": [],
"aggrFunc": "",
"consolFunc": "",
"Comparisons": []
}]'
"comparisons": []
}' | jq .
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册