log_tailer.go 1.9 KB
Newer Older
O
ob-robot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
package log_tailer

import (
	"context"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	log "github.com/sirupsen/logrus"

	"github.com/oceanbase/obagent/config/monagent"
	"github.com/oceanbase/obagent/monitor/message"
	"github.com/oceanbase/obagent/stat"
)

// LogTailer tails log for metric platform or others.
type LogTailer struct {
	conf        monagent.LogTailerConfig
	executors   []*LogTailerExecutor
	toBeStopped chan bool
}

// NewLogTailer creates a new instance of LogTailer
func NewLogTailer(conf monagent.LogTailerConfig) (*LogTailer, error) {
	if conf.ProcessQueueCapacity == 0 {
		conf.ProcessQueueCapacity = 10
	}
	return &LogTailer{
		conf:        conf,
		executors:   make([]*LogTailerExecutor, 0),
		toBeStopped: make(chan bool),
	}, nil
}

func (l *LogTailer) isStopped() bool {
	for _, executor := range l.executors {
		if !executor.isStopped() {
			return false
		}
	}
	return true
}

func (l *LogTailer) Start(out chan<- []*message.Message) (err error) {
	log.Info("start to run log tailer")
	err = l.Run(context.Background(), out)
	if err != nil {
		log.Error("failed to run log tailer")
		return err
	}
	return nil
}

func (l *LogTailer) Stop() {
	close(l.toBeStopped)
	// graceful stop
	i := 0
	for !l.isStopped() && i < 8000 {
		time.Sleep(time.Microsecond)
		i++
	}
	log.Infof("logTailer stopped, isStopped:%t", l.isStopped())
	stat.LogTailerCount.With(prometheus.Labels{stat.LogFileName: "all"}).Dec()
}

func (l *LogTailer) Run(ctx context.Context, out chan<- []*message.Message) error {
	ctxLog := log.WithContext(ctx)
	for _, tailConfig := range l.conf.TailConfigs {
		executor := NewLogTailerExecutor(tailConfig, l.conf.RecoveryConfig, l.toBeStopped, out)
		l.executors = append(l.executors, executor)
		err := executor.TailLog(ctx)
		if err != nil {
			ctxLog.WithField("tailConfig", tailConfig).WithError(err).Warn("failed to process log")
		}
	}

	return nil
}