prometheus.go 3.9 KB
Newer Older
O
ob-robot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
package prometheus

import (
	"context"
	"net/http"
	"sync"
	"time"

	"github.com/pkg/errors"
	"github.com/prometheus/common/expfmt"
	log "github.com/sirupsen/logrus"
	"gopkg.in/yaml.v3"

	agentlog "github.com/oceanbase/obagent/log"
	"github.com/oceanbase/obagent/monitor/message"
)

const sampleConfig = `
addresses:['http://127.0.0.1:9090/metrics/node', 'http://127.0.0.1:9091/metrics/node']
httpTimeout: 10s
`

const description = `
collect from http server via prometheus protocol
`

var defaultTimeout = 10 * time.Second
var defaultCollectInterval = 15 * time.Second

type Config struct {
	Addresses       []string      `yaml:"addresses"`
	HttpTimeout     time.Duration `yaml:"httpTimeout"`
	CollectInterval time.Duration `yaml:"collect_interval"`
}

type Prometheus struct {
	sourceConfig map[string]interface{}

	config     *Config
	httpClient *http.Client

	ctx  context.Context
	done chan struct{}
}

func (p *Prometheus) Init(ctx context.Context, config map[string]interface{}) error {
	p.sourceConfig = config
	configData, err := yaml.Marshal(p.sourceConfig)
	if err != nil {
		return errors.Wrap(err, "prometheus input encode config")
	}
	p.config = &Config{}
	err = yaml.Unmarshal(configData, p.config)
	if err != nil {
		return errors.Wrap(err, "prometheus input decode config")
	}
	log.WithContext(ctx).Infof("prometheus input config : %v", p.config)
	p.httpClient = &http.Client{}
	if p.config.HttpTimeout == 0 {
		p.httpClient.Timeout = defaultTimeout
	} else {
		p.httpClient.Timeout = p.config.HttpTimeout
	}
	p.ctx = ctx
	p.done = make(chan struct{})
	if p.config.CollectInterval == 0 {
		p.config.CollectInterval = defaultCollectInterval
	}

	return nil
}

func (p *Prometheus) SampleConfig() string {
	return sampleConfig
}

func (p *Prometheus) Description() string {
	return description
}

func (p *Prometheus) Start(out chan<- []*message.Message) error {
	log.WithContext(p.ctx).Info("start prometheusInput")
	go p.update(p.ctx, out)
	return nil
}

func (p *Prometheus) update(ctx context.Context, out chan<- []*message.Message) {
	ticker := time.NewTicker(p.config.CollectInterval)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			msgs, err := p.CollectMsgs(ctx)
			if err != nil {
				log.WithContext(ctx).WithError(err).Warn("prometheusInput collect messages failed")
				continue
			}
			out <- msgs
		case <-p.done:
			log.Info("prometheusInput exited")
			return
		}
	}
}

func (p *Prometheus) Stop() {
	if p.done != nil {
		close(p.done)
	}
}

func (p *Prometheus) CollectMsgs(ctx context.Context) ([]*message.Message, error) {
	if p.httpClient == nil {
		return nil, errors.New("prometheus http client is nil")
	}
	var metricsTotal []*message.Message
	var wg sync.WaitGroup
	var mutex sync.Mutex
	for _, address := range p.config.Addresses {
		wg.Add(1)
		go collect(ctx, p.httpClient, address, &metricsTotal, &wg, &mutex)
	}
	wg.Wait()

	return metricsTotal, nil
}

func collect(ctx context.Context, client *http.Client, url string, metricsTotal *[]*message.Message, waitGroup *sync.WaitGroup, mutex *sync.Mutex) {
	defer waitGroup.Done()

	entry := log.WithContext(context.WithValue(ctx, agentlog.StartTimeKey, time.Now())).WithField("prometheus url", url)
	resp, err := client.Get(url)
	entry.Debug("get message end")
	if err != nil {
		log.WithContext(ctx).WithError(err).Warn("http client collect failed")
		return
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		log.WithContext(ctx).Warnf("http get resp failed status code is %d", resp.StatusCode)
		return
	}

	var parser expfmt.TextParser
	metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
	if err != nil {
		log.WithContext(ctx).WithError(err).Warn("read text format failed")
		return
	}

	var metrics []*message.Message
	for _, metricFamily := range metricFamilies {
		msgs := message.ParseFromMetricFamily(metricFamily)
		metrics = append(metrics, msgs...)
	}
	mutex.Lock()
	*metricsTotal = append(*metricsTotal, metrics...)
	mutex.Unlock()
}