提交 a6de4d35 编写于 作者: 7 710leo

Support operating plugin collection by web

上级 9264aa62
......@@ -34,6 +34,10 @@ $ docker-compose up -d
![dashboard](https://user-images.githubusercontent.com/19553554/78956965-8b9c6180-7b16-11ea-9747-6ed5e62b068d.png)
## Upgrading
If upgrade `version<1.4.0` to `v1.4.0`, follow the operating instructions in [v1.4.0](https://github.com/didi/nightingale/releases/tag/v1.4.0) release
## Team
[ulricqin](https://github.com/ulricqin) [710leo](https://github.com/710leo) [jsers](https://github.com/jsers) [hujter](https://github.com/hujter) [n4mine](https://github.com/n4mine) [heli567](https://github.com/heli567)
......
......@@ -34,6 +34,9 @@ $ docker-compose up -d
![dashboard](https://user-images.githubusercontent.com/19553554/78956965-8b9c6180-7b16-11ea-9747-6ed5e62b068d.png)
## 版本升级
如果需要从 `v1.4.0` 之前的版本升级到 `v1.4.0` , 按照 [v1.4.0](https://github.com/didi/nightingale/releases/tag/v1.4.0) release 说明操作即可
## 团队
[ulricqin](https://github.com/ulricqin) [710leo](https://github.com/710leo) [jsers](https://github.com/jsers) [hujter](https://github.com/hujter) [n4mine](https://github.com/n4mine) [heli567](https://github.com/heli567)
......
#!/bin/bash
# release version
version=1.3.3
version=1.4.0
CWD=$(cd $(dirname $0)/; pwd)
cd $CWD
......
......@@ -255,6 +255,24 @@ CREATE TABLE `log_collect` (
KEY `idx_collect_type` (`collect_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT 'log collect';
CREATE TABLE `plugin_collect` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`nid` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT 'nid',
`name` varchar(255) NOT NULL DEFAULT '' COMMENT 'name',
`collect_type` varchar(64) NOT NULL DEFAULT 'PROC' COMMENT 'type',
`step` int(11) NOT NULL DEFAULT '0' COMMENT '采集周期',
`file_path` varchar(255) NOT NULL COMMENT 'file_path',
`params` varchar(255) NOT NULL COMMENT 'params',
`comment` varchar(512) NOT NULL DEFAULT '' COMMENT 'comment',
`creator` varchar(255) NOT NULL DEFAULT '' COMMENT 'creator',
`created` datetime NOT NULL COMMENT 'created',
`last_updator` varchar(128) NOT NULL DEFAULT '' COMMENT 'last_updator',
`last_updated` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `idx_nid` (`nid`),
KEY `idx_collect_type` (`collect_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT 'plugin collect';
CREATE TABLE `collect_hist` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
`cid` bigint(20) NOT NULL DEFAULT '0' COMMENT 'collect id',
......
......@@ -323,6 +323,44 @@ func (l *LogCollect) Update() error {
return err
}
func GetPluginCollects() ([]*PluginCollect, error) {
collects := []*PluginCollect{}
err := DB["mon"].Find(&collects)
return collects, err
}
func (p *PluginCollect) Update() error {
session := DB["mon"].NewSession()
defer session.Close()
err := session.Begin()
if err != nil {
return err
}
if _, err = session.Id(p.Id).AllCols().Update(p); err != nil {
session.Rollback()
return err
}
b, err := json.Marshal(p)
if err != nil {
session.Rollback()
return err
}
if err := saveHist(p.Id, "plugin", "update", p.Creator, string(b), session); err != nil {
session.Rollback()
return err
}
if err = session.Commit(); err != nil {
return err
}
return err
}
func CreateCollect(collectType, creator string, collect interface{}) error {
session := DB["mon"].NewSession()
defer session.Close()
......@@ -379,6 +417,14 @@ func GetCollectByNid(collectType string, nids []int64) ([]interface{}, error) {
}
return res, err
case "plugin":
collects := []PluginCollect{}
err := DB["mon"].In("nid", nids).Find(&collects)
for _, c := range collects {
res = append(res, c)
}
return res, err
default:
return nil, fmt.Errorf("illegal collectType")
}
......@@ -400,6 +446,10 @@ func GetCollectById(collectType string, cid int64) (interface{}, error) {
_, err := DB["mon"].Where("id = ?", cid).Get(collect)
collect.Decode()
return collect, err
case "plugin":
collect := new(PluginCollect)
_, err := DB["mon"].Where("id = ?", cid).Get(collect)
return collect, err
default:
return nil, fmt.Errorf("illegal collectType")
......@@ -448,16 +498,3 @@ func saveHist(id int64, tp string, action, username, body string, session *xorm.
return err
}
func GetCollectsModel(t string) (interface{}, error) {
collects := make([]*PortCollect, 0)
switch t {
case "port":
return collects, nil
case "proc":
return collects, nil
default:
return nil, fmt.Errorf("illegal collectType")
}
}
......@@ -69,9 +69,10 @@ func Parse(conf string) error {
})
viper.SetDefault("sys", map[string]interface{}{
"timeout": 1000, //请求超时时间
"interval": 10, //基础指标上报周期
"plugin": "./plugin",
"timeout": 1000, //请求超时时间
"interval": 10, //基础指标上报周期
"pluginRemote": true, //从monapi获取插件采集配置
"plugin": "./plugin",
})
err = viper.Unmarshal(&Config)
......
......@@ -2,8 +2,10 @@ package stra
import (
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/didi/nightingale/src/model"
"github.com/didi/nightingale/src/toolkits/str"
......@@ -12,12 +14,13 @@ import (
"github.com/toolkits/pkg/logger"
)
func NewPortCollect(port, step int, tags string) *model.PortCollect {
func NewPortCollect(port, step int, tags string, modTime time.Time) *model.PortCollect {
return &model.PortCollect{
CollectType: "port",
Port: port,
Step: step,
Tags: tags,
LastUpdated: modTime,
}
}
......@@ -54,8 +57,14 @@ func GetPortCollects() map[int]*model.PortCollect {
continue
}
info, err := os.Stat(f)
if err != nil {
logger.Warning(err)
continue
}
tags := fmt.Sprintf("port=%s,service=%s", strconv.Itoa(port), service)
p := NewPortCollect(port, step, tags)
p := NewPortCollect(port, step, tags, info.ModTime())
ports[p.Port] = p
}
......
......@@ -2,8 +2,10 @@ package stra
import (
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/didi/nightingale/src/model"
"github.com/didi/nightingale/src/toolkits/str"
......@@ -12,13 +14,14 @@ import (
"github.com/toolkits/pkg/logger"
)
func NewProcCollect(method, name, tags string, step int) *model.ProcCollect {
func NewProcCollect(method, name, tags string, step int, modTime time.Time) *model.ProcCollect {
return &model.ProcCollect{
CollectType: "proc",
CollectMethod: method,
Target: name,
Step: step,
Tags: tags,
LastUpdated: modTime,
}
}
......@@ -54,8 +57,14 @@ func GetProcCollects() map[string]*model.ProcCollect {
continue
}
info, err := os.Stat(f)
if err != nil {
logger.Warning(err)
continue
}
tags := fmt.Sprintf("target=%s,service=%s", name, service)
p := NewProcCollect(method, name, tags, step)
p := NewProcCollect(method, name, tags, step, info.ModTime())
procs[name] = p
}
......
......@@ -73,7 +73,7 @@ func ListPluginsFromLocal() map[string]*Plugin {
filename := f.Name()
arr := strings.Split(filename, "_")
if len(arr) < 2 {
logger.Warningf("plugin:%s name illegal, should be: $cycle_$xx", filename)
logger.Debugf("plugin:%s name illegal, should be: $cycle_$xx", filename)
continue
}
......@@ -81,13 +81,13 @@ func ListPluginsFromLocal() map[string]*Plugin {
var cycle int
cycle, err = strconv.Atoi(arr[0])
if err != nil {
logger.Warningf("plugin:%s name illegal, should be: $cycle_$xx %v", filename, err)
logger.Debugf("plugin:%s name illegal, should be: $cycle_$xx %v", filename, err)
continue
}
fpath, err := filepath.Abs(filepath.Join(dir, filename))
if err != nil {
logger.Warningf("plugin:%s absolute path get err:%v", filename, err)
logger.Debugf("plugin:%s absolute path get err:%v", filename, err)
continue
}
......
......@@ -39,12 +39,3 @@ func deletePort(key int) {
}
delete(Ports, key)
}
func NewPortCollect(port, step int, tags string) *model.PortCollect {
return &model.PortCollect{
CollectType: "port",
Port: port,
Step: step,
Tags: tags,
}
}
......@@ -104,6 +104,31 @@ func collectPost(c *gin.Context) {
}
errors.Dangerous(model.CreateCollect(obj.Type, creator, collect))
case "plugin":
collect := new(model.PluginCollect)
b, err := json.Marshal(obj.Data)
if err != nil {
errors.Bomb("marshal body %s err:%v", obj, err)
}
err = json.Unmarshal(b, collect)
if err != nil {
errors.Bomb("unmarshal body %s err:%v", string(b), err)
}
collect.Creator = creator
collect.LastUpdator = creator
nid := collect.Nid
name := collect.Name
old, _ := model.GetCollectByName(obj.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
errors.Dangerous(model.CreateCollect(obj.Type, creator, collect))
default:
errors.Bomb("采集类型不合法")
}
......@@ -131,7 +156,7 @@ func collectsGet(c *gin.Context) {
var resp []interface{}
nids := []int64{nid}
types := []string{"port", "proc", "log"}
types := []string{"port", "proc", "log", "plugin"}
for _, t := range types {
collects, err := model.GetCollectByNid(t, nids)
......@@ -268,6 +293,45 @@ func collectPut(c *gin.Context) {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
errors.Dangerous(collect.Update())
renderData(c, "ok", nil)
return
case "plugin":
collect := new(model.PluginCollect)
b, err := json.Marshal(recv.Data)
if err != nil {
errors.Bomb("marshal body %s err:%v", recv, err)
}
err = json.Unmarshal(b, collect)
if err != nil {
errors.Bomb("unmarshal body %s err:%v", string(b), err)
}
nid := collect.Nid
name := collect.Name
//校验采集是否存在
obj, err := model.GetCollectById(recv.Type, collect.Id) //id找不到的情况
if err != nil {
errors.Bomb("采集不存在 type:%s id:%d", recv.Type, collect.Id)
}
tmpId := obj.(*model.PluginCollect).Id
if tmpId == 0 {
errors.Bomb("采集不存在 type:%s id:%d", recv.Type, collect.Id)
}
collect.Creator = creator
collect.LastUpdator = creator
old, _ := model.GetCollectByName(recv.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid &&
tmpId != collect.Id {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
errors.Dangerous(collect.Update())
renderData(c, "ok", nil)
return
......
......@@ -177,6 +177,37 @@ func syncCollects() {
}
}
pluginConfigs, err := model.GetPluginCollects()
if err != nil {
logger.Warningf("get log collects err:%v", err)
}
for _, p := range pluginConfigs {
leafNids, err := GetLeafNids(p.Nid, []int64{})
if err != nil {
logger.Warningf("get LeafNids err:%v %v", err, p)
continue
}
Endpoints, err := model.EndpointUnderLeafs(leafNids)
if err != nil {
logger.Warningf("get endpoints err:%v %v", err, p)
continue
}
for _, endpoint := range Endpoints {
name := endpoint.Ident
c, exists := collectMap[name]
if !exists {
c = model.NewCollect()
}
key := fmt.Sprintf("%s-%d", p.Name, p.Nid)
c.Plugins[key] = p
collectMap[name] = c
}
}
CollectCache.SetAll(collectMap)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册