writer.go 6.1 KB
Newer Older
U
UlricQin 已提交
1 2 3 4 5
package writer

import (
	"bytes"
	"context"
U
Ulric Qin 已提交
6
	"fmt"
U
Ulric Qin 已提交
7
	"hash/crc32"
U
UlricQin 已提交
8 9
	"net"
	"net/http"
U
Ulric Qin 已提交
10
	"sync"
U
UlricQin 已提交
11 12
	"time"

U
Ulric Qin 已提交
13
	"github.com/didi/nightingale/v5/src/server/config"
U
UlricQin 已提交
14 15 16 17 18
	"github.com/golang/protobuf/proto"
	"github.com/golang/snappy"
	"github.com/prometheus/client_golang/api"
	"github.com/prometheus/prometheus/prompb"
	"github.com/toolkits/pkg/logger"
U
Ulric Qin 已提交
19 20

	promstat "github.com/didi/nightingale/v5/src/server/stat"
U
UlricQin 已提交
21 22 23
)

type WriterType struct {
U
Ulric Qin 已提交
24
	Opts   config.WriterOptions
U
UlricQin 已提交
25 26 27
	Client api.Client
}

U
Ulric Qin 已提交
28
func (w WriterType) Write(items []*prompb.TimeSeries, headers ...map[string]string) {
U
Ulric Qin 已提交
29 30 31 32
	if len(items) == 0 {
		return
	}

U
UlricQin 已提交
33 34 35 36 37 38 39 40 41 42
	req := &prompb.WriteRequest{
		Timeseries: items,
	}

	data, err := proto.Marshal(req)
	if err != nil {
		logger.Warningf("marshal prom data to proto got error: %v, data: %+v", err, items)
		return
	}

U
Ulric Qin 已提交
43
	if err := w.Post(snappy.Encode(nil, data), headers...); err != nil {
可爱也可不爱's avatar
可爱也可不爱 已提交
44 45 46 47 48
		logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
		logger.Warning("example timeseries:", items[0].String())
	}
}

U
Ulric Qin 已提交
49
func (w WriterType) Post(req []byte, headers ...map[string]string) error {
U
UlricQin 已提交
50 51 52 53 54 55 56 57 58 59 60
	httpReq, err := http.NewRequest("POST", w.Opts.Url, bytes.NewReader(req))
	if err != nil {
		logger.Warningf("create remote write request got error: %s", err.Error())
		return err
	}

	httpReq.Header.Add("Content-Encoding", "snappy")
	httpReq.Header.Set("Content-Type", "application/x-protobuf")
	httpReq.Header.Set("User-Agent", "n9e")
	httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

U
Ulric Qin 已提交
61 62
	if len(headers) > 0 {
		for k, v := range headers[0] {
可爱也可不爱's avatar
可爱也可不爱 已提交
63 64 65 66
			httpReq.Header.Set(k, v)
		}
	}

U
UlricQin 已提交
67 68 69 70
	if w.Opts.BasicAuthUser != "" {
		httpReq.SetBasicAuth(w.Opts.BasicAuthUser, w.Opts.BasicAuthPass)
	}

71 72 73 74 75 76 77 78 79 80
	headerCount := len(w.Opts.Headers)
	if headerCount > 0 && headerCount%2 == 0 {
		for i := 0; i < len(w.Opts.Headers); i += 2 {
			httpReq.Header.Add(w.Opts.Headers[i], w.Opts.Headers[i+1])
			if w.Opts.Headers[i] == "Host" {
				httpReq.Host = w.Opts.Headers[i+1]
			}
		}
	}

U
UlricQin 已提交
81 82 83 84 85 86 87
	resp, body, err := w.Client.Do(context.Background(), httpReq)
	if err != nil {
		logger.Warningf("push data with remote write request got error: %v, response body: %s", err, string(body))
		return err
	}

	if resp.StatusCode >= 400 {
U
Ulric Qin 已提交
88
		err = fmt.Errorf("push data with remote write request got status code: %v, response body: %s", resp.StatusCode, string(body))
U
UlricQin 已提交
89 90 91 92 93 94 95
		return err
	}

	return nil
}

type WritersType struct {
U
Ulric Qin 已提交
96
	globalOpt config.WriterGlobalOpt
97
	backends  map[string]WriterType
U
Ulric Qin 已提交
98
	chans     map[int]chan *prompb.TimeSeries
U
UlricQin 已提交
99 100 101
}

func (ws *WritersType) Put(name string, writer WriterType) {
102
	ws.backends[name] = writer
U
UlricQin 已提交
103 104
}

105
// PushSample Push one sample to chan, hash by ident
可爱也可不爱's avatar
可爱也可不爱 已提交
106
// @Author: quzhihao
107
func (ws *WritersType) PushSample(ident string, v interface{}) {
U
Ulric Qin 已提交
108
	hashkey := crc32.ChecksumIEEE([]byte(ident)) % uint32(ws.globalOpt.QueueCount)
109

U
Ulric Qin 已提交
110
	c, ok := ws.chans[int(hashkey)]
可爱也可不爱's avatar
可爱也可不爱 已提交
111 112
	if ok {
		select {
U
Ulric Qin 已提交
113
		case c <- v.(*prompb.TimeSeries):
114
		default:
U
Ulric Qin 已提交
115
			logger.Warningf("Write channel(%s) full, current channel size: %d", ident, len(c))
可爱也可不爱's avatar
可爱也可不爱 已提交
116 117 118 119
		}
	}
}

