writer.go 6.1 KB
Newer Older
U
UlricQin 已提交
1 2 3 4 5
package writer

import (
	"bytes"
	"context"
U
Ulric Qin 已提交
6
	"fmt"
U
UlricQin 已提交
7 8
	"net"
	"net/http"
可爱也可不爱's avatar
可爱也可不爱 已提交
9
	"sync"
U
UlricQin 已提交
10 11
	"time"

U
Ulric Qin 已提交
12 13
	cmap "github.com/orcaman/concurrent-map"

U
UlricQin 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
	"github.com/golang/protobuf/proto"
	"github.com/golang/snappy"
	"github.com/prometheus/client_golang/api"
	"github.com/prometheus/prometheus/prompb"
	"github.com/toolkits/pkg/logger"
)

type Options struct {
	Url           string
	BasicAuthUser string
	BasicAuthPass string

	Timeout               int64
	DialTimeout           int64
	TLSHandshakeTimeout   int64
	ExpectContinueTimeout int64
	IdleConnTimeout       int64
	KeepAlive             int64

	MaxConnsPerHost     int
	MaxIdleConns        int
	MaxIdleConnsPerHost int
}

type GlobalOpt struct {
	QueueMaxSize  int
	QueuePopSize  int
	SleepInterval int64
}

type WriterType struct {
	Opts   Options
	Client api.Client
}

U
Ulric Qin 已提交
49
func (w WriterType) Write(items []*prompb.TimeSeries, headers ...map[string]string) {
U
Ulric Qin 已提交
50 51 52 53
	if len(items) == 0 {
		return
	}

U
UlricQin 已提交
54 55 56 57 58 59 60 61 62 63
	req := &prompb.WriteRequest{
		Timeseries: items,
	}

	data, err := proto.Marshal(req)
	if err != nil {
		logger.Warningf("marshal prom data to proto got error: %v, data: %+v", err, items)
		return
	}

U
Ulric Qin 已提交
64
	if err := w.Post(snappy.Encode(nil, data), headers...); err != nil {
可爱也可不爱's avatar
可爱也可不爱 已提交
65 66 67 68 69
		logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
		logger.Warning("example timeseries:", items[0].String())
	}
}

U
Ulric Qin 已提交
70
func (w WriterType) Post(req []byte, headers ...map[string]string) error {
U
UlricQin 已提交
71 72 73 74 75 76 77 78 79 80 81
	httpReq, err := http.NewRequest("POST", w.Opts.Url, bytes.NewReader(req))
	if err != nil {
		logger.Warningf("create remote write request got error: %s", err.Error())
		return err
	}

	httpReq.Header.Add("Content-Encoding", "snappy")
	httpReq.Header.Set("Content-Type", "application/x-protobuf")
	httpReq.Header.Set("User-Agent", "n9e")
	httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

U
Ulric Qin 已提交
82 83
	if len(headers) > 0 {
		for k, v := range headers[0] {
可爱也可不爱's avatar
可爱也可不爱 已提交
84 85 86 87
			httpReq.Header.Set(k, v)
		}
	}

U
UlricQin 已提交
88 89 90 91 92 93 94 95 96 97 98
	if w.Opts.BasicAuthUser != "" {
		httpReq.SetBasicAuth(w.Opts.BasicAuthUser, w.Opts.BasicAuthPass)
	}

	resp, body, err := w.Client.Do(context.Background(), httpReq)
	if err != nil {
		logger.Warningf("push data with remote write request got error: %v, response body: %s", err, string(body))
		return err
	}

	if resp.StatusCode >= 400 {
U
Ulric Qin 已提交
99
		err = fmt.Errorf("push data with remote write request got status code: %v, response body: %s", resp.StatusCode, string(body))
U
UlricQin 已提交
100 101 102 103 104 105 106
		return err
	}

	return nil
}

type WritersType struct {
107
	globalOpt GlobalOpt
108
	backends  map[string]WriterType
109 110
	chans     cmap.ConcurrentMap
	sync.RWMutex
U
UlricQin 已提交
111 112 113
}

func (ws *WritersType) Put(name string, writer WriterType) {
114
	ws.backends[name] = writer
U
UlricQin 已提交
115 116
}

117
// PushSample Push one sample to chan, hash by ident
可爱也可不爱's avatar
可爱也可不爱 已提交
118
// @Author: quzhihao
119 120 121 122 123
func (ws *WritersType) PushSample(ident string, v interface{}) {
	if !ws.chans.Has(ident) {
		ws.Lock()
		// important: check twice
		if !ws.chans.Has(ident) {
可爱也可不爱's avatar
可爱也可不爱 已提交
124
			c := make(chan *prompb.TimeSeries, Writers.globalOpt.QueueMaxSize)
125 126
			ws.chans.Set(ident, c)
			go ws.StartConsumer(ident, c)
可爱也可不爱's avatar
可爱也可不爱 已提交
127
		}
128
		ws.Unlock()
可爱也可不爱's avatar
可爱也可不爱 已提交
129
	}
130 131

	c, ok := ws.chans.Get(ident)
可爱也可不爱's avatar
可爱也可不爱 已提交
132
	if ok {
133
		ch := c.(chan *prompb.TimeSeries)
可爱也可不爱's avatar
可爱也可不爱 已提交
134
		select {
135 136 137
		case ch <- v.(*prompb.TimeSeries):
		default:
			logger.Warningf("Write channel(%s) full, current channel size: %d", ident, len(ch))
可爱也可不爱's avatar
可爱也可不爱 已提交
138 139 140 141
		}
	}
}

