wal.go 8.4 KB
Newer Older
F
Fabian Reinartz 已提交
1 2 3 4 5 6 7 8 9
package tsdb

import (
	"encoding/binary"
	"hash/crc32"
	"io"
	"math"
	"os"
	"path/filepath"
10
	"sync"
11
	"time"
F
Fabian Reinartz 已提交
12 13

	"github.com/coreos/etcd/pkg/fileutil"
F
Fabian Reinartz 已提交
14
	"github.com/coreos/etcd/pkg/ioutil"
F
Fabian Reinartz 已提交
15
	"github.com/fabxc/tsdb/labels"
16
	"github.com/go-kit/kit/log"
17
	"github.com/pkg/errors"
F
Fabian Reinartz 已提交
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
)

// WALEntryType indicates what data a WAL entry contains.
type WALEntryType byte

// The valid WAL entry types.
const (
	WALEntrySymbols = 1
	WALEntrySeries  = 2
	WALEntrySamples = 3
)

// WAL is a write ahead log for series data. It can only be written to.
// Use WALReader to read back from a write ahead log.
type WAL struct {
33 34
	mtx sync.Mutex

35 36 37 38 39 40 41
	f             *fileutil.LockedFile
	enc           *walEncoder
	logger        log.Logger
	flushInterval time.Duration

	stopc chan struct{}
	donec chan struct{}
F
Fabian Reinartz 已提交
42 43 44 45

	symbols map[string]uint32
}

46
const walFileName = "wal-000"
47

48 49
// OpenWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written.
50
func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) {
F
Fabian Reinartz 已提交
51 52 53 54
	if err := os.MkdirAll(dir, 0777); err != nil {
		return nil, err
	}

55
	p := filepath.Join(dir, walFileName)
F
Fabian Reinartz 已提交
56

F
Fabian Reinartz 已提交
57
	f, err := fileutil.TryLockFile(p, os.O_RDWR, 0666)
F
Fabian Reinartz 已提交
58
	if err != nil {
59 60 61 62
		if !os.IsNotExist(err) {
			return nil, err
		}

F
Fabian Reinartz 已提交
63
		f, err = fileutil.LockFile(p, os.O_RDWR|os.O_CREATE, 0666)
64 65 66 67 68 69
		if err != nil {
			return nil, err
		}
		if _, err = f.Seek(0, os.SEEK_END); err != nil {
			return nil, err
		}
F
Fabian Reinartz 已提交
70
	}
F
Fabian Reinartz 已提交
71 72 73 74
	enc, err := newWALEncoder(f.File)
	if err != nil {
		return nil, err
	}
F
Fabian Reinartz 已提交
75 76

	w := &WAL{
77 78 79 80 81 82 83
		f:             f,
		logger:        l,
		enc:           enc,
		flushInterval: flushInterval,
		symbols:       map[string]uint32{},
		donec:         make(chan struct{}),
		stopc:         make(chan struct{}),
F
Fabian Reinartz 已提交
84
	}
85 86
	go w.run(flushInterval)

F
Fabian Reinartz 已提交
87 88 89
	return w, nil
}

90 91 92 93 94
type walHandler struct {
	sample func(hashedSample)
	series func(labels.Labels)
}

F
Fabian Reinartz 已提交
95
// ReadAll consumes all entries in the WAL and triggers the registered handlers.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
func (w *WAL) ReadAll(h *walHandler) error {
	dec := &walDecoder{
		r:       w.f,
		handler: h,
	}

	for {
		if err := dec.entry(); err != nil {
			if err == io.EOF {
				return nil
			}
			return err
		}
	}
}

F
Fabian Reinartz 已提交
112 113 114 115 116 117 118 119
// Log writes a batch of new series labels and samples to the log.
func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error {
	if err := w.enc.encodeSeries(series); err != nil {
		return err
	}
	if err := w.enc.encodeSamples(samples); err != nil {
		return err
	}
120 121 122
	if w.flushInterval <= 0 {
		return w.sync()
	}
F
Fabian Reinartz 已提交
123 124 125 126
	return nil
}

func (w *WAL) sync() error {
F
Fabian Reinartz 已提交
127 128 129
	if err := w.enc.flush(); err != nil {
		return err
	}
F
Fabian Reinartz 已提交
130 131 132
	return fileutil.Fdatasync(w.f.File)
}

133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
func (w *WAL) run(interval time.Duration) {
	var tick <-chan time.Time

	if interval > 0 {
		ticker := time.NewTicker(interval)
		defer ticker.Stop()
		tick = ticker.C
	}
	defer close(w.donec)

	for {
		select {
		case <-w.stopc:
			return
		case <-tick:
			if err := w.sync(); err != nil {
				w.logger.Log("msg", "sync failed", "err", err)
			}
		}
	}
}

