error_log_input.go 9.0 KB
Newer Older
Z
zhj-luo 已提交
1 2 3 4 5 6 7 8 9 10 11 12
/*
 * Copyright (c) 2023 OceanBase
 * OCP Express is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

13 14 15 16 17 18 19 20 21 22 23 24 25
package log

import (
	"bufio"
	"context"
	"fmt"
	"os"
	"regexp"
	"sync"
	"time"

	"github.com/pkg/errors"
	log "github.com/sirupsen/logrus"
O
ob-robot 已提交
26
	"gopkg.in/yaml.v3"
27

O
ob-robot 已提交
28
	"github.com/oceanbase/obagent/monitor/message"
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
)

const sampleConfig = `
expireTime: 300s
collectDelay: 1s
logServiceConfig:
  rootservice:
    excludeRegexes:
      - hello
      - world
    logConfig:
      logDir: /home/admin/oceanbase/log
      logFileName: rootservice.log.wf
  election:
    excludeRegexes:
      - hello
      - world
    logConfig:
      logDir: /home/admin/oceanbase/log
      logFileName: election.log.wf
  observer:
    excludeRegexes:
      - hello
      - world
    logConfig:
      logDir: /home/admin/oceanbase/log
      logFileName: observer.log.wf
`

const description = `
collect ob error logs and filter by keywords
`

type ServiceType string

const (
	RootService ServiceType = "rootservice"
	Observer    ServiceType = "observer"
	Election    ServiceType = "election"
)

type LogCollectConfig struct {
	LogConfig      *LogConfig `yaml:"logConfig"`
	ExcludeRegexes []string   `yaml:"excludeRegexes"`
}

type Config struct {
	LogServiceConfig map[ServiceType]*LogCollectConfig `yaml:"logServiceConfig"`
	CollectDelay     time.Duration                     `yaml:"collectDelay"`
	ExpireTime       time.Duration                     `yaml:"expireTime"`
}

type ErrorLogInput struct {
C
chris-sun-star 已提交
82 83 84 85 86 87
	config                  *Config
	logAnalyzer             ILogAnalyzer
	logProcessQueue         map[ServiceType]*processQueue
	ctx                     context.Context
	cancel                  context.CancelFunc
	backgroundTaskWaitGroup sync.WaitGroup
O
ob-robot 已提交
88
	metricBufferChan        chan []*message.Message
89 90 91 92 93 94 95 96 97 98
}

func (e *ErrorLogInput) SampleConfig() string {
	return sampleConfig
}

func (e *ErrorLogInput) Description() string {
	return description
}

O
ob-robot 已提交
99
func (e *ErrorLogInput) Init(ctx context.Context, config map[string]interface{}) error {
100 101 102 103 104 105 106 107 108 109 110 111 112 113

	var pluginConfig Config
	configBytes, err := yaml.Marshal(config)
	if err != nil {
		return errors.Wrap(err, "error log input encode config")
	}
	err = yaml.Unmarshal(configBytes, &pluginConfig)
	if err != nil {
		return errors.Wrap(err, "error log input decode config")
	}
	e.config = &pluginConfig

	e.logAnalyzer = NewLogAnalyzer()
	e.logProcessQueue = make(map[ServiceType]*processQueue)
O
ob-robot 已提交
114
	e.metricBufferChan = make(chan []*message.Message, 1000)
115 116 117 118 119 120 121 122 123 124 125 126

	e.ctx, e.cancel = context.WithCancel(context.Background())

	for service := range e.config.LogServiceConfig {
		q := &processQueue{
			queue: make([]*logFileInfo, 0, 8),
			mutex: sync.Mutex{},
		}
		e.logProcessQueue[service] = q
	}

	for service := range e.config.LogServiceConfig {
C
chris-sun-star 已提交
127
		e.backgroundTaskWaitGroup.Add(1)
128 129 130 131
		go e.doCollect(service)
	}

	// start go routine to add log file to logProcessQueue
C
chris-sun-star 已提交
132
	e.backgroundTaskWaitGroup.Add(1)
133 134 135 136 137 138 139
	go e.watchFile()

	log.Info("error log input init with config", e.config)

	return nil
}

O
ob-robot 已提交
140 141 142 143 144 145 146 147 148
func (e *ErrorLogInput) Start(out chan<- []*message.Message) error {
	log.WithContext(e.ctx).Info("errorLogInput started")

	return nil
}

func (e *ErrorLogInput) update(out chan<- []*message.Message) {
}

149
func (e *ErrorLogInput) doCollect(service ServiceType) {
C
chris-sun-star 已提交
150
	defer e.backgroundTaskWaitGroup.Done()
151 152 153 154 155 156 157 158 159 160
	for {
		select {
		case <-e.ctx.Done():
			log.Infof("received exit signal, stop collect routine of service %s", service)
			q, found := e.logProcessQueue[service]
			if found {
				for q.getQueueLen() > 0 {
					fd := q.getHead().fileDesc
					err := fd.Close()
					if err != nil {
O
ob-robot 已提交
161
						log.Warnf("close log file of service %s %s failed %v", service, fd.Name(), err)
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
					}
					q.popHead()
				}
			}
		default:
			e.collectErrorLogs(service)
		}
		time.Sleep(e.config.CollectDelay)
	}
}

func (e *ErrorLogInput) collectErrorLogs(service ServiceType) {
	q, found := e.logProcessQueue[service]
	if !found {
		log.Warnf("service %s has no process queue", service)
	} else {
		if q.getQueueLen() == 0 {
			log.Warnf("service %s has no process queue", service)
		} else {
			// read head of queue
			head := q.getHead()
			fdScanner := bufio.NewScanner(head.fileDesc)
O
ob-robot 已提交
184
			logMetrics := make([]*message.Message, 0, 8)
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
			for fdScanner.Scan() {
				line := fdScanner.Text()
				if line == "" || len(line) == 0 {
					continue
				} else {
					logMetric := e.processLogLine(service, line)
					if logMetric != nil {
						logMetrics = append(logMetrics, logMetric)
					}
				}
			}

			if len(logMetrics) > 0 {
				e.metricBufferChan <- logMetrics
			}

			if q.getHeadIsRenamed() {
				head.fileDesc.Close()
				q.popHead()
			}
		}
	}
}

O
ob-robot 已提交
209
func (e *ErrorLogInput) processLogLine(service ServiceType, line string) *message.Message {
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
	if !e.logAnalyzer.isErrLog(line) {
		return nil
	}
	logAt, err := e.logAnalyzer.getLogAt(line)
	if err != nil {
		log.Warnf("parse log time failed %s ", line)
		return nil
	}
	if logAt.Add(e.config.ExpireTime).Before(time.Now()) {
		log.Warnf("log expired, just skip, %s", line)
		return nil
	}
	errCode, _ := e.logAnalyzer.getErrCode(line)

	if e.isFiltered(service, line) {
		log.Debugf("log is filtered, %s", line)
		return nil
	}
O
ob-robot 已提交
228 229 230 231 232 233 234
	fields := make([]message.FieldEntry, 0)
	tags := make([]message.TagEntry, 0)
	fields = append(fields, message.FieldEntry{"log_content", line})
	tags = append(tags, message.TagEntry{"error_code", fmt.Sprintf("%d", errCode)})
	//fields["log_content"] = line
	//tags["error_code"] = fmt.Sprintf("%d", errCode)
	logMetric := message.NewMessageWithTagsFields("oceanbase_log", message.Untyped, logAt, tags, fields)
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
	return logMetric
}

func (e *ErrorLogInput) isFiltered(service ServiceType, line string) bool {
	c, found := e.config.LogServiceConfig[service]
	if found {
		if c.ExcludeRegexes == nil {
			return false
		}
		for _, regex := range c.ExcludeRegexes {
			match, _ := regexp.MatchString(regex, line)
			if match {
				return true
			}
		}
	}
	return false
}

func (e *ErrorLogInput) watchFile() {
C
chris-sun-star 已提交
255
	defer e.backgroundTaskWaitGroup.Done()
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
	for {
		select {
		case <-e.ctx.Done():
			log.Info("received exit signal, stop watch file routine")
			return
		default:
			// open file and set fd in file process queue
			e.watchFileChanges()
		}
		time.Sleep(e.config.CollectDelay)
	}
}

func (e *ErrorLogInput) checkAndOpenFile(logFileRealPath string) (*os.File, error) {
	var fileDesc *os.File
	_, err := os.Stat(logFileRealPath)
	if err == nil {
		fileDesc, err = os.OpenFile(logFileRealPath, os.O_RDONLY, os.ModePerm)
	}
	return fileDesc, err
}

func (e *ErrorLogInput) watchFileChanges() {
	for service, logCollectConfig := range e.config.LogServiceConfig {
		log.Infof("check log file of service: %s", service)
		queue, exists := e.logProcessQueue[service]
		logFileRealPath := fmt.Sprintf("%s/%s", logCollectConfig.LogConfig.LogDir, logCollectConfig.LogConfig.LogFileName)
		log.Debugf("log file of service %s: %s", service, logFileRealPath)
		newFileDesc, err := e.checkAndOpenFile(logFileRealPath)
		if err != nil {
O
ob-robot 已提交
286
			log.WithError(err).Warnf("open logfile of service %s %s failed, got error %v", service, logFileRealPath, err)
287 288 289 290
			continue
		}
		newFileInfo, err := FileInfo(newFileDesc)
		if err != nil {
O
ob-robot 已提交
291
			log.WithError(err).Warnf("check logfile of service %s %s info failed, got error %v", service, logFileRealPath, err)
292 293 294 295 296 297
			continue
		}

		if exists && queue.getQueueLen() > 0 {
			tail := queue.getTail()
			if tail == nil {
O
ob-robot 已提交
298
				log.Warnf("queue should not be empty")
299 300 301 302
				continue
			}
			tailFileInfo, err := FileInfo(tail.fileDesc)
			if err != nil {
O
ob-robot 已提交
303
				log.WithError(err).Warnf("failed to get file info of service %s head", service)
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
				continue
			}

			if newFileInfo.DevId() == tailFileInfo.DevId() && newFileInfo.FileId() == tailFileInfo.FileId() {
				log.Debugf("log file of service %s not change", service)
			} else {
				log.Infof("log file of service %s has changed", service)
				queue.pushBack(&logFileInfo{
					fileDesc:  newFileDesc,
					isRenamed: false,
				})
				queue.setRenameTrueExceptTail()
			}
		} else {
			log.Warnf("process queue not exists or empty")
			// first time, create queue, open last file
			// initialize process queue
			q := e.logProcessQueue[service]
			q.pushBack(&logFileInfo{
				fileDesc:  newFileDesc,
				isRenamed: false,
			})
		}
	}
}

func (e *ErrorLogInput) Close() error {
	e.cancel()
C
chris-sun-star 已提交
332
	e.backgroundTaskWaitGroup.Wait()
333 334 335
	return nil
}

O
ob-robot 已提交
336
func (e *ErrorLogInput) Collect(ctx context.Context) ([]*message.Message, error) {
337
	moreMetrics := true
O
ob-robot 已提交
338
	metrics := make([]*message.Message, 0, 1024)
339 340 341 342 343 344 345 346 347 348 349
	for moreMetrics {
		select {
		case metricsFromBuffer := <-e.metricBufferChan:
			metrics = append(metrics, metricsFromBuffer...)
		default:
			log.Infof("no more metric from buffer")
			moreMetrics = false
		}
	}
	return metrics, nil
}