diff --git a/plugins/inputs/oceanbase/log/error_log_input.go b/plugins/inputs/oceanbase/log/error_log_input.go index d28867ec993cb2e45cb8826feffcde6267fd1855..fef92c5229f5ff914538618d57c196889bd956ad 100644 --- a/plugins/inputs/oceanbase/log/error_log_input.go +++ b/plugins/inputs/oceanbase/log/error_log_input.go @@ -79,12 +79,13 @@ type Config struct { } type ErrorLogInput struct { - config *Config - logAnalyzer ILogAnalyzer - logProcessQueue map[ServiceType]*processQueue - ctx context.Context - cancel context.CancelFunc - metricBufferChan chan []metric.Metric + config *Config + logAnalyzer ILogAnalyzer + logProcessQueue map[ServiceType]*processQueue + ctx context.Context + cancel context.CancelFunc + tbackgroundTaskWaitGroup sync.WaitGroup + metricBufferChan chan []metric.Metric } func (e *ErrorLogInput) SampleConfig() string { @@ -123,10 +124,12 @@ func (e *ErrorLogInput) Init(config map[string]interface{}) error { } for service := range e.config.LogServiceConfig { + e.backgroundTaskWaitGroup.Add(1) go e.doCollect(service) } // start go routine to add log file to logProcessQueue + e.backgroundTaskWaitGroup.Add(1) go e.watchFile() log.Info("error log input init with config", e.config) @@ -135,6 +138,7 @@ func (e *ErrorLogInput) Init(config map[string]interface{}) error { } func (e *ErrorLogInput) doCollect(service ServiceType) { + defer e.backgroundTaskWaitGroup.Done() for { select { case <-e.ctx.Done(): @@ -221,7 +225,6 @@ func (e *ErrorLogInput) processLogLine(service ServiceType, line string) metric. } func (e *ErrorLogInput) isFiltered(service ServiceType, line string) bool { - // TODO: compile first c, found := e.config.LogServiceConfig[service] if found { if c.ExcludeRegexes == nil { @@ -238,6 +241,7 @@ func (e *ErrorLogInput) isFiltered(service ServiceType, line string) bool { } func (e *ErrorLogInput) watchFile() { + defer e.backgroundTaskWaitGroup.Done() for { select { case <-e.ctx.Done(): @@ -297,7 +301,6 @@ func (e *ErrorLogInput) watchFileChanges() { fileDesc: newFileDesc, isRenamed: false, }) - // TODO: should set all node renamed except last one queue.setRenameTrueExceptTail() } } else { @@ -315,6 +318,7 @@ func (e *ErrorLogInput) watchFileChanges() { func (e *ErrorLogInput) Close() error { e.cancel() + e.backgroundTaskWaitGroup.Wait() return nil } diff --git a/plugins/outputs/prometheus/alertmanager.go b/plugins/outputs/prometheus/alertmanager.go index 2dd74244cd60dbb92fc52bd0e3a7b71b5127229b..62520e14ced30e57135dfc3dc387e28ca81c0937 100644 --- a/plugins/outputs/prometheus/alertmanager.go +++ b/plugins/outputs/prometheus/alertmanager.go @@ -19,11 +19,11 @@ import ( "fmt" "math/rand" "net/http" + "sync" "time" "github.com/pkg/errors" - // "github.com/avast/retry-go/v3" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" @@ -51,11 +51,12 @@ type AlertmanagerOutputConfig struct { } type AlertmanagerOutput struct { - config *AlertmanagerOutputConfig - httpClient *http.Client - taskChan chan []metric.Metric - ctx context.Context - cancelFunc context.CancelFunc + config *AlertmanagerOutputConfig + httpClient *http.Client + taskChan chan []metric.Metric + ctx context.Context + cancelFunc context.CancelFunc + backgroundTaskWaitGroup sync.WaitGroup } func (a *AlertmanagerOutput) Init(config map[string]interface{}) error { @@ -79,6 +80,7 @@ func (a *AlertmanagerOutput) Init(config map[string]interface{}) error { a.httpClient.Timeout = a.config.HttpTimeout } + a.backgroundTaskWaitGroup.Add(1) go a.schedule() log.Infof("alertmanager output inited with config : %v", a.config) @@ -88,6 +90,7 @@ func (a *AlertmanagerOutput) Init(config map[string]interface{}) error { func (a *AlertmanagerOutput) Close() error { a.cancelFunc() close(a.taskChan) + a.backgroundTaskWaitGroup.Wait() return nil } @@ -112,6 +115,7 @@ func (a *AlertmanagerOutput) Write(metrics []metric.Metric) error { } func (a *AlertmanagerOutput) schedule() { + a.backgroundTaskWaitGroup.Done() for { select { case <-a.ctx.Done():