提交 967c3aa5 编写于 作者: U UlricQin

Merge branch 'master' of https://github.com/didi/nightingale

......@@ -3,37 +3,39 @@ module github.com/didi/nightingale/v4
go 1.12
require (
github.com/Shopify/sarama v1.27.2
github.com/Shopify/sarama v1.27.2 // indirect
github.com/alouca/gologger v0.0.0-20120904114645-7d4b7291de9c // indirect
github.com/blang/semver v3.5.1+incompatible
github.com/cespare/xxhash v1.1.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/freedomkk-qfeng/go-fastping v0.0.0-20160109021039-d7bb493dee3e // indirect
github.com/gaochao1/gosnmp v0.0.0-20150630013918-783a67a067fd // indirect
github.com/gaochao1/sw v4.0.0+incompatible
github.com/garyburd/redigo v1.6.2
github.com/gin-contrib/pprof v1.3.0
github.com/gin-contrib/static v0.0.1
github.com/gin-contrib/static v0.0.1 // indirect
github.com/gin-gonic/gin v1.6.3
github.com/go-ping/ping v0.0.0-20201115131931-3300c582a663
github.com/go-ping/ping v0.0.0-20201115131931-3300c582a663 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/google/uuid v1.1.2
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hpcloud/tail v1.0.0
github.com/influxdata/influxdb v1.8.0
github.com/influxdata/telegraf v1.17.2
github.com/m3db/m3 v0.15.17
github.com/jackc/pgx v3.6.0+incompatible
github.com/m3db/m3 v0.15.17 // indirect
github.com/mattn/go-isatty v0.0.12
github.com/mattn/go-sqlite3 v1.14.0 // indirect
github.com/mojocn/base64Captcha v1.3.1
github.com/mojocn/base64Captcha v1.3.1 // indirect
github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect
github.com/shirou/gopsutil v3.20.11+incompatible // indirect
github.com/spaolacci/murmur3 v1.1.0
github.com/sparrc/go-ping v0.0.0-20190613174326-4e5b6552494c
github.com/spf13/viper v1.7.1
github.com/streadway/amqp v1.0.0
github.com/streadway/amqp v1.0.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/toolkits/file v0.0.0-20160325033739-a5b3c5147e07 // indirect
github.com/toolkits/file v0.0.0-20160325033739-a5b3c5147e07
github.com/toolkits/pkg v1.1.3
github.com/toolkits/sys v0.0.0-20170615103026-1f33b217ffaf // indirect
github.com/ugorji/go/codec v1.1.7
......
......@@ -151,6 +151,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/bmatcuk/doublestar v1.3.1/go.mod h1:wiQtGV+rzVYxB7WIlirSN++5HPtPlXEo9MEoZQC/PmE=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
......@@ -564,6 +565,7 @@ github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 h1:W2IgzRCb0L9VzMujq/QuTaZUKcH8096jWwP519mHN6Q=
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8/go.mod h1:/2NMgWB1DHM1ti/gqhOlg+LJeBVk6FqR5aVGYY0hlwI=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgx v3.6.0+incompatible h1:bJeo4JdVbDAW8KB2m8XkFeo8CPipREoG37BwEoKGz+Q=
github.com/jackc/pgx v3.6.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
......
......@@ -21,9 +21,9 @@ import (
"github.com/didi/nightingale/v4/src/modules/server/collector"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/all"
_ "github.com/go-sql-driver/mysql"
"github.com/gin-gonic/gin"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
"github.com/toolkits/pkg/file"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/runner"
......
......@@ -59,7 +59,7 @@ type userProfileForm struct {
Im string `json:"im"`
IsRoot int `json:"is_root"`
LeaderId int64 `json:"leader_id"`
Typ int `json:"typ"`
Type int `json:"type"`
Status int `json:"status"`
Organization string `json:"organization"`
}
......@@ -86,6 +86,7 @@ func userAddPost(c *gin.Context) {
Im: f.Im,
IsRoot: f.IsRoot,
LeaderId: f.LeaderId,
Type: f.Type,
Organization: f.Organization,
UpdatedAt: now,
UUID: models.GenUUIDForUser(f.Username),
......@@ -157,13 +158,13 @@ func userProfilePut(c *gin.Context) {
target.IsRoot = f.IsRoot
}
if f.Typ != target.Type {
arr = append(arr, fmt.Sprintf("typ: %d -> %d", target.Type, f.Typ))
target.Type = f.Typ
if f.Type != target.Type {
arr = append(arr, fmt.Sprintf("type: %d -> %d", target.Type, f.Type))
target.Type = f.Type
}
if f.Status != target.Status {
arr = append(arr, fmt.Sprintf("typ: %d -> %d", target.Status, f.Status))
arr = append(arr, fmt.Sprintf("status: %d -> %d", target.Status, f.Status))
target.Status = f.Status
if target.Status == models.USER_S_ACTIVE {
target.LoginErrNum = 0
......
......@@ -14,12 +14,12 @@ import (
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/net_response"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/nginx"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/ping"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/postgresql"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/prometheus"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/rabbitmq"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/redis"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/tengine"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/zookeeper"
// local
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/log"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/plugin"
......
......@@ -9,8 +9,8 @@ import (
"github.com/didi/nightingale/v4/src/common/i18n"
"github.com/didi/nightingale/v4/src/modules/server/collector"
"github.com/didi/nightingale/v4/src/modules/server/plugins"
"github.com/didi/nightingale/v4/src/modules/server/plugins/http_response/http_response"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/http_response"
)
func init() {
......@@ -33,22 +33,22 @@ func NewCollector() *Collector {
var (
langDict = map[string]map[string]string{
"zh": map[string]string{
"URLS": "地址",
"Method": "方法",
"ResponseTimeout": "响应超时",
"Headers": "Headers",
"Username": "用户名",
"Password": "密码",
"Body": "Body",
"ResponseBodyMaxSize": "ResponseBodyMaxSize",
"ResponseStringMatch": "ResponseStringMatch",
"ResponseStatusCode": "ResponseStatusCode",
"Interface": "Interface",
"HTTPProxy": "HTTPProxy",
"FollowRedirects": "FollowRedirects",
"List of urls to query": "要监测的URL地址",
"HTTP Request Method, default GET": "HTTP 的请求方法,默认是 GET",
"HTTP Request Headers": "HTTP 请求的的 Headers",
"URLS": "地址",
"Method": "方法",
"ResponseTimeout": "响应超时",
"Headers": "Headers",
"Username": "用户名",
"Password": "密码",
"Body": "Body",
"ResponseBodyMaxSize": "ResponseBodyMaxSize",
"ResponseStringMatch": "ResponseStringMatch",
"ResponseStatusCode": "ResponseStatusCode",
"Interface": "Interface",
"HTTPProxy": "HTTPProxy",
"FollowRedirects": "FollowRedirects",
"List of urls to query": "要监测的URL地址",
"HTTP Request Method, default GET": "HTTP 的请求方法,默认是 GET",
"HTTP Request Headers": "HTTP 请求的的 Headers",
"Optional HTTP Basic Auth Credentials, Username": "HTTP Basic 认证的用户名",
"Optional HTTP Basic Auth Credentials, Password": "HTTP Basic 认证的密码",
"Optional HTTP Request Body": "HTTP 请求的 Body",
......
package http_response
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"regexp"
"strings"
"time"
"unicode/utf8"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
const (
// defaultResponseBodyMaxSize is the default maximum response body size, in bytes.
// if the response body is over this size, we will raise a body_read_error.
defaultResponseBodyMaxSize = 32 * 1024 * 1024
)
// HTTPResponse struct
type HTTPResponse struct {
Address string // deprecated in 1.12
URLs []string `toml:"urls"`
HTTPProxy string `toml:"http_proxy"`
Body string
Method string
ResponseTimeout time.Duration
HTTPHeaderTags map[string]string `toml:"http_header_tags"`
Headers map[string]string
FollowRedirects bool
// Absolute path to file with Bearer token
BearerToken string `toml:"bearer_token"`
ResponseBodyField string `toml:"response_body_field"`
ResponseBodyMaxSize int64 `toml:"response_body_max_size"`
ResponseStringMatch string
ResponseStatusCode int
Interface string
// HTTP Basic Auth Credentials
Username string `toml:"username"`
Password string `toml:"password"`
tls.ClientConfig
Log telegraf.Logger
compiledStringMatch *regexp.Regexp
client httpClient
}
type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}
// Description returns the plugin Description
func (h *HTTPResponse) Description() string {
return "HTTP/HTTPS request given an address a method and a timeout"
}
var sampleConfig = `
## Deprecated in 1.12, use 'urls'
## Server address (default http://localhost)
# address = "http://localhost"
## List of urls to query.
# urls = ["http://localhost"]
## Set http_proxy (telegraf uses the system wide proxy settings if it's is not set)
# http_proxy = "http://localhost:8888"
## Set response_timeout (default 5 seconds)
# response_timeout = "5s"
## HTTP Request Method
# method = "GET"
## Whether to follow redirects from the server (defaults to false)
# follow_redirects = false
## Optional file with Bearer token
## file content is added as an Authorization header
# bearer_token = "/path/to/file"
## Optional HTTP Basic Auth Credentials
# username = "username"
# password = "pa$$word"
## Optional HTTP Request Body
# body = '''
# {'fake':'data'}
# '''
## Optional name of the field that will contain the body of the response.
## By default it is set to an empty String indicating that the body's content won't be added
# response_body_field = ''
## Maximum allowed HTTP response body size in bytes.
## 0 means to use the default of 32MiB.
## If the response body size exceeds this limit a "body_read_error" will be raised
# response_body_max_size = "32MiB"
## Optional substring or regex match in body of the response (case sensitive)
# response_string_match = "\"service_status\": \"up\""
# response_string_match = "ok"
# response_string_match = "\".*_status\".?:.?\"up\""
## Expected response status code.
## The status code of the response is compared to this value. If they match, the field
## "response_status_code_match" will be 1, otherwise it will be 0. If the
## expected status code is 0, the check is disabled and the field won't be added.
# response_status_code = 0
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## HTTP Request Headers (all values must be strings)
# [inputs.http_response.headers]
# Host = "github.com"
## Optional setting to map response http headers into tags
## If the http header is not present on the request, no corresponding tag will be added
## If multiple instances of the http header are present, only the first value will be used
# http_header_tags = {"HTTP_HEADER" = "TAG_NAME"}
## Interface to use when dialing an address
# interface = "eth0"
`
// SampleConfig returns the plugin SampleConfig
func (h *HTTPResponse) SampleConfig() string {
return sampleConfig
}
// ErrRedirectAttempted indicates that a redirect occurred
var ErrRedirectAttempted = errors.New("redirect")
// Set the proxy. A configured proxy overwrites the system wide proxy.
func getProxyFunc(httpProxy string) func(*http.Request) (*url.URL, error) {
if httpProxy == "" {
return http.ProxyFromEnvironment
}
proxyURL, err := url.Parse(httpProxy)
if err != nil {
return func(_ *http.Request) (*url.URL, error) {
return nil, errors.New("bad proxy: " + err.Error())
}
}
return func(r *http.Request) (*url.URL, error) {
return proxyURL, nil
}
}
// createHTTPClient creates an http client which will timeout at the specified
// timeout period and can follow redirects if specified
func (h *HTTPResponse) createHTTPClient() (*http.Client, error) {
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
dialer := &net.Dialer{}
if h.Interface != "" {
dialer.LocalAddr, err = localAddress(h.Interface)
if err != nil {
return nil, err
}
}
client := &http.Client{
Transport: &http.Transport{
Proxy: getProxyFunc(h.HTTPProxy),
DialContext: dialer.DialContext,
DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},
Timeout: time.Duration(h.ResponseTimeout),
}
if !h.FollowRedirects {
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}
}
return client, nil
}
func localAddress(interfaceName string) (net.Addr, error) {
i, err := net.InterfaceByName(interfaceName)
if err != nil {
return nil, err
}
addrs, err := i.Addrs()
if err != nil {
return nil, err
}
for _, addr := range addrs {
if naddr, ok := addr.(*net.IPNet); ok {
// leaving port set to zero to let kernel pick
return &net.TCPAddr{IP: naddr.IP}, nil
}
}
return nil, fmt.Errorf("cannot create local address for interface %q", interfaceName)
}
func setResult(resultString string, fields map[string]interface{}, tags map[string]string) {
resultCodes := map[string]int{
"success": 0,
"response_string_mismatch": 1,
"body_read_error": 2,
"connection_failed": 3,
"timeout": 4,
"dns_error": 5,
"response_status_code_mismatch": 6,
}
tags["result"] = resultString
fields["result_type"] = resultString
fields["result_code"] = resultCodes[resultString]
}
func setError(err error, fields map[string]interface{}, tags map[string]string) error {
if timeoutError, ok := err.(net.Error); ok && timeoutError.Timeout() {
setResult("timeout", fields, tags)
return timeoutError
}
urlErr, isURLErr := err.(*url.Error)
if !isURLErr {
return nil
}
opErr, isNetErr := (urlErr.Err).(*net.OpError)
if isNetErr {
switch e := (opErr.Err).(type) {
case *net.DNSError:
setResult("dns_error", fields, tags)
return e
case *net.ParseError:
// Parse error has to do with parsing of IP addresses, so we
// group it with address errors
setResult("address_error", fields, tags)
return e
}
}
return nil
}
// HTTPGather gathers all fields and returns any errors it encounters
func (h *HTTPResponse) httpGather(u string) (map[string]interface{}, map[string]string, error) {
// Prepare fields and tags
fields := make(map[string]interface{})
tags := map[string]string{"server": u, "method": h.Method}
var body io.Reader
if h.Body != "" {
body = strings.NewReader(h.Body)
}
request, err := http.NewRequest(h.Method, u, body)
if err != nil {
return nil, nil, err
}
if h.BearerToken != "" {
token, err := ioutil.ReadFile(h.BearerToken)
if err != nil {
return nil, nil, err
}
bearer := "Bearer " + strings.Trim(string(token), "\n")
request.Header.Add("Authorization", bearer)
}
for key, val := range h.Headers {
request.Header.Add(key, val)
if key == "Host" {
request.Host = val
}
}
if h.Username != "" || h.Password != "" {
request.SetBasicAuth(h.Username, h.Password)
}
// Start Timer
start := time.Now()
resp, err := h.client.Do(request)
responseTime := time.Since(start).Seconds()
// If an error in returned, it means we are dealing with a network error, as
// HTTP error codes do not generate errors in the net/http library
if err != nil {
// Log error
h.Log.Debugf("Network error while polling %s: %s", u, err.Error())
// Get error details
if setError(err, fields, tags) == nil {
// Any error not recognized by `set_error` is considered a "connection_failed"
setResult("connection_failed", fields, tags)
}
return fields, tags, nil
}
if _, ok := fields["response_time"]; !ok {
fields["response_time"] = responseTime
}
// This function closes the response body, as
// required by the net/http library
defer resp.Body.Close()
// Add the response headers
for headerName, tag := range h.HTTPHeaderTags {
headerValues, foundHeader := resp.Header[headerName]
if foundHeader && len(headerValues) > 0 {
tags[tag] = headerValues[0]
}
}
// Set log the HTTP response code
//tags["status_code"] = strconv.Itoa(resp.StatusCode)
fields["http_response_code"] = resp.StatusCode
if h.ResponseBodyMaxSize == 0 {
h.ResponseBodyMaxSize = int64(defaultResponseBodyMaxSize)
}
bodyBytes, err := ioutil.ReadAll(io.LimitReader(resp.Body, int64(h.ResponseBodyMaxSize)+1))
// Check first if the response body size exceeds the limit.
if err == nil && int64(len(bodyBytes)) > int64(h.ResponseBodyMaxSize) {
h.setBodyReadError("The body of the HTTP Response is too large", bodyBytes, fields, tags)
return fields, tags, nil
} else if err != nil {
h.setBodyReadError(fmt.Sprintf("Failed to read body of HTTP Response : %s", err.Error()), bodyBytes, fields, tags)
return fields, tags, nil
}
// Add the body of the response if expected
if len(h.ResponseBodyField) > 0 {
// Check that the content of response contains only valid utf-8 characters.
if !utf8.Valid(bodyBytes) {
h.setBodyReadError("The body of the HTTP Response is not a valid utf-8 string", bodyBytes, fields, tags)
return fields, tags, nil
}
fields[h.ResponseBodyField] = string(bodyBytes)
}
fields["content_length"] = len(bodyBytes)
var success = true
// Check the response for a regex
if h.ResponseStringMatch != "" {
if h.compiledStringMatch.Match(bodyBytes) {
fields["response_string_match"] = 1
} else {
success = false
setResult("response_string_mismatch", fields, tags)
fields["response_string_match"] = 0
}
}
// Check the response status code
if h.ResponseStatusCode > 0 {
if resp.StatusCode == h.ResponseStatusCode {
fields["response_status_code_match"] = 1
} else {
success = false
setResult("response_status_code_mismatch", fields, tags)
fields["response_status_code_match"] = 0
}
}
if success {
setResult("success", fields, tags)
}
return fields, tags, nil
}
// Set result in case of a body read error
func (h *HTTPResponse) setBodyReadError(errorMsg string, bodyBytes []byte, fields map[string]interface{}, tags map[string]string) {
h.Log.Debugf(errorMsg)
setResult("body_read_error", fields, tags)
fields["content_length"] = len(bodyBytes)
if h.ResponseStringMatch != "" {
fields["response_string_match"] = 0
}
}
// Gather gets all metric fields and tags and returns any errors it encounters
func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
// Compile the body regex if it exist
if h.compiledStringMatch == nil {
var err error
h.compiledStringMatch, err = regexp.Compile(h.ResponseStringMatch)
if err != nil {
return fmt.Errorf("failed to compile regular expression %s : %s", h.ResponseStringMatch, err)
}
}
// Set default values
if h.ResponseTimeout < time.Duration(time.Second) {
h.ResponseTimeout = time.Duration(time.Second * 5)
}
// Check send and expected string
if h.Method == "" {
h.Method = "GET"
}
if len(h.URLs) == 0 {
if h.Address == "" {
h.URLs = []string{"http://localhost"}
} else {
h.Log.Warn("'address' deprecated in telegraf 1.12, please use 'urls'")
h.URLs = []string{h.Address}
}
}
if h.client == nil {
client, err := h.createHTTPClient()
if err != nil {
return err
}
h.client = client
}
for _, u := range h.URLs {
addr, err := url.Parse(u)
if err != nil {
acc.AddError(err)
continue
}
if addr.Scheme != "http" && addr.Scheme != "https" {
acc.AddError(errors.New("only http and https are supported"))
continue
}
// Prepare data
var fields map[string]interface{}
var tags map[string]string
// Gather data
fields, tags, err = h.httpGather(u)
if err != nil {
acc.AddError(err)
continue
}
// Add metrics
acc.AddFields("http_response", fields, tags)
}
return nil
}
func init() {
inputs.Add("http_response", func() telegraf.Input {
return &HTTPResponse{}
})
}
package postgresql
import (
"fmt"
"github.com/didi/nightingale/v4/src/common/i18n"
"github.com/didi/nightingale/v4/src/modules/server/collector"
"github.com/didi/nightingale/v4/src/modules/server/plugins"
"github.com/didi/nightingale/v4/src/modules/server/plugins/postgresql/postgresql"
"github.com/influxdata/telegraf"
"net"
"net/url"
"sort"
"strings"
)
func init() {
collector.CollectorRegister(NewPostgresqlCollector())
i18n.DictRegister(langDict)
}
type PostgresqlCollector struct {
*collector.BaseCollector
}
var (
langDict = map[string]map[string]string{
"zh": map[string]string{
"Dsn": "数据库地址",
"ExcludeDatabases": "不需要监控的数据库",
"if the list is empty, then metrics are gathered from all database": "如果列表为空,则收集所有数据库表",
"PgSetting": "数据库全局配置",
"gather pg setting":"是否采集 pg setting全局配置",
"StatArchiver": "采集pg_stat_archiver视图",
"gather pg_stat_archiver":"主要记录WAL归档信息",
"ReplicationSlot": "采集pg_replication_slot视图",
"gather pg_replication_slots":"用于确保WAL迁移是否正常",
"StatReplication":"采集pg_stat_replication视图",
"gather pg_stat_replication": "pg复制(异步同步)监控",
"StatDatabaseConfilicts":"采集pg_stat_database_confilicts视图",
"specify servers via a url matching<br />postgresql://[pqgotest[:password]]@host:port[/dbname]?sslmode=[disable|verify-ca|verify-full]]<br />": "通过URL设置指定服务器<br />postgresql://[pqgotest[:password]]@host:port[/dbname]?sslmode=[disable|verify-ca|verify-full]]<br />",
},
}
)
func NewPostgresqlCollector() *PostgresqlCollector {
return &PostgresqlCollector{BaseCollector: collector.NewBaseCollector(
"postgresql",
collector.RemoteCategory,
func() collector.TelegrafPlugin { return &PostgresqlRule{} },
)}
}
type PostgresqlRule struct {
ExcludeDatabases []string `label:"ExcludeDatabases" json:"exclude_databases" description:"if the list is empty, then metrics are gathered from all database"`
GatherPgStatReplication bool `label:"StatReplication" json:"pg_stat_replication" description:"gather pg_stat_replication" default:"false"`
GatherPgReplicationSlots bool `label:"ReplicationSlot" json:"pg_replication_slots" description:"gather pg_replication_slots" default:"false"`
GatherPgStatArchiver bool `label:"StatArchiver" json:"pg_stat_archiver" description:"gather pg_stat_archiver" default:"false"`
GatherPgSetting bool `label:"PgSetting" json:"pg_setting" description:"gather pg setting" default:"false"`
Dsn string `label:"Dsn" json:"dsn, required" description:"specify servers via a url matching<br />postgresql://[pqgotest[:password]]@host:port[/dbname]?sslmode=[disable|verify-ca|verify-full]]<br />" example:"postgresql://postgres:xxx@127.0.0.1:5432/postgres?sslmode=disable"`
plugins.ClientConfig
}
func parseURL(uri string) (string, error) {
u, err := url.Parse(uri)
if err != nil {
return "", err
}
if u.Scheme != "postgres" && u.Scheme != "postgresql" {
return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme)
}
var kvs []string
escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`)
accrue := func(k, v string) {
if v != "" {
kvs = append(kvs, k+"="+escaper.Replace(v))
}
}
if u.User != nil {
v := u.User.Username()
accrue("user", v)
v, _ = u.User.Password()
accrue("password", v)
}
if host, port, err := net.SplitHostPort(u.Host); err != nil {
accrue("host", u.Host)
} else {
accrue("host", host)
accrue("port", port)
}
if u.Path != "" {
accrue("dbname", u.Path[1:])
}
q := u.Query()
for k := range q {
accrue(k, q.Get(k))
}
sort.Strings(kvs) // Makes testing easier (not a performance concern)
return strings.Join(kvs, " "), nil
}
func (p *PostgresqlRule) Validate() error {
if p.Dsn == "" {
return fmt.Errorf("postgresql.rule.address must be set")
}
_, err := parseURL(p.Dsn)
if err != nil {
return fmt.Errorf("address parse failed, detail: %v", err)
}
return nil
}
func (p *PostgresqlRule) TelegrafInput() (telegraf.Input, error) {
if err := p.Validate(); err != nil {
return nil, err
}
return &postgresql.Postgresql{
Dsn: p.Dsn,
ExcludeDatabases: p.ExcludeDatabases,
GatherPgSetting: p.GatherPgSetting,
GatherPgStatReplication: p.GatherPgStatReplication,
GatherPgReplicationSlots: p.GatherPgReplicationSlots,
GatherPgStatArchiver: p.GatherPgStatArchiver,
}, nil
}
package postgresql
import (
"fmt"
"github.com/influxdata/telegraf"
"math"
"strconv"
"strings"
)
// Query the pg_settings view containing runtime variables
func querySettings(host string, server *Server, acc telegraf.Accumulator) error {
// pg_settings docs: https://www.postgresql.org/docs/current/static/view-pg-settings.html
//
// NOTE: If you add more vartypes here, you must update the supported
// types in normaliseUnit() below
query := "SELECT name, setting, COALESCE(unit, ''), short_desc, vartype FROM pg_settings WHERE vartype IN ('bool', 'integer', 'real');"
rows, err := server.db.Query(query)
if err != nil {
return fmt.Errorf("Error running query on database %q: %s %v", server, "pg", err)
}
defer rows.Close() // nolint: errcheck
var fields =make(map[string]interface{})
for rows.Next() {
s := &pgSetting{}
err = rows.Scan(&s.name, &s.setting, &s.unit, &s.shortDesc, &s.vartype)
if err != nil {
return fmt.Errorf("Error retrieving rows on %q: %s %v", server, "pg", err)
}
// 处理结果
k, v := s.metric()
fields[k] = v
}
acc.AddGauge("postgresql_pg_settings",fields, map[string]string{"server": host})
return nil
}
// pgSetting is represents a PostgreSQL runtime variable as returned by the
// pg_settings view.
type pgSetting struct {
name, setting, unit, shortDesc, vartype string
}
func (s *pgSetting) metric() (name string, val float64) {
var (
err error
unit = s.unit // nolint: ineffassign
)
name = strings.Replace(s.name, ".", "_", -1)
switch s.vartype {
case "bool":
if s.setting == "on" {
val = 1
}
case "integer", "real":
if val, unit, err = s.normaliseUnit(); err != nil {
// Panic, since we should recognise all units
// and don't want to silently exlude metrics
panic(err)
}
if len(unit) > 0 {
name = fmt.Sprintf("%s_%s", name, unit)
}
default:
// Panic because we got a type we didn't ask for
panic(fmt.Sprintf("Unsupported vartype %q", s.vartype))
}
return name, val
//acc.AddGauge("postgresql_pg_settings", map[string]interface{}{name: val}, map[string]string{"server": host})
}
// TODO: fix linter override
// nolint: nakedret
func (s *pgSetting) normaliseUnit() (val float64, unit string, err error) {
val, err = strconv.ParseFloat(s.setting, 64)
if err != nil {
return val, unit, fmt.Errorf("Error converting setting %q value %q to float: %s", s.name, s.setting, err)
}
// Units defined in: https://www.postgresql.org/docs/current/static/config-setting.html
switch s.unit {
case "":
return
case "ms", "s", "min", "h", "d":
unit = "seconds"
case "B", "kB", "MB", "GB", "TB", "8kB", "16kB", "32kB", "16MB", "32MB", "64MB":
unit = "bytes"
default:
err = fmt.Errorf("Unknown unit for runtime variable: %q", s.unit)
return
}
// -1 is special, don't modify the value
if val == -1 {
return
}
switch s.unit {
case "ms":
val /= 1000
case "min":
val *= 60
case "h":
val *= 60 * 60
case "d":
val *= 60 * 60 * 24
case "kB":
val *= math.Pow(2, 10)
case "MB":
val *= math.Pow(2, 20)
case "GB":
val *= math.Pow(2, 30)
case "TB":
val *= math.Pow(2, 40)
case "8kB":
val *= math.Pow(2, 13)
case "16kB":
val *= math.Pow(2, 14)
case "32kB":
val *= math.Pow(2, 15)
case "16MB":
val *= math.Pow(2, 24)
case "32MB":
val *= math.Pow(2, 25)
case "64MB":
val *= math.Pow(2, 26)
}
return
}
package postgresql
import (
"database/sql"
)
type Server struct {
db *sql.DB
}
// NewServer establishes a new connection using DSN.
func NewServer(dsn string) (*Server, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
s := &Server{
db: db,
}
return s, nil
}
// Close disconnects from Postgres.
func (s *Server) Close() error {
return s.db.Close()
}
// Ping checks connection availability and possibly invalidates the connection if it fails.
func (s *Server) Ping() error {
if err := s.db.Ping(); err != nil {
if cerr := s.Close(); cerr != nil {
return cerr
}
return err
}
return nil
}
package postgresql
import (
"errors"
"fmt"
"math"
"net/url"
"strconv"
"time"
)
func contains(a []string, x string) bool {
for _, n := range a {
if x == n {
return true
}
}
return false
}
func loggableDSN(dsn string) string {
pDSN, err := url.Parse(dsn)
if err != nil {
return "could not parse DATA_SOURCE_NAME"
}
// Blank user info if not nil
if pDSN.User != nil {
pDSN.User = url.UserPassword(pDSN.User.Username(), "PASSWORD_REMOVED")
}
return pDSN.String()
}
// 获取当前所有数据库
func queryDatabases(p *Server) ([]string, error) {
rows, err := p.db.Query("SELECT datname FROM pg_database WHERE datallowconn = true AND datistemplate = false AND datname != current_database()")
if err != nil {
return []string{},fmt.Errorf("Error retrieving databases: %v", err)
}
defer rows.Close() // nolint: errcheck
var databaseName string
result := make([]string, 0)
for rows.Next() {
err = rows.Scan(&databaseName)
if err != nil {
return []string{},errors.New(fmt.Sprintln("Error retrieving rows:", err))
}
result = append(result, databaseName)
}
return result, nil
}
// Convert database.sql types to float64s for Prometheus consumption. Null types are mapped to NaN. string and []byte
// types are mapped as NaN and !ok
func dbToFloat64(t interface{}) (float64, bool) {
switch v := t.(type) {
case int64:
return float64(v), true
case float64:
return v, true
case time.Time:
return float64(v.Unix()), true
case []byte:
// Try and convert to string and then parse to a float64
strV := string(v)
result, err := strconv.ParseFloat(strV, 64)
if err != nil {
return math.NaN(), false
}
return result, true
case string:
result, err := strconv.ParseFloat(v, 64)
if err != nil {
return math.NaN(), false
}
return result, true
case bool:
if v {
return 1, true
}
return 0, true
case nil:
return math.NaN(), true
default:
return math.NaN(), false
}
}
// Convert database.sql to string for Prometheus labels. Null types are mapped to empty strings.
func dbToString(t interface{}) (string, bool) {
switch v := t.(type) {
case int64:
return fmt.Sprintf("%v", v), true
case float64:
return fmt.Sprintf("%v", v), true
case time.Time:
return fmt.Sprintf("%v", v.Unix()), true
case nil:
return "", true
case []byte:
// Try and convert to string
return string(v), true
case string:
return v, true
case bool:
if v {
return "true", true
}
return "false", true
default:
return "", false
}
}
\ No newline at end of file
package postgresql
import (
"github.com/didi/nightingale/v4/src/modules/server/plugins"
_ "github.com/lib/pq"
"testing"
"time"
)
func TestCollect(t *testing.T) {
input := plugins.PluginTest(
t, &PostgresqlRule{
Dsn: "postgres://postgres:xxxx@127.0.0.1:5432/postgres?sslmode=disable",
ExcludeDatabases: []string{},
GatherPgReplicationSlots: true,
ClientConfig: plugins.ClientConfig{},
})
time.Sleep(2 * time.Second)
plugins.PluginInputTest(t, input)
}
......@@ -26,15 +26,15 @@ import (
"github.com/didi/nightingale/v4/src/modules/server/http/session"
"github.com/didi/nightingale/v4/src/modules/server/judge"
"github.com/didi/nightingale/v4/src/modules/server/judge/query"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/all"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/api"
"github.com/didi/nightingale/v4/src/modules/server/rabbitmq"
"github.com/didi/nightingale/v4/src/modules/server/redisc"
"github.com/didi/nightingale/v4/src/modules/server/rpc"
"github.com/didi/nightingale/v4/src/modules/server/ssoc"
"github.com/didi/nightingale/v4/src/modules/server/timer"
"github.com/didi/nightingale/v4/src/modules/server/wechat"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/all"
_ "github.com/didi/nightingale/v4/src/modules/server/plugins/api"
_ "github.com/lib/pq"
_ "github.com/go-sql-driver/mysql"
"github.com/toolkits/file"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册