From 7fa09f19def0eee6192a9d617100b0b3b632e2a1 Mon Sep 17 00:00:00 2001 From: zengwh Date: Sat, 16 May 2020 14:32:32 +0800 Subject: [PATCH] Transfer backend support opentsdb (#149) * support send to opentsdb * fix issue --- etc/transfer.yml | 4 + src/dataobj/opentsdb.go | 35 ++++++++ src/modules/transfer/backend/init.go | 26 +++++- src/modules/transfer/backend/sender.go | 90 +++++++++++++++++++ src/modules/transfer/config/config.go | 13 ++- src/modules/transfer/rpc/push.go | 4 + src/toolkits/pools/opentsdb.go | 117 +++++++++++++++++++++++++ 7 files changed, 286 insertions(+), 3 deletions(-) create mode 100644 src/dataobj/opentsdb.go create mode 100644 src/toolkits/pools/opentsdb.go diff --git a/etc/transfer.yml b/etc/transfer.yml index e7a467de..9fda3690 100644 --- a/etc/transfer.yml +++ b/etc/transfer.yml @@ -12,6 +12,10 @@ backend: database: "n9e" address: "http://127.0.0.1:8086" + opentsdb: + enabled: false + address: "127.0.0.1:4242" + logger: dir: logs/transfer level: WARNING diff --git a/src/dataobj/opentsdb.go b/src/dataobj/opentsdb.go new file mode 100644 index 00000000..b342233d --- /dev/null +++ b/src/dataobj/opentsdb.go @@ -0,0 +1,35 @@ +package dataobj + +import ( + "fmt" + "strings" +) + +type OpenTsdbItem struct { + Metric string `json:"metric"` + Tags map[string]string `json:"tags"` + Value float64 `json:"value"` + Timestamp int64 `json:"timestamp"` +} + +func (t *OpenTsdbItem) String() string { + return fmt.Sprintf( + "", + t.Metric, + t.Tags, + t.Value, + t.Timestamp, + ) +} + +func (t *OpenTsdbItem) OpenTsdbString() (s string) { + s = fmt.Sprintf("put %s %d %.3f ", t.Metric, t.Timestamp, t.Value) + + for k, v := range t.Tags { + key := strings.ToLower(strings.Replace(k, " ", "_", -1)) + value := strings.Replace(v, " ", "_", -1) + s += key + "=" + value + " " + } + + return s +} diff --git a/src/modules/transfer/backend/init.go b/src/modules/transfer/backend/init.go index f8083b40..4b09165e 100644 --- a/src/modules/transfer/backend/init.go +++ b/src/modules/transfer/backend/init.go @@ -24,6 +24,18 @@ type InfluxdbSection struct { Precision string `yaml:"precision"` } +type OpenTsdbSection struct { + Enabled bool `yaml:"enabled"` + Batch int `yaml:"batch"` + ConnTimeout int `yaml:"connTimeout"` + CallTimeout int `yaml:"callTimeout"` + WorkerNum int `yaml:"workerNum"` + MaxConns int `yaml:"maxConns"` + MaxIdle int `yaml:"maxIdle"` + MaxRetry int `yaml:"maxRetry"` + Address string `yaml:"address"` +} + type BackendSection struct { Enabled bool `yaml:"enabled"` Batch int `yaml:"batch"` @@ -40,6 +52,7 @@ type BackendSection struct { Cluster map[string]string `yaml:"cluster"` ClusterList map[string]*ClusterNode `json:"clusterList"` Influxdb InfluxdbSection `yaml:"influxdb"` + OpenTsdb OpenTsdbSection `yaml:"opentsdb"` } const DefaultSendQueueMaxSize = 102400 //10.24w @@ -57,10 +70,12 @@ var ( TsdbQueues = make(map[string]*list.SafeListLimited) JudgeQueues = cache.SafeJudgeQueue{} InfluxdbQueue *list.SafeListLimited + OpenTsdbQueue *list.SafeListLimited // 连接池 node_address -> connection_pool - TsdbConnPools *pools.ConnPools - JudgeConnPools *pools.ConnPools + TsdbConnPools *pools.ConnPools + JudgeConnPools *pools.ConnPools + OpenTsdbConnPoolHelper *pools.OpenTsdbConnPoolHelper connTimeout int32 callTimeout int32 @@ -97,6 +112,9 @@ func initConnPools() { JudgeConnPools = pools.NewConnPools( Config.MaxConns, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, GetJudges(), ) + if Config.OpenTsdb.Enabled { + OpenTsdbConnPoolHelper = pools.NewOpenTsdbConnPoolHelper(Config.OpenTsdb.Address, Config.OpenTsdb.MaxConns, Config.OpenTsdb.MaxIdle, Config.OpenTsdb.ConnTimeout, Config.OpenTsdb.CallTimeout) + } } func initSendQueues() { @@ -115,6 +133,10 @@ func initSendQueues() { if Config.Influxdb.Enabled { InfluxdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize) } + + if Config.OpenTsdb.Enabled { + OpenTsdbQueue = list.NewSafeListLimited(DefaultSendQueueMaxSize) + } } func GetJudges() []string { diff --git a/src/modules/transfer/backend/sender.go b/src/modules/transfer/backend/sender.go index 772bcde2..b5e9f855 100644 --- a/src/modules/transfer/backend/sender.go +++ b/src/modules/transfer/backend/sender.go @@ -1,6 +1,7 @@ package backend import ( + "bytes" "time" "github.com/didi/nightingale/src/dataobj" @@ -42,6 +43,11 @@ func startSendTasks() { influxdbConcurrent = 1 } + openTsdbConcurrent := Config.OpenTsdb.WorkerNum + if openTsdbConcurrent < 1 { + openTsdbConcurrent = 1 + } + if Config.Enabled { for node, item := range Config.ClusterList { for _, addr := range item.Addrs { @@ -62,6 +68,12 @@ func startSendTasks() { go send2InfluxdbTask(influxdbConcurrent) } + + if Config.OpenTsdb.Enabled { + go send2OpenTsdbTask(openTsdbConcurrent) + + } + } func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int) { @@ -407,3 +419,81 @@ func send2InfluxdbTask(concurrent int) { }(addr, influxdbItems, count) } } + +// 将原始数据入到tsdb发送缓存队列 +func Push2OpenTsdbSendQueue(items []*dataobj.MetricValue) { + errCnt := 0 + for _, item := range items { + tsdbItem := convert2OpenTsdbItem(item) + isSuccess := OpenTsdbQueue.PushFront(tsdbItem) + + if !isSuccess { + errCnt += 1 + } + } + stats.Counter.Set("opentsdb.queue.err", errCnt) +} + +func send2OpenTsdbTask(concurrent int) { + batch := Config.OpenTsdb.Batch // 一次发送,最多batch条数据 + retry := Config.OpenTsdb.MaxRetry + addr := Config.OpenTsdb.Address + sema := semaphore.NewSemaphore(concurrent) + + for { + items := OpenTsdbQueue.PopBackBy(batch) + count := len(items) + if count == 0 { + time.Sleep(DefaultSendTaskSleepInterval) + continue + } + var openTsdbBuffer bytes.Buffer + + for i := 0; i < count; i++ { + tsdbItem := items[i].(*dataobj.OpenTsdbItem) + openTsdbBuffer.WriteString(tsdbItem.OpenTsdbString()) + openTsdbBuffer.WriteString("\n") + stats.Counter.Set("points.out.opentsdb", 1) + logger.Debug("send to opentsdb: ", tsdbItem) + } + // 同步Call + 有限并发 进行发送 + sema.Acquire() + go func(addr string, openTsdbBuffer bytes.Buffer, count int) { + defer sema.Release() + + var err error + sendOk := false + for i := 0; i < retry; i++ { + err = OpenTsdbConnPoolHelper.Send(openTsdbBuffer.Bytes()) + if err == nil { + sendOk = true + break + } + logger.Warningf("send opentsdb %s fail: %v", addr, err) + time.Sleep(100 * time.Millisecond) + } + + if !sendOk { + stats.Counter.Set("points.out.opentsdb.err", count) + for _, item := range items { + logger.Errorf("send %v to opentsdb %s fail: %v", item, addr, err) + } + } else { + logger.Debugf("send to opentsdb %s ok", addr) + } + }(addr, openTsdbBuffer, count) + } +} + +func convert2OpenTsdbItem(d *dataobj.MetricValue) *dataobj.OpenTsdbItem { + t := dataobj.OpenTsdbItem{Tags: make(map[string]string)} + + for k, v := range d.TagsMap { + t.Tags[k] = v + } + t.Tags["endpoint"] = d.Endpoint + t.Metric = d.Metric + t.Timestamp = d.Timestamp + t.Value = d.Value + return &t +} diff --git a/src/modules/transfer/config/config.go b/src/modules/transfer/config/config.go index e1207655..8d8fbd6d 100644 --- a/src/modules/transfer/config/config.go +++ b/src/modules/transfer/config/config.go @@ -94,7 +94,7 @@ func Parse(conf string) error { }) viper.SetDefault("backend.influxdb", map[string]interface{}{ - "enabled": true, + "enabled": false, "batch": 200, //每次拉取文件的个数 "maxRetry": 3, //重试次数 "workerNum": 32, @@ -102,6 +102,17 @@ func Parse(conf string) error { "timeout": 3000, //访问超时时间,单位毫秒 }) + viper.SetDefault("backend.opentsdb", map[string]interface{}{ + "enabled": false, + "batch": 200, //每次拉取文件的个数 + "maxRetry": 3, //重试次数 + "workerNum": 32, + "maxConns": 2000, //查询和推送数据的并发个数 + "maxIdle": 32, //建立的连接池的最大空闲数 + "connTimeout": 1000, //链接超时时间,单位毫秒 + "callTimeout": 3000, //访问超时时间,单位毫秒 + }) + err = viper.Unmarshal(&Config) if err != nil { return fmt.Errorf("cannot read yml[%s]: %v", conf, err) diff --git a/src/modules/transfer/rpc/push.go b/src/modules/transfer/rpc/push.go index fc13ff9d..bdef7c73 100644 --- a/src/modules/transfer/rpc/push.go +++ b/src/modules/transfer/rpc/push.go @@ -48,6 +48,10 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp backend.Push2InfluxdbSendQueue(items) } + if backend.Config.OpenTsdb.Enabled { + backend.Push2OpenTsdbSendQueue(items) + } + if reply.Invalid == 0 { reply.Msg = "ok" } diff --git a/src/toolkits/pools/opentsdb.go b/src/toolkits/pools/opentsdb.go new file mode 100644 index 00000000..6fcdb59d --- /dev/null +++ b/src/toolkits/pools/opentsdb.go @@ -0,0 +1,117 @@ +// Copyright 2017 Xiaomi, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pools + +import ( + "fmt" + "net" + "time" + + connp "github.com/toolkits/pkg/pool" +) + +type OpenTsdbClient struct { + cli *struct{ net.Conn } + name string +} + +func (t OpenTsdbClient) Name() string { + return t.name +} + +func (t OpenTsdbClient) Closed() bool { + return t.cli.Conn == nil +} + +func (t OpenTsdbClient) Close() error { + if t.cli != nil { + err := t.cli.Close() + t.cli.Conn = nil + return err + } + return nil +} + +func newOpenTsdbConnPool(address string, maxConns int, maxIdle int, connTimeout int) *connp.ConnPool { + pool := connp.NewConnPool("opentsdb", address, maxConns, maxIdle) + + pool.New = func(name string) (connp.NConn, error) { + _, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return nil, err + } + + conn, err := net.DialTimeout("tcp", address, time.Duration(connTimeout)*time.Millisecond) + if err != nil { + return nil, err + } + + return OpenTsdbClient{ + cli: &struct{ net.Conn }{conn}, + name: name, + }, nil + } + + return pool +} + +type OpenTsdbConnPoolHelper struct { + p *connp.ConnPool + maxConns int + maxIdle int + connTimeout int + callTimeout int + address string +} + +func NewOpenTsdbConnPoolHelper(address string, maxConns, maxIdle, connTimeout, callTimeout int) *OpenTsdbConnPoolHelper { + return &OpenTsdbConnPoolHelper{ + p: newOpenTsdbConnPool(address, maxConns, maxIdle, connTimeout), + maxConns: maxConns, + maxIdle: maxIdle, + connTimeout: connTimeout, + callTimeout: callTimeout, + address: address, + } +} + +func (t *OpenTsdbConnPoolHelper) Send(data []byte) (err error) { + conn, err := t.p.Fetch() + if err != nil { + return fmt.Errorf("get connection fail: err %v. proc: %s", err, t.p.Proc()) + } + + cli := conn.(OpenTsdbClient).cli + + done := make(chan error, 1) + go func() { + _, err = cli.Write(data) + done <- err + }() + + select { + case <-time.After(time.Duration(t.callTimeout) * time.Millisecond): + t.p.ForceClose(conn) + return fmt.Errorf("%s, call timeout", t.address) + case err = <-done: + if err != nil { + t.p.ForceClose(conn) + err = fmt.Errorf("%s, call failed, err %v. proc: %s", t.address, err, t.p.Proc()) + } else { + t.p.Release(conn) + } + return err + } +} -- GitLab