F
Fabian Reinartz 已提交
155 156
// Close sync all data and closes the underlying resources.
func (w *WAL) Close() error {
157 158 159
	close(w.stopc)
	<-w.donec

F
Fabian Reinartz 已提交
160 161 162 163 164 165 166
	if err := w.sync(); err != nil {
		return err
	}
	return w.f.Close()
}

type walEncoder struct {
F
Fabian Reinartz 已提交
167 168
	mtx sync.Mutex
	w   *ioutil.PageWriter
F
Fabian Reinartz 已提交
169 170
}

F
Fabian Reinartz 已提交
171 172 173 174 175 176
const (
	minSectorSize = 512

	// walPageBytes is the alignment for flushing records to the backing Writer.
	// It should be a multiple of the minimum sector size so that WAL can safely
	// distinguish between torn writes and ordinary data corruption.
F
Fabian Reinartz 已提交
177
	walPageBytes = 16 * minSectorSize
F
Fabian Reinartz 已提交
178
)
F
Fabian Reinartz 已提交
179 180 181 182 183 184

func newWALEncoder(f *os.File) (*walEncoder, error) {
	offset, err := f.Seek(0, os.SEEK_CUR)
	if err != nil {
		return nil, err
	}
185
	enc := &walEncoder{
F
Fabian Reinartz 已提交
186
		w: ioutil.NewPageWriter(f, walPageBytes, int(offset)),
187 188
	}
	return enc, nil
F
Fabian Reinartz 已提交
189 190 191
}

func (e *walEncoder) flush() error {
F
Fabian Reinartz 已提交
192 193 194
	e.mtx.Lock()
	defer e.mtx.Unlock()

F
Fabian Reinartz 已提交
195
	return e.w.Flush()
F
Fabian Reinartz 已提交
196 197
}

F
Fabian Reinartz 已提交
198 199 200 201
func (e *walEncoder) entry(et WALEntryType, flag byte, buf []byte) error {
	e.mtx.Lock()
	defer e.mtx.Unlock()

F
Fabian Reinartz 已提交
202 203 204 205 206 207 208
	h := crc32.NewIEEE()
	w := io.MultiWriter(h, e.w)

	b := make([]byte, 6)
	b[0] = byte(et)
	b[1] = flag

F
Fabian Reinartz 已提交
209
	binary.BigEndian.PutUint32(b[2:], uint32(len(buf)))
F
Fabian Reinartz 已提交
210 211 212 213

	if _, err := w.Write(b); err != nil {
		return err
	}
F
Fabian Reinartz 已提交
214
	if _, err := w.Write(buf); err != nil {
F
Fabian Reinartz 已提交
215 216 217 218 219 220
		return err
	}
	if _, err := e.w.Write(h.Sum(nil)); err != nil {
		return err
	}

F
Fabian Reinartz 已提交
221
	putWALBuffer(buf)
F
Fabian Reinartz 已提交
222 223 224 225 226 227 228 229
	return nil
}

const (
	walSeriesSimple  = 1
	walSamplesSimple = 1
)

F
Fabian Reinartz 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
var walBuffers = sync.Pool{}

func getWALBuffer() []byte {
	b := walBuffers.Get()
	if b == nil {
		return make([]byte, 0, 64*1024)
	}
	return b.([]byte)
}

func putWALBuffer(b []byte) {
	b = b[:0]
	walBuffers.Put(b)
}

F
Fabian Reinartz 已提交
245 246 247 248
func (e *walEncoder) encodeSeries(series []labels.Labels) error {
	if len(series) == 0 {
		return nil
	}
249 250

	b := make([]byte, binary.MaxVarintLen32)
F
Fabian Reinartz 已提交
251
	buf := getWALBuffer()
F
Fabian Reinartz 已提交
252 253 254

	for _, lset := range series {
		n := binary.PutUvarint(b, uint64(len(lset)))
F
Fabian Reinartz 已提交
255
		buf = append(buf, b[:n]...)
F
Fabian Reinartz 已提交
256 257 258

		for _, l := range lset {
			n = binary.PutUvarint(b, uint64(len(l.Name)))
F
Fabian Reinartz 已提交
259 260
			buf = append(buf, b[:n]...)
			buf = append(buf, l.Name...)
F
Fabian Reinartz 已提交
261 262

			n = binary.PutUvarint(b, uint64(len(l.Value)))
F
Fabian Reinartz 已提交
263 264
			buf = append(buf, b[:n]...)
			buf = append(buf, l.Value...)
F
Fabian Reinartz 已提交
265 266 267
		}
	}

F
Fabian Reinartz 已提交
268
	return e.entry(WALEntrySeries, walSeriesSimple, buf)
F
Fabian Reinartz 已提交
269 270 271 272 273 274
}

