未验证 提交 f542e49c 编写于 作者: 陈键冬 提交者: GitHub

Merge branch 'master' into version-management

......@@ -2,6 +2,7 @@ logger:
dir: logs/collector
level: WARNING
keepHours: 2
identity:
specify: ""
shell: ifconfig `route|grep '^default'|awk '{print $NF}'`|grep inet|awk '{print $2}'|awk -F ':' '{print $NF}'|head -n 1
......@@ -18,12 +19,20 @@ sys:
timeout: 1000
interval: 20
plugin: ./plugin
# monitor nic which filtered by prefix
ifacePrefix:
- eth
- em
mountPoint: []
mountIgnorePrefix:
- /var/lib
- ens
# ignore disk mount point
mountIgnore:
prefix:
- /var/lib
- /run
# collect anyway
exclude: []
ignoreMetrics:
- cpu.core.idle
......
......@@ -49,8 +49,11 @@ func buildEndpointWhere(query, batch, field string) *xorm.Session {
session := DB["mon"].Table(new(Endpoint))
if batch == "" && query != "" {
q := "%" + query + "%"
session = session.Where("ident like ? or alias like ?", q, q)
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {
q := "%" + arr[i] + "%"
session = session.Where("ident like ? or alias like ?", q, q)
}
}
if batch != "" {
......@@ -141,8 +144,11 @@ func buildEndpointUnderNodeWhere(leafids []int64, query, batch, field string) *x
session := DB["mon"].Where("id in (select endpoint_id from node_endpoint where node_id in (" + str.IdsString(leafids) + "))")
if batch == "" && query != "" {
q := "%" + query + "%"
session = session.Where("ident like ? or alias like ?", q, q)
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {
q := "%" + arr[i] + "%"
session = session.Where("ident like ? or alias like ?", q, q)
}
}
if batch != "" {
......
......@@ -33,31 +33,29 @@ func pushData(c *gin.Context) {
return
}
recvMetricValues := []*dataobj.MetricValue{}
var recvMetricValues []*dataobj.MetricValue
errors.Dangerous(c.ShouldBindJSON(&recvMetricValues))
err := funcs.Push(recvMetricValues)
render.Message(c, err)
return
}
func getStrategy(c *gin.Context) {
var resp []interface{}
port := stra.GetPortCollects()
for _, stra := range port {
resp = append(resp, stra)
for _, s := range port {
resp = append(resp, s)
}
proc := stra.GetProcCollects()
for _, stra := range proc {
resp = append(resp, stra)
for _, s := range proc {
resp = append(resp, s)
}
logStras := strategy.GetListAll()
for _, stra := range logStras {
resp = append(resp, stra)
for _, s := range logStras {
resp = append(resp, s)
}
render.Data(c, resp, nil)
......
......@@ -57,7 +57,7 @@ func (r *Reader) StartRead() {
var readCnt, readSwp int64
var dropCnt, dropSwp int64
analysClose := make(chan int, 0)
analysClose := make(chan int)
go func() {
for {
// 十秒钟统计一次
......
......@@ -14,7 +14,7 @@ var (
)
func init() {
globalStrategy = make(map[int64]*stra.Strategy, 0)
globalStrategy = make(map[int64]*stra.Strategy)
}
func Update() error {
......@@ -22,7 +22,7 @@ func Update() error {
err := UpdateGlobalStrategy(strategies)
if err != nil {
logger.Errorf("Update Strategy cache error ! [msg:%v]", err)
logger.Errorf("Update Strategy cache error:%v\n", err)
return err
}
logger.Infof("Update Strategy end")
......@@ -30,7 +30,7 @@ func Update() error {
}
func UpdateGlobalStrategy(sts []*stra.Strategy) error {
tmpStrategyMap := make(map[int64]*stra.Strategy, 0)
tmpStrategyMap := make(map[int64]*stra.Strategy)
for _, st := range sts {
if st.Degree == 0 {
st.Degree = 6
......
......@@ -25,7 +25,7 @@ type pushPointsCache struct {
Counters map[string]*counterCache `json:"counters"`
}
var globalPushPoints = pushPointsCache{Counters: make(map[string]*counterCache, 0)}
var globalPushPoints = pushPointsCache{Counters: make(map[string]*counterCache)}
func init() {
go CleanLoop()
......
......@@ -253,7 +253,7 @@ func (sc *StrategyCounter) AddTms(tms int64) error {
_, ok := sc.TmsPoints[tms]
if !ok {
tmp := new(PointsCounter)
tmp.TagstringMap = make(map[string]*PointCounter, 0)
tmp.TagstringMap = make(map[string]*PointCounter)
sc.TmsPoints[tms] = tmp
}
sc.Unlock()
......@@ -300,7 +300,7 @@ func (gc *GlobalCounter) AddStrategyCount(st *stra.Strategy) {
if _, ok := gc.StrategyCounts[st.ID]; !ok {
tmp := new(StrategyCounter)
tmp.Strategy = st
tmp.TmsPoints = make(map[int64]*PointsCounter, 0)
tmp.TmsPoints = make(map[int64]*PointsCounter)
gc.StrategyCounts[st.ID] = tmp
}
gc.Unlock()
......@@ -348,8 +348,7 @@ func (gc *GlobalCounter) cleanStrategyData(id int64) {
if !ok || sCount == nil {
return
}
sCount.TmsPoints = make(map[int64]*PointsCounter, 0)
return
sCount.TmsPoints = make(map[int64]*PointsCounter)
}
// countEqual意味着不会对统计的结构产生影响
......
......@@ -165,7 +165,7 @@ func ToPushQueue(strategy *stra.Strategy, tms int64, pointMap map[string]*PointC
var tags map[string]string
if tagstring == "null" {
tags = make(map[string]string, 0)
tags = make(map[string]string)
} else {
tags = dataobj.DictedTagstring(tagstring)
}
......
......@@ -136,7 +136,7 @@ func (w *Worker) Work() {
logger.Infof("worker starting...[%s]", w.Mark)
var anaCnt, anaSwp int64
analysClose := make(chan int, 0)
analysClose := make(chan int)
go func() {
for {
......@@ -211,7 +211,7 @@ func (w *Worker) producer(line string, strategy *stra.Strategy) (*AnalysPoint, e
t := reg.FindString(line)
if len(t) <= 0 {
return nil, errors.New(fmt.Sprintf("cannot get timestamp:[sname:%s][sid:%d][timeFormat:%v]", strategy.Name, strategy.ID, timeFormat))
return nil, fmt.Errorf("cannot get timestamp:[sname:%s][sid:%d][timeFormat:%v]", strategy.Name, strategy.ID, timeFormat)
}
// 如果没有年,需添加当前年
......
......@@ -165,7 +165,7 @@ func parsePattern(strategies []*Strategy) {
func updateRegs(strategies []*Strategy) {
for _, st := range strategies {
st.TagRegs = make(map[string]*regexp.Regexp, 0)
st.TagRegs = make(map[string]*regexp.Regexp)
st.ParseSucc = false
//更新时间正则
......
package sys
type SysSection struct {
IfacePrefix []string `yaml:"ifacePrefix"`
MountPoint []string `yaml:"mountPoint"`
MountIgnorePrefix []string `yaml:"mountIgnorePrefix"`
IgnoreMetrics []string `yaml:"ignoreMetrics"`
IgnoreMetricsMap map[string]struct{} `yaml:"-"`
NtpServers []string `yaml:"ntpServers"`
Plugin string `yaml:"plugin"`
PluginRemote bool `yaml:"pluginRemote"`
Interval int `yaml:"interval"`
Timeout int `yaml:"timeout"`
IfacePrefix []string `yaml:"ifacePrefix"`
MountIgnore MountIgnoreSection `yaml:"mountIgnore"`
IgnoreMetrics []string `yaml:"ignoreMetrics"`
IgnoreMetricsMap map[string]struct{} `yaml:"-"`
NtpServers []string `yaml:"ntpServers"`
Plugin string `yaml:"plugin"`
PluginRemote bool `yaml:"pluginRemote"`
Interval int `yaml:"interval"`
Timeout int `yaml:"timeout"`
}
type MountIgnoreSection struct {
Prefix []string `yaml:"prefix"`
Exclude []string `yaml:"exclude"`
}
var Config SysSection
......
......@@ -9,36 +9,25 @@ import (
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/nux"
"github.com/toolkits/pkg/slice"
)
func DeviceMetrics() (L []*dataobj.MetricValue) {
func DeviceMetrics() []*dataobj.MetricValue {
ret := make([]*dataobj.MetricValue, 0)
mountPoints, err := nux.ListMountPoint()
fsFileFilter := make(map[string]struct{}) //过滤 /proc/mounts 出现重复的fsFile
if err != nil {
logger.Error("collect device metrics fail:", err)
return
}
var myMountPoints map[string]bool = make(map[string]bool)
if len(sys.Config.MountPoint) > 0 {
for _, mp := range sys.Config.MountPoint {
myMountPoints[mp] = true
}
return ret
}
ignoreMountPointsPrefix := sys.Config.MountIgnorePrefix
var diskTotal uint64 = 0
var diskUsed uint64 = 0
for idx := range mountPoints {
fsSpec, fsFile, fsVfstype := mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2]
if len(myMountPoints) > 0 {
if _, ok := myMountPoints[fsFile]; !ok {
logger.Debug("mount point not matched with config", fsFile, "ignored.")
continue
}
}
if _, exists := fsFileFilter[fsFile]; exists {
logger.Debugf("mount point %s was collected", fsFile)
continue
......@@ -46,7 +35,9 @@ func DeviceMetrics() (L []*dataobj.MetricValue) {
fsFileFilter[fsFile] = struct{}{}
}
if hasIgnorePrefix(fsFile, ignoreMountPointsPrefix) {
// 注意: 虽然前缀被忽略了,但是被忽略的这部分分区里边有些仍然是需要采集的
if hasIgnorePrefix(fsFile, sys.Config.MountIgnore.Prefix) &&
!slice.ContainsString(sys.Config.MountIgnore.Exclude, fsFile) {
continue
}
......@@ -65,29 +56,29 @@ func DeviceMetrics() (L []*dataobj.MetricValue) {
diskUsed += du.BlocksUsed
tags := fmt.Sprintf("mount=%s", du.FsFile)
L = append(L, GaugeValue("disk.bytes.total", du.BlocksAll, tags))
L = append(L, GaugeValue("disk.bytes.free", du.BlocksFree, tags))
L = append(L, GaugeValue("disk.bytes.used", du.BlocksUsed, tags))
L = append(L, GaugeValue("disk.bytes.used.percent", du.BlocksUsedPercent, tags))
ret = append(ret, GaugeValue("disk.bytes.total", du.BlocksAll, tags))
ret = append(ret, GaugeValue("disk.bytes.free", du.BlocksFree, tags))
ret = append(ret, GaugeValue("disk.bytes.used", du.BlocksUsed, tags))
ret = append(ret, GaugeValue("disk.bytes.used.percent", du.BlocksUsedPercent, tags))
if du.InodesAll == 0 {
continue
}
L = append(L, GaugeValue("disk.inodes.total", du.InodesAll, tags))
L = append(L, GaugeValue("disk.inodes.free", du.InodesFree, tags))
L = append(L, GaugeValue("disk.inodes.used", du.InodesUsed, tags))
L = append(L, GaugeValue("disk.inodes.used.percent", du.InodesUsedPercent, tags))
ret = append(ret, GaugeValue("disk.inodes.total", du.InodesAll, tags))
ret = append(ret, GaugeValue("disk.inodes.free", du.InodesFree, tags))
ret = append(ret, GaugeValue("disk.inodes.used", du.InodesUsed, tags))
ret = append(ret, GaugeValue("disk.inodes.used.percent", du.InodesUsedPercent, tags))
}
if len(L) > 0 && diskTotal > 0 {
L = append(L, GaugeValue("disk.cap.bytes.total", float64(diskTotal)))
L = append(L, GaugeValue("disk.cap.bytes.used", float64(diskUsed)))
L = append(L, GaugeValue("disk.cap.bytes.free", float64(diskTotal-diskUsed)))
L = append(L, GaugeValue("disk.cap.bytes.used.percent", float64(diskUsed)*100.0/float64(diskTotal)))
if len(ret) > 0 && diskTotal > 0 {
ret = append(ret, GaugeValue("disk.cap.bytes.total", float64(diskTotal)))
ret = append(ret, GaugeValue("disk.cap.bytes.used", float64(diskUsed)))
ret = append(ret, GaugeValue("disk.cap.bytes.free", float64(diskTotal-diskUsed)))
ret = append(ret, GaugeValue("disk.cap.bytes.used.percent", float64(diskUsed)*100.0/float64(diskTotal)))
}
return
return ret
}
func hasIgnorePrefix(fsFile string, ignoreMountPointsPrefix []string) bool {
......
......@@ -97,7 +97,9 @@ func IODelta(device string, f func([2]*nux.DiskStats) uint64) uint64 {
return f(val)
}
func IOStatsMetrics() (L []*dataobj.MetricValue) {
func IOStatsMetrics() []*dataobj.MetricValue {
ret := make([]*dataobj.MetricValue, 0)
dsLock.RLock()
defer dsLock.RUnlock()
......@@ -109,38 +111,38 @@ func IOStatsMetrics() (L []*dataobj.MetricValue) {
tags := "device=" + device
rio := IODelta(device, IOReadRequests)
wio := IODelta(device, IOWriteRequests)
delta_rsec := IODelta(device, IOReadSectors)
delta_wsec := IODelta(device, IOWriteSectors)
deltaRsec := IODelta(device, IOReadSectors)
deltaWsec := IODelta(device, IOWriteSectors)
ruse := IODelta(device, IOMsecRead)
wuse := IODelta(device, IOMsecWrite)
use := IODelta(device, IOMsecTotal)
n_io := rio + wio
avgrq_sz := 0.0
nio := rio + wio
avgrqSz := 0.0
await := 0.0
svctm := 0.0
if n_io != 0 {
avgrq_sz = float64(delta_rsec+delta_wsec) / float64(n_io)
await = float64(ruse+wuse) / float64(n_io)
svctm = float64(use) / float64(n_io)
if nio != 0 {
avgrqSz = float64(deltaRsec+deltaWsec) / float64(nio)
await = float64(ruse+wuse) / float64(nio)
svctm = float64(use) / float64(nio)
}
duration := IODelta(device, TS)
L = append(L, GaugeValue("disk.io.read.request", float64(rio), tags))
L = append(L, GaugeValue("disk.io.write.request", float64(wio), tags))
L = append(L, GaugeValue("disk.io.read.bytes", float64(delta_rsec)*512.0, tags))
L = append(L, GaugeValue("disk.io.write.bytes", float64(delta_wsec)*512.0, tags))
L = append(L, GaugeValue("disk.io.avgrq_sz", avgrq_sz, tags))
L = append(L, GaugeValue("disk.io.avgqu_sz", float64(IODelta(device, IOMsecWeightedTotal))/1000.0, tags))
L = append(L, GaugeValue("disk.io.await", await, tags))
L = append(L, GaugeValue("disk.io.svctm", svctm, tags))
ret = append(ret, GaugeValue("disk.io.read.request", float64(rio), tags))
ret = append(ret, GaugeValue("disk.io.write.request", float64(wio), tags))
ret = append(ret, GaugeValue("disk.io.read.bytes", float64(deltaRsec)*512.0, tags))
ret = append(ret, GaugeValue("disk.io.write.bytes", float64(deltaWsec)*512.0, tags))
ret = append(ret, GaugeValue("disk.io.avgrq_sz", avgrqSz, tags))
ret = append(ret, GaugeValue("disk.io.avgqu_sz", float64(IODelta(device, IOMsecWeightedTotal))/1000.0, tags))
ret = append(ret, GaugeValue("disk.io.await", await, tags))
ret = append(ret, GaugeValue("disk.io.svctm", svctm, tags))
tmp := float64(use) * 100.0 / float64(duration)
if tmp > 100.0 {
tmp = 100.0
}
L = append(L, GaugeValue("disk.io.util", tmp, tags))
ret = append(ret, GaugeValue("disk.io.util", tmp, tags))
}
return
return ret
}
func ShouldHandleDevice(device string) bool {
......
......@@ -13,19 +13,20 @@ import (
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/nux"
"github.com/toolkits/pkg/slice"
)
func FsRWMetrics() (L []*dataobj.MetricValue) {
func FsRWMetrics() []*dataobj.MetricValue {
ret := make([]*dataobj.MetricValue, 0)
mountPoints, err := nux.ListMountPoint()
if err != nil {
logger.Error("failed to call ListMountPoint:", err)
return
logger.Errorf("failed to call ListMountPoint:%v\n", err)
return ret
}
fsFileFilter := make(map[string]struct{}) //过滤 /proc/mounts 出现重复的fsFile
ignoreMountPointsPrefix := sys.Config.MountIgnorePrefix
for idx := range mountPoints {
var du *nux.DeviceUsage
du, err = nux.BuildDeviceUsage(mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2])
......@@ -34,7 +35,8 @@ func FsRWMetrics() (L []*dataobj.MetricValue) {
continue
}
if hasIgnorePrefix(mountPoints[idx][1], ignoreMountPointsPrefix) {
if hasIgnorePrefix(du.FsFile, sys.Config.MountIgnore.Prefix) &&
!slice.ContainsString(sys.Config.MountIgnore.Exclude, du.FsFile) {
continue
}
......@@ -50,14 +52,14 @@ func FsRWMetrics() (L []*dataobj.MetricValue) {
f, err := os.Open(du.FsFile)
if err != nil {
logger.Error("target mount point open failed:", err)
L = append(L, GaugeValue("disk.rw.error", 1, tags))
ret = append(ret, GaugeValue("disk.rw.error", 1, tags))
continue
}
fs, err := f.Stat()
if err != nil {
logger.Error("get target mount point status failed:", err)
L = append(L, GaugeValue("disk.rw.error", 2, tags))
ret = append(ret, GaugeValue("disk.rw.error", 2, tags))
continue
}
......@@ -70,13 +72,13 @@ func FsRWMetrics() (L []*dataobj.MetricValue) {
content := "FS-RW" + now
err = CheckFS(file, content)
if err != nil {
L = append(L, GaugeValue("disk.rw.error", 3, tags))
ret = append(ret, GaugeValue("disk.rw.error", 3, tags))
} else {
L = append(L, GaugeValue("disk.rw.error", 0, tags))
ret = append(ret, GaugeValue("disk.rw.error", 0, tags))
}
}
return
return ret
}
func CheckFS(file string, content string) error {
......@@ -101,7 +103,7 @@ func CheckFS(file string, content string) error {
}
if string(read) != content {
logger.Error("Read content failed: ", string(read))
return errors.New("Read content failed")
return errors.New("read content failed")
}
//clean the file
err = os.Remove(file)
......
......@@ -14,7 +14,7 @@ func NfMetrics() []*dataobj.MetricValue {
if !file.IsExist(connMaxFile) {
return []*dataobj.MetricValue{}
}
res := []*dataobj.MetricValue{}
var res []*dataobj.MetricValue
nfConntrackMax, err := file.ToInt64(connMaxFile)
if err != nil {
......
......@@ -12,10 +12,12 @@ import (
var ntpServer string
func NtpOffsetMetrics() (L []*dataobj.MetricValue) {
func NtpOffsetMetrics() []*dataobj.MetricValue {
ret := make([]*dataobj.MetricValue, 0)
ntpServers := sys.Config.NtpServers
if len(ntpServers) <= 0 {
return
if len(ntpServers) == 0 {
return ret
}
for idx, server := range ntpServers {
......@@ -42,15 +44,11 @@ func NtpOffsetMetrics() (L []*dataobj.MetricValue) {
logger.Debug("ntp: client receive time, ", dstTime)
delta := duration / 1e6 // 转换成 ms
L = append(L, GaugeValue("sys.ntp.offset.ms", delta))
//one ntp server's response is enough
ret = append(ret, GaugeValue("sys.ntp.offset.ms", delta))
return
//one ntp server's response is enough
break
}
//keep silence when no config ntp server
if len(ntpServers) > 0 {
logger.Error("sys.ntp.offset error. all ntp servers response failed.")
}
return
return ret
}
......@@ -33,7 +33,8 @@ func Push(metricItems []*dataobj.MetricValue) error {
if err != nil {
msg := fmt.Errorf("metric:%v err:%v", item, err)
logger.Warning(msg)
return msg
// 如果数据有问题,直接跳过吧,比如mymon采集的到的数据,其实只有一个有问题,剩下的都没问题
continue
}
if item.CounterType == dataobj.COUNTER {
if err := CounterToGauge(item); err != nil {
......@@ -68,10 +69,10 @@ func Push(metricItems []*dataobj.MetricValue) error {
retry += 1
if retry == 3 {
retry = 0
break
}
}
return err
}
......@@ -84,14 +85,14 @@ func rpcCall(addr string, items []*dataobj.MetricValue) (dataobj.TransferResp, e
client, err = rpcClient(addr)
if err != nil {
return reply, err
} else {
affected := rpcClients.Put(addr, client)
if !affected {
defer func() {
// 我尝试把自己这个client塞进map失败,说明已经有一个client塞进去了,那我自己用完了就关闭
client.Close()
}()
}
}
affected := rpcClients.Put(addr, client)
if !affected {
defer func() {
// 我尝试把自己这个client塞进map失败,说明已经有一个client塞进去了,那我自己用完了就关闭
client.Close()
}()
}
}
......@@ -105,7 +106,7 @@ func rpcCall(addr string, items []*dataobj.MetricValue) (dataobj.TransferResp, e
select {
case <-time.After(timeout):
logger.Warningf("rpc call timeout, transfer addr: %s", addr)
logger.Warningf("rpc call timeout, transfer addr: %s\n", addr)
rpcClients.Put(addr, nil)
client.Close()
return reply, fmt.Errorf("%s rpc call timeout", addr)
......
......@@ -7,16 +7,17 @@ import (
"github.com/didi/nightingale/src/dataobj"
)
func SocketStatSummaryMetrics() (L []*dataobj.MetricValue) {
func SocketStatSummaryMetrics() []*dataobj.MetricValue {
ret := make([]*dataobj.MetricValue, 0)
ssMap, err := nux.SocketStatSummary()
if err != nil {
logger.Error("failed to collect SocketStatSummaryMetrics:", err)
return
logger.Errorf("failed to collect SocketStatSummaryMetrics:%v\n", err)
return ret
}
for k, v := range ssMap {
L = append(L, GaugeValue("net."+k, v))
ret = append(ret, GaugeValue("net."+k, v))
}
return
return ret
}
......@@ -12,25 +12,26 @@ import (
"github.com/didi/nightingale/src/dataobj"
)
func FsKernelMetrics() (L []*dataobj.MetricValue) {
func FsKernelMetrics() []*dataobj.MetricValue {
maxFiles, err := nux.KernelMaxFiles()
if err != nil {
logger.Error("failed collect kernel metrics:", err)
return
logger.Errorf("failed to call collect KernelMaxFiles:%v\n", err)
return nil
}
allocateFiles, err := nux.KernelAllocateFiles()
if err != nil {
logger.Error("failed to call KernelAllocateFiles:", err)
return
logger.Errorf("failed to call KernelAllocateFiles:%v\n", err)
return nil
}
v := math.Ceil(float64(allocateFiles) * 100 / float64(maxFiles))
L = append(L, GaugeValue("sys.fs.files.max", maxFiles))
L = append(L, GaugeValue("sys.fs.files.free", maxFiles-allocateFiles))
L = append(L, GaugeValue("sys.fs.files.used", allocateFiles))
L = append(L, GaugeValue("sys.fs.files.used.percent", v))
return
return []*dataobj.MetricValue{
GaugeValue("sys.fs.files.max", maxFiles),
GaugeValue("sys.fs.files.free", maxFiles-allocateFiles),
GaugeValue("sys.fs.files.used", allocateFiles),
GaugeValue("sys.fs.files.used.percent", v),
}
}
func ProcsNumMetrics() []*dataobj.MetricValue {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册