未验证 提交 22dc5c90 编写于 作者: Y yubo 提交者: GitHub

feature: add dryrun for collect_rule add/update (#599)

* feature: add dryrun for collect_rule add/update

* ignore sso when it is disable
上级 cd4336d4
......@@ -8,9 +8,7 @@ require (
github.com/codegangsta/negroni v1.0.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/dgryski/go-tsz v0.0.0-20180227144327-03b7d791f4fe
github.com/ericchiang/k8s v1.2.0
github.com/garyburd/redigo v1.6.2
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/gin-contrib/pprof v1.3.0
github.com/gin-gonic/gin v1.6.3
github.com/go-ping/ping v0.0.0-20201115131931-3300c582a663
......@@ -24,12 +22,9 @@ require (
github.com/m3db/m3 v0.15.17
github.com/mattn/go-isatty v0.0.12
github.com/mattn/go-sqlite3 v1.14.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mojocn/base64Captcha v1.3.1
github.com/open-falcon/rrdlite v0.0.0-20200214140804-bf5829f786ad
github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect
github.com/shirou/gopsutil v3.20.11+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0
......@@ -45,7 +40,6 @@ require (
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/ldap.v3 v3.1.0
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v2 v2.3.0
xorm.io/core v0.7.3
......
......@@ -525,32 +525,32 @@ func (a *ApiCollect) Update() error {
return err
}
func CreateCollect(collectType, creator string, collect interface{}) error {
func CreateCollect(collectType, creator string, collect interface{}, dryRun bool) (err error) {
session := DB["mon"].NewSession()
defer session.Close()
err := session.Begin()
if err != nil {
return err
}
if _, err := session.Insert(collect); err != nil {
session.Rollback()
if err = session.Begin(); err != nil {
session.Close()
return err
}
defer func() {
if err != nil || dryRun {
session.Rollback()
} else {
err = session.Commit()
}
session.Close()
}()
b, err := json.Marshal(collect)
if err != nil {
session.Rollback()
return err
if _, err = session.Insert(collect); err != nil {
return
}
if err := saveHistory(0, collectType, "create", creator, string(b), session); err != nil {
session.Rollback()
return err
var b []byte
if b, err = json.Marshal(collect); err != nil {
return
}
return session.Commit()
err = saveHistory(0, collectType, "create", creator, string(b), session)
return
}
func DeleteCollectById(collectType, creator string, cid int64) error {
......
......@@ -21,6 +21,7 @@ type CollectRule struct {
Name string `json:"name" describes:"customize name"`
Region string `json:"region"`
Comment string `json:"comment"`
DryRun bool `json:"dryrun" xorm:"-"`
Data json.RawMessage `json:"data"`
Tags string `json:"tags" description:"k1=v1,k2=v2,k3=v3,..."`
Creator string `json:"creator" description:"just for output"`
......@@ -100,36 +101,32 @@ func GetCollectRules(typ string, nid int64, limit, offset int) (total int64, lis
return
}
func (p *CollectRule) Update() error {
func (p *CollectRule) Update() (err error) {
session := DB["mon"].NewSession()
defer session.Close()
err := session.Begin()
if err != nil {
return err
if err = session.Begin(); err != nil {
session.Close()
return
}
defer func() {
if err != nil || p.DryRun {
session.Rollback()
} else {
err = session.Commit()
}
session.Close()
}()
if _, err = session.Id(p.Id).AllCols().Update(p); err != nil {
session.Rollback()
return err
}
b, err := json.Marshal(p)
if err != nil {
session.Rollback()
return err
}
if err := saveHistory(p.Id, p.CollectType, "update", p.Creator, string(b), session); err != nil {
session.Rollback()
return err
return
}
if err = session.Commit(); err != nil {
return err
var b []byte
if b, err = json.Marshal(p); err != nil {
return
}
return err
err = saveHistory(p.Id, p.CollectType, "update", p.Creator, string(b), session)
return
}
func DeleteCollectRule(sid int64) error {
......
......@@ -25,10 +25,17 @@ type Token struct {
}
func TokenAll() (int64, error) {
if _, ok := DB["sso"]; !ok {
return 0, nil
}
return DB["sso"].Count(new(Token))
}
func TokenGet(token string) (*Token, error) {
if _, ok := DB["sso"]; !ok {
return nil, nil
}
var obj Token
has, err := DB["sso"].Where("access_token=?", token).Get(&obj)
if err != nil {
......@@ -54,16 +61,25 @@ func (p *Token) Session() *Session {
}
func (p *Token) Update(cols ...string) error {
if _, ok := DB["sso"]; !ok {
return nil
}
_, err := DB["sso"].Where("access_token=?", p.AccessToken).Cols(cols...).Update(p)
return err
}
func TokenDelete(token string) error {
if _, ok := DB["sso"]; !ok {
return nil
}
_, err := DB["sso"].Where("access_token=?", token).Delete(new(Token))
return err
}
func TokenGets(where string, args ...interface{}) (tokens []Token, err error) {
if _, ok := DB["sso"]; !ok {
return
}
if where != "" {
err = DB["sso"].Where(where, args...).Find(&tokens)
} else {
......
package collector
import (
"bytes"
"encoding/json"
"fmt"
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/models"
"github.com/didi/nightingale/src/modules/prober/manager/accumulator"
"github.com/influxdata/telegraf"
)
......@@ -113,7 +116,52 @@ func (p BaseCollector) Create(data []byte, username string) error {
if old != nil {
return fmt.Errorf("同节点下策略名称 %s 已存在", collect.Name)
}
return models.CreateCollect(p.name, username, collect)
if err := models.CreateCollect(p.name, username, collect, collect.DryRun); err != nil {
return err
}
if collect.DryRun {
return p.dryRun(rule)
}
return nil
}
func (p BaseCollector) dryRun(rule TelegrafPlugin) error {
input, err := rule.TelegrafInput()
if err != nil {
return err
}
metrics := []*dataobj.MetricValue{}
acc, err := accumulator.New(accumulator.Options{Name: "plugin-dryrun", Metrics: &metrics})
if err != nil {
return err
}
if err = input.Gather(acc); err != nil {
return err
}
buf := &bytes.Buffer{}
for k, v := range metrics {
fmt.Fprintf(buf, "%d %s %s %f\n", k, v.CounterType, v.PK(), v.Value)
}
return NewDryRunError(buf.String())
}
type DryRun struct {
msg string
}
func (p DryRun) Error() string {
return p.msg
}
func NewDryRunError(msg string) error {
return DryRun{msg}
}
func (p BaseCollector) Update(data []byte, username string) error {
......@@ -153,7 +201,15 @@ func (p BaseCollector) Update(data []byte, username string) error {
return fmt.Errorf("同节点下策略名称 %s 已存在", collect.Name)
}
return collect.Update()
if err := collect.Update(); err != nil {
return err
}
if collect.DryRun {
return p.dryRun(rule)
}
return nil
}
func (p BaseCollector) Delete(id int64, username string) error {
......
package http
import (
"bytes"
"encoding/json"
"fmt"
"regexp"
......@@ -24,17 +25,23 @@ func collectRulePost(c *gin.Context) {
var recv []CollectRecv
errors.Dangerous(c.ShouldBind(&recv))
buf := &bytes.Buffer{}
creator := loginUsername(c)
for _, obj := range recv {
cl, err := collector.GetCollector(obj.Type)
errors.Dangerous(err)
if err := cl.Create([]byte(obj.Data), creator); err != nil {
errors.Bomb("%s add rule err %s", obj.Type, err)
if _, ok := err.(collector.DryRun); ok {
fmt.Fprintf(buf, "%s\n", err)
} else {
errors.Bomb("%s add rule err %s", obj.Type, err)
}
}
}
renderData(c, "ok", nil)
buf.WriteString("ok")
renderData(c, buf.String(), nil)
}
func collectRulesGetByLocalEndpoint(c *gin.Context) {
......@@ -104,11 +111,17 @@ func collectRulePut(c *gin.Context) {
cl, err := collector.GetCollector(recv.Type)
errors.Dangerous(err)
buf := &bytes.Buffer{}
creator := loginUsername(c)
if err := cl.Update([]byte(recv.Data), creator); err != nil {
errors.Bomb("%s update rule err %s", recv.Type, err)
if _, ok := err.(collector.DryRun); ok {
fmt.Fprintf(buf, "%s\n", err)
} else {
errors.Bomb("%s update rule err %s", recv.Type, err)
}
}
renderData(c, "ok", nil)
buf.WriteString("ok")
renderData(c, buf.String(), nil)
}
type CollectsDelRev struct {
......
......@@ -12,6 +12,7 @@ import (
"github.com/didi/nightingale/src/models"
"github.com/didi/nightingale/src/modules/monapi/acache"
"github.com/didi/nightingale/src/modules/monapi/alarm"
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/modules/monapi/config"
"github.com/didi/nightingale/src/modules/monapi/http"
"github.com/didi/nightingale/src/modules/monapi/redisc"
......@@ -96,6 +97,8 @@ func main() {
go alarm.CleanEventLoop()
}
pluginInfo()
http.Start()
ending()
}
......@@ -140,3 +143,10 @@ func pconf() {
os.Exit(1)
}
}
func pluginInfo() {
fmt.Println("remote collector")
for k, v := range collector.GetRemoteCollectors() {
fmt.Printf(" %d %s\n", k, v)
}
}
......@@ -82,7 +82,7 @@ func (p LogCollector) Create(data []byte, username string) error {
if old != nil {
return fmt.Errorf("同节点下策略名称 %s 已存在", name)
}
return models.CreateCollect(p.Name(), username, collector)
return models.CreateCollect(p.Name(), username, collector, false)
}
func (p LogCollector) Update(data []byte, username string) error {
......
......@@ -79,7 +79,7 @@ func (p PluginCollector) Create(data []byte, username string) error {
if old != nil {
return fmt.Errorf("同节点下策略名称 %s 已存在", name)
}
return models.CreateCollect(p.Name(), username, collect)
return models.CreateCollect(p.Name(), username, collect, false)
}
func (p PluginCollector) Update(data []byte, username string) error {
......
......@@ -79,7 +79,7 @@ func (p PortCollector) Create(data []byte, username string) error {
if old != nil {
return fmt.Errorf("同节点下策略名称 %s 已存在", name)
}
return models.CreateCollect(p.Name(), username, collect)
return models.CreateCollect(p.Name(), username, collect, false)
}
func (p PortCollector) Update(data []byte, username string) error {
......
......@@ -79,7 +79,7 @@ func (p ProcCollector) Create(data []byte, username string) error {
if old != nil {
return fmt.Errorf("同节点下策略名称 %s 已存在", name)
}
return models.CreateCollect(p.Name(), username, collect)
return models.CreateCollect(p.Name(), username, collect, false)
}
func (p ProcCollector) Update(data []byte, username string) error {
......
......@@ -6,7 +6,7 @@ import (
"testing"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/modules/prober/manager"
"github.com/didi/nightingale/src/modules/prober/manager/accumulator"
"github.com/influxdata/telegraf"
"github.com/toolkits/pkg/logger"
)
......@@ -83,7 +83,7 @@ func PluginTest(t *testing.T, plugin telegrafPlugin) telegraf.Input {
func PluginInputTest(t *testing.T, input telegraf.Input) {
metrics := []*dataobj.MetricValue{}
acc, err := manager.NewAccumulator(manager.AccumulatorOptions{Name: "plugin-test", Metrics: &metrics})
acc, err := accumulator.New(accumulator.Options{Name: "plugin-test", Metrics: &metrics})
if err != nil {
t.Error(err)
}
......
package manager
package accumulator
import (
"fmt"
......@@ -7,17 +7,18 @@ import (
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/modules/prober/manager/metric"
"github.com/influxdata/telegraf"
"github.com/toolkits/pkg/logger"
)
type AccumulatorOptions struct {
type Options struct {
Name string
Tags map[string]string
Metrics *[]*dataobj.MetricValue
}
func (p *AccumulatorOptions) Validate() error {
func (p *Options) Validate() error {
if p.Name == "" {
return fmt.Errorf("unable to get Name")
}
......@@ -28,8 +29,8 @@ func (p *AccumulatorOptions) Validate() error {
return nil
}
// NewAccumulator return telegraf.Accumulator
func NewAccumulator(opt AccumulatorOptions) (telegraf.Accumulator, error) {
// New return telegraf.Accumulator
func New(opt Options) (telegraf.Accumulator, error) {
if err := opt.Validate(); err != nil {
return nil, err
}
......@@ -126,7 +127,7 @@ func (p *accumulator) addFields(
tp telegraf.ValueType,
t ...time.Time,
) {
m, err := NewMetric(measurement, tags, fields, p.getTime(t), tp)
m, err := metric.NewMetric(measurement, tags, fields, p.getTime(t), tp)
if err != nil {
return
}
......
......@@ -9,6 +9,7 @@ import (
"github.com/didi/nightingale/src/models"
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/modules/prober/config"
"github.com/didi/nightingale/src/modules/prober/manager/accumulator"
"github.com/influxdata/telegraf"
"github.com/toolkits/pkg/logger"
)
......@@ -43,7 +44,7 @@ func newCollectRule(rule *models.CollectRule) (*collectRule, error) {
metrics := []*dataobj.MetricValue{}
acc, err := NewAccumulator(AccumulatorOptions{
acc, err := accumulator.New(accumulator.Options{
Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id),
Tags: tags,
Metrics: &metrics})
......@@ -176,7 +177,7 @@ func (p *collectRule) update(rule *models.CollectRule) error {
return err
}
acc, err := NewAccumulator(AccumulatorOptions{
acc, err := accumulator.New(accumulator.Options{
Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id),
Tags: tags,
Metrics: p.metrics})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册