func (e *walEncoder) encodeSamples(samples []hashedSample) error {
	if len(samples) == 0 {
		return nil
	}
275 276

	b := make([]byte, binary.MaxVarintLen64)
F
Fabian Reinartz 已提交
277
	buf := getWALBuffer()
F
Fabian Reinartz 已提交
278 279 280 281 282 283 284 285

	// Store base timestamp and base reference number of first sample.
	// All samples encode their timestamp and ref as delta to those.
	//
	// TODO(fabxc): optimize for all samples having the same timestamp.
	first := samples[0]

	binary.BigEndian.PutUint32(b, first.ref)
F
Fabian Reinartz 已提交
286
	buf = append(buf, b[:4]...)
F
Fabian Reinartz 已提交
287
	binary.BigEndian.PutUint64(b, uint64(first.t))
F
Fabian Reinartz 已提交
288
	buf = append(buf, b[:8]...)
F
Fabian Reinartz 已提交
289 290 291

	for _, s := range samples {
		n := binary.PutVarint(b, int64(s.ref)-int64(first.ref))
F
Fabian Reinartz 已提交
292
		buf = append(buf, b[:n]...)
F
Fabian Reinartz 已提交
293 294

		n = binary.PutVarint(b, s.t-first.t)
F
Fabian Reinartz 已提交
295
		buf = append(buf, b[:n]...)
F
Fabian Reinartz 已提交
296 297

		binary.BigEndian.PutUint64(b, math.Float64bits(s.v))
F
Fabian Reinartz 已提交
298
		buf = append(buf, b[:8]...)
F
Fabian Reinartz 已提交
299 300
	}

F
Fabian Reinartz 已提交
301
	return e.entry(WALEntrySamples, walSamplesSimple, buf)
F
Fabian Reinartz 已提交
302 303 304
}

type walDecoder struct {
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
	r       io.Reader
	handler *walHandler

	buf []byte
}

func newWALDecoer(r io.Reader, h *walHandler) *walDecoder {
	return &walDecoder{
		r:       r,
		handler: h,
		buf:     make([]byte, 0, 1024*1024),
	}
}

func (d *walDecoder) decodeSeries(flag byte, b []byte) error {
	for len(b) > 0 {
		l, n := binary.Uvarint(b)
		if n < 1 {
			return errors.Wrap(errInvalidSize, "number of labels")
		}
		b = b[n:]
		lset := make(labels.Labels, l)

		for i := 0; i < int(l); i++ {
			nl, n := binary.Uvarint(b)
			if n < 1 || len(b) < n+int(nl) {
				return errors.Wrap(errInvalidSize, "label name")
			}
			lset[i].Name = string(b[n : n+int(nl)])
			b = b[n+int(nl):]

			vl, n := binary.Uvarint(b)
			if n < 1 || len(b) < n+int(vl) {
				return errors.Wrap(errInvalidSize, "label value")
			}
			lset[i].Value = string(b[n : n+int(vl)])
			b = b[n+int(vl):]
		}

		d.handler.series(lset)
	}
	return nil
}

func (d *walDecoder) decodeSamples(flag byte, b []byte) error {
	if len(b) < 12 {
		return errors.Wrap(errInvalidSize, "header length")
	}
	var (
		baseRef  = binary.BigEndian.Uint32(b)
		baseTime = int64(binary.BigEndian.Uint64(b[4:]))
	)
	b = b[12:]

	for len(b) > 0 {
		var smpl hashedSample

		dref, n := binary.Varint(b)
		if n < 1 {
			return errors.Wrap(errInvalidSize, "sample ref delta")
		}
		b = b[n:]
		smpl.ref = uint32(int64(baseRef) + dref)

		dtime, n := binary.Varint(b)
		if n < 1 {
			return errors.Wrap(errInvalidSize, "sample timestamp delta")
		}
		b = b[n:]
		smpl.t = baseTime + dtime

		if len(b) < 8 {
			return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
		}
		smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
		b = b[8:]

		d.handler.sample(smpl)
	}
	return nil
}

func (d *walDecoder) entry() error {
	b := make([]byte, 6)
	if _, err := d.r.Read(b); err != nil {
		return err
	}

	var (
		etype  = WALEntryType(b[0])
		flag   = b[1]
		length = int(binary.BigEndian.Uint32(b[2:]))
	)

	if length > len(d.buf) {
		d.buf = make([]byte, length)
	}
	buf := d.buf[:length]

	if _, err := d.r.Read(buf); err != nil {
		return err
	}
	// Read away checksum.
	// TODO(fabxc): verify it
	if _, err := d.r.Read(b[:4]); err != nil {
		return err
	}

	switch etype {
	case WALEntrySeries:
		return d.decodeSeries(flag, buf)
	case WALEntrySamples:
		return d.decodeSamples(flag, buf)
	}
F
Fabian Reinartz 已提交
419

420
	return errors.Errorf("unknown WAL entry type %q", etype)
F
Fabian Reinartz 已提交
421
}