提交 f640b64a 编写于 作者: C chris-sun-star

close wait

上级 784775cf
...@@ -79,12 +79,13 @@ type Config struct { ...@@ -79,12 +79,13 @@ type Config struct {
} }
type ErrorLogInput struct { type ErrorLogInput struct {
config *Config config *Config
logAnalyzer ILogAnalyzer logAnalyzer ILogAnalyzer
logProcessQueue map[ServiceType]*processQueue logProcessQueue map[ServiceType]*processQueue
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
metricBufferChan chan []metric.Metric tbackgroundTaskWaitGroup sync.WaitGroup
metricBufferChan chan []metric.Metric
} }
func (e *ErrorLogInput) SampleConfig() string { func (e *ErrorLogInput) SampleConfig() string {
...@@ -123,10 +124,12 @@ func (e *ErrorLogInput) Init(config map[string]interface{}) error { ...@@ -123,10 +124,12 @@ func (e *ErrorLogInput) Init(config map[string]interface{}) error {
} }
for service := range e.config.LogServiceConfig { for service := range e.config.LogServiceConfig {
e.backgroundTaskWaitGroup.Add(1)
go e.doCollect(service) go e.doCollect(service)
} }
// start go routine to add log file to logProcessQueue // start go routine to add log file to logProcessQueue
e.backgroundTaskWaitGroup.Add(1)
go e.watchFile() go e.watchFile()
log.Info("error log input init with config", e.config) log.Info("error log input init with config", e.config)
...@@ -135,6 +138,7 @@ func (e *ErrorLogInput) Init(config map[string]interface{}) error { ...@@ -135,6 +138,7 @@ func (e *ErrorLogInput) Init(config map[string]interface{}) error {
} }
func (e *ErrorLogInput) doCollect(service ServiceType) { func (e *ErrorLogInput) doCollect(service ServiceType) {
defer e.backgroundTaskWaitGroup.Done()
for { for {
select { select {
case <-e.ctx.Done(): case <-e.ctx.Done():
...@@ -221,7 +225,6 @@ func (e *ErrorLogInput) processLogLine(service ServiceType, line string) metric. ...@@ -221,7 +225,6 @@ func (e *ErrorLogInput) processLogLine(service ServiceType, line string) metric.
} }
func (e *ErrorLogInput) isFiltered(service ServiceType, line string) bool { func (e *ErrorLogInput) isFiltered(service ServiceType, line string) bool {
// TODO: compile first
c, found := e.config.LogServiceConfig[service] c, found := e.config.LogServiceConfig[service]
if found { if found {
if c.ExcludeRegexes == nil { if c.ExcludeRegexes == nil {
...@@ -238,6 +241,7 @@ func (e *ErrorLogInput) isFiltered(service ServiceType, line string) bool { ...@@ -238,6 +241,7 @@ func (e *ErrorLogInput) isFiltered(service ServiceType, line string) bool {
} }
func (e *ErrorLogInput) watchFile() { func (e *ErrorLogInput) watchFile() {
defer e.backgroundTaskWaitGroup.Done()
for { for {
select { select {
case <-e.ctx.Done(): case <-e.ctx.Done():
...@@ -297,7 +301,6 @@ func (e *ErrorLogInput) watchFileChanges() { ...@@ -297,7 +301,6 @@ func (e *ErrorLogInput) watchFileChanges() {
fileDesc: newFileDesc, fileDesc: newFileDesc,
isRenamed: false, isRenamed: false,
}) })
// TODO: should set all node renamed except last one
queue.setRenameTrueExceptTail() queue.setRenameTrueExceptTail()
} }
} else { } else {
...@@ -315,6 +318,7 @@ func (e *ErrorLogInput) watchFileChanges() { ...@@ -315,6 +318,7 @@ func (e *ErrorLogInput) watchFileChanges() {
func (e *ErrorLogInput) Close() error { func (e *ErrorLogInput) Close() error {
e.cancel() e.cancel()
e.backgroundTaskWaitGroup.Wait()
return nil return nil
} }
......
...@@ -19,11 +19,11 @@ import ( ...@@ -19,11 +19,11 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
// "github.com/avast/retry-go/v3"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
...@@ -51,11 +51,12 @@ type AlertmanagerOutputConfig struct { ...@@ -51,11 +51,12 @@ type AlertmanagerOutputConfig struct {
} }
type AlertmanagerOutput struct { type AlertmanagerOutput struct {
config *AlertmanagerOutputConfig config *AlertmanagerOutputConfig
httpClient *http.Client httpClient *http.Client
taskChan chan []metric.Metric taskChan chan []metric.Metric
ctx context.Context ctx context.Context
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
backgroundTaskWaitGroup sync.WaitGroup
} }
func (a *AlertmanagerOutput) Init(config map[string]interface{}) error { func (a *AlertmanagerOutput) Init(config map[string]interface{}) error {
...@@ -79,6 +80,7 @@ func (a *AlertmanagerOutput) Init(config map[string]interface{}) error { ...@@ -79,6 +80,7 @@ func (a *AlertmanagerOutput) Init(config map[string]interface{}) error {
a.httpClient.Timeout = a.config.HttpTimeout a.httpClient.Timeout = a.config.HttpTimeout
} }
a.backgroundTaskWaitGroup.Add(1)
go a.schedule() go a.schedule()
log.Infof("alertmanager output inited with config : %v", a.config) log.Infof("alertmanager output inited with config : %v", a.config)
...@@ -88,6 +90,7 @@ func (a *AlertmanagerOutput) Init(config map[string]interface{}) error { ...@@ -88,6 +90,7 @@ func (a *AlertmanagerOutput) Init(config map[string]interface{}) error {
func (a *AlertmanagerOutput) Close() error { func (a *AlertmanagerOutput) Close() error {
a.cancelFunc() a.cancelFunc()
close(a.taskChan) close(a.taskChan)
a.backgroundTaskWaitGroup.Wait()
return nil return nil
} }
...@@ -112,6 +115,7 @@ func (a *AlertmanagerOutput) Write(metrics []metric.Metric) error { ...@@ -112,6 +115,7 @@ func (a *AlertmanagerOutput) Write(metrics []metric.Metric) error {
} }
func (a *AlertmanagerOutput) schedule() { func (a *AlertmanagerOutput) schedule() {
a.backgroundTaskWaitGroup.Done()
for { for {
select { select {
case <-a.ctx.Done(): case <-a.ctx.Done():
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册