未验证 提交 973f94f5 编写于 作者: Z zengwh 提交者: GitHub

support send to influxdb (#146)

* support send to influxdb

* fix influxdb config var

* fix name error

* fix remove unused variable
上级 fc4fa979
......@@ -4,6 +4,13 @@ backend:
# callTimeout: 3000
cluster:
tsdb01: 127.0.0.1:5821
influxdb:
enabled: false
username: "influx"
password: "admin123"
precision: "s"
database: "n9e"
address: "http://127.0.0.1:8086"
logger:
dir: logs/transfer
......
package dataobj
type InfluxdbItem struct {
Measurement string `json:"metric"`
Tags map[string]string `json:"tags"`
Fields map[string]interface{} `json:"fields"`
Timestamp int64 `json:"timestamp"`
}
......@@ -11,6 +11,19 @@ import (
"github.com/didi/nightingale/src/toolkits/stats"
)
type InfluxdbSection struct {
Enabled bool `yaml:"enabled"`
Batch int `yaml:"batch"`
MaxRetry int `yaml:"maxRetry"`
WorkerNum int `yaml:"workerNum"`
Timeout int `yaml:"timeout"`
Address string `yaml:"address"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Precision string `yaml:"precision"`
}
type BackendSection struct {
Enabled bool `yaml:"enabled"`
Batch int `yaml:"batch"`
......@@ -26,6 +39,7 @@ type BackendSection struct {
Replicas int `yaml:"replicas"`
Cluster map[string]string `yaml:"cluster"`
ClusterList map[string]*ClusterNode `json:"clusterList"`
Influxdb InfluxdbSection `yaml:"influxdb"`
}
const DefaultSendQueueMaxSize = 102400 //10.24w
......@@ -40,8 +54,9 @@ var (
TsdbNodeRing *ConsistentHashRing
// 发送缓存队列 node -> queue_of_data
TsdbQueues = make(map[string]*list.SafeListLimited)
JudgeQueues = cache.SafeJudgeQueue{}
TsdbQueues = make(map[string]*list.SafeListLimited)
JudgeQueues = cache.SafeJudgeQueue{}
InfluxdbQueue *list.SafeListLimited
// 连接池 node_address -> connection_pool
TsdbConnPools *pools.ConnPools
......@@ -96,6 +111,10 @@ func initSendQueues() {
for _, judge := range judges {
JudgeQueues.Set(judge, list.NewSafeListLimited(DefaultSendQueueMaxSize))
}
if Config.Influxdb.Enabled {
InfluxdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize)
}
}
func GetJudges() []string {
......
......@@ -8,6 +8,7 @@ import (
"github.com/didi/nightingale/src/modules/transfer/cache"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/didi/nightingale/src/toolkits/str"
"github.com/influxdata/influxdb/client/v2"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/container/list"
......@@ -36,6 +37,11 @@ func startSendTasks() {
judgeConcurrent = 1
}
influxdbConcurrent := Config.Influxdb.WorkerNum
if influxdbConcurrent < 1 {
influxdbConcurrent = 1
}
if Config.Enabled {
for node, item := range Config.ClusterList {
for _, addr := range item.Addrs {
......@@ -51,6 +57,11 @@ func startSendTasks() {
go Send2JudgeTask(queue, instance, judgeConcurrent)
}
}
if Config.Influxdb.Enabled {
go send2InfluxdbTask(influxdbConcurrent)
}
}
func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int) {
......@@ -265,3 +276,134 @@ func TagMatch(straTags []model.Tag, tag map[string]string) bool {
}
return true
}
type InfluxClient struct {
Client client.Client
Database string
Precision string
}
func NewInfluxdbClient() (*InfluxClient, error) {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: Config.Influxdb.Address,
Username: Config.Influxdb.Username,
Password: Config.Influxdb.Password,
Timeout: time.Millisecond * time.Duration(Config.Influxdb.Timeout),
})
if err != nil {
return nil, err
}
return &InfluxClient{
Client: c,
Database: Config.Influxdb.Database,
Precision: Config.Influxdb.Precision,
}, nil
}
func (c *InfluxClient) Send(items []*dataobj.InfluxdbItem) error {
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: c.Database,
Precision: c.Precision,
})
if err != nil {
logger.Errorf("create batch points error: ", err)
return err
}
for _, item := range items {
pt, err := client.NewPoint(item.Measurement, item.Tags, item.Fields, time.Unix(item.Timestamp, 0))
if err != nil {
logger.Errorf("create new points error: ", err)
continue
}
bp.AddPoint(pt)
}
return c.Client.Write(bp)
}
// 将原始数据插入到influxdb缓存队列
func Push2InfluxdbSendQueue(items []*dataobj.MetricValue) {
errCnt := 0
for _, item := range items {
influxdbItem := convert2InfluxdbItem(item)
isSuccess := InfluxdbQueue.PushFront(influxdbItem)
if !isSuccess {
errCnt += 1
}
}
stats.Counter.Set("influxdb.queue.err", errCnt)
}
func convert2InfluxdbItem(d *dataobj.MetricValue) *dataobj.InfluxdbItem {
t := dataobj.InfluxdbItem{Tags: make(map[string]string), Fields: make(map[string]interface{})}
for k, v := range d.TagsMap {
t.Tags[k] = v
}
t.Tags["endpoint"] = d.Endpoint
t.Measurement = d.Metric
t.Fields["value"] = d.Value
t.Timestamp = d.Timestamp
return &t
}
func send2InfluxdbTask(concurrent int) {
batch := Config.Influxdb.Batch // 一次发送,最多batch条数据
retry := Config.Influxdb.MaxRetry
addr := Config.Influxdb.Address
sema := semaphore.NewSemaphore(concurrent)
var err error
c, err := NewInfluxdbClient()
defer c.Client.Close()
if err != nil {
logger.Errorf("init influxdb client fail: %v", err)
return
}
for {
items := InfluxdbQueue.PopBackBy(batch)
count := len(items)
if count == 0 {
time.Sleep(DefaultSendTaskSleepInterval)
continue
}
influxdbItems := make([]*dataobj.InfluxdbItem, count)
for i := 0; i < count; i++ {
influxdbItems[i] = items[i].(*dataobj.InfluxdbItem)
stats.Counter.Set("points.out.influxdb", 1)
logger.Debug("send to influxdb: ", influxdbItems[i])
}
// 同步Call + 有限并发 进行发送
sema.Acquire()
go func(addr string, influxdbItems []*dataobj.InfluxdbItem, count int) {
defer sema.Release()
sendOk := false
for i := 0; i < retry; i++ {
err = c.Send(influxdbItems)
if err == nil {
sendOk = true
break
}
logger.Warningf("send influxdb fail: %v", err)
time.Sleep(time.Millisecond * 10)
}
if !sendOk {
stats.Counter.Set("points.out.influxdb.err", count)
logger.Errorf("send %v to influxdb %s fail: %v", influxdbItems, addr, err)
} else {
logger.Debugf("send to influxdb %s ok", addr)
}
}(addr, influxdbItems, count)
}
}
......@@ -93,6 +93,15 @@ func Parse(conf string) error {
"hbsMod": "monapi",
})
viper.SetDefault("backend.influxdb", map[string]interface{}{
"enabled": true,
"batch": 200, //每次拉取文件的个数
"maxRetry": 3, //重试次数
"workerNum": 32,
"maxConns": 2000, //查询和推送数据的并发个数
"timeout": 3000, //访问超时时间,单位毫秒
})
err = viper.Unmarshal(&Config)
if err != nil {
return fmt.Errorf("cannot read yml[%s]: %v", conf, err)
......
......@@ -44,6 +44,10 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp
backend.Push2JudgeSendQueue(items)
}
if backend.Config.Influxdb.Enabled {
backend.Push2InfluxdbSendQueue(items)
}
if reply.Invalid == 0 {
reply.Msg = "ok"
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册