提交 5c374def 编写于 作者: C chris-sun-star

support ob log alarm collect and send to alertmanager

上级 e2e327f6
......@@ -68,7 +68,7 @@ configs:
- key: foo.bar.bar
value: 3306`
fooModuleYaml = `modules:
fooModuleYaml = `modules:
-
module: foo
moduleType: fooType
......@@ -128,8 +128,8 @@ func _init(t *testing.T) string {
moduleConfigDir := filepath.Join(tempDir, "module_config")
err := os.MkdirAll(moduleConfigDir, 0755)
assert.Nil(t, err)
err = ioutil.WriteFile(filepath.Join(moduleConfigDir, "foo.yaml"), []byte(fooModuleYaml), 0755)
assert.Nil(t, err)
err = ioutil.WriteFile(filepath.Join(moduleConfigDir, "foo.yaml"), []byte(fooModuleYaml), 0755)
assert.Nil(t, err)
configPropertiesDir := filepath.Join(tempDir, "config_properties")
err = os.MkdirAll(configPropertiesDir, 0755)
......
......@@ -13,6 +13,9 @@ configs:
- key: monagent.ob.rpc.port
value: {rpc_port}
valueType: int64
- key: monagent.ob.install.path
value: {ob_install_path}
valueType: string
- key: monagent.host.ip
value: {host_ip}
valueType: string
......@@ -28,6 +31,12 @@ configs:
- key: monagent.pipeline.ob.status
value: {ob_monitor_status}
valueType: string
- key: monagent.pipeline.ob.log.status
value: {ob_log_monitor_status}
valueType: string
- key: monagent.pipeline.node.status
value: {host_monitor_status}
valueType: string
- key: monagent.alertmanager.address
value: {alertmanager_address}
valueType: string
errorLogInput: &errorLogInput
plugin: errorLogInput
config:
timeout: 10s
pluginConfig:
expireTime: 300s
collectDelay: 1s
logServiceConfig:
rootservice:
logConfig:
logDir: ${monagent.ob.install.path}/log
logFileName: rootservice.log.wf
election:
logConfig:
logDir: ${monagent.ob.install.path}/log
logFileName: election.log.wf
observer:
logConfig:
logDir: ${monagent.ob.install.path}/log
logFileName: observer.log.wf
retagProcessor: &retagProcessor
plugin: retagProcessor
config:
timout: 10s
pluginConfig:
newTags:
app: OB
svr_ip: ${monagent.host.ip}
obzone: ${monagent.ob.zone.name}
ob_cluster_id: ${monagent.ob.cluster.id}
ob_cluster_name: ${monagent.ob.cluster.name}
alertmanagerOutput: &alertmanagerOutput
plugin: alertmanagerOutput
config:
timout: 10s
pluginConfig:
address: ${monagent.alertmanager.address}
batchCount: 100
httpTimeout: 10s
retryTimes: 3
modules:
- module: monitor.ob.log
moduleType: monagent.pipeline
process: monagent
config:
name: monitor.ob.log
status: ${monagent.pipeline.ob.log.status}
pipelines:
- name: node_info
config:
scheduleStrategy: periodic
period: 5s
structure:
inputs:
- <<: *errorLogInput
processors:
- <<: *retagProcessor
output:
<<: *alertmanagerOutput
......@@ -14,17 +14,18 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/gwatts/gin-adapter v0.0.0-20170508204228-c44433c485ad
github.com/huandu/go-clone v1.3.0
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-isatty v0.0.12
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/nicksnyder/go-i18n/v2 v2.1.2
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.30.0
github.com/prometheus/mysqld_exporter v0.13.0 // indirect
github.com/prometheus/mysqld_exporter v0.13.0
github.com/prometheus/node_exporter v1.2.2
github.com/sirupsen/logrus v1.8.1
github.com/smartystreets/goconvey v1.6.4
......@@ -33,6 +34,8 @@ require (
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.2
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/text v0.3.6
......
......@@ -34,6 +34,7 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
......@@ -140,7 +141,6 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.10.0 h1:dXFJfIHVvUcpSgDOV+Ne6t7jXri8Tfv2uOLHUZ2XNuo=
github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o=
......@@ -287,7 +287,6 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 h1:uhL5Gw7BINiiPAo24A2sxkcDI0Jt/sqp1v5xQCniEFA=
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw=
github.com/jsimonetti/rtnetlink v0.0.0-20190830100107-3784a6c7c552/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw=
......@@ -379,8 +378,9 @@ github.com/mitchellh/hashstructure v1.0.0 h1:ZkRJX1CyOoTkar7p/mLS5TZU4nJ1Rn/F8u9
github.com/mitchellh/hashstructure v1.0.0/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
......@@ -388,7 +388,6 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
......@@ -413,8 +412,9 @@ github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWEr
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxSfWAKL3wpBW7V8scJMt8N8gnaMCS9E/cA=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
......@@ -465,7 +465,6 @@ github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+
github.com/prometheus/common v0.30.0 h1:JEkYlQnpzrzQFxi6gnukFPdQ+ac82oRhzMcIduJu/Ug=
github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/exporter-toolkit v0.5.1/go.mod h1:OCkM4805mmisBhLmVFw858QYi3v0wKdY6/UxrT0pZVg=
github.com/prometheus/exporter-toolkit v0.6.0 h1:rGoS9gIqj3sXaw+frvo0ozCs1CxBRqpOCGsbixC52UI=
github.com/prometheus/exporter-toolkit v0.6.0/go.mod h1:ZUBIj498ePooX9t/2xtDjeQYwvRpiPP2lh5u4iblj2g=
github.com/prometheus/mysqld_exporter v0.13.0 h1:eU2cGRb0eAqAiOZaqWcAoIfIizVt6MZGIYUmKmQqQZ0=
github.com/prometheus/mysqld_exporter v0.13.0/go.mod h1:ZsOL7ddSifE8uyWVmJ1Yq7oX8uZFrCUASWHVD/ybY+A=
......@@ -565,8 +564,9 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
......@@ -661,14 +661,14 @@ golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 h1:4CSI6oo7cOjJKajidEljs9h+uP0rRZBPPPhcCbj5mw8=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c h1:pkQiBZBvdos9qq4wBAHqlzuZHEXo07pqV06ef90u1WI=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
......@@ -835,7 +835,6 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
......@@ -912,7 +911,6 @@ gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qS
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
......
Name: obagent
Summary: ob agent program
Group: alipay/oceanbase
Version: 1.0.0
Version: 1.1.0
Release: %(echo $RELEASE)%{?dist}
URL: http://rpm.corp.taobao.com/find.php?q=obagent
License: Commercial
......
......@@ -16,6 +16,7 @@ import (
"github.com/oceanbase/obagent/plugins"
"github.com/oceanbase/obagent/plugins/inputs/mysql"
"github.com/oceanbase/obagent/plugins/inputs/nodeexporter"
"github.com/oceanbase/obagent/plugins/inputs/oceanbase/log"
"github.com/oceanbase/obagent/plugins/inputs/prometheus"
)
......@@ -32,4 +33,7 @@ func init() {
plugins.GetInputManager().Register("prometheusInput", func() plugins.Input {
return &prometheus.Prometheus{}
})
plugins.GetInputManager().Register("errorLogInput", func() plugins.Input {
return &log.ErrorLogInput{}
})
}
package log
import (
"bufio"
"context"
"fmt"
"gopkg.in/yaml.v3"
"os"
"regexp"
"sync"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/oceanbase/obagent/metric"
)
const sampleConfig = `
expireTime: 300s
collectDelay: 1s
logServiceConfig:
rootservice:
excludeRegexes:
- hello
- world
logConfig:
logDir: /home/admin/oceanbase/log
logFileName: rootservice.log.wf
election:
excludeRegexes:
- hello
- world
logConfig:
logDir: /home/admin/oceanbase/log
logFileName: election.log.wf
observer:
excludeRegexes:
- hello
- world
logConfig:
logDir: /home/admin/oceanbase/log
logFileName: observer.log.wf
`
const description = `
collect ob error logs and filter by keywords
`
type ServiceType string
const (
RootService ServiceType = "rootservice"
Observer ServiceType = "observer"
Election ServiceType = "election"
)
type LogCollectConfig struct {
LogConfig *LogConfig `yaml:"logConfig"`
ExcludeRegexes []string `yaml:"excludeRegexes"`
}
type Config struct {
LogServiceConfig map[ServiceType]*LogCollectConfig `yaml:"logServiceConfig"`
CollectDelay time.Duration `yaml:"collectDelay"`
ExpireTime time.Duration `yaml:"expireTime"`
}
type ErrorLogInput struct {
config *Config
logAnalyzer ILogAnalyzer
logProcessQueue map[ServiceType]*processQueue
ctx context.Context
cancel context.CancelFunc
metricBufferChan chan []metric.Metric
}
func (e *ErrorLogInput) SampleConfig() string {
return sampleConfig
}
func (e *ErrorLogInput) Description() string {
return description
}
func (e *ErrorLogInput) Init(config map[string]interface{}) error {
var pluginConfig Config
configBytes, err := yaml.Marshal(config)
if err != nil {
return errors.Wrap(err, "error log input encode config")
}
err = yaml.Unmarshal(configBytes, &pluginConfig)
if err != nil {
return errors.Wrap(err, "error log input decode config")
}
e.config = &pluginConfig
e.logAnalyzer = NewLogAnalyzer()
e.logProcessQueue = make(map[ServiceType]*processQueue)
e.metricBufferChan = make(chan []metric.Metric, 1000)
e.ctx, e.cancel = context.WithCancel(context.Background())
for service := range e.config.LogServiceConfig {
q := &processQueue{
queue: make([]*logFileInfo, 0, 8),
mutex: sync.Mutex{},
}
e.logProcessQueue[service] = q
}
for service := range e.config.LogServiceConfig {
go e.doCollect(service)
}
// start go routine to add log file to logProcessQueue
go e.watchFile()
log.Info("error log input init with config", e.config)
return nil
}
func (e *ErrorLogInput) doCollect(service ServiceType) {
for {
select {
case <-e.ctx.Done():
log.Infof("received exit signal, stop collect routine of service %s", service)
q, found := e.logProcessQueue[service]
if found {
for q.getQueueLen() > 0 {
fd := q.getHead().fileDesc
err := fd.Close()
if err != nil {
log.Errorf("close log file of service %s %s failed %v", service, fd.Name(), err)
}
q.popHead()
}
}
default:
e.collectErrorLogs(service)
}
time.Sleep(e.config.CollectDelay)
}
}
func (e *ErrorLogInput) collectErrorLogs(service ServiceType) {
q, found := e.logProcessQueue[service]
if !found {
log.Warnf("service %s has no process queue", service)
} else {
if q.getQueueLen() == 0 {
log.Warnf("service %s has no process queue", service)
} else {
// read head of queue
head := q.getHead()
fdScanner := bufio.NewScanner(head.fileDesc)
logMetrics := make([]metric.Metric, 0, 8)
for fdScanner.Scan() {
line := fdScanner.Text()
if line == "" || len(line) == 0 {
continue
} else {
logMetric := e.processLogLine(service, line)
if logMetric != nil {
logMetrics = append(logMetrics, logMetric)
}
}
}
if len(logMetrics) > 0 {
e.metricBufferChan <- logMetrics
}
if q.getHeadIsRenamed() {
head.fileDesc.Close()
q.popHead()
}
}
}
}
func (e *ErrorLogInput) processLogLine(service ServiceType, line string) metric.Metric {
if !e.logAnalyzer.isErrLog(line) {
return nil
}
logAt, err := e.logAnalyzer.getLogAt(line)
if err != nil {
log.Warnf("parse log time failed %s ", line)
return nil
}
if logAt.Add(e.config.ExpireTime).Before(time.Now()) {
log.Warnf("log expired, just skip, %s", line)
return nil
}
errCode, _ := e.logAnalyzer.getErrCode(line)
if e.isFiltered(service, line) {
log.Debugf("log is filtered, %s", line)
return nil
}
fields := make(map[string]interface{})
tags := make(map[string]string)
fields["log_content"] = line
tags["error_code"] = fmt.Sprintf("%d", errCode)
logMetric := metric.NewMetric("oceanbase_log", fields, tags, logAt, metric.Untyped)
return logMetric
}
func (e *ErrorLogInput) isFiltered(service ServiceType, line string) bool {
// TODO: compile first
c, found := e.config.LogServiceConfig[service]
if found {
if c.ExcludeRegexes == nil {
return false
}
for _, regex := range c.ExcludeRegexes {
match, _ := regexp.MatchString(regex, line)
if match {
return true
}
}
}
return false
}
func (e *ErrorLogInput) watchFile() {
for {
select {
case <-e.ctx.Done():
log.Info("received exit signal, stop watch file routine")
return
default:
// open file and set fd in file process queue
e.watchFileChanges()
}
time.Sleep(e.config.CollectDelay)
}
}
func (e *ErrorLogInput) checkAndOpenFile(logFileRealPath string) (*os.File, error) {
var fileDesc *os.File
_, err := os.Stat(logFileRealPath)
if err == nil {
fileDesc, err = os.OpenFile(logFileRealPath, os.O_RDONLY, os.ModePerm)
}
return fileDesc, err
}
func (e *ErrorLogInput) watchFileChanges() {
for service, logCollectConfig := range e.config.LogServiceConfig {
log.Infof("check log file of service: %s", service)
queue, exists := e.logProcessQueue[service]
logFileRealPath := fmt.Sprintf("%s/%s", logCollectConfig.LogConfig.LogDir, logCollectConfig.LogConfig.LogFileName)
log.Debugf("log file of service %s: %s", service, logFileRealPath)
newFileDesc, err := e.checkAndOpenFile(logFileRealPath)
if err != nil {
log.WithError(err).Errorf("open logfile of service %s %s failed, got error %v", service, logFileRealPath, err)
continue
}
newFileInfo, err := FileInfo(newFileDesc)
if err != nil {
log.WithError(err).Errorf("check logfile of service %s %s info failed, got error %v", service, logFileRealPath, err)
continue
}
if exists && queue.getQueueLen() > 0 {
tail := queue.getTail()
if tail == nil {
log.Errorf("queue should not be empty")
continue
}
tailFileInfo, err := FileInfo(tail.fileDesc)
if err != nil {
log.WithError(err).Errorf("failed to get file info of service %s head", service)
continue
}
if newFileInfo.DevId() == tailFileInfo.DevId() && newFileInfo.FileId() == tailFileInfo.FileId() {
log.Debugf("log file of service %s not change", service)
} else {
log.Infof("log file of service %s has changed", service)
queue.pushBack(&logFileInfo{
fileDesc: newFileDesc,
isRenamed: false,
})
// TODO: should set all node renamed except last one
queue.setRenameTrueExceptTail()
}
} else {
log.Warnf("process queue not exists or empty")
// first time, create queue, open last file
// initialize process queue
q := e.logProcessQueue[service]
q.pushBack(&logFileInfo{
fileDesc: newFileDesc,
isRenamed: false,
})
}
}
}
func (e *ErrorLogInput) Close() error {
e.cancel()
return nil
}
func (e *ErrorLogInput) Collect() ([]metric.Metric, error) {
moreMetrics := true
metrics := make([]metric.Metric, 0, 1024)
for moreMetrics {
select {
case metricsFromBuffer := <-e.metricBufferChan:
metrics = append(metrics, metricsFromBuffer...)
default:
log.Infof("no more metric from buffer")
moreMetrics = false
}
}
return metrics, nil
}
package log
import (
"os"
"time"
)
type FileInfoEx struct {
os.FileInfo
fileId uint64
devId uint64
createTime time.Time
}
func (f *FileInfoEx) FileId() uint64 {
return f.fileId
}
func (f *FileInfoEx) DevId() uint64 {
return f.devId
}
func (f *FileInfoEx) CreateTime() time.Time {
return f.createTime
}
func FileInfo(f *os.File) (*FileInfoEx, error) {
info, err := f.Stat()
if err != nil {
return nil, err
}
return toFileInfoEx(info), nil
}
//go:build darwin
// +build darwin
package log
import (
"os"
"syscall"
"time"
)
func toFileInfoEx(info os.FileInfo) *FileInfoEx {
sysInfo, _ := info.Sys().(*syscall.Stat_t)
return &FileInfoEx{
FileInfo: info,
fileId: sysInfo.Ino,
devId: uint64(sysInfo.Dev),
createTime: time.Unix(sysInfo.Ctimespec.Unix()),
}
}
//go:build linux
// +build linux
package log
import (
"os"
"syscall"
"time"
)
func toFileInfoEx(info os.FileInfo) *FileInfoEx {
sysInfo, _ := info.Sys().(*syscall.Stat_t)
return &FileInfoEx{
FileInfo: info,
fileId: sysInfo.Ino,
devId: sysInfo.Dev,
createTime: time.Unix(sysInfo.Ctim.Unix()),
}
}
package log
import (
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
const logAtLayout = "2006-01-02 15:04:05.000000"
const logTimeInFileNameLayout = "20060102150405"
type logFileInfo struct {
fileDesc *os.File
// 该文件已经写满了,并被重命名过
isRenamed bool
}
type LogConfig struct {
LogDir string `yaml:"logDir"`
LogFileName string `yaml:"logFileName"`
}
type reportedError struct {
ErrorCode int
ReportedAt time.Time
}
type ILogAnalyzer interface {
isErrLog(logLine string) bool
getErrCode(logLine string) (int, error)
getLogAt(logLine string) (time.Time, error)
}
type logAnalyzer struct {
logAtRegexp *regexp.Regexp
errCodeRegexp *regexp.Regexp
}
func NewLogAnalyzer() *logAnalyzer {
return &logAnalyzer{
logAtRegexp: regexp.MustCompile(`^\[\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d\.\d\d\d\d\d\d\]`),
errCodeRegexp: regexp.MustCompile(`ret=-\d+`),
}
}
// isErrLog 检查是否 error 类型的 log(后面看需求是否需要扩展为其他类型日志)
func (l *logAnalyzer) isErrLog(logLine string) bool {
// example: [2020-08-07 05:55:44.377075] ERROR [RS] ob_server_table_operator.cpp:376 [151575][4][Y0-0000000000000000] [lt = 5] [dc =0] svr_status(svr_status = "active", display_status =1)
if len(logLine) > 34 {
return "ERROR" == logLine[29:34]
}
return false
}
// getErrCode 获取日志中的错误码
func (l *logAnalyzer) getErrCode(logLine string) (int, error) {
matchedErrCodes := l.errCodeRegexp.FindAllString(logLine, -1)
matchedErrCodesLen := len(matchedErrCodes)
if matchedErrCodesLen > 0 {
lastErrCodeStr := matchedErrCodes[matchedErrCodesLen-1]
// 匹配的格式为 ret=-\d+,数字从下标位置 5 开始
if len(lastErrCodeStr) >= 5 {
errCode, err := strconv.Atoi(lastErrCodeStr[5:])
if err != nil {
return -1, err
}
return errCode, nil
}
} else if strings.Contains(logLine, "clog disk is almost full") {
return 4264, nil
} else if strings.Contains(logLine, "partition table update task cost too much time to execute") {
return 4015, nil
}
return -1, nil
}
func (l *logAnalyzer) getLogAt(logLine string) (time.Time, error) {
timeStr := strings.TrimRight(strings.TrimLeft(l.logAtRegexp.FindString(logLine), "["), "]")
logAt, err := time.ParseInLocation(logAtLayout, timeStr, time.Local)
if err != nil {
return time.Time{}, err
}
return logAt, nil
}
func matchString(reg string, content string) (matched bool, err error) {
matched = strings.Contains(content, reg)
return
}
type processQueue struct {
queue []*logFileInfo
mutex sync.Mutex
}
func (p *processQueue) getQueueLen() int {
return len(p.queue)
}
func (p *processQueue) getHead() *logFileInfo {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.getQueueLen() == 0 {
return nil
}
return p.queue[0]
}
func (p *processQueue) getTail() *logFileInfo {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.getQueueLen() == 0 {
return nil
}
return p.queue[p.getQueueLen()-1]
}
func (p *processQueue) popHead() {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.getQueueLen() == 0 {
return
}
p.queue = p.queue[1:]
}
func (p *processQueue) pushBack(info *logFileInfo) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.queue = append(p.queue, info)
}
func (p *processQueue) setRenameTrueExceptTail() {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.getQueueLen() == 0 {
return
}
for i := 0; i < p.getQueueLen()-1; i++ {
p.queue[i].isRenamed = true
}
}
func (p *processQueue) setHeadIsRenameTrue() {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.getQueueLen() == 0 {
return
}
p.queue[0].isRenamed = true
}
func (p *processQueue) getHeadIsRenamed() bool {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.getQueueLen() == 0 {
return false
}
return p.queue[0].isRenamed
}
......@@ -12,5 +12,13 @@
package outputs
import (
"github.com/oceanbase/obagent/plugins"
"github.com/oceanbase/obagent/plugins/outputs/prometheus"
)
func init() {
plugins.GetOutputManager().Register("alertmanagerOutput", func() plugins.Output {
return &prometheus.AlertmanagerOutput{}
})
}
package prometheus
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/pkg/errors"
// "github.com/avast/retry-go/v3"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
"github.com/oceanbase/obagent/metric"
)
const alertmanagerOutputSampleConfig = `
address: http://1.1.1.1:9093
httpTimeout: 10s
batchCount: 100
retryTimes: 3
`
const alertmanagerOutputDescription = `
send metric data as alarm to alertmanager
`
var defaultTimeout = 10 * time.Second
type AlertmanagerOutputConfig struct {
Address string `yaml:"address"`
BatchCount int `yaml:"batchCount"`
HttpTimeout time.Duration `yaml:"httpTimeout"`
RetryTimes int `yaml:"retryTimes"`
}
type AlertmanagerOutput struct {
config *AlertmanagerOutputConfig
httpClient *http.Client
taskChan chan []metric.Metric
ctx context.Context
cancelFunc context.CancelFunc
}
func (a *AlertmanagerOutput) Init(config map[string]interface{}) error {
configData, err := yaml.Marshal(config)
if err != nil {
return errors.Wrap(err, "alertmanager output encode config")
}
a.config = &AlertmanagerOutputConfig{}
err = yaml.Unmarshal(configData, a.config)
if err != nil {
return errors.Wrap(err, "alertmanager output decode config")
}
a.taskChan = make(chan []metric.Metric, 1024)
a.ctx, a.cancelFunc = context.WithCancel(context.Background())
a.httpClient = &http.Client{}
if a.config.HttpTimeout == 0 {
a.httpClient.Timeout = defaultTimeout
} else {
a.httpClient.Timeout = a.config.HttpTimeout
}
go a.schedule()
log.Infof("alertmanager output inited with config : %v", a.config)
return nil
}
func (a *AlertmanagerOutput) Close() error {
a.cancelFunc()
close(a.taskChan)
return nil
}
func (a *AlertmanagerOutput) SampleConfig() string {
return alertmanagerOutputSampleConfig
}
func (a *AlertmanagerOutput) Description() string {
return alertmanagerOutputDescription
}
func (a *AlertmanagerOutput) Write(metrics []metric.Metric) error {
for len(metrics) > 0 {
count := a.config.BatchCount
if len(metrics) < count {
count = len(metrics)
}
a.taskChan <- metrics[0:count]
metrics = metrics[count:]
}
return nil
}
func (a *AlertmanagerOutput) schedule() {
for {
select {
case <-a.ctx.Done():
break
case metrics := <-a.taskChan:
err := a.sendAlarm(metrics)
log.WithError(err).Errorf("send alarm got error: %v", err)
}
}
}
func (a *AlertmanagerOutput) sendAlarm(metrics []metric.Metric) error {
alarmList := make([]map[string]interface{}, 0, a.config.BatchCount)
for _, metricEntry := range metrics {
alarmList = append(alarmList, a.convertMetricToAlarm(metricEntry))
}
jsonData, err := json.Marshal(alarmList)
log.Debugf("send alarm metrics: %s", jsonData)
body := bytes.NewBuffer(jsonData)
pushAlertsAddress := fmt.Sprintf("%s/%s", a.config.Address, "api/v2/alerts")
req, err := http.NewRequest(http.MethodPost, pushAlertsAddress, body)
if err != nil {
return errors.Wrap(err, "generate http request")
}
req.Header.Set("Content-Type", "application/json")
resp, err := a.httpClient.Do(req)
log.Infof("send alarm got response: %v", resp)
if err != nil {
return errors.Wrap(err, "do query")
}
if err != nil {
return errors.Wrap(err, "read response")
}
if resp.StatusCode != 200 {
return errors.New("send alarm got abnormal status code")
}
return nil
}
func (a *AlertmanagerOutput) convertMetricToAlarm(metric metric.Metric) map[string]interface{} {
alarmItem := make(map[string]interface{})
labels := metric.Tags()
labels["alertname"] = metric.GetName()
annotations := metric.Fields()
alarmItem["labels"] = labels
alarmItem["annotations"] = annotations
alarmItem["startAt"] = metric.GetTime()
alarmItem["generatorURL"] = ""
return alarmItem
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册