未验证 提交 6a70bed3 编写于 作者: Q qinyening 提交者: GitHub

modify proc info collect (#528)

上级 91503cfd
......@@ -5,15 +5,12 @@ import (
"time"
"github.com/didi/nightingale/src/common/dataobj"
process "github.com/shirou/gopsutil/process"
)
var MetricHistory *History
var ProcsCache *ProcessCache
func Init() {
MetricHistory = NewHistory()
ProcsCache = NewProcsCache()
}
func NewHistory() *History {
......@@ -64,50 +61,3 @@ func (h *History) clean() {
}
}
}
type ProcessCache struct {
sync.RWMutex
Data map[int32]*process.Process
}
func NewProcsCache() *ProcessCache {
pc := ProcessCache{
Data: make(map[int32]*process.Process),
}
go pc.Clean()
return &pc
}
func (pc *ProcessCache) Set(pid int32, p *process.Process) {
pc.Lock()
defer pc.Unlock()
pc.Data[pid] = p
}
func (pc *ProcessCache) Get(pid int32) (*process.Process, bool) {
pc.RLock()
defer pc.RUnlock()
p, exists := pc.Data[pid]
return p, exists
}
func (pc *ProcessCache) Clean() {
ticker := time.NewTicker(10 * time.Minute)
for {
select {
case <-ticker.C:
pc.clean()
}
}
}
func (pc *ProcessCache) clean() {
pc.Lock()
defer pc.Unlock()
for pid, procs := range pc.Data {
running, _ := procs.IsRunning()
if !running {
delete(pc.Data, pid)
}
}
}
......@@ -4,14 +4,13 @@ import (
"strings"
"time"
process "github.com/shirou/gopsutil/process"
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/models"
"github.com/didi/nightingale/src/modules/agent/cache"
"github.com/didi/nightingale/src/modules/agent/config"
"github.com/didi/nightingale/src/modules/agent/core"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/nux"
)
type ProcScheduler struct {
......@@ -45,52 +44,97 @@ func (p *ProcScheduler) Stop() {
close(p.Quit)
}
var (
rBytes map[int]uint64
wBytes map[int]uint64
procJiffy map[int]uint64
jiffy uint64
)
func ProcCollect(p *models.ProcCollect) {
ps, err := process.Processes()
ps, err := AllProcs()
if err != nil {
logger.Error(err)
return
}
var memUsedTotal uint64 = 0
var memUtilTotal = 0.0
var cpuUtilTotal = 0.0
newRBytes := make(map[int]uint64)
newWBytes := make(map[int]uint64)
newProcJiffy := make(map[int]uint64)
newJiffy := readJiffy()
for _, proc := range ps {
newRBytes[proc.Pid] = proc.RBytes
newWBytes[proc.Pid] = proc.WBytes
if pj, err := readProcJiffy(proc.Pid); err == nil {
newProcJiffy[proc.Pid] = pj
}
}
var items []*dataobj.MetricValue
cnt := 0
for _, procs := range ps {
if isProc(procs, p.CollectMethod, p.Target) {
var cnt int
var fdNum int
var memory uint64
var cpu float64
var ioWrite, ioRead uint64
var uptime uint64
for _, proc := range ps {
if isProc(proc, p.CollectMethod, p.Target) {
cnt++
procCache, exists := cache.ProcsCache.Get(procs.Pid)
if !exists {
cache.ProcsCache.Set(procs.Pid, procs)
procCache = procs
}
mem, err := procCache.MemoryInfo()
if err != nil {
logger.Error(err)
continue
memory += proc.Mem
fdNum += proc.FdCount
rOld := rBytes[proc.Pid]
if rOld != 0 && rOld <= proc.RBytes {
ioRead += proc.RBytes - rOld
}
memUsedTotal += mem.RSS
memUtil, err := procCache.MemoryPercent()
if err != nil {
logger.Error(err)
continue
wOld := wBytes[proc.Pid]
if wOld != 0 && wOld <= proc.WBytes {
ioWrite += proc.WBytes - wOld
}
memUtilTotal += float64(memUtil)
cpuUtil, err := procCache.Percent(0)
if err != nil {
logger.Error(err)
uptime = readUptime(proc.Pid)
// jiffy 为零,表示第一次采集信息,不做cpu计算
if jiffy == 0 {
continue
}
cpuUtilTotal += cpuUtil
cpu += float64(newProcJiffy[proc.Pid] - procJiffy[proc.Pid])
}
}
procNumItem := core.GaugeValue("proc.num", cnt, p.Tags)
memUsedItem := core.GaugeValue("proc.mem.used", memUsedTotal, p.Tags)
memUtilItem := core.GaugeValue("proc.mem.util", memUtilTotal, p.Tags)
cpuUtilItem := core.GaugeValue("proc.cpu.util", cpuUtilTotal, p.Tags)
items = []*dataobj.MetricValue{procNumItem, memUsedItem, memUtilItem, cpuUtilItem}
procFdItem := core.GaugeValue("proc.uptime", uptime, p.Tags)
procUptimeItem := core.GaugeValue("proc.fdnum", fdNum, p.Tags)
memUsedItem := core.GaugeValue("proc.mem.used", memory*1024, p.Tags)
ioReadItem := core.GaugeValue("proc.io.read.bytes", ioRead, p.Tags)
ioWriteItem := core.GaugeValue("proc.io.write.bytes", ioWrite, p.Tags)
items = []*dataobj.MetricValue{procNumItem, memUsedItem, procFdItem, procUptimeItem, ioReadItem, ioWriteItem}
if jiffy != 0 {
cpuUtil := cpu / float64(newJiffy-jiffy) * 100
if cpuUtil > 100 {
cpuUtil = 100
}
cpuUtilItem := core.GaugeValue("proc.cpu.util", cpuUtil, p.Tags)
items = append(items, cpuUtilItem)
}
sysMem, err := nux.MemInfo()
if err != nil {
logger.Error(err)
}
if sysMem != nil && sysMem.MemTotal != 0 {
memUsedUtil := float64(memory*1024) / float64(sysMem.MemTotal) * 100
memUtilItem := core.GaugeValue("proc.mem.util", memUsedUtil, p.Tags)
items = append(items, memUtilItem)
}
now := time.Now().Unix()
for _, item := range items {
item.Step = int64(p.Step)
......@@ -99,18 +143,16 @@ func ProcCollect(p *models.ProcCollect) {
}
core.Push(items)
rBytes = newRBytes
wBytes = newWBytes
procJiffy = newProcJiffy
jiffy = readJiffy()
}
func isProc(p *process.Process, method, target string) bool {
name, err := p.Name()
if err != nil {
return false
}
cmdlines, err := p.Cmdline()
if err != nil {
return false
}
if method == "name" && target == name {
func isProc(p *Proc, method, target string) bool {
cmdlines := p.Cmdline
if method == "name" && target == p.Name {
return true
} else if (method == "cmdline" || method == "cmd") && strings.Contains(cmdlines, target) {
return true
......
package procs
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
"time"
"github.com/toolkits/pkg/file"
"github.com/toolkits/pkg/logger"
)
type Proc struct {
Pid int
Name string
Exe string
Cmdline string
Mem uint64
Cpu float64
jiffy uint64
RBytes uint64
WBytes uint64
Uptime uint64
FdCount int
}
func (this *Proc) String() string {
return fmt.Sprintf("<Pid:%d, Name:%s, Uptime:%s Exe:%s Mem:%d Cpu:%.3f>",
this.Pid, this.Name, this.Uptime, this.Exe, this.Mem, this.Cpu)
}
func AllProcs() (ps []*Proc, err error) {
var dirs []string
dirs, err = file.DirsUnder("/proc")
if err != nil {
return
}
size := len(dirs)
if size == 0 {
return
}
for i := 0; i < size; i++ {
pid, e := strconv.Atoi(dirs[i])
if e != nil {
continue
}
statusFile := fmt.Sprintf("/proc/%d/status", pid)
cmdlineFile := fmt.Sprintf("/proc/%d/cmdline", pid)
if !file.IsExist(statusFile) || !file.IsExist(cmdlineFile) {
continue
}
name, memory, e := ReadNameAndMem(statusFile)
if e != nil {
logger.Error("read pid status file err:", e)
continue
}
cmdlineBytes, e := file.ToBytes(cmdlineFile)
if e != nil {
continue
}
cmdlineBytesLen := len(cmdlineBytes)
if cmdlineBytesLen == 0 {
continue
}
noNut := make([]byte, 0, cmdlineBytesLen)
for j := 0; j < cmdlineBytesLen; j++ {
if cmdlineBytes[j] != 0 {
noNut = append(noNut, cmdlineBytes[j])
}
}
p := Proc{Pid: pid, Name: name, Cmdline: string(noNut), Mem: memory}
ps = append(ps, &p)
}
for _, p := range ps {
p.RBytes, p.WBytes = readIO(p.Pid)
p.FdCount = readProcFd(p.Pid)
p.Uptime = readUptime(p.Pid)
}
return
}
func ReadNameAndMem(path string) (name string, memory uint64, err error) {
var content []byte
content, err = ioutil.ReadFile(path)
if err != nil {
return
}
reader := bufio.NewReader(bytes.NewBuffer(content))
for {
var bs []byte
bs, err = file.ReadLine(reader)
if err == io.EOF {
return
}
line := string(bs)
colonIndex := strings.Index(line, ":")
if colonIndex == -1 {
logger.Warning("line is illegal", path)
continue
}
if strings.TrimSpace(line[0:colonIndex]) == "Name" {
name = strings.TrimSpace(line[colonIndex+1:])
} else if strings.TrimSpace(line[0:colonIndex]) == "VmRSS" {
kbIndex := strings.Index(line, "kB")
memory, _ = strconv.ParseUint(strings.TrimSpace(line[colonIndex+1:kbIndex]), 10, 64)
break
}
}
return
}
func readJiffy() uint64 {
f, err := os.Open("/proc/stat")
if err != nil {
return 0
}
defer f.Close()
scanner := bufio.NewScanner(f)
scanner.Scan()
s := scanner.Text()
if !strings.HasPrefix(s, "cpu ") {
return 0
}
ss := strings.Split(s, " ")
var ret uint64
for _, x := range ss {
if x == "" || x == "cpu" {
continue
}
if v, e := strconv.ParseUint(x, 10, 64); e == nil {
ret += v
}
}
return ret
}
func readProcFd(pid int) int {
var fds []string
fds, err := file.FilesUnder(fmt.Sprintf("/proc/%d/fd", pid))
if err != nil {
return 0
}
return len(fds)
}
func readProcJiffy(pid int) (uint64, error) {
f, err := os.Open(fmt.Sprintf("/proc/%d/stat", pid))
if err != nil {
return 0, err
}
defer f.Close()
scanner := bufio.NewScanner(f)
scanner.Scan()
s := scanner.Text()
ss := strings.Split(s, " ")
if len(ss) < 15 {
return 0, fmt.Errorf("/porc/%s/stat illegal:%v", pid, ss)
}
var ret uint64
for i := 13; i < 15; i++ {
v, e := strconv.ParseUint(ss[i], 10, 64)
if e != nil {
return 0, err
}
ret += v
}
return ret, nil
}
func readIO(pid int) (r uint64, w uint64) {
f, err := os.Open(fmt.Sprintf("/proc/%d/io", pid))
if err != nil {
return
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
s := scanner.Text()
if strings.HasPrefix(s, "read_bytes") || strings.HasPrefix(s, "write_bytes") {
v := strings.Split(s, " ")
if len(v) == 2 {
value, _ := strconv.ParseUint(v[1], 10, 64)
if s[0] == 'r' {
r = value
} else {
w = value
}
}
}
}
return
}
func readUptime(pid int) uint64 {
fileInfo, err := os.Stat(fmt.Sprintf("/proc/%d", pid))
if err != nil {
return 0
}
duration := time.Now().Sub(fileInfo.ModTime())
return uint64(duration.Seconds())
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册