120
// StartConsumer every ident channel has a consumer, start it
可爱也可不爱's avatar
可爱也可不爱 已提交
121
// @Author: quzhihao
U
Ulric Qin 已提交
122
func (ws *WritersType) StartConsumer(index int, ch chan *prompb.TimeSeries) {
123 124 125
	var (
		batch        = ws.globalOpt.QueuePopSize
		series       = make([]*prompb.TimeSeries, 0, batch)
U
Ulric Qin 已提交
126
		batchCounter int
127 128
	)

可爱也可不爱's avatar
可爱也可不爱 已提交
129 130
	for {
		select {
131 132
		case item := <-ch:
			// has data, no need to close
可爱也可不爱's avatar
可爱也可不爱 已提交
133
			series = append(series, item)
134 135 136

			batchCounter++
			if batchCounter >= ws.globalOpt.QueuePopSize {
U
Ulric Qin 已提交
137
				ws.post(index, series)
138 139 140

				// reset
				batchCounter = 0
可爱也可不爱's avatar
可爱也可不爱 已提交
141 142
				series = make([]*prompb.TimeSeries, 0, batch)
			}
143
		case <-time.After(time.Second):
可爱也可不爱's avatar
可爱也可不爱 已提交
144
			if len(series) > 0 {
U
Ulric Qin 已提交
145
				ws.post(index, series)
146 147 148

				// reset
				batchCounter = 0
可爱也可不爱's avatar
可爱也可不爱 已提交
149 150 151 152 153 154
				series = make([]*prompb.TimeSeries, 0, batch)
			}
		}
	}
}

155
// post post series to TSDB
可爱也可不爱's avatar
可爱也可不爱 已提交
156
// @Author: quzhihao
U
Ulric Qin 已提交
157 158
func (ws *WritersType) post(index int, series []*prompb.TimeSeries) {
	header := map[string]string{"hash": fmt.Sprintf("%s-%d", config.C.Heartbeat.Endpoint, index)}
U
Ulric Qin 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
	if len(ws.backends) == 1 {
		for key := range ws.backends {
			ws.backends[key].Write(series, header)
		}
		return
	}

	if len(ws.backends) > 1 {
		wg := new(sync.WaitGroup)
		for key := range ws.backends {
			wg.Add(1)
			go func(wg *sync.WaitGroup, backend WriterType, items []*prompb.TimeSeries, header map[string]string) {
				defer wg.Done()
				backend.Write(series, header)
			}(wg, ws.backends[key], series, header)
		}
		wg.Wait()
可爱也可不爱's avatar
可爱也可不爱 已提交
176 177 178
	}
}

U
UlricQin 已提交
179 180
func NewWriters() WritersType {
	return WritersType{
181
		backends: make(map[string]WriterType),
U
UlricQin 已提交
182 183 184 185 186
	}
}

var Writers = NewWriters()

U
Ulric Qin 已提交
187
func Init(opts []config.WriterOptions, globalOpt config.WriterGlobalOpt) error {
U
UlricQin 已提交
188
	Writers.globalOpt = globalOpt
U
Ulric Qin 已提交
189 190 191 192 193
	Writers.chans = make(map[int]chan *prompb.TimeSeries)

	// init channels
	for i := 0; i < globalOpt.QueueCount; i++ {
		Writers.chans[i] = make(chan *prompb.TimeSeries, Writers.globalOpt.QueueMaxSize)
U
Ulric Qin 已提交
194
		go Writers.StartConsumer(i, Writers.chans[i])
U
Ulric Qin 已提交
195
	}
U
UlricQin 已提交
196

U
Ulric Qin 已提交
197 198
	go reportChanSize()

U
UlricQin 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
	for i := 0; i < len(opts); i++ {
		cli, err := api.NewClient(api.Config{
			Address: opts[i].Url,
			RoundTripper: &http.Transport{
				// TLSClientConfig: tlsConfig,
				Proxy: http.ProxyFromEnvironment,
				DialContext: (&net.Dialer{
					Timeout:   time.Duration(opts[i].DialTimeout) * time.Millisecond,
					KeepAlive: time.Duration(opts[i].KeepAlive) * time.Millisecond,
				}).DialContext,
				ResponseHeaderTimeout: time.Duration(opts[i].Timeout) * time.Millisecond,
				TLSHandshakeTimeout:   time.Duration(opts[i].TLSHandshakeTimeout) * time.Millisecond,
				ExpectContinueTimeout: time.Duration(opts[i].ExpectContinueTimeout) * time.Millisecond,
				MaxConnsPerHost:       opts[i].MaxConnsPerHost,
				MaxIdleConns:          opts[i].MaxIdleConns,
				MaxIdleConnsPerHost:   opts[i].MaxIdleConnsPerHost,
				IdleConnTimeout:       time.Duration(opts[i].IdleConnTimeout) * time.Millisecond,
			},
		})

		if err != nil {
			return err
		}

		writer := WriterType{
			Opts:   opts[i],
			Client: cli,
		}

U
Ulric Qin 已提交
228
		Writers.Put(opts[i].Url, writer)
U
UlricQin 已提交
229 230 231 232
	}

	return nil
}
U
Ulric Qin 已提交
233 234

func reportChanSize() {
U
Ulric Qin 已提交
235 236 237 238 239 240
	for {
		time.Sleep(time.Second * 3)
		for i, c := range Writers.chans {
			size := len(c)
			promstat.GaugeSampleQueueSize.WithLabelValues(config.C.ClusterName, fmt.Sprint(i)).Set(float64(size))
		}
U
Ulric Qin 已提交
241 242
	}
}