From 5c374def560e258f9a0284f765d242219cf2f924 Mon Sep 17 00:00:00 2001 From: chris-sun-star Date: Wed, 15 Dec 2021 21:26:34 +0800 Subject: [PATCH] support ob log alarm collect and send to alertmanager --- config/config_module_test.go | 6 +- etc/config_properties/monagent_pipeline.yaml | 9 + etc/module_config/monitor_ob_log.yaml | 63 ++++ go.mod | 7 +- go.sum | 20 +- obagent.spec | 2 +- plugins/inputs/all.go | 4 + .../inputs/oceanbase/log/error_log_input.go | 322 ++++++++++++++++++ plugins/inputs/oceanbase/log/file_info.go | 33 ++ .../inputs/oceanbase/log/file_info_darwin.go | 20 ++ .../inputs/oceanbase/log/file_info_linux.go | 20 ++ plugins/inputs/oceanbase/log/log_utils.go | 179 ++++++++++ plugins/outputs/all.go | 8 + plugins/outputs/prometheus/alertmanager.go | 163 +++++++++ 14 files changed, 839 insertions(+), 17 deletions(-) create mode 100644 etc/module_config/monitor_ob_log.yaml create mode 100644 plugins/inputs/oceanbase/log/error_log_input.go create mode 100644 plugins/inputs/oceanbase/log/file_info.go create mode 100644 plugins/inputs/oceanbase/log/file_info_darwin.go create mode 100644 plugins/inputs/oceanbase/log/file_info_linux.go create mode 100644 plugins/inputs/oceanbase/log/log_utils.go create mode 100644 plugins/outputs/prometheus/alertmanager.go diff --git a/config/config_module_test.go b/config/config_module_test.go index ee981a5..fbe8961 100644 --- a/config/config_module_test.go +++ b/config/config_module_test.go @@ -68,7 +68,7 @@ configs: - key: foo.bar.bar value: 3306` -fooModuleYaml = `modules: + fooModuleYaml = `modules: - module: foo moduleType: fooType @@ -128,8 +128,8 @@ func _init(t *testing.T) string { moduleConfigDir := filepath.Join(tempDir, "module_config") err := os.MkdirAll(moduleConfigDir, 0755) assert.Nil(t, err) - err = ioutil.WriteFile(filepath.Join(moduleConfigDir, "foo.yaml"), []byte(fooModuleYaml), 0755) - assert.Nil(t, err) + err = ioutil.WriteFile(filepath.Join(moduleConfigDir, "foo.yaml"), []byte(fooModuleYaml), 0755) + assert.Nil(t, err) configPropertiesDir := filepath.Join(tempDir, "config_properties") err = os.MkdirAll(configPropertiesDir, 0755) diff --git a/etc/config_properties/monagent_pipeline.yaml b/etc/config_properties/monagent_pipeline.yaml index a0409ca..c323410 100644 --- a/etc/config_properties/monagent_pipeline.yaml +++ b/etc/config_properties/monagent_pipeline.yaml @@ -13,6 +13,9 @@ configs: - key: monagent.ob.rpc.port value: {rpc_port} valueType: int64 + - key: monagent.ob.install.path + value: {ob_install_path} + valueType: string - key: monagent.host.ip value: {host_ip} valueType: string @@ -28,6 +31,12 @@ configs: - key: monagent.pipeline.ob.status value: {ob_monitor_status} valueType: string + - key: monagent.pipeline.ob.log.status + value: {ob_log_monitor_status} + valueType: string - key: monagent.pipeline.node.status value: {host_monitor_status} valueType: string + - key: monagent.alertmanager.address + value: {alertmanager_address} + valueType: string diff --git a/etc/module_config/monitor_ob_log.yaml b/etc/module_config/monitor_ob_log.yaml new file mode 100644 index 0000000..06aeafc --- /dev/null +++ b/etc/module_config/monitor_ob_log.yaml @@ -0,0 +1,63 @@ +errorLogInput: &errorLogInput + plugin: errorLogInput + config: + timeout: 10s + pluginConfig: + expireTime: 300s + collectDelay: 1s + logServiceConfig: + rootservice: + logConfig: + logDir: ${monagent.ob.install.path}/log + logFileName: rootservice.log.wf + election: + logConfig: + logDir: ${monagent.ob.install.path}/log + logFileName: election.log.wf + observer: + logConfig: + logDir: ${monagent.ob.install.path}/log + logFileName: observer.log.wf + + +retagProcessor: &retagProcessor + plugin: retagProcessor + config: + timout: 10s + pluginConfig: + newTags: + app: OB + svr_ip: ${monagent.host.ip} + obzone: ${monagent.ob.zone.name} + ob_cluster_id: ${monagent.ob.cluster.id} + ob_cluster_name: ${monagent.ob.cluster.name} + +alertmanagerOutput: &alertmanagerOutput + plugin: alertmanagerOutput + config: + timout: 10s + pluginConfig: + address: ${monagent.alertmanager.address} + batchCount: 100 + httpTimeout: 10s + retryTimes: 3 + +modules: + - module: monitor.ob.log + moduleType: monagent.pipeline + process: monagent + config: + name: monitor.ob.log + status: ${monagent.pipeline.ob.log.status} + pipelines: + - name: node_info + config: + scheduleStrategy: periodic + period: 5s + structure: + inputs: + - <<: *errorLogInput + processors: + - <<: *retagProcessor + output: + <<: *alertmanagerOutput diff --git a/go.mod b/go.mod index c0af508..0600242 100644 --- a/go.mod +++ b/go.mod @@ -14,17 +14,18 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/gwatts/gin-adapter v0.0.0-20170508204228-c44433c485ad github.com/huandu/go-clone v1.3.0 - github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-isatty v0.0.12 + github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/nicksnyder/go-i18n/v2 v2.1.2 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml v1.9.3 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.30.0 - github.com/prometheus/mysqld_exporter v0.13.0 // indirect + github.com/prometheus/mysqld_exporter v0.13.0 github.com/prometheus/node_exporter v1.2.2 github.com/sirupsen/logrus v1.8.1 github.com/smartystreets/goconvey v1.6.4 @@ -33,6 +34,8 @@ require ( github.com/spf13/cobra v0.0.3 github.com/spf13/viper v1.3.2 github.com/stretchr/testify v1.7.0 + go.uber.org/atomic v1.9.0 // indirect + golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c golang.org/x/text v0.3.6 diff --git a/go.sum b/go.sum index 41ab22a..1f58cba 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,7 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= @@ -140,7 +141,6 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= -github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.10.0 h1:dXFJfIHVvUcpSgDOV+Ne6t7jXri8Tfv2uOLHUZ2XNuo= github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o= @@ -287,7 +287,6 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 h1:uhL5Gw7BINiiPAo24A2sxkcDI0Jt/sqp1v5xQCniEFA= github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= -github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= github.com/jsimonetti/rtnetlink v0.0.0-20190830100107-3784a6c7c552/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= @@ -379,8 +378,9 @@ github.com/mitchellh/hashstructure v1.0.0 h1:ZkRJX1CyOoTkar7p/mLS5TZU4nJ1Rn/F8u9 github.com/mitchellh/hashstructure v1.0.0/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ= github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= -github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= +github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -388,7 +388,6 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= @@ -413,8 +412,9 @@ github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWEr github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxSfWAKL3wpBW7V8scJMt8N8gnaMCS9E/cA= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= @@ -465,7 +465,6 @@ github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+ github.com/prometheus/common v0.30.0 h1:JEkYlQnpzrzQFxi6gnukFPdQ+ac82oRhzMcIduJu/Ug= github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/exporter-toolkit v0.5.1/go.mod h1:OCkM4805mmisBhLmVFw858QYi3v0wKdY6/UxrT0pZVg= -github.com/prometheus/exporter-toolkit v0.6.0 h1:rGoS9gIqj3sXaw+frvo0ozCs1CxBRqpOCGsbixC52UI= github.com/prometheus/exporter-toolkit v0.6.0/go.mod h1:ZUBIj498ePooX9t/2xtDjeQYwvRpiPP2lh5u4iblj2g= github.com/prometheus/mysqld_exporter v0.13.0 h1:eU2cGRb0eAqAiOZaqWcAoIfIizVt6MZGIYUmKmQqQZ0= github.com/prometheus/mysqld_exporter v0.13.0/go.mod h1:ZsOL7ddSifE8uyWVmJ1Yq7oX8uZFrCUASWHVD/ybY+A= @@ -565,8 +564,9 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= @@ -661,14 +661,14 @@ golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 h1:4CSI6oo7cOjJKajidEljs9h+uP0rRZBPPPhcCbj5mw8= +golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= -golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c h1:pkQiBZBvdos9qq4wBAHqlzuZHEXo07pqV06ef90u1WI= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -835,7 +835,6 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -912,7 +911,6 @@ gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qS gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= -gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= diff --git a/obagent.spec b/obagent.spec index 186f40d..a185c57 100644 --- a/obagent.spec +++ b/obagent.spec @@ -1,7 +1,7 @@ Name: obagent Summary: ob agent program Group: alipay/oceanbase -Version: 1.0.0 +Version: 1.1.0 Release: %(echo $RELEASE)%{?dist} URL: http://rpm.corp.taobao.com/find.php?q=obagent License: Commercial diff --git a/plugins/inputs/all.go b/plugins/inputs/all.go index b3f092f..92d909f 100644 --- a/plugins/inputs/all.go +++ b/plugins/inputs/all.go @@ -16,6 +16,7 @@ import ( "github.com/oceanbase/obagent/plugins" "github.com/oceanbase/obagent/plugins/inputs/mysql" "github.com/oceanbase/obagent/plugins/inputs/nodeexporter" + "github.com/oceanbase/obagent/plugins/inputs/oceanbase/log" "github.com/oceanbase/obagent/plugins/inputs/prometheus" ) @@ -32,4 +33,7 @@ func init() { plugins.GetInputManager().Register("prometheusInput", func() plugins.Input { return &prometheus.Prometheus{} }) + plugins.GetInputManager().Register("errorLogInput", func() plugins.Input { + return &log.ErrorLogInput{} + }) } diff --git a/plugins/inputs/oceanbase/log/error_log_input.go b/plugins/inputs/oceanbase/log/error_log_input.go new file mode 100644 index 0000000..70003bd --- /dev/null +++ b/plugins/inputs/oceanbase/log/error_log_input.go @@ -0,0 +1,322 @@ +package log + +import ( + "bufio" + "context" + "fmt" + "gopkg.in/yaml.v3" + "os" + "regexp" + "sync" + "time" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/oceanbase/obagent/metric" +) + +const sampleConfig = ` +expireTime: 300s +collectDelay: 1s +logServiceConfig: + rootservice: + excludeRegexes: + - hello + - world + logConfig: + logDir: /home/admin/oceanbase/log + logFileName: rootservice.log.wf + election: + excludeRegexes: + - hello + - world + logConfig: + logDir: /home/admin/oceanbase/log + logFileName: election.log.wf + observer: + excludeRegexes: + - hello + - world + logConfig: + logDir: /home/admin/oceanbase/log + logFileName: observer.log.wf +` + +const description = ` +collect ob error logs and filter by keywords +` + +type ServiceType string + +const ( + RootService ServiceType = "rootservice" + Observer ServiceType = "observer" + Election ServiceType = "election" +) + +type LogCollectConfig struct { + LogConfig *LogConfig `yaml:"logConfig"` + ExcludeRegexes []string `yaml:"excludeRegexes"` +} + +type Config struct { + LogServiceConfig map[ServiceType]*LogCollectConfig `yaml:"logServiceConfig"` + CollectDelay time.Duration `yaml:"collectDelay"` + ExpireTime time.Duration `yaml:"expireTime"` +} + +type ErrorLogInput struct { + config *Config + logAnalyzer ILogAnalyzer + logProcessQueue map[ServiceType]*processQueue + ctx context.Context + cancel context.CancelFunc + metricBufferChan chan []metric.Metric +} + +func (e *ErrorLogInput) SampleConfig() string { + return sampleConfig +} + +func (e *ErrorLogInput) Description() string { + return description +} + +func (e *ErrorLogInput) Init(config map[string]interface{}) error { + + var pluginConfig Config + configBytes, err := yaml.Marshal(config) + if err != nil { + return errors.Wrap(err, "error log input encode config") + } + err = yaml.Unmarshal(configBytes, &pluginConfig) + if err != nil { + return errors.Wrap(err, "error log input decode config") + } + e.config = &pluginConfig + + e.logAnalyzer = NewLogAnalyzer() + e.logProcessQueue = make(map[ServiceType]*processQueue) + e.metricBufferChan = make(chan []metric.Metric, 1000) + + e.ctx, e.cancel = context.WithCancel(context.Background()) + + for service := range e.config.LogServiceConfig { + q := &processQueue{ + queue: make([]*logFileInfo, 0, 8), + mutex: sync.Mutex{}, + } + e.logProcessQueue[service] = q + } + + for service := range e.config.LogServiceConfig { + go e.doCollect(service) + } + + // start go routine to add log file to logProcessQueue + go e.watchFile() + + log.Info("error log input init with config", e.config) + + return nil +} + +func (e *ErrorLogInput) doCollect(service ServiceType) { + for { + select { + case <-e.ctx.Done(): + log.Infof("received exit signal, stop collect routine of service %s", service) + q, found := e.logProcessQueue[service] + if found { + for q.getQueueLen() > 0 { + fd := q.getHead().fileDesc + err := fd.Close() + if err != nil { + log.Errorf("close log file of service %s %s failed %v", service, fd.Name(), err) + } + q.popHead() + } + } + default: + e.collectErrorLogs(service) + } + time.Sleep(e.config.CollectDelay) + } +} + +func (e *ErrorLogInput) collectErrorLogs(service ServiceType) { + q, found := e.logProcessQueue[service] + if !found { + log.Warnf("service %s has no process queue", service) + } else { + if q.getQueueLen() == 0 { + log.Warnf("service %s has no process queue", service) + } else { + // read head of queue + head := q.getHead() + fdScanner := bufio.NewScanner(head.fileDesc) + logMetrics := make([]metric.Metric, 0, 8) + for fdScanner.Scan() { + line := fdScanner.Text() + if line == "" || len(line) == 0 { + continue + } else { + logMetric := e.processLogLine(service, line) + if logMetric != nil { + logMetrics = append(logMetrics, logMetric) + } + } + } + + if len(logMetrics) > 0 { + e.metricBufferChan <- logMetrics + } + + if q.getHeadIsRenamed() { + head.fileDesc.Close() + q.popHead() + } + } + } +} + +func (e *ErrorLogInput) processLogLine(service ServiceType, line string) metric.Metric { + if !e.logAnalyzer.isErrLog(line) { + return nil + } + logAt, err := e.logAnalyzer.getLogAt(line) + if err != nil { + log.Warnf("parse log time failed %s ", line) + return nil + } + if logAt.Add(e.config.ExpireTime).Before(time.Now()) { + log.Warnf("log expired, just skip, %s", line) + return nil + } + errCode, _ := e.logAnalyzer.getErrCode(line) + + if e.isFiltered(service, line) { + log.Debugf("log is filtered, %s", line) + return nil + } + fields := make(map[string]interface{}) + tags := make(map[string]string) + fields["log_content"] = line + tags["error_code"] = fmt.Sprintf("%d", errCode) + logMetric := metric.NewMetric("oceanbase_log", fields, tags, logAt, metric.Untyped) + return logMetric +} + +func (e *ErrorLogInput) isFiltered(service ServiceType, line string) bool { + // TODO: compile first + c, found := e.config.LogServiceConfig[service] + if found { + if c.ExcludeRegexes == nil { + return false + } + for _, regex := range c.ExcludeRegexes { + match, _ := regexp.MatchString(regex, line) + if match { + return true + } + } + } + return false +} + +func (e *ErrorLogInput) watchFile() { + for { + select { + case <-e.ctx.Done(): + log.Info("received exit signal, stop watch file routine") + return + default: + // open file and set fd in file process queue + e.watchFileChanges() + } + time.Sleep(e.config.CollectDelay) + } +} + +func (e *ErrorLogInput) checkAndOpenFile(logFileRealPath string) (*os.File, error) { + var fileDesc *os.File + _, err := os.Stat(logFileRealPath) + if err == nil { + fileDesc, err = os.OpenFile(logFileRealPath, os.O_RDONLY, os.ModePerm) + } + return fileDesc, err +} + +func (e *ErrorLogInput) watchFileChanges() { + for service, logCollectConfig := range e.config.LogServiceConfig { + log.Infof("check log file of service: %s", service) + queue, exists := e.logProcessQueue[service] + logFileRealPath := fmt.Sprintf("%s/%s", logCollectConfig.LogConfig.LogDir, logCollectConfig.LogConfig.LogFileName) + log.Debugf("log file of service %s: %s", service, logFileRealPath) + newFileDesc, err := e.checkAndOpenFile(logFileRealPath) + if err != nil { + log.WithError(err).Errorf("open logfile of service %s %s failed, got error %v", service, logFileRealPath, err) + continue + } + newFileInfo, err := FileInfo(newFileDesc) + if err != nil { + log.WithError(err).Errorf("check logfile of service %s %s info failed, got error %v", service, logFileRealPath, err) + continue + } + + if exists && queue.getQueueLen() > 0 { + tail := queue.getTail() + if tail == nil { + log.Errorf("queue should not be empty") + continue + } + tailFileInfo, err := FileInfo(tail.fileDesc) + if err != nil { + log.WithError(err).Errorf("failed to get file info of service %s head", service) + continue + } + + if newFileInfo.DevId() == tailFileInfo.DevId() && newFileInfo.FileId() == tailFileInfo.FileId() { + log.Debugf("log file of service %s not change", service) + } else { + log.Infof("log file of service %s has changed", service) + queue.pushBack(&logFileInfo{ + fileDesc: newFileDesc, + isRenamed: false, + }) + // TODO: should set all node renamed except last one + queue.setRenameTrueExceptTail() + } + } else { + log.Warnf("process queue not exists or empty") + // first time, create queue, open last file + // initialize process queue + q := e.logProcessQueue[service] + q.pushBack(&logFileInfo{ + fileDesc: newFileDesc, + isRenamed: false, + }) + } + } +} + +func (e *ErrorLogInput) Close() error { + e.cancel() + return nil +} + +func (e *ErrorLogInput) Collect() ([]metric.Metric, error) { + moreMetrics := true + metrics := make([]metric.Metric, 0, 1024) + for moreMetrics { + select { + case metricsFromBuffer := <-e.metricBufferChan: + metrics = append(metrics, metricsFromBuffer...) + default: + log.Infof("no more metric from buffer") + moreMetrics = false + } + } + return metrics, nil +} diff --git a/plugins/inputs/oceanbase/log/file_info.go b/plugins/inputs/oceanbase/log/file_info.go new file mode 100644 index 0000000..913c438 --- /dev/null +++ b/plugins/inputs/oceanbase/log/file_info.go @@ -0,0 +1,33 @@ +package log + +import ( + "os" + "time" +) + +type FileInfoEx struct { + os.FileInfo + fileId uint64 + devId uint64 + createTime time.Time +} + +func (f *FileInfoEx) FileId() uint64 { + return f.fileId +} + +func (f *FileInfoEx) DevId() uint64 { + return f.devId +} + +func (f *FileInfoEx) CreateTime() time.Time { + return f.createTime +} + +func FileInfo(f *os.File) (*FileInfoEx, error) { + info, err := f.Stat() + if err != nil { + return nil, err + } + return toFileInfoEx(info), nil +} diff --git a/plugins/inputs/oceanbase/log/file_info_darwin.go b/plugins/inputs/oceanbase/log/file_info_darwin.go new file mode 100644 index 0000000..340dca4 --- /dev/null +++ b/plugins/inputs/oceanbase/log/file_info_darwin.go @@ -0,0 +1,20 @@ +//go:build darwin +// +build darwin + +package log + +import ( + "os" + "syscall" + "time" +) + +func toFileInfoEx(info os.FileInfo) *FileInfoEx { + sysInfo, _ := info.Sys().(*syscall.Stat_t) + return &FileInfoEx{ + FileInfo: info, + fileId: sysInfo.Ino, + devId: uint64(sysInfo.Dev), + createTime: time.Unix(sysInfo.Ctimespec.Unix()), + } +} diff --git a/plugins/inputs/oceanbase/log/file_info_linux.go b/plugins/inputs/oceanbase/log/file_info_linux.go new file mode 100644 index 0000000..bee62cb --- /dev/null +++ b/plugins/inputs/oceanbase/log/file_info_linux.go @@ -0,0 +1,20 @@ +//go:build linux +// +build linux + +package log + +import ( + "os" + "syscall" + "time" +) + +func toFileInfoEx(info os.FileInfo) *FileInfoEx { + sysInfo, _ := info.Sys().(*syscall.Stat_t) + return &FileInfoEx{ + FileInfo: info, + fileId: sysInfo.Ino, + devId: sysInfo.Dev, + createTime: time.Unix(sysInfo.Ctim.Unix()), + } +} diff --git a/plugins/inputs/oceanbase/log/log_utils.go b/plugins/inputs/oceanbase/log/log_utils.go new file mode 100644 index 0000000..1804a11 --- /dev/null +++ b/plugins/inputs/oceanbase/log/log_utils.go @@ -0,0 +1,179 @@ +package log + +import ( + "os" + "regexp" + "strconv" + "strings" + "sync" + "time" +) + +const logAtLayout = "2006-01-02 15:04:05.000000" +const logTimeInFileNameLayout = "20060102150405" + +type logFileInfo struct { + fileDesc *os.File + // 该文件已经写满了,并被重命名过 + isRenamed bool +} + +type LogConfig struct { + LogDir string `yaml:"logDir"` + LogFileName string `yaml:"logFileName"` +} + +type reportedError struct { + ErrorCode int + ReportedAt time.Time +} + +type ILogAnalyzer interface { + isErrLog(logLine string) bool + getErrCode(logLine string) (int, error) + getLogAt(logLine string) (time.Time, error) +} + +type logAnalyzer struct { + logAtRegexp *regexp.Regexp + errCodeRegexp *regexp.Regexp +} + +func NewLogAnalyzer() *logAnalyzer { + return &logAnalyzer{ + logAtRegexp: regexp.MustCompile(`^\[\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d\]`), + errCodeRegexp: regexp.MustCompile(`ret=-\d+`), + } +} + +// isErrLog 检查是否 error 类型的 log(后面看需求是否需要扩展为其他类型日志) +func (l *logAnalyzer) isErrLog(logLine string) bool { + // example: [2020-08-07 05:55:44.377075] ERROR [RS] ob_server_table_operator.cpp:376 [151575][4][Y0-0000000000000000] [lt = 5] [dc =0] svr_status(svr_status = "active", display_status =1) + if len(logLine) > 34 { + return "ERROR" == logLine[29:34] + } + + return false +} + +// getErrCode 获取日志中的错误码 +func (l *logAnalyzer) getErrCode(logLine string) (int, error) { + matchedErrCodes := l.errCodeRegexp.FindAllString(logLine, -1) + matchedErrCodesLen := len(matchedErrCodes) + if matchedErrCodesLen > 0 { + lastErrCodeStr := matchedErrCodes[matchedErrCodesLen-1] + // 匹配的格式为 ret=-\d+,数字从下标位置 5 开始 + if len(lastErrCodeStr) >= 5 { + errCode, err := strconv.Atoi(lastErrCodeStr[5:]) + if err != nil { + return -1, err + } + return errCode, nil + } + } else if strings.Contains(logLine, "clog disk is almost full") { + return 4264, nil + + } else if strings.Contains(logLine, "partition table update task cost too much time to execute") { + return 4015, nil + } + + return -1, nil +} + +func (l *logAnalyzer) getLogAt(logLine string) (time.Time, error) { + timeStr := strings.TrimRight(strings.TrimLeft(l.logAtRegexp.FindString(logLine), "["), "]") + logAt, err := time.ParseInLocation(logAtLayout, timeStr, time.Local) + if err != nil { + return time.Time{}, err + } + return logAt, nil +} + +func matchString(reg string, content string) (matched bool, err error) { + matched = strings.Contains(content, reg) + return +} + +type processQueue struct { + queue []*logFileInfo + mutex sync.Mutex +} + +func (p *processQueue) getQueueLen() int { + return len(p.queue) +} + +func (p *processQueue) getHead() *logFileInfo { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.getQueueLen() == 0 { + return nil + } + + return p.queue[0] +} + +func (p *processQueue) getTail() *logFileInfo { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.getQueueLen() == 0 { + return nil + } + + return p.queue[p.getQueueLen()-1] +} + +func (p *processQueue) popHead() { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.getQueueLen() == 0 { + return + } + + p.queue = p.queue[1:] +} + +func (p *processQueue) pushBack(info *logFileInfo) { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.queue = append(p.queue, info) +} + +func (p *processQueue) setRenameTrueExceptTail() { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.getQueueLen() == 0 { + return + } + + for i := 0; i < p.getQueueLen()-1; i++ { + p.queue[i].isRenamed = true + } +} + +func (p *processQueue) setHeadIsRenameTrue() { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.getQueueLen() == 0 { + return + } + + p.queue[0].isRenamed = true +} + +func (p *processQueue) getHeadIsRenamed() bool { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.getQueueLen() == 0 { + return false + } + + return p.queue[0].isRenamed +} diff --git a/plugins/outputs/all.go b/plugins/outputs/all.go index efe0236..f151b20 100644 --- a/plugins/outputs/all.go +++ b/plugins/outputs/all.go @@ -12,5 +12,13 @@ package outputs +import ( + "github.com/oceanbase/obagent/plugins" + "github.com/oceanbase/obagent/plugins/outputs/prometheus" +) + func init() { + plugins.GetOutputManager().Register("alertmanagerOutput", func() plugins.Output { + return &prometheus.AlertmanagerOutput{} + }) } diff --git a/plugins/outputs/prometheus/alertmanager.go b/plugins/outputs/prometheus/alertmanager.go new file mode 100644 index 0000000..7e66bb0 --- /dev/null +++ b/plugins/outputs/prometheus/alertmanager.go @@ -0,0 +1,163 @@ +package prometheus + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/pkg/errors" + + // "github.com/avast/retry-go/v3" + log "github.com/sirupsen/logrus" + "gopkg.in/yaml.v3" + + "github.com/oceanbase/obagent/metric" +) + +const alertmanagerOutputSampleConfig = ` +address: http://1.1.1.1:9093 +httpTimeout: 10s +batchCount: 100 +retryTimes: 3 +` + +const alertmanagerOutputDescription = ` +send metric data as alarm to alertmanager +` + +var defaultTimeout = 10 * time.Second + +type AlertmanagerOutputConfig struct { + Address string `yaml:"address"` + BatchCount int `yaml:"batchCount"` + HttpTimeout time.Duration `yaml:"httpTimeout"` + RetryTimes int `yaml:"retryTimes"` +} + +type AlertmanagerOutput struct { + config *AlertmanagerOutputConfig + httpClient *http.Client + taskChan chan []metric.Metric + ctx context.Context + cancelFunc context.CancelFunc +} + +func (a *AlertmanagerOutput) Init(config map[string]interface{}) error { + configData, err := yaml.Marshal(config) + if err != nil { + return errors.Wrap(err, "alertmanager output encode config") + } + a.config = &AlertmanagerOutputConfig{} + err = yaml.Unmarshal(configData, a.config) + if err != nil { + return errors.Wrap(err, "alertmanager output decode config") + } + + a.taskChan = make(chan []metric.Metric, 1024) + a.ctx, a.cancelFunc = context.WithCancel(context.Background()) + a.httpClient = &http.Client{} + if a.config.HttpTimeout == 0 { + a.httpClient.Timeout = defaultTimeout + } else { + a.httpClient.Timeout = a.config.HttpTimeout + } + + go a.schedule() + + log.Infof("alertmanager output inited with config : %v", a.config) + return nil +} + +func (a *AlertmanagerOutput) Close() error { + a.cancelFunc() + close(a.taskChan) + return nil +} + +func (a *AlertmanagerOutput) SampleConfig() string { + return alertmanagerOutputSampleConfig +} + +func (a *AlertmanagerOutput) Description() string { + return alertmanagerOutputDescription +} + +func (a *AlertmanagerOutput) Write(metrics []metric.Metric) error { + for len(metrics) > 0 { + count := a.config.BatchCount + if len(metrics) < count { + count = len(metrics) + } + a.taskChan <- metrics[0:count] + metrics = metrics[count:] + } + return nil +} + +func (a *AlertmanagerOutput) schedule() { + for { + select { + case <-a.ctx.Done(): + break + + case metrics := <-a.taskChan: + err := a.sendAlarm(metrics) + log.WithError(err).Errorf("send alarm got error: %v", err) + } + } +} + +func (a *AlertmanagerOutput) sendAlarm(metrics []metric.Metric) error { + alarmList := make([]map[string]interface{}, 0, a.config.BatchCount) + for _, metricEntry := range metrics { + alarmList = append(alarmList, a.convertMetricToAlarm(metricEntry)) + } + + jsonData, err := json.Marshal(alarmList) + + log.Debugf("send alarm metrics: %s", jsonData) + + body := bytes.NewBuffer(jsonData) + pushAlertsAddress := fmt.Sprintf("%s/%s", a.config.Address, "api/v2/alerts") + req, err := http.NewRequest(http.MethodPost, pushAlertsAddress, body) + + if err != nil { + return errors.Wrap(err, "generate http request") + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := a.httpClient.Do(req) + log.Infof("send alarm got response: %v", resp) + if err != nil { + return errors.Wrap(err, "do query") + } + + if err != nil { + return errors.Wrap(err, "read response") + } + + if resp.StatusCode != 200 { + return errors.New("send alarm got abnormal status code") + } + + return nil +} + +func (a *AlertmanagerOutput) convertMetricToAlarm(metric metric.Metric) map[string]interface{} { + alarmItem := make(map[string]interface{}) + + labels := metric.Tags() + labels["alertname"] = metric.GetName() + annotations := metric.Fields() + + alarmItem["labels"] = labels + alarmItem["annotations"] = annotations + alarmItem["startAt"] = metric.GetTime() + alarmItem["generatorURL"] = "" + + return alarmItem +} -- GitLab