142
// StartConsumer every ident channel has a consumer, start it
可爱也可不爱's avatar
可爱也可不爱 已提交
143
// @Author: quzhihao
144 145 146 147 148 149 150 151 152 153 154
func (ws *WritersType) StartConsumer(ident string, ch chan *prompb.TimeSeries) {
	var (
		batch        = ws.globalOpt.QueuePopSize
		max          = ws.globalOpt.QueueMaxSize
		batchCounter int
		closeCounter int
		series       = make([]*prompb.TimeSeries, 0, batch)
	)

	logger.Infof("Starting channel(%s) consumer, max size:%d, batch:%d", ident, max, batch)

可爱也可不爱's avatar
可爱也可不爱 已提交
155 156
	for {
		select {
157 158 159
		case item := <-ch:
			// has data, no need to close
			closeCounter = 0
可爱也可不爱's avatar
可爱也可不爱 已提交
160
			series = append(series, item)
161 162 163 164 165 166 167

			batchCounter++
			if batchCounter >= ws.globalOpt.QueuePopSize {
				ws.post(ident, series)

				// reset
				batchCounter = 0
可爱也可不爱's avatar
可爱也可不爱 已提交
168 169
				series = make([]*prompb.TimeSeries, 0, batch)
			}
170
		case <-time.After(time.Second):
可爱也可不爱's avatar
可爱也可不爱 已提交
171
			if len(series) > 0 {
172 173 174 175 176 177 178
				// has data, no need to close
				closeCounter = 0

				ws.post(ident, series)

				// reset
				batchCounter = 0
可爱也可不爱's avatar
可爱也可不爱 已提交
179 180
				series = make([]*prompb.TimeSeries, 0, batch)
			} else {
181
				closeCounter++
可爱也可不爱's avatar
可爱也可不爱 已提交
182
			}
183 184 185 186 187 188 189 190 191 192 193

			if closeCounter > 3600 {
				logger.Infof("Closing channel(%s) reason: no data for an hour", ident)

				ws.Lock()
				close(ch)
				ws.chans.Remove(ident)
				ws.Unlock()

				logger.Infof("Closed channel(%s) reason: no data for an hour", ident)

可爱也可不爱's avatar
可爱也可不爱 已提交
194 195 196 197 198 199
				return
			}
		}
	}
}

200
// post post series to TSDB
可爱也可不爱's avatar
可爱也可不爱 已提交
201
// @Author: quzhihao
202
func (ws *WritersType) post(ident string, series []*prompb.TimeSeries) {
可爱也可不爱's avatar
可爱也可不爱 已提交
203
	wg := sync.WaitGroup{}
204
	wg.Add(len(ws.backends))
U
Ulric Qin 已提交
205

206
	// maybe as backend hashstring
U
Ulric Qin 已提交
207
	headers := map[string]string{"ident": ident}
208
	for key := range ws.backends {
可爱也可不爱's avatar
可爱也可不爱 已提交
209 210
		go func(key string) {
			defer wg.Done()
211
			ws.backends[key].Write(series, headers)
可爱也可不爱's avatar
可爱也可不爱 已提交
212 213
		}(key)
	}
U
Ulric Qin 已提交
214

可爱也可不爱's avatar
可爱也可不爱 已提交
215 216 217
	wg.Wait()
}

U
UlricQin 已提交
218 219
func NewWriters() WritersType {
	return WritersType{
220
		backends: make(map[string]WriterType),
U
UlricQin 已提交
221 222 223 224 225 226 227
	}
}

var Writers = NewWriters()

func Init(opts []Options, globalOpt GlobalOpt) error {
	Writers.globalOpt = globalOpt
228
	Writers.chans = cmap.New()
U
UlricQin 已提交
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258

	for i := 0; i < len(opts); i++ {
		cli, err := api.NewClient(api.Config{
			Address: opts[i].Url,
			RoundTripper: &http.Transport{
				// TLSClientConfig: tlsConfig,
				Proxy: http.ProxyFromEnvironment,
				DialContext: (&net.Dialer{
					Timeout:   time.Duration(opts[i].DialTimeout) * time.Millisecond,
					KeepAlive: time.Duration(opts[i].KeepAlive) * time.Millisecond,
				}).DialContext,
				ResponseHeaderTimeout: time.Duration(opts[i].Timeout) * time.Millisecond,
				TLSHandshakeTimeout:   time.Duration(opts[i].TLSHandshakeTimeout) * time.Millisecond,
				ExpectContinueTimeout: time.Duration(opts[i].ExpectContinueTimeout) * time.Millisecond,
				MaxConnsPerHost:       opts[i].MaxConnsPerHost,
				MaxIdleConns:          opts[i].MaxIdleConns,
				MaxIdleConnsPerHost:   opts[i].MaxIdleConnsPerHost,
				IdleConnTimeout:       time.Duration(opts[i].IdleConnTimeout) * time.Millisecond,
			},
		})

		if err != nil {
			return err
		}

		writer := WriterType{
			Opts:   opts[i],
			Client: cli,
		}

U
Ulric Qin 已提交
259
		Writers.Put(opts[i].Url, writer)
U
UlricQin 已提交
260 261 262 263
	}

	return nil
}