未验证 提交 c0049326 编写于 作者: Y yubo 提交者: GitHub

Add mongdb as a plugin (#489)

* bugfix: whiteList list return empty

* support multi-dict for i18n && add mongodb for monapi as a plugin

* use 10day as max lifetime for extra mode auth

* bugfix: ignore i18n with default value

* Spelling mistakes
上级 543d345a
......@@ -59,36 +59,6 @@
"cannot delete root user": "root用户不能删除",
"user not found": "用户未找到",
"Databases": "数据库",
"if the list is empty, then metrics are gathered from all database tables": "如果列表为空,则收集所有数据库表",
"Process List": "进程列表",
"gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST": "从 INFORMATION_SCHEMA.PROCESSLIST 收集线程状态信息",
"User Statistics": "User Statistics",
"gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS": "从 INFORMATION_SCHEMA.USER_STATISTICS 收集用户状态信息",
"Auto Increment": "Auto Increment",
"gather auto_increment columns and max values from information schema": "采集 auto_increment 和 max values 信息",
"Innodb Metrics": "Innodb Metrics",
"gather metrics from INFORMATION_SCHEMA.INNODB_METRICS": "采集 INFORMATION_SCHEMA.INNODB_METRICS 信息",
"Slave Status": "Slave Status",
"gather metrics from SHOW SLAVE STATUS command output": "采集 metrics from SHOW SLAVE STATUS command output",
"Binary Logs": "Binary Logs",
"gather metrics from SHOW BINARY LOGS command output": "采集 metrics from SHOW BINARY LOGS command output",
"Table IO Waits": "Table IO Waits",
"gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMARY_BY_TABLE": "采集 from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMARY_BY_TABLE",
"Table Lock Waits": "Table Lock Waits",
"gather metrics from PERFORMANCE_SCHEMA.TABLE_LOCK_WAITS": "采集 from PERFORMANCE_SCHEMA.TABLE_LOCK_WAITS",
"Index IO Waits": "Index IO Waits",
"gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMARY_BY_INDEX_USAGE": "采集 from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMARY_BY_INDEX_USAGE",
"Event Waits": "Event Waits",
"gather metrics from PERFORMANCE_SCHEMA.EVENT_WAITS": "采集 from PERFORMANCE_SCHEMA.EVENT_WAITS",
"Tables": "Tables",
"gather metrics from INFORMATION_SCHEMA.TABLES for databases provided above list": "采集 from INFORMATION_SCHEMA.TABLES for databases provided above list",
"File Events Stats": "File Events Stats",
"gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME": "采集 from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME",
"Perf Events Statements": "Perf Events Statements",
"gather metrics from PERFORMANCE_SCHEMA.EVENTS_STATEMENTS_SUMMARY_BY_DIGEST": "采集 from PERFORMANCE_SCHEMA.EVENTS_STATEMENTS_SUMMARY_BY_DIGEST",
"Interval Slow": "Interval Slow",
"Repositories": "Repositories",
"List of repositories to monitor": "List of repositories to monitor",
"Access token": "Access token",
......
......@@ -41,6 +41,7 @@ require (
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/ldap.v3 v3.1.0
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v2 v2.3.0
xorm.io/core v0.7.3
......
......@@ -94,7 +94,7 @@ func ConfigsGets(ckeys []string) (map[string]string, error) {
type AuthConfig struct {
MaxNumErr int `json:"maxNumErr"`
MaxSessionNumber int64 `json:"maxSessionNumber"`
MaxConnIdelTime int64 `json:"maxConnIdelTime" description:"minute"`
MaxConnIdleTime int64 `json:"maxConnIdleTime" description:"minute"`
LockTime int64 `json:"lockTime" description:"minute"`
PwdHistorySize int `json:"pwdHistorySize"`
PwdMinLenght int `json:"pwdMinLenght"`
......@@ -163,7 +163,7 @@ func (p *AuthConfig) Validate() error {
}
var DefaultAuthConfig = AuthConfig{
MaxConnIdelTime: 30,
MaxConnIdleTime: 30,
PwdMustInclude: []string{},
}
......
......@@ -92,7 +92,7 @@ func (p *WhiteList) Validate() error {
func WhiteListTotal(query string) (int64, error) {
if query != "" {
q := "%" + query + "%"
return DB["rdb"].Where("start_ip like ? or end_ip like ?", q, q).Count(new(NodeTrash))
return DB["rdb"].Where("start_ip like ? or end_ip like ?", q, q).Count(new(WhiteList))
}
return DB["rdb"].Count(new(WhiteList))
......
......@@ -17,6 +17,7 @@ type Field struct {
Name string `json:"name,omitempty"`
Label string `json:"label,omitempty"`
Default string `json:"default,omitempty"`
Example string `json:"example,omitempty"`
Description string `json:"description,omitempty"`
Required bool `json:"required,omitempty"`
......@@ -136,6 +137,7 @@ func getTagOpt(sf reflect.StructField) (opt Field) {
opt.Name = name
opt.Label = _s(sf.Tag.Get("label"))
opt.Default = sf.Tag.Get("default")
opt.Example = sf.Tag.Get("example")
opt.Description = _s(sf.Tag.Get("description"))
......
......@@ -3,10 +3,11 @@ package all
import (
// remote
_ "github.com/didi/nightingale/src/modules/monapi/plugins/api"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/github"
// telegraf style
_ "github.com/didi/nightingale/src/modules/monapi/plugins/mongodb"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/mysql"
// _ "github.com/didi/nightingale/src/modules/monapi/plugins/prometheus"
_ "github.com/didi/nightingale/src/modules/monapi/plugins/redis"
// _ "github.com/didi/nightingale/src/modules/monapi/plugins/github"
// local
_ "github.com/didi/nightingale/src/modules/monapi/plugins/log"
......
package redis
package mongodb
import (
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/modules/monapi/plugins/mongodb/mongodb"
"github.com/didi/nightingale/src/toolkits/i18n"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/redis"
)
func init() {
collector.CollectorRegister(NewRedisCollector()) // for monapi
collector.CollectorRegister(NewMongodbCollector()) // for monapi
i18n.DictRegister(langDict)
}
type RedisCollector struct {
var (
langDict = map[string]map[string]string{
"zh": map[string]string{
"Servers": "服务",
"An array of URLs of the form": "服务地址",
"Cluster status": "采集集群",
"When true, collect cluster status.": "采集集群统计信息",
"Per DB stats": "采集单个数据库(db)统计信息",
"When true, collect per database stats": "采集一个数据库的统计信息",
"Col stats": "采集集合(Collection)统计信息",
"When true, collect per collection stats": "采集一个集合的统计信息",
"Col stats dbs": "采集集合的列表",
"List of db where collections stats are collected, If empty, all db are concerned": "如果设置为空,则采集数据库里所有集合的统计信息",
},
}
)
type MongodbCollector struct {
*collector.BaseCollector
}
func NewRedisCollector() *RedisCollector {
return &RedisCollector{BaseCollector: collector.NewBaseCollector(
"redis",
func NewMongodbCollector() *MongodbCollector {
return &MongodbCollector{BaseCollector: collector.NewBaseCollector(
"mongodb",
collector.RemoteCategory,
func() interface{} { return &RedisRule{} },
func() interface{} { return &MongodbRule{} },
)}
}
type RedisRule struct {
type MongodbRule struct {
Servers []string `label:"Servers" json:"servers,required" description:"An array of URLs of the form" example:"mongodb://user:auth_key@10.10.3.30:27017"`
GatherClusterStatus bool `label:"Cluster status" json:"gather_cluster_status" description:"When true, collect cluster status." default:"true"`
GatherPerdbStats bool `label:"Per DB stats" json:"gather_perdb_stats" description:"When true, collect per database stats" default:"false"`
GatherColStats bool `label:"Col stats" json:"gather_col_stats" description:"When true, collect per collection stats" default:"false"`
ColStatsDbs []string `label:"Col stats dbs" json:"col_stats_dbs" description:"List of db where collections stats are collected, If empty, all db are concerned" example:"local" default:"[\"local\"]"`
// tlsint.ClientConfig
// Ssl Ssl
}
func (p *RedisRule) Validate() error {
func (p *MongodbRule) Validate() error {
return nil
}
func (p *RedisRule) TelegrafInput() (telegraf.Input, error) {
func (p *MongodbRule) TelegrafInput() (telegraf.Input, error) {
if err := p.Validate(); err != nil {
return nil, err
}
return &redis.Redis{}, nil
return &mongodb.MongoDB{
Servers: p.Servers,
Mongos: make(map[string]*mongodb.Server),
GatherClusterStatus: p.GatherClusterStatus,
GatherPerdbStats: p.GatherPerdbStats,
GatherColStats: p.GatherColStats,
ColStatsDbs: p.ColStatsDbs,
}, nil
}
# MongoDB Input Plugin
### Configuration:
```toml
[[inputs.mongodb]]
## An array of URLs of the form:
## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832,
servers = ["mongodb://127.0.0.1:27017"]
## When true, collect cluster status.
## Note that the query that counts jumbo chunks triggers a COLLSCAN, which
## may have an impact on performance.
# gather_cluster_status = true
## When true, collect per database stats
# gather_perdb_stats = false
## When true, collect per collection stats
# gather_col_stats = false
## List of db where collections stats are collected
## If empty, all db are concerned
# col_stats_dbs = ["local"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
#### Permissions:
If your MongoDB instance has access control enabled you will need to connect
as a user with sufficient rights.
With MongoDB 3.4 and higher, the `clusterMonitor` role can be used. In
version 3.2 you may also need these additional permissions:
```
> db.grantRolesToUser("user", [{role: "read", actions: "find", db: "local"}])
```
If the user is missing required privileges you may see an error in the
Telegraf logs similar to:
```
Error in input [mongodb]: not authorized on admin to execute command { serverStatus: 1, recordStats: 0 }
```
Some permission related errors are logged at debug level, you can check these
messages by setting `debug = true` in the agent section of the configuration or
by running Telegraf with the `--debug` argument.
### Metrics:
- mongodb
- tags:
- hostname
- node_type
- rs_name
- fields:
- active_reads (integer)
- active_writes (integer)
- aggregate_command_failed (integer)
- aggregate_command_total (integer)
- assert_msg (integer)
- assert_regular (integer)
- assert_rollovers (integer)
- assert_user (integer)
- assert_warning (integer)
- available_reads (integer)
- available_writes (integer)
- commands (integer)
- connections_available (integer)
- connections_current (integer)
- connections_total_created (integer)
- count_command_failed (integer)
- count_command_total (integer)
- cursor_no_timeout_count (integer)
- cursor_pinned_count (integer)
- cursor_timed_out_count (integer)
- cursor_total_count (integer)
- delete_command_failed (integer)
- delete_command_total (integer)
- deletes (integer)
- distinct_command_failed (integer)
- distinct_command_total (integer)
- document_deleted (integer)
- document_inserted (integer)
- document_returned (integer)
- document_updated (integer)
- find_and_modify_command_failed (integer)
- find_and_modify_command_total (integer)
- find_command_failed (integer)
- find_command_total (integer)
- flushes (integer)
- flushes_total_time_ns (integer)
- get_more_command_failed (integer)
- get_more_command_total (integer)
- getmores (integer)
- insert_command_failed (integer)
- insert_command_total (integer)
- inserts (integer)
- jumbo_chunks (integer)
- latency_commands_count (integer)
- latency_commands (integer)
- latency_reads_count (integer)
- latency_reads (integer)
- latency_writes_count (integer)
- latency_writes (integer)
- member_status (string)
- net_in_bytes_count (integer)
- net_out_bytes_count (integer)
- open_connections (integer)
- operation_scan_and_order (integer)
- operation_write_conflicts (integer)
- page_faults (integer)
- percent_cache_dirty (float)
- percent_cache_used (float)
- queries (integer)
- queued_reads (integer)
- queued_writes (integer)
- repl_apply_batches_num (integer)
- repl_apply_batches_total_millis (integer)
- repl_apply_ops (integer)
- repl_buffer_count (integer)
- repl_buffer_size_bytes (integer)
- repl_commands (integer)
- repl_deletes (integer)
- repl_executor_pool_in_progress_count (integer)
- repl_executor_queues_network_in_progress (integer)
- repl_executor_queues_sleepers (integer)
- repl_executor_unsignaled_events (integer)
- repl_getmores (integer)
- repl_inserts (integer)
- repl_lag (integer)
- repl_network_bytes (integer)
- repl_network_getmores_num (integer)
- repl_network_getmores_total_millis (integer)
- repl_network_ops (integer)
- repl_queries (integer)
- repl_updates (integer)
- repl_oplog_window_sec (integer)
- repl_state (integer)
- resident_megabytes (integer)
- state (string)
- storage_freelist_search_bucket_exhausted (integer)
- storage_freelist_search_requests (integer)
- storage_freelist_search_scanned (integer)
- tcmalloc_central_cache_free_bytes (integer)
- tcmalloc_current_allocated_bytes (integer)
- tcmalloc_current_total_thread_cache_bytes (integer)
- tcmalloc_heap_size (integer)
- tcmalloc_max_total_thread_cache_bytes (integer)
- tcmalloc_pageheap_commit_count (integer)
- tcmalloc_pageheap_committed_bytes (integer)
- tcmalloc_pageheap_decommit_count (integer)
- tcmalloc_pageheap_free_bytes (integer)
- tcmalloc_pageheap_reserve_count (integer)
- tcmalloc_pageheap_scavenge_count (integer)
- tcmalloc_pageheap_total_commit_bytes (integer)
- tcmalloc_pageheap_total_decommit_bytes (integer)
- tcmalloc_pageheap_total_reserve_bytes (integer)
- tcmalloc_pageheap_unmapped_bytes (integer)
- tcmalloc_spinlock_total_delay_ns (integer)
- tcmalloc_thread_cache_free_bytes (integer)
- tcmalloc_total_free_bytes (integer)
- tcmalloc_transfer_cache_free_bytes (integer)
- total_available (integer)
- total_created (integer)
- total_docs_scanned (integer)
- total_in_use (integer)
- total_keys_scanned (integer)
- total_refreshing (integer)
- total_tickets_reads (integer)
- total_tickets_writes (integer)
- ttl_deletes (integer)
- ttl_passes (integer)
- update_command_failed (integer)
- update_command_total (integer)
- updates (integer)
- uptime_ns (integer)
- version (string)
- vsize_megabytes (integer)
- wtcache_app_threads_page_read_count (integer)
- wtcache_app_threads_page_read_time (integer)
- wtcache_app_threads_page_write_count (integer)
- wtcache_bytes_read_into (integer)
- wtcache_bytes_written_from (integer)
- wtcache_pages_read_into (integer)
- wtcache_pages_requested_from (integer)
- wtcache_current_bytes (integer)
- wtcache_max_bytes_configured (integer)
- wtcache_internal_pages_evicted (integer)
- wtcache_modified_pages_evicted (integer)
- wtcache_unmodified_pages_evicted (integer)
- wtcache_pages_evicted_by_app_thread (integer)
- wtcache_pages_queued_for_eviction (integer)
- wtcache_server_evicting_pages (integer)
- wtcache_tracked_dirty_bytes (integer)
- wtcache_worker_thread_evictingpages (integer)
- commands_per_sec (integer, deprecated in 1.10; use `commands`))
- cursor_no_timeout (integer, opened/sec, deprecated in 1.10; use `cursor_no_timeout_count`))
- cursor_pinned (integer, opened/sec, deprecated in 1.10; use `cursor_pinned_count`))
- cursor_timed_out (integer, opened/sec, deprecated in 1.10; use `cursor_timed_out_count`))
- cursor_total (integer, opened/sec, deprecated in 1.10; use `cursor_total_count`))
- deletes_per_sec (integer, deprecated in 1.10; use `deletes`))
- flushes_per_sec (integer, deprecated in 1.10; use `flushes`))
- getmores_per_sec (integer, deprecated in 1.10; use `getmores`))
- inserts_per_sec (integer, deprecated in 1.10; use `inserts`))
- net_in_bytes (integer, bytes/sec, deprecated in 1.10; use `net_out_bytes_count`))
- net_out_bytes (integer, bytes/sec, deprecated in 1.10; use `net_out_bytes_count`))
- queries_per_sec (integer, deprecated in 1.10; use `queries`))
- repl_commands_per_sec (integer, deprecated in 1.10; use `repl_commands`))
- repl_deletes_per_sec (integer, deprecated in 1.10; use `repl_deletes`)
- repl_getmores_per_sec (integer, deprecated in 1.10; use `repl_getmores`)
- repl_inserts_per_sec (integer, deprecated in 1.10; use `repl_inserts`))
- repl_queries_per_sec (integer, deprecated in 1.10; use `repl_queries`))
- repl_updates_per_sec (integer, deprecated in 1.10; use `repl_updates`))
- ttl_deletes_per_sec (integer, deprecated in 1.10; use `ttl_deletes`))
- ttl_passes_per_sec (integer, deprecated in 1.10; use `ttl_passes`))
- updates_per_sec (integer, deprecated in 1.10; use `updates`))
+ mongodb_db_stats
- tags:
- db_name
- hostname
- fields:
- avg_obj_size (float)
- collections (integer)
- data_size (integer)
- index_size (integer)
- indexes (integer)
- num_extents (integer)
- objects (integer)
- ok (integer)
- storage_size (integer)
- type (string)
- mongodb_col_stats
- tags:
- hostname
- collection
- db_name
- fields:
- size (integer)
- avg_obj_size (integer)
- storage_size (integer)
- total_index_size (integer)
- ok (integer)
- count (integer)
- type (string)
- mongodb_shard_stats
- tags:
- hostname
- fields:
- in_use (integer)
- available (integer)
- created (integer)
- refreshing (integer)
### Example Output:
```
mongodb,hostname=127.0.0.1:27017 active_reads=3i,active_writes=0i,aggregate_command_failed=0i,aggregate_command_total=87210i,assert_msg=0i,assert_regular=0i,assert_rollovers=0i,assert_user=0i,assert_warning=0i,available_reads=125i,available_writes=128i,commands=218126i,commands_per_sec=1876i,connections_available=838853i,connections_current=7i,connections_total_created=8i,count_command_failed=0i,count_command_total=7i,cursor_no_timeout=0i,cursor_no_timeout_count=0i,cursor_pinned=0i,cursor_pinned_count=0i,cursor_timed_out=0i,cursor_timed_out_count=0i,cursor_total=0i,cursor_total_count=0i,delete_command_failed=0i,delete_command_total=0i,deletes=0i,deletes_per_sec=0i,distinct_command_failed=0i,distinct_command_total=87190i,document_deleted=0i,document_inserted=0i,document_returned=7i,document_updated=43595i,find_and_modify_command_failed=0i,find_and_modify_command_total=43595i,find_command_failed=0i,find_command_total=348819i,flushes=1i,flushes_per_sec=0i,flushes_total_time_ns=5000000i,get_more_command_failed=0i,get_more_command_total=0i,getmores=7i,getmores_per_sec=1i,insert_command_failed=0i,insert_command_total=0i,inserts=0i,inserts_per_sec=0i,jumbo_chunks=0i,latency_commands=44179i,latency_commands_count=122i,latency_reads=36662189i,latency_reads_count=523229i,latency_writes=6768713i,latency_writes_count=87190i,net_in_bytes=837378i,net_in_bytes_count=97692502i,net_out_bytes=690836i,net_out_bytes_count=75377383i,open_connections=7i,operation_scan_and_order=87193i,operation_write_conflicts=7i,page_faults=0i,percent_cache_dirty=0.9,percent_cache_used=1,queries=348816i,queries_per_sec=2988i,queued_reads=0i,queued_writes=0i,resident_megabytes=77i,storage_freelist_search_bucket_exhausted=0i,storage_freelist_search_requests=0i,storage_freelist_search_scanned=0i,tcmalloc_central_cache_free_bytes=280136i,tcmalloc_current_allocated_bytes=77677288i,tcmalloc_current_total_thread_cache_bytes=1222608i,tcmalloc_heap_size=142659584i,tcmalloc_max_total_thread_cache_bytes=260046848i,tcmalloc_pageheap_commit_count=1898i,tcmalloc_pageheap_committed_bytes=130084864i,tcmalloc_pageheap_decommit_count=889i,tcmalloc_pageheap_free_bytes=50610176i,tcmalloc_pageheap_reserve_count=50i,tcmalloc_pageheap_scavenge_count=884i,tcmalloc_pageheap_total_commit_bytes=13021937664i,tcmalloc_pageheap_total_decommit_bytes=12891852800i,tcmalloc_pageheap_total_reserve_bytes=142659584i,tcmalloc_pageheap_unmapped_bytes=12574720i,tcmalloc_spinlock_total_delay_ns=9767500i,tcmalloc_thread_cache_free_bytes=1222608i,tcmalloc_total_free_bytes=1797400i,tcmalloc_transfer_cache_free_bytes=294656i,total_available=0i,total_created=0i,total_docs_scanned=43595i,total_in_use=0i,total_keys_scanned=130805i,total_refreshing=0i,total_tickets_reads=128i,total_tickets_writes=128i,ttl_deletes=0i,ttl_deletes_per_sec=0i,ttl_passes=0i,ttl_passes_per_sec=0i,update_command_failed=0i,update_command_total=43595i,updates=43595i,updates_per_sec=372i,uptime_ns=60023000000i,version="3.6.17",vsize_megabytes=1048i,wtcache_app_threads_page_read_count=108i,wtcache_app_threads_page_read_time=25995i,wtcache_app_threads_page_write_count=0i,wtcache_bytes_read_into=2487250i,wtcache_bytes_written_from=74i,wtcache_current_bytes=5014530i,wtcache_internal_pages_evicted=0i,wtcache_max_bytes_configured=505413632i,wtcache_modified_pages_evicted=0i,wtcache_pages_evicted_by_app_thread=0i,wtcache_pages_queued_for_eviction=0i,wtcache_pages_read_into=139i,wtcache_pages_requested_from=699135i,wtcache_server_evicting_pages=0i,wtcache_tracked_dirty_bytes=4797426i,wtcache_unmodified_pages_evicted=0i,wtcache_worker_thread_evictingpages=0i 1586379818000000000
mongodb,hostname=127.0.0.1:27017,node_type=SEC,rs_name=rs0 active_reads=1i,active_writes=0i,aggregate_command_failed=0i,aggregate_command_total=1i,assert_msg=0i,assert_regular=0i,assert_rollovers=0i,assert_user=79i,assert_warning=0i,available_reads=127i,available_writes=128i,commands=1121855i,commands_per_sec=10i,connections_available=51183i,connections_current=17i,connections_total_created=557i,count_command_failed=0i,count_command_total=46307i,cursor_no_timeout=0i,cursor_no_timeout_count=0i,cursor_pinned=0i,cursor_pinned_count=0i,cursor_timed_out=0i,cursor_timed_out_count=28i,cursor_total=0i,cursor_total_count=0i,delete_command_failed=0i,delete_command_total=0i,deletes=0i,deletes_per_sec=0i,distinct_command_failed=0i,distinct_command_total=0i,document_deleted=0i,document_inserted=0i,document_returned=2248129i,document_updated=0i,find_and_modify_command_failed=0i,find_and_modify_command_total=0i,find_command_failed=2i,find_command_total=8764i,flushes=7850i,flushes_per_sec=0i,flushes_total_time_ns=4535446000000i,get_more_command_failed=0i,get_more_command_total=1993i,getmores=2018i,getmores_per_sec=0i,insert_command_failed=0i,insert_command_total=0i,inserts=0i,inserts_per_sec=0i,jumbo_chunks=0i,latency_commands=112011949i,latency_commands_count=1072472i,latency_reads=1877142443i,latency_reads_count=57086i,latency_writes=0i,latency_writes_count=0i,member_status="SEC",net_in_bytes=1212i,net_in_bytes_count=263928689i,net_out_bytes=41051i,net_out_bytes_count=2475389483i,open_connections=17i,operation_scan_and_order=34i,operation_write_conflicts=0i,page_faults=317i,percent_cache_dirty=1.6,percent_cache_used=73,queries=8764i,queries_per_sec=0i,queued_reads=0i,queued_writes=0i,repl_apply_batches_num=17839419i,repl_apply_batches_total_millis=399929i,repl_apply_ops=23355263i,repl_buffer_count=0i,repl_buffer_size_bytes=0i,repl_commands=11i,repl_commands_per_sec=0i,repl_deletes=440608i,repl_deletes_per_sec=0i,repl_executor_pool_in_progress_count=0i,repl_executor_queues_network_in_progress=0i,repl_executor_queues_sleepers=4i,repl_executor_unsignaled_events=0i,repl_getmores=0i,repl_getmores_per_sec=0i,repl_inserts=1875729i,repl_inserts_per_sec=0i,repl_lag=0i,repl_network_bytes=39122199371i,repl_network_getmores_num=34908797i,repl_network_getmores_total_millis=434805356i,repl_network_ops=23199086i,repl_oplog_window_sec=619292i,repl_queries=0i,repl_queries_per_sec=0i,repl_updates=21034729i,repl_updates_per_sec=38i,repl_state=2,resident_megabytes=6721i,state="SECONDARY",storage_freelist_search_bucket_exhausted=0i,storage_freelist_search_requests=0i,storage_freelist_search_scanned=0i,tcmalloc_central_cache_free_bytes=358512400i,tcmalloc_current_allocated_bytes=5427379424i,tcmalloc_current_total_thread_cache_bytes=70349552i,tcmalloc_heap_size=10199310336i,tcmalloc_max_total_thread_cache_bytes=1073741824i,tcmalloc_pageheap_commit_count=790819i,tcmalloc_pageheap_committed_bytes=7064821760i,tcmalloc_pageheap_decommit_count=533347i,tcmalloc_pageheap_free_bytes=1207816192i,tcmalloc_pageheap_reserve_count=7706i,tcmalloc_pageheap_scavenge_count=426235i,tcmalloc_pageheap_total_commit_bytes=116127649792i,tcmalloc_pageheap_total_decommit_bytes=109062828032i,tcmalloc_pageheap_total_reserve_bytes=10199310336i,tcmalloc_pageheap_unmapped_bytes=3134488576i,tcmalloc_spinlock_total_delay_ns=2518474348i,tcmalloc_thread_cache_free_bytes=70349552i,tcmalloc_total_free_bytes=429626144i,tcmalloc_transfer_cache_free_bytes=764192i,total_available=0i,total_created=0i,total_docs_scanned=735004782i,total_in_use=0i,total_keys_scanned=6188216i,total_refreshing=0i,total_tickets_reads=128i,total_tickets_writes=128i,ttl_deletes=0i,ttl_deletes_per_sec=0i,ttl_passes=7892i,ttl_passes_per_sec=0i,update_command_failed=0i,update_command_total=0i,updates=0i,updates_per_sec=0i,uptime_ns=473590288000000i,version="3.6.17",vsize_megabytes=11136i,wtcache_app_threads_page_read_count=11467625i,wtcache_app_threads_page_read_time=1700336840i,wtcache_app_threads_page_write_count=13268184i,wtcache_bytes_read_into=348022587843i,wtcache_bytes_written_from=322571702254i,wtcache_current_bytes=5509459274i,wtcache_internal_pages_evicted=109108i,wtcache_max_bytes_configured=7547650048i,wtcache_modified_pages_evicted=911196i,wtcache_pages_evicted_by_app_thread=17366i,wtcache_pages_queued_for_eviction=16572754i,wtcache_pages_read_into=11689764i,wtcache_pages_requested_from=499825861i,wtcache_server_evicting_pages=0i,wtcache_tracked_dirty_bytes=117487510i,wtcache_unmodified_pages_evicted=11058458i,wtcache_worker_thread_evictingpages=11907226i 1586379707000000000
mongodb_db_stats,db_name=admin,hostname=127.0.0.1:27017 avg_obj_size=241,collections=2i,data_size=723i,index_size=49152i,indexes=3i,num_extents=0i,objects=3i,ok=1i,storage_size=53248i,type="db_stat" 1547159491000000000
mongodb_db_stats,db_name=local,hostname=127.0.0.1:27017 avg_obj_size=813.9705882352941,collections=6i,data_size=55350i,index_size=102400i,indexes=5i,num_extents=0i,objects=68i,ok=1i,storage_size=204800i,type="db_stat" 1547159491000000000
mongodb_col_stats,collection=foo,db_name=local,hostname=127.0.0.1:27017 size=375005928i,avg_obj_size=5494,type="col_stat",storage_size=249307136i,total_index_size=2138112i,ok=1i,count=68251i 1547159491000000000
mongodb_shard_stats,hostname=127.0.0.1:27017,in_use=3i,available=3i,created=4i,refreshing=0i 1522799074000000000
```
version: '3'
services:
mongodb:
image: mongo
telegraf:
image: glinton/scratch
volumes:
- ./telegraf.conf:/telegraf.conf
- ../../../../telegraf:/telegraf
depends_on:
- mongodb
entrypoint:
- /telegraf
- --config
- /telegraf.conf
[agent]
interval="1s"
flush_interval="3s"
[[inputs.mongodb]]
servers = ["mongodb://mongodb:27017"]
[[outputs.file]]
files = ["stdout"]
package mongodb
import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"gopkg.in/mgo.v2"
)
type MongoDB struct {
Servers []string
Ssl Ssl
Mongos map[string]*Server
GatherClusterStatus bool
GatherPerdbStats bool
GatherColStats bool
ColStatsDbs []string
tlsint.ClientConfig
Log telegraf.Logger
}
type Ssl struct {
Enabled bool
CaCerts []string `toml:"cacerts"`
}
var sampleConfig = `
## An array of URLs of the form:
## "mongodb://" [user ":" pass "@"] host [ ":" port]
## For example:
## mongodb://user:auth_key@10.10.3.30:27017,
## mongodb://10.10.3.33:18832,
servers = ["mongodb://127.0.0.1:27017"]
## When true, collect cluster status
## Note that the query that counts jumbo chunks triggers a COLLSCAN, which
## may have an impact on performance.
# gather_cluster_status = true
## When true, collect per database stats
# gather_perdb_stats = false
## When true, collect per collection stats
# gather_col_stats = false
## List of db where collections stats are collected
## If empty, all db are concerned
# col_stats_dbs = ["local"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (m *MongoDB) SampleConfig() string {
return sampleConfig
}
func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}
var localhost = &url.URL{Host: "mongodb://127.0.0.1:27017"}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
if len(m.Servers) == 0 {
m.gatherServer(m.getMongoServer(localhost), acc)
return nil
}
var wg sync.WaitGroup
for i, serv := range m.Servers {
if !strings.HasPrefix(serv, "mongodb://") {
// Preserve backwards compatibility for hostnames without a
// scheme, broken in go 1.8. Remove in Telegraf 2.0
serv = "mongodb://" + serv
m.Log.Warnf("Using %q as connection URL; please update your configuration to use an URL", serv)
m.Servers[i] = serv
}
u, err := url.Parse(serv)
if err != nil {
m.Log.Errorf("Unable to parse address %q: %s", serv, err.Error())
continue
}
if u.Host == "" {
m.Log.Errorf("Unable to parse address %q", serv)
continue
}
wg.Add(1)
go func(srv *Server) {
defer wg.Done()
err := m.gatherServer(srv, acc)
if err != nil {
m.Log.Errorf("Error in plugin: %v", err)
}
}(m.getMongoServer(u))
}
wg.Wait()
return nil
}
func (m *MongoDB) getMongoServer(url *url.URL) *Server {
if _, ok := m.Mongos[url.Host]; !ok {
m.Mongos[url.Host] = &Server{
Log: m.Log,
Url: url,
}
}
return m.Mongos[url.Host]
}
func (m *MongoDB) gatherServer(server *Server, acc telegraf.Accumulator) error {
if server.Session == nil {
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
return fmt.Errorf("unable to parse URL %q: %s", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = 5 * time.Second
var tlsConfig *tls.Config
if m.Ssl.Enabled {
// Deprecated TLS config
tlsConfig = &tls.Config{}
if len(m.Ssl.CaCerts) > 0 {
roots := x509.NewCertPool()
for _, caCert := range m.Ssl.CaCerts {
ok := roots.AppendCertsFromPEM([]byte(caCert))
if !ok {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
} else {
tlsConfig.InsecureSkipVerify = true
}
} else {
tlsConfig, err = m.ClientConfig.TLSConfig()
if err != nil {
return err
}
}
// If configured to use TLS, add a dial function
if tlsConfig != nil {
dialInfo.DialServer = func(addr *mgo.ServerAddr) (net.Conn, error) {
conn, err := tls.Dial("tcp", addr.String(), tlsConfig)
if err != nil {
fmt.Printf("error in Dial, %s\n", err.Error())
}
return conn, err
}
}
sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %s", err.Error())
}
server.Session = sess
}
return server.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.ColStatsDbs)
}
func init() {
inputs.Add("mongodb", func() telegraf.Input {
return &MongoDB{
Mongos: make(map[string]*Server),
GatherClusterStatus: true,
GatherPerdbStats: false,
GatherColStats: false,
ColStatsDbs: []string{"local"},
}
})
}
package mongodb
import (
"fmt"
"reflect"
"strconv"
"github.com/influxdata/telegraf"
)
type MongodbData struct {
StatLine *StatLine
Fields map[string]interface{}
Tags map[string]string
DbData []DbData
ColData []ColData
ShardHostData []DbData
}
type DbData struct {
Name string
Fields map[string]interface{}
}
type ColData struct {
Name string
DbName string
Fields map[string]interface{}
}
func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
return &MongodbData{
StatLine: statLine,
Tags: tags,
Fields: make(map[string]interface{}),
DbData: []DbData{},
}
}
var DefaultStats = map[string]string{
"uptime_ns": "UptimeNanos",
"inserts": "InsertCnt",
"inserts_per_sec": "Insert",
"queries": "QueryCnt",
"queries_per_sec": "Query",
"updates": "UpdateCnt",
"updates_per_sec": "Update",
"deletes": "DeleteCnt",
"deletes_per_sec": "Delete",
"getmores": "GetMoreCnt",
"getmores_per_sec": "GetMore",
"commands": "CommandCnt",
"commands_per_sec": "Command",
"flushes": "FlushesCnt",
"flushes_per_sec": "Flushes",
"flushes_total_time_ns": "FlushesTotalTime",
"vsize_megabytes": "Virtual",
"resident_megabytes": "Resident",
"queued_reads": "QueuedReaders",
"queued_writes": "QueuedWriters",
"active_reads": "ActiveReaders",
"active_writes": "ActiveWriters",
"available_reads": "AvailableReaders",
"available_writes": "AvailableWriters",
"total_tickets_reads": "TotalTicketsReaders",
"total_tickets_writes": "TotalTicketsWriters",
"net_in_bytes_count": "NetInCnt",
"net_in_bytes": "NetIn",
"net_out_bytes_count": "NetOutCnt",
"net_out_bytes": "NetOut",
"open_connections": "NumConnections",
"ttl_deletes": "DeletedDocumentsCnt",
"ttl_deletes_per_sec": "DeletedDocuments",
"ttl_passes": "PassesCnt",
"ttl_passes_per_sec": "Passes",
"cursor_timed_out": "TimedOutC",
"cursor_timed_out_count": "TimedOutCCnt",
"cursor_no_timeout": "NoTimeoutC",
"cursor_no_timeout_count": "NoTimeoutCCnt",
"cursor_pinned": "PinnedC",
"cursor_pinned_count": "PinnedCCnt",
"cursor_total": "TotalC",
"cursor_total_count": "TotalCCnt",
"document_deleted": "DeletedD",
"document_inserted": "InsertedD",
"document_returned": "ReturnedD",
"document_updated": "UpdatedD",
"connections_current": "CurrentC",
"connections_available": "AvailableC",
"connections_total_created": "TotalCreatedC",
"operation_scan_and_order": "ScanAndOrderOp",
"operation_write_conflicts": "WriteConflictsOp",
"total_keys_scanned": "TotalKeysScanned",
"total_docs_scanned": "TotalObjectsScanned",
}
var DefaultAssertsStats = map[string]string{
"assert_regular": "Regular",
"assert_warning": "Warning",
"assert_msg": "Msg",
"assert_user": "User",
"assert_rollovers": "Rollovers",
}
var DefaultCommandsStats = map[string]string{
"aggregate_command_total": "AggregateCommandTotal",
"aggregate_command_failed": "AggregateCommandFailed",
"count_command_total": "CountCommandTotal",
"count_command_failed": "CountCommandFailed",
"delete_command_total": "DeleteCommandTotal",
"delete_command_failed": "DeleteCommandFailed",
"distinct_command_total": "DistinctCommandTotal",
"distinct_command_failed": "DistinctCommandFailed",
"find_command_total": "FindCommandTotal",
"find_command_failed": "FindCommandFailed",
"find_and_modify_command_total": "FindAndModifyCommandTotal",
"find_and_modify_command_failed": "FindAndModifyCommandFailed",
"get_more_command_total": "GetMoreCommandTotal",
"get_more_command_failed": "GetMoreCommandFailed",
"insert_command_total": "InsertCommandTotal",
"insert_command_failed": "InsertCommandFailed",
"update_command_total": "UpdateCommandTotal",
"update_command_failed": "UpdateCommandFailed",
}
var DefaultLatencyStats = map[string]string{
"latency_writes_count": "WriteOpsCnt",
"latency_writes": "WriteLatency",
"latency_reads_count": "ReadOpsCnt",
"latency_reads": "ReadLatency",
"latency_commands_count": "CommandOpsCnt",
"latency_commands": "CommandLatency",
}
var DefaultReplStats = map[string]string{
"repl_inserts": "InsertRCnt",
"repl_inserts_per_sec": "InsertR",
"repl_queries": "QueryRCnt",
"repl_queries_per_sec": "QueryR",
"repl_updates": "UpdateRCnt",
"repl_updates_per_sec": "UpdateR",
"repl_deletes": "DeleteRCnt",
"repl_deletes_per_sec": "DeleteR",
"repl_getmores": "GetMoreRCnt",
"repl_getmores_per_sec": "GetMoreR",
"repl_commands": "CommandRCnt",
"repl_commands_per_sec": "CommandR",
"member_status": "NodeType",
"state": "NodeState",
"repl_state": "NodeStateInt",
"repl_lag": "ReplLag",
"repl_network_bytes": "ReplNetworkBytes",
"repl_network_getmores_num": "ReplNetworkGetmoresNum",
"repl_network_getmores_total_millis": "ReplNetworkGetmoresTotalMillis",
"repl_network_ops": "ReplNetworkOps",
"repl_buffer_count": "ReplBufferCount",
"repl_buffer_size_bytes": "ReplBufferSizeBytes",
"repl_apply_batches_num": "ReplApplyBatchesNum",
"repl_apply_batches_total_millis": "ReplApplyBatchesTotalMillis",
"repl_apply_ops": "ReplApplyOps",
"repl_executor_pool_in_progress_count": "ReplExecutorPoolInProgressCount",
"repl_executor_queues_network_in_progress": "ReplExecutorQueuesNetworkInProgress",
"repl_executor_queues_sleepers": "ReplExecutorQueuesSleepers",
"repl_executor_unsignaled_events": "ReplExecutorUnsignaledEvents",
}
var DefaultClusterStats = map[string]string{
"jumbo_chunks": "JumboChunksCount",
}
var DefaultShardStats = map[string]string{
"total_in_use": "TotalInUse",
"total_available": "TotalAvailable",
"total_created": "TotalCreated",
"total_refreshing": "TotalRefreshing",
}
var ShardHostStats = map[string]string{
"in_use": "InUse",
"available": "Available",
"created": "Created",
"refreshing": "Refreshing",
}
var MmapStats = map[string]string{
"mapped_megabytes": "Mapped",
"non-mapped_megabytes": "NonMapped",
"page_faults": "FaultsCnt",
"page_faults_per_sec": "Faults",
}
var WiredTigerStats = map[string]string{
"percent_cache_dirty": "CacheDirtyPercent",
"percent_cache_used": "CacheUsedPercent",
}
var WiredTigerExtStats = map[string]string{
"wtcache_tracked_dirty_bytes": "TrackedDirtyBytes",
"wtcache_current_bytes": "CurrentCachedBytes",
"wtcache_max_bytes_configured": "MaxBytesConfigured",
"wtcache_app_threads_page_read_count": "AppThreadsPageReadCount",
"wtcache_app_threads_page_read_time": "AppThreadsPageReadTime",
"wtcache_app_threads_page_write_count": "AppThreadsPageWriteCount",
"wtcache_bytes_written_from": "BytesWrittenFrom",
"wtcache_bytes_read_into": "BytesReadInto",
"wtcache_pages_evicted_by_app_thread": "PagesEvictedByAppThread",
"wtcache_pages_queued_for_eviction": "PagesQueuedForEviction",
"wtcache_pages_read_into": "PagesReadIntoCache",
"wtcache_pages_written_from": "PagesWrittenFromCache",
"wtcache_pages_requested_from": "PagesRequestedFromCache",
"wtcache_server_evicting_pages": "ServerEvictingPages",
"wtcache_worker_thread_evictingpages": "WorkerThreadEvictingPages",
"wtcache_internal_pages_evicted": "InternalPagesEvicted",
"wtcache_modified_pages_evicted": "ModifiedPagesEvicted",
"wtcache_unmodified_pages_evicted": "UnmodifiedPagesEvicted",
}
var DefaultTCMallocStats = map[string]string{
"tcmalloc_current_allocated_bytes": "TCMallocCurrentAllocatedBytes",
"tcmalloc_heap_size": "TCMallocHeapSize",
"tcmalloc_central_cache_free_bytes": "TCMallocCentralCacheFreeBytes",
"tcmalloc_current_total_thread_cache_bytes": "TCMallocCurrentTotalThreadCacheBytes",
"tcmalloc_max_total_thread_cache_bytes": "TCMallocMaxTotalThreadCacheBytes",
"tcmalloc_total_free_bytes": "TCMallocTotalFreeBytes",
"tcmalloc_transfer_cache_free_bytes": "TCMallocTransferCacheFreeBytes",
"tcmalloc_thread_cache_free_bytes": "TCMallocThreadCacheFreeBytes",
"tcmalloc_spinlock_total_delay_ns": "TCMallocSpinLockTotalDelayNanos",
"tcmalloc_pageheap_free_bytes": "TCMallocPageheapFreeBytes",
"tcmalloc_pageheap_unmapped_bytes": "TCMallocPageheapUnmappedBytes",
"tcmalloc_pageheap_committed_bytes": "TCMallocPageheapComittedBytes",
"tcmalloc_pageheap_scavenge_count": "TCMallocPageheapScavengeCount",
"tcmalloc_pageheap_commit_count": "TCMallocPageheapCommitCount",
"tcmalloc_pageheap_total_commit_bytes": "TCMallocPageheapTotalCommitBytes",
"tcmalloc_pageheap_decommit_count": "TCMallocPageheapDecommitCount",
"tcmalloc_pageheap_total_decommit_bytes": "TCMallocPageheapTotalDecommitBytes",
"tcmalloc_pageheap_reserve_count": "TCMallocPageheapReserveCount",
"tcmalloc_pageheap_total_reserve_bytes": "TCMallocPageheapTotalReserveBytes",
}
var DefaultStorageStats = map[string]string{
"storage_freelist_search_bucket_exhausted": "StorageFreelistSearchBucketExhausted",
"storage_freelist_search_requests": "StorageFreelistSearchRequests",
"storage_freelist_search_scanned": "StorageFreelistSearchScanned",
}
var DbDataStats = map[string]string{
"collections": "Collections",
"objects": "Objects",
"avg_obj_size": "AvgObjSize",
"data_size": "DataSize",
"storage_size": "StorageSize",
"num_extents": "NumExtents",
"indexes": "Indexes",
"index_size": "IndexSize",
"ok": "Ok",
}
var ColDataStats = map[string]string{
"count": "Count",
"size": "Size",
"avg_obj_size": "AvgObjSize",
"storage_size": "StorageSize",
"total_index_size": "TotalIndexSize",
"ok": "Ok",
}
func (d *MongodbData) AddDbStats() {
for _, dbstat := range d.StatLine.DbStatsLines {
dbStatLine := reflect.ValueOf(&dbstat).Elem()
newDbData := &DbData{
Name: dbstat.Name,
Fields: make(map[string]interface{}),
}
newDbData.Fields["type"] = "db_stat"
for key, value := range DbDataStats {
val := dbStatLine.FieldByName(value).Interface()
newDbData.Fields[key] = val
}
d.DbData = append(d.DbData, *newDbData)
}
}
func (d *MongodbData) AddColStats() {
for _, colstat := range d.StatLine.ColStatsLines {
colStatLine := reflect.ValueOf(&colstat).Elem()
newColData := &ColData{
Name: colstat.Name,
DbName: colstat.DbName,
Fields: make(map[string]interface{}),
}
newColData.Fields["type"] = "col_stat"
for key, value := range ColDataStats {
val := colStatLine.FieldByName(value).Interface()
newColData.Fields[key] = val
}
d.ColData = append(d.ColData, *newColData)
}
}
func (d *MongodbData) AddShardHostStats() {
for host, hostStat := range d.StatLine.ShardHostStatsLines {
hostStatLine := reflect.ValueOf(&hostStat).Elem()
newDbData := &DbData{
Name: host,
Fields: make(map[string]interface{}),
}
newDbData.Fields["type"] = "shard_host_stat"
for k, v := range ShardHostStats {
val := hostStatLine.FieldByName(v).Interface()
newDbData.Fields[k] = val
}
d.ShardHostData = append(d.ShardHostData, *newDbData)
}
}
func (d *MongodbData) AddDefaultStats() {
statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(statLine, DefaultStats)
if d.StatLine.NodeType != "" {
d.addStat(statLine, DefaultReplStats)
d.Tags["node_type"] = d.StatLine.NodeType
}
if d.StatLine.ReadLatency > 0 {
d.addStat(statLine, DefaultLatencyStats)
}
if d.StatLine.ReplSetName != "" {
d.Tags["rs_name"] = d.StatLine.ReplSetName
}
if d.StatLine.OplogStats != nil {
d.add("repl_oplog_window_sec", d.StatLine.OplogStats.TimeDiff)
}
if d.StatLine.Version != "" {
d.add("version", d.StatLine.Version)
}
d.addStat(statLine, DefaultAssertsStats)
d.addStat(statLine, DefaultClusterStats)
d.addStat(statLine, DefaultCommandsStats)
d.addStat(statLine, DefaultShardStats)
d.addStat(statLine, DefaultStorageStats)
d.addStat(statLine, DefaultTCMallocStats)
if d.StatLine.StorageEngine == "mmapv1" || d.StatLine.StorageEngine == "rocksdb" {
d.addStat(statLine, MmapStats)
} else if d.StatLine.StorageEngine == "wiredTiger" {
for key, value := range WiredTigerStats {
val := statLine.FieldByName(value).Interface()
percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
floatVal, _ := strconv.ParseFloat(percentVal, 64)
d.add(key, floatVal)
}
d.addStat(statLine, WiredTigerExtStats)
d.add("page_faults", d.StatLine.FaultsCnt)
}
}
func (d *MongodbData) addStat(statLine reflect.Value, stats map[string]string) {
for key, value := range stats {
val := statLine.FieldByName(value).Interface()
d.add(key, val)
}
}
func (d *MongodbData) add(key string, val interface{}) {
d.Fields[key] = val
}
func (d *MongodbData) flush(acc telegraf.Accumulator) {
acc.AddFields(
"mongodb",
d.Fields,
d.Tags,
d.StatLine.Time,
)
d.Fields = make(map[string]interface{})
for _, db := range d.DbData {
d.Tags["db_name"] = db.Name
acc.AddFields(
"mongodb_db_stats",
db.Fields,
d.Tags,
d.StatLine.Time,
)
db.Fields = make(map[string]interface{})
}
for _, col := range d.ColData {
d.Tags["collection"] = col.Name
d.Tags["db_name"] = col.DbName
acc.AddFields(
"mongodb_col_stats",
col.Fields,
d.Tags,
d.StatLine.Time,
)
col.Fields = make(map[string]interface{})
}
for _, host := range d.ShardHostData {
d.Tags["hostname"] = host.Name
acc.AddFields(
"mongodb_shard_stats",
host.Fields,
d.Tags,
d.StatLine.Time,
)
host.Fields = make(map[string]interface{})
}
}
package mongodb
import (
"sort"
"testing"
"time"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
var tags = make(map[string]string)
func TestAddNonReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
UptimeNanos: 0,
Insert: 0,
Query: 0,
Update: 0,
UpdateCnt: 0,
Delete: 0,
GetMore: 0,
Command: 0,
Flushes: 0,
FlushesCnt: 0,
Virtual: 0,
Resident: 0,
QueuedReaders: 0,
QueuedWriters: 0,
ActiveReaders: 0,
ActiveWriters: 0,
AvailableReaders: 0,
AvailableWriters: 0,
TotalTicketsReaders: 0,
TotalTicketsWriters: 0,
NetIn: 0,
NetOut: 0,
NumConnections: 0,
Passes: 0,
DeletedDocuments: 0,
TimedOutC: 0,
NoTimeoutC: 0,
PinnedC: 0,
TotalC: 0,
DeletedD: 0,
InsertedD: 0,
ReturnedD: 0,
UpdatedD: 0,
CurrentC: 0,
AvailableC: 0,
TotalCreatedC: 0,
ScanAndOrderOp: 0,
WriteConflictsOp: 0,
TotalKeysScanned: 0,
TotalObjectsScanned: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultStats {
assert.True(t, acc.HasFloatField("mongodb", key) || acc.HasInt64Field("mongodb", key), key)
}
}
func TestAddReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "mmapv1",
Mapped: 0,
NonMapped: 0,
Faults: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range MmapStats {
assert.True(t, acc.HasInt64Field("mongodb", key), key)
}
}
func TestAddWiredTigerStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "wiredTiger",
CacheDirtyPercent: 0,
CacheUsedPercent: 0,
TrackedDirtyBytes: 0,
CurrentCachedBytes: 0,
MaxBytesConfigured: 0,
AppThreadsPageReadCount: 0,
AppThreadsPageReadTime: 0,
AppThreadsPageWriteCount: 0,
BytesWrittenFrom: 0,
BytesReadInto: 0,
PagesEvictedByAppThread: 0,
PagesQueuedForEviction: 0,
PagesWrittenFromCache: 1247,
ServerEvictingPages: 0,
WorkerThreadEvictingPages: 0,
FaultsCnt: 204,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range WiredTigerStats {
assert.True(t, acc.HasFloatField("mongodb", key), key)
}
for key := range WiredTigerExtStats {
assert.True(t, acc.HasFloatField("mongodb", key) || acc.HasInt64Field("mongodb", key), key)
}
assert.True(t, acc.HasInt64Field("mongodb", "page_faults"))
}
func TestAddShardStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
TotalInUse: 0,
TotalAvailable: 0,
TotalCreated: 0,
TotalRefreshing: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultShardStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddLatencyStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
CommandOpsCnt: 73,
CommandLatency: 364,
ReadOpsCnt: 113,
ReadLatency: 201,
WriteOpsCnt: 7,
WriteLatency: 55,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultLatencyStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddAssertsStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
Regular: 3,
Warning: 9,
Msg: 2,
User: 34,
Rollovers: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultAssertsStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddCommandsStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
AggregateCommandTotal: 12,
AggregateCommandFailed: 2,
CountCommandTotal: 18,
CountCommandFailed: 5,
DeleteCommandTotal: 73,
DeleteCommandFailed: 364,
DistinctCommandTotal: 87,
DistinctCommandFailed: 19,
FindCommandTotal: 113,
FindCommandFailed: 201,
FindAndModifyCommandTotal: 7,
FindAndModifyCommandFailed: 55,
GetMoreCommandTotal: 4,
GetMoreCommandFailed: 55,
InsertCommandTotal: 34,
InsertCommandFailed: 65,
UpdateCommandTotal: 23,
UpdateCommandFailed: 6,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultCommandsStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddTCMallocStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
TCMallocCurrentAllocatedBytes: 5877253096,
TCMallocHeapSize: 8067108864,
TCMallocPageheapFreeBytes: 1054994432,
TCMallocPageheapUnmappedBytes: 677859328,
TCMallocMaxTotalThreadCacheBytes: 1073741824,
TCMallocCurrentTotalThreadCacheBytes: 80405312,
TCMallocTotalFreeBytes: 457002008,
TCMallocCentralCacheFreeBytes: 375131800,
TCMallocTransferCacheFreeBytes: 1464896,
TCMallocThreadCacheFreeBytes: 80405312,
TCMallocPageheapComittedBytes: 7389249536,
TCMallocPageheapScavengeCount: 396394,
TCMallocPageheapCommitCount: 641765,
TCMallocPageheapTotalCommitBytes: 102248751104,
TCMallocPageheapDecommitCount: 396394,
TCMallocPageheapTotalDecommitBytes: 94859501568,
TCMallocPageheapReserveCount: 6179,
TCMallocPageheapTotalReserveBytes: 8067108864,
TCMallocSpinLockTotalDelayNanos: 2344453860,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultTCMallocStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddStorageStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageFreelistSearchBucketExhausted: 0,
StorageFreelistSearchRequests: 0,
StorageFreelistSearchScanned: 0,
},
tags,
)
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
for key := range DefaultStorageStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
func TestAddShardHostStats(t *testing.T) {
expectedHosts := []string{"hostA", "hostB"}
hostStatLines := map[string]ShardHostStatLine{}
for _, host := range expectedHosts {
hostStatLines[host] = ShardHostStatLine{
InUse: 0,
Available: 0,
Created: 0,
Refreshing: 0,
}
}
d := NewMongodbData(
&StatLine{
ShardHostStatsLines: hostStatLines,
},
map[string]string{}, // Use empty tags, so we don't break existing tests
)
var acc testutil.Accumulator
d.AddShardHostStats()
d.flush(&acc)
var hostsFound []string
for host := range hostStatLines {
for key := range ShardHostStats {
assert.True(t, acc.HasInt64Field("mongodb_shard_stats", key))
}
assert.True(t, acc.HasTag("mongodb_shard_stats", "hostname"))
hostsFound = append(hostsFound, host)
}
sort.Strings(hostsFound)
sort.Strings(expectedHosts)
assert.Equal(t, hostsFound, expectedHosts)
}
func TestStateTag(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
Insert: 0,
Query: 0,
NodeType: "PRI",
NodeState: "PRIMARY",
ReplSetName: "rs1",
Version: "3.6.17",
},
tags,
)
stateTags := make(map[string]string)
stateTags["node_type"] = "PRI"
stateTags["rs_name"] = "rs1"
var acc testutil.Accumulator
d.AddDefaultStats()
d.flush(&acc)
fields := map[string]interface{}{
"active_reads": int64(0),
"active_writes": int64(0),
"aggregate_command_failed": int64(0),
"aggregate_command_total": int64(0),
"assert_msg": int64(0),
"assert_regular": int64(0),
"assert_rollovers": int64(0),
"assert_user": int64(0),
"assert_warning": int64(0),
"available_reads": int64(0),
"available_writes": int64(0),
"commands": int64(0),
"commands_per_sec": int64(0),
"connections_available": int64(0),
"connections_current": int64(0),
"connections_total_created": int64(0),
"count_command_failed": int64(0),
"count_command_total": int64(0),
"cursor_no_timeout": int64(0),
"cursor_no_timeout_count": int64(0),
"cursor_pinned": int64(0),
"cursor_pinned_count": int64(0),
"cursor_timed_out": int64(0),
"cursor_timed_out_count": int64(0),
"cursor_total": int64(0),
"cursor_total_count": int64(0),
"delete_command_failed": int64(0),
"delete_command_total": int64(0),
"deletes": int64(0),
"deletes_per_sec": int64(0),
"distinct_command_failed": int64(0),
"distinct_command_total": int64(0),
"document_deleted": int64(0),
"document_inserted": int64(0),
"document_returned": int64(0),
"document_updated": int64(0),
"find_and_modify_command_failed": int64(0),
"find_and_modify_command_total": int64(0),
"find_command_failed": int64(0),
"find_command_total": int64(0),
"flushes": int64(0),
"flushes_per_sec": int64(0),
"flushes_total_time_ns": int64(0),
"get_more_command_failed": int64(0),
"get_more_command_total": int64(0),
"getmores": int64(0),
"getmores_per_sec": int64(0),
"insert_command_failed": int64(0),
"insert_command_total": int64(0),
"inserts": int64(0),
"inserts_per_sec": int64(0),
"jumbo_chunks": int64(0),
"member_status": "PRI",
"net_in_bytes": int64(0),
"net_in_bytes_count": int64(0),
"net_out_bytes": int64(0),
"net_out_bytes_count": int64(0),
"open_connections": int64(0),
"operation_scan_and_order": int64(0),
"operation_write_conflicts": int64(0),
"queries": int64(0),
"queries_per_sec": int64(0),
"queued_reads": int64(0),
"queued_writes": int64(0),
"repl_apply_batches_num": int64(0),
"repl_apply_batches_total_millis": int64(0),
"repl_apply_ops": int64(0),
"repl_buffer_count": int64(0),
"repl_buffer_size_bytes": int64(0),
"repl_commands": int64(0),
"repl_commands_per_sec": int64(0),
"repl_deletes": int64(0),
"repl_deletes_per_sec": int64(0),
"repl_executor_pool_in_progress_count": int64(0),
"repl_executor_queues_network_in_progress": int64(0),
"repl_executor_queues_sleepers": int64(0),
"repl_executor_unsignaled_events": int64(0),
"repl_getmores": int64(0),
"repl_getmores_per_sec": int64(0),
"repl_inserts": int64(0),
"repl_inserts_per_sec": int64(0),
"repl_lag": int64(0),
"repl_network_bytes": int64(0),
"repl_network_getmores_num": int64(0),
"repl_network_getmores_total_millis": int64(0),
"repl_network_ops": int64(0),
"repl_queries": int64(0),
"repl_queries_per_sec": int64(0),
"repl_updates": int64(0),
"repl_updates_per_sec": int64(0),
"repl_state": int64(0),
"resident_megabytes": int64(0),
"state": "PRIMARY",
"storage_freelist_search_bucket_exhausted": int64(0),
"storage_freelist_search_requests": int64(0),
"storage_freelist_search_scanned": int64(0),
"tcmalloc_central_cache_free_bytes": int64(0),
"tcmalloc_current_allocated_bytes": int64(0),
"tcmalloc_current_total_thread_cache_bytes": int64(0),
"tcmalloc_heap_size": int64(0),
"tcmalloc_max_total_thread_cache_bytes": int64(0),
"tcmalloc_pageheap_commit_count": int64(0),
"tcmalloc_pageheap_committed_bytes": int64(0),
"tcmalloc_pageheap_decommit_count": int64(0),
"tcmalloc_pageheap_free_bytes": int64(0),
"tcmalloc_pageheap_reserve_count": int64(0),
"tcmalloc_pageheap_scavenge_count": int64(0),
"tcmalloc_pageheap_total_commit_bytes": int64(0),
"tcmalloc_pageheap_total_decommit_bytes": int64(0),
"tcmalloc_pageheap_total_reserve_bytes": int64(0),
"tcmalloc_pageheap_unmapped_bytes": int64(0),
"tcmalloc_spinlock_total_delay_ns": int64(0),
"tcmalloc_thread_cache_free_bytes": int64(0),
"tcmalloc_total_free_bytes": int64(0),
"tcmalloc_transfer_cache_free_bytes": int64(0),
"total_available": int64(0),
"total_created": int64(0),
"total_docs_scanned": int64(0),
"total_in_use": int64(0),
"total_keys_scanned": int64(0),
"total_refreshing": int64(0),
"total_tickets_reads": int64(0),
"total_tickets_writes": int64(0),
"ttl_deletes": int64(0),
"ttl_deletes_per_sec": int64(0),
"ttl_passes": int64(0),
"ttl_passes_per_sec": int64(0),
"update_command_failed": int64(0),
"update_command_total": int64(0),
"updates": int64(0),
"updates_per_sec": int64(0),
"uptime_ns": int64(0),
"version": "3.6.17",
"vsize_megabytes": int64(0),
}
acc.AssertContainsTaggedFields(t, "mongodb", fields, stateTags)
}
package mongodb
import (
"fmt"
"net/url"
"strings"
"time"
"github.com/influxdata/telegraf"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
type Server struct {
Url *url.URL
Session *mgo.Session
lastResult *MongoStatus
Log telegraf.Logger
}
func (s *Server) getDefaultTags() map[string]string {
tags := make(map[string]string)
tags["hostname"] = s.Url.Host
return tags
}
type oplogEntry struct {
Timestamp bson.MongoTimestamp `bson:"ts"`
}
func IsAuthorization(err error) bool {
return strings.Contains(err.Error(), "not authorized")
}
func (s *Server) authLog(err error) {
if IsAuthorization(err) {
s.Log.Debug(err.Error())
} else {
s.Log.Error(err.Error())
}
}
func (s *Server) gatherServerStatus() (*ServerStatus, error) {
serverStatus := &ServerStatus{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "serverStatus",
Value: 1,
},
{
Name: "recordStats",
Value: 0,
},
}, serverStatus)
if err != nil {
return nil, err
}
return serverStatus, nil
}
func (s *Server) gatherReplSetStatus() (*ReplSetStatus, error) {
replSetStatus := &ReplSetStatus{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "replSetGetStatus",
Value: 1,
},
}, replSetStatus)
if err != nil {
return nil, err
}
return replSetStatus, nil
}
func (s *Server) gatherClusterStatus() (*ClusterStatus, error) {
chunkCount, err := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count()
if err != nil {
return nil, err
}
return &ClusterStatus{
JumboChunksCount: int64(chunkCount),
}, nil
}
func (s *Server) gatherShardConnPoolStats() (*ShardStats, error) {
shardStats := &ShardStats{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "shardConnPoolStats",
Value: 1,
},
}, &shardStats)
if err != nil {
return nil, err
}
return shardStats, nil
}
func (s *Server) gatherDBStats(name string) (*Db, error) {
stats := &DbStatsData{}
err := s.Session.DB(name).Run(bson.D{
{
Name: "dbStats",
Value: 1,
},
}, stats)
if err != nil {
return nil, err
}
return &Db{
Name: name,
DbStatsData: stats,
}, nil
}
func (s *Server) getOplogReplLag(collection string) (*OplogStats, error) {
query := bson.M{"ts": bson.M{"$exists": true}}
var first oplogEntry
err := s.Session.DB("local").C(collection).Find(query).Sort("$natural").Limit(1).One(&first)
if err != nil {
return nil, err
}
var last oplogEntry
err = s.Session.DB("local").C(collection).Find(query).Sort("-$natural").Limit(1).One(&last)
if err != nil {
return nil, err
}
firstTime := time.Unix(int64(first.Timestamp>>32), 0)
lastTime := time.Unix(int64(last.Timestamp>>32), 0)
stats := &OplogStats{
TimeDiff: int64(lastTime.Sub(firstTime).Seconds()),
}
return stats, nil
}
// The "oplog.rs" collection is stored on all replica set members.
//
// The "oplog.$main" collection is created on the master node of a
// master-slave replicated deployment. As of MongoDB 3.2, master-slave
// replication has been deprecated.
func (s *Server) gatherOplogStats() (*OplogStats, error) {
stats, err := s.getOplogReplLag("oplog.rs")
if err == nil {
return stats, nil
}
return s.getOplogReplLag("oplog.$main")
}
func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error) {
names, err := s.Session.DatabaseNames()
if err != nil {
return nil, err
}
results := &ColStats{}
for _, dbName := range names {
if stringInSlice(dbName, colStatsDbs) || len(colStatsDbs) == 0 {
var colls []string
colls, err = s.Session.DB(dbName).CollectionNames()
if err != nil {
s.Log.Errorf("Error getting collection names: %s", err.Error())
continue
}
for _, colName := range colls {
colStatLine := &ColStatsData{}
err = s.Session.DB(dbName).Run(bson.D{
{
Name: "collStats",
Value: colName,
},
}, colStatLine)
if err != nil {
s.authLog(fmt.Errorf("error getting col stats from %q: %v", colName, err))
continue
}
collection := &Collection{
Name: colName,
DbName: dbName,
ColStatsData: colStatLine,
}
results.Collections = append(results.Collections, *collection)
}
}
}
return results, nil
}
func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus bool, gatherDbStats bool, gatherColStats bool, colStatsDbs []string) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
serverStatus, err := s.gatherServerStatus()
if err != nil {
return err
}
// Get replica set status, an error indicates that the server is not a
// member of a replica set.
replSetStatus, err := s.gatherReplSetStatus()
if err != nil {
s.Log.Debugf("Unable to gather replica set status: %s", err.Error())
}
// Gather the oplog if we are a member of a replica set. Non-replica set
// members do not have the oplog collections.
var oplogStats *OplogStats
if replSetStatus != nil {
oplogStats, err = s.gatherOplogStats()
if err != nil {
s.authLog(fmt.Errorf("Unable to get oplog stats: %v", err))
}
}
var clusterStatus *ClusterStatus
if gatherClusterStatus {
status, err := s.gatherClusterStatus()
if err != nil {
s.Log.Debugf("Unable to gather cluster status: %s", err.Error())
}
clusterStatus = status
}
shardStats, err := s.gatherShardConnPoolStats()
if err != nil {
s.authLog(fmt.Errorf("unable to gather shard connection pool stats: %s", err.Error()))
}
var collectionStats *ColStats
if gatherColStats {
stats, err := s.gatherCollectionStats(colStatsDbs)
if err != nil {
return err
}
collectionStats = stats
}
dbStats := &DbStats{}
if gatherDbStats {
names, err := s.Session.DatabaseNames()
if err != nil {
return err
}
for _, name := range names {
db, err := s.gatherDBStats(name)
if err != nil {
s.Log.Debugf("Error getting db stats from %q: %s", name, err.Error())
}
dbStats.Dbs = append(dbStats.Dbs, *db)
}
}
result := &MongoStatus{
ServerStatus: serverStatus,
ReplSetStatus: replSetStatus,
ClusterStatus: clusterStatus,
DbStats: dbStats,
ColStats: collectionStats,
ShardStats: shardStats,
OplogStats: oplogStats,
}
result.SampleTime = time.Now()
if s.lastResult != nil && result != nil {
duration := result.SampleTime.Sub(s.lastResult.SampleTime)
durationInSeconds := int64(duration.Seconds())
if durationInSeconds == 0 {
durationInSeconds = 1
}
data := NewMongodbData(
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
s.getDefaultTags(),
)
data.AddDefaultStats()
data.AddDbStats()
data.AddColStats()
data.AddShardHostStats()
data.flush(acc)
}
s.lastResult = result
return nil
}
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
// +build integration
package mongodb
import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetDefaultTags(t *testing.T) {
var tagTests = []struct {
in string
out string
}{
{"hostname", server.Url.Host},
}
defaultTags := server.getDefaultTags()
for _, tt := range tagTests {
if defaultTags[tt.in] != tt.out {
t.Errorf("expected %q, got %q", tt.out, defaultTags[tt.in])
}
}
}
func TestAddDefaultStats(t *testing.T) {
var acc testutil.Accumulator
err := server.gatherData(&acc, false)
require.NoError(t, err)
// need to call this twice so it can perform the diff
err = server.gatherData(&acc, false)
require.NoError(t, err)
for key := range DefaultStats {
assert.True(t, acc.HasInt64Field("mongodb", key))
}
}
// +build integration
package mongodb
import (
"log"
"math/rand"
"net/url"
"os"
"testing"
"time"
"gopkg.in/mgo.v2"
)
var connect_url string
var server *Server
func init() {
connect_url = os.Getenv("MONGODB_URL")
if connect_url == "" {
connect_url = "127.0.0.1:27017"
server = &Server{Url: &url.URL{Host: connect_url}}
} else {
full_url, err := url.Parse(connect_url)
if err != nil {
log.Fatalf("Unable to parse URL (%s), %s\n", full_url, err.Error())
}
server = &Server{Url: full_url}
}
}
func testSetup(m *testing.M) {
var err error
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
log.Fatalf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = 5 * time.Second
sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
log.Fatalf("Unable to connect to MongoDB, %s\n", err.Error())
}
server.Session = sess
server.Session, _ = mgo.Dial(server.Url.Host)
if err != nil {
log.Fatalln(err.Error())
}
}
func testTeardown(m *testing.M) {
server.Session.Close()
}
func TestMain(m *testing.M) {
// seed randomness for use with tests
rand.Seed(time.Now().UTC().UnixNano())
testSetup(m)
res := m.Run()
testTeardown(m)
os.Exit(res)
}
此差异已折叠。
package mongodb
import (
"testing"
//"time"
//"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
func TestLatencyStats(t *testing.T) {
sl := NewStatLine(
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
},
},
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 0,
Latency: 0,
},
Writes: &LatencyStats{
Ops: 0,
Latency: 0,
},
Commands: &LatencyStats{
Ops: 0,
Latency: 0,
},
},
},
},
"foo",
true,
60,
)
assert.Equal(t, sl.CommandLatency, int64(0))
assert.Equal(t, sl.ReadLatency, int64(0))
assert.Equal(t, sl.WriteLatency, int64(0))
assert.Equal(t, sl.CommandOpsCnt, int64(0))
assert.Equal(t, sl.ReadOpsCnt, int64(0))
assert.Equal(t, sl.WriteOpsCnt, int64(0))
}
func TestLatencyStatsDiffZero(t *testing.T) {
sl := NewStatLine(
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 0,
Latency: 0,
},
Writes: &LatencyStats{
Ops: 0,
Latency: 0,
},
Commands: &LatencyStats{
Ops: 0,
Latency: 0,
},
},
},
},
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 0,
Latency: 0,
},
Writes: &LatencyStats{
Ops: 0,
Latency: 0,
},
Commands: &LatencyStats{
Ops: 0,
Latency: 0,
},
},
},
},
"foo",
true,
60,
)
assert.Equal(t, sl.CommandLatency, int64(0))
assert.Equal(t, sl.ReadLatency, int64(0))
assert.Equal(t, sl.WriteLatency, int64(0))
assert.Equal(t, sl.CommandOpsCnt, int64(0))
assert.Equal(t, sl.ReadOpsCnt, int64(0))
assert.Equal(t, sl.WriteOpsCnt, int64(0))
}
func TestLatencyStatsDiff(t *testing.T) {
sl := NewStatLine(
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 4189041956,
Latency: 2255922322753,
},
Writes: &LatencyStats{
Ops: 1691019457,
Latency: 494478256915,
},
Commands: &LatencyStats{
Ops: 1019150402,
Latency: 59177710371,
},
},
},
},
MongoStatus{
ServerStatus: &ServerStatus{
Connections: &ConnectionStats{},
Mem: &MemStats{
Bits: 0,
Resident: 0,
Virtual: 0,
Supported: false,
Mapped: 0,
MappedWithJournal: 0,
},
OpLatencies: &OpLatenciesStats{
Reads: &LatencyStats{
Ops: 4189049884,
Latency: 2255946760057,
},
Writes: &LatencyStats{
Ops: 1691021287,
Latency: 494479456987,
},
Commands: &LatencyStats{
Ops: 1019152861,
Latency: 59177981552,
},
},
},
},
"foo",
true,
60,
)
assert.Equal(t, sl.CommandLatency, int64(59177981552))
assert.Equal(t, sl.ReadLatency, int64(2255946760057))
assert.Equal(t, sl.WriteLatency, int64(494479456987))
assert.Equal(t, sl.CommandOpsCnt, int64(1019152861))
assert.Equal(t, sl.ReadOpsCnt, int64(4189049884))
assert.Equal(t, sl.WriteOpsCnt, int64(1691021287))
}
......@@ -4,12 +4,14 @@ import (
"fmt"
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/toolkits/i18n"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/mysql"
)
func init() {
collector.CollectorRegister(NewMysqlCollector()) // for monapi
i18n.DictRegister(langDict)
}
type MysqlCollector struct {
......@@ -24,6 +26,43 @@ func NewMysqlCollector() *MysqlCollector {
)}
}
var (
langDict = map[string]map[string]string{
"zh": map[string]string{
"Servers": "服务",
"Databases": "数据库",
"if the list is empty, then metrics are gathered from all database tables": "如果列表为空,则收集所有数据库表",
"Process List": "进程列表",
"gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST": "从 INFORMATION_SCHEMA.PROCESSLIST 收集线程状态信息",
"User Statistics": "User Statistics",
"gather user statistics from INFORMATION_SCHEMA.USER_STATISTICS": "从 INFORMATION_SCHEMA.USER_STATISTICS 收集用户状态信息",
"Auto Increment": "Auto Increment",
"gather auto_increment columns and max values from information schema": "采集 auto_increment 和 max values 信息",
"Innodb Metrics": "Innodb Metrics",
"gather metrics from INFORMATION_SCHEMA.INNODB_METRICS": "采集 INFORMATION_SCHEMA.INNODB_METRICS 信息",
"Slave Status": "Slave Status",
"gather metrics from SHOW SLAVE STATUS command output": "采集 SHOW SLAVE STATUS command output",
"Binary Logs": "Binary Logs",
"gather metrics from SHOW BINARY LOGS command output": "采集 SHOW BINARY LOGS command output",
"Table IO Waits": "Table IO Waits",
"gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMARY_BY_TABLE": "采集 PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMARY_BY_TABLE",
"Table Lock Waits": "Table Lock Waits",
"gather metrics from PERFORMANCE_SCHEMA.TABLE_LOCK_WAITS": "采集 PERFORMANCE_SCHEMA.TABLE_LOCK_WAITS",
"Index IO Waits": "Index IO Waits",
"gather metrics from PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMARY_BY_INDEX_USAGE": "采集 PERFORMANCE_SCHEMA.TABLE_IO_WAITS_SUMMARY_BY_INDEX_USAGE",
"Event Waits": "Event Waits",
"gather metrics from PERFORMANCE_SCHEMA.EVENT_WAITS": "采集 PERFORMANCE_SCHEMA.EVENT_WAITS",
"Tables": "Tables",
"gather metrics from INFORMATION_SCHEMA.TABLES for databases provided above list": "采集 INFORMATION_SCHEMA.TABLES for databases provided above list",
"File Events Stats": "File Events Stats",
"gather metrics from PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME": "采集 PERFORMANCE_SCHEMA.FILE_SUMMARY_BY_EVENT_NAME",
"Perf Events Statements": "Perf Events Statements",
"gather metrics from PERFORMANCE_SCHEMA.EVENTS_STATEMENTS_SUMMARY_BY_DIGEST": "采集 PERFORMANCE_SCHEMA.EVENTS_STATEMENTS_SUMMARY_BY_DIGEST",
"Interval Slow": "Interval Slow",
},
}
)
type MysqlRule struct {
Servers []string `label:"Servers" json:"servers,required" description:"specify servers via a url matching\n[username[:password]@][protocol[(address)]]/[?tls=[true|false|skip-verify|custom]]\nsee https://github.com/go-sql-driver/mysql#dsn-data-source-name" example:"servers = ['user:passwd@tcp(127.0.0.1:3306)/?tls=false']\nservers = ["user@tcp(127.0.0.1:3306)/?tls=false"]"`
PerfEventsStatementsDigestTextLimit int64 `label:"-" json:"-"`
......
......@@ -2,12 +2,14 @@ package redis
import (
"github.com/didi/nightingale/src/modules/monapi/collector"
"github.com/didi/nightingale/src/toolkits/i18n"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/redis"
)
func init() {
collector.CollectorRegister(NewRedisCollector()) // for monapi
i18n.DictRegister(langDict)
}
type RedisCollector struct {
......@@ -22,6 +24,20 @@ func NewRedisCollector() *RedisCollector {
)}
}
var (
langDict = map[string]map[string]string{
"zh": map[string]string{
"Field": "名称",
"Type": "类型",
"Servers": "服务",
"specify servers": "指定服务器地址",
"Optional. Specify redis commands to retrieve values": "设置服务器命令,采集数据名称",
"Password": "密码",
"specify server password": "服务密码",
},
}
)
type RedisCommand struct {
Command []string `label:"Command" json:"command,required" description:"" `
Field string `label:"Field" json:"field,required" description:"metric name"`
......@@ -29,7 +45,7 @@ type RedisCommand struct {
}
type RedisRule struct {
Servers []string `label:"Servers" json:"servers,required" description:"If no servers are specified, then localhost is used as the host." example:"tcp://localhost:6379"`
Servers []string `label:"Servers" json:"servers,required" description:"specify servers" example:"tcp://localhost:6379"`
Commands []*RedisCommand `label:"Commands" json:"commands" description:"Optional. Specify redis commands to retrieve values"`
Password string `label:"Password" json:"password" description:"specify server password"`
}
......
......@@ -301,11 +301,11 @@ func (p *Authenticator) cleanupSession() {
cf := cache.AuthConfig()
// idle session cleanup
if cf.MaxConnIdelTime > 0 {
expiresAt := now - cf.MaxConnIdelTime*60
if cf.MaxConnIdleTime > 0 {
expiresAt := now - cf.MaxConnIdleTime*60
sessions := []models.Session{}
if err := models.DB["rdb"].SQL("select * from session where updated_at < ? and username <> '' ", expiresAt).Find(&sessions); err != nil {
logger.Errorf("token idel time cleanup err %s", err)
logger.Errorf("token idle time cleanup err %s", err)
}
logger.Debugf("find %d idle sessions that should be clean up", len(sessions))
......@@ -327,7 +327,7 @@ func (p *Authenticator) cleanupSession() {
cnt := 0
if err := models.DB["sso"].SQL("select * from token order by user_name, id desc").Find(&tokens); err != nil {
logger.Errorf("token idel time cleanup err %s", err)
logger.Errorf("token idle time cleanup err %s", err)
}
for _, token := range tokens {
......
......@@ -11,6 +11,16 @@ import (
func newDbStorage(cf *config.SessionSection, opts *options) (storage, error) {
st := &dbStorage{config: cf}
lifeTime := config.Config.HTTP.Session.CookieLifetime
if lifeTime == 0 {
if config.Config.Auth.ExtraMode.Enable {
// cleanup by idle time worker
lifeTime = 86400 * 10
} else {
lifeTime = 86400
}
}
go func() {
t := time.NewTicker(time.Second * time.Duration(cf.GcInterval))
defer t.Stop()
......@@ -19,11 +29,7 @@ func newDbStorage(cf *config.SessionSection, opts *options) (storage, error) {
case <-opts.ctx.Done():
return
case <-t.C:
ct := config.Config.HTTP.Session.CookieLifetime
if ct == 0 {
ct = 86400
}
err := models.SessionCleanupByCreatedAt(time.Now().Unix() - ct)
err := models.SessionCleanupByCreatedAt(time.Now().Unix() - lifeTime)
if err != nil {
logger.Errorf("session gc err %s", err)
}
......
......@@ -85,7 +85,12 @@ func (p *mStorage) gc() {
p.Lock()
defer p.Unlock()
expiresAt := time.Now().Unix() - cache.AuthConfig().MaxConnIdelTime*60
idleTime := cache.AuthConfig().MaxConnIdleTime * 60
if idleTime == 0 {
return
}
expiresAt := time.Now().Unix() - idleTime
keys := []string{}
for k, v := range p.data {
if v.UpdatedAt < expiresAt {
......
......@@ -4,6 +4,7 @@ import (
"encoding/json"
"io"
"log"
"strings"
"golang.org/x/text/language"
"golang.org/x/text/message"
......@@ -11,6 +12,8 @@ import (
"github.com/toolkits/pkg/file"
)
var p *message.Printer
type I18nSection struct {
DictPath string `yaml:"dictPath"`
Lang string `yaml:"lang"`
......@@ -26,99 +29,49 @@ func Init(config ...I18nSection) {
fpath = config[0].DictPath
}
lang := language.Chinese
switch l {
case "en":
lang = language.English
case "zh":
lang = language.Chinese
}
tag, _, _ := supported.Match(lang)
switch tag {
case language.AmericanEnglish, language.English:
initEnUS(lang)
case language.SimplifiedChinese, language.Chinese:
initZhCN(lang, fpath)
default:
initZhCN(lang, fpath)
}
p = message.NewPrinter(lang)
DictFileRegister(fpath)
p = message.NewPrinter(langTag(l))
}
func initEnUS(tag language.Tag) {
}
func initZhCN(tag language.Tag, fpath string) {
content, err := file.ToTrimString(fpath)
if err != nil {
log.Printf("read configuration file %s fail %s", fpath, err.Error())
return
}
m := make(map[string]map[string]string)
err = json.Unmarshal([]byte(content), &m)
if err != nil {
log.Println("parse config file:", fpath, "fail:", err)
return
}
if dict, exists := m["zh"]; exists {
for k, v := range dict {
_ = message.SetString(tag, k, v)
func DictFileRegister(files ...string) {
for _, filePath := range files {
content, err := file.ToTrimString(filePath)
if err != nil {
log.Printf("read configuration file %s fail %s", filePath, err)
continue
}
}
}
var p *message.Printer
func newMatcher(t []language.Tag) *matcher {
tags := &matcher{make(map[language.Tag]int)}
for i, tag := range t {
ct, err := language.All.Canonicalize(tag)
m := make(map[string]map[string]string)
err = json.Unmarshal([]byte(content), &m)
if err != nil {
ct = tag
log.Println("parse config file:", filePath, "fail:", err)
continue
}
tags.index[ct] = i
}
return tags
}
type matcher struct {
index map[language.Tag]int
DictRegister(m)
}
}
func (m matcher) Match(want ...language.Tag) (language.Tag, int, language.Confidence) {
for _, t := range want {
ct, err := language.All.Canonicalize(t)
if err != nil {
ct = t
func DictRegister(m map[string]map[string]string) {
for lang, dict := range m {
tag := langTag(lang)
if tag == language.English {
continue
}
conf := language.Exact
for {
if index, ok := m.index[ct]; ok {
return ct, index, conf
}
if ct == language.Und {
break
}
ct = ct.Parent()
conf = language.High
for k, v := range dict {
message.SetString(tag, k, v)
}
}
return language.Und, 0, language.No
}
var supported = newMatcher([]language.Tag{
language.AmericanEnglish,
language.English,
language.SimplifiedChinese,
language.Chinese,
})
func langTag(l string) language.Tag {
switch strings.ToLower(l) {
case "zh", "cn":
return language.Chinese
default:
return language.English
}
}
// Fprintf is like fmt.Fprintf, but using language-specific formatting.
func Fprintf(w io.Writer, key message.Reference, a ...interface{}) (n int, err error) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册