wal.go 12.9 KB
Newer Older
F
Fabian Reinartz 已提交
1 2 3
package tsdb

import (
F
Fabian Reinartz 已提交
4
	"bufio"
F
Fabian Reinartz 已提交
5
	"encoding/binary"
6
	"hash"
F
Fabian Reinartz 已提交
7 8 9 10 11
	"hash/crc32"
	"io"
	"math"
	"os"
	"path/filepath"
12
	"sync"
13
	"time"
F
Fabian Reinartz 已提交
14 15 16

	"github.com/coreos/etcd/pkg/fileutil"
	"github.com/fabxc/tsdb/labels"
17
	"github.com/go-kit/kit/log"
18
	"github.com/pkg/errors"
F
Fabian Reinartz 已提交
19 20 21 22 23 24
)

// WALEntryType indicates what data a WAL entry contains.
type WALEntryType byte

const (
F
Fabian Reinartz 已提交
25 26
	// WALMagic is a 4 byte number every WAL segment file starts with.
	WALMagic = uint32(0x43AF00EF)
27

F
Fabian Reinartz 已提交
28 29 30
	// WALFormatDefault is the version flag for the default outer segment file format.
	WALFormatDefault = byte(1)
)
31

F
Fabian Reinartz 已提交
32 33 34 35 36
// Entry types in a segment file.
const (
	WALEntrySymbols WALEntryType = 1
	WALEntrySeries  WALEntryType = 2
	WALEntrySamples WALEntryType = 3
F
Fabian Reinartz 已提交
37 38 39 40 41
)

// WAL is a write ahead log for series data. It can only be written to.
// Use WALReader to read back from a write ahead log.
type WAL struct {
42 43
	mtx sync.Mutex

44
	dirFile *os.File
45
	files   []*os.File
46

47 48
	logger        log.Logger
	flushInterval time.Duration
F
Fabian Reinartz 已提交
49
	segmentSize   int64
50

51 52 53
	crc32 hash.Hash32
	cur   *bufio.Writer
	curN  int64
54

55 56
	stopc chan struct{}
	donec chan struct{}
F
Fabian Reinartz 已提交
57 58
}

59 60
const (
	walDirName          = "wal"
61
	walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
62
)
63

64 65
// OpenWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written.
66
func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error) {
67 68
	dir = filepath.Join(dir, walDirName)

F
Fabian Reinartz 已提交
69 70 71
	if err := os.MkdirAll(dir, 0777); err != nil {
		return nil, err
	}
72
	df, err := fileutil.OpenDir(dir)
F
Fabian Reinartz 已提交
73 74 75
	if err != nil {
		return nil, err
	}
F
Fabian Reinartz 已提交
76 77

	w := &WAL{
78
		dirFile:       df,
79 80 81 82
		logger:        l,
		flushInterval: flushInterval,
		donec:         make(chan struct{}),
		stopc:         make(chan struct{}),
F
Fabian Reinartz 已提交
83
		segmentSize:   walSegmentSizeBytes,
84
		crc32:         crc32.New(crc32.MakeTable(crc32.Castagnoli)),
F
Fabian Reinartz 已提交
85
	}
86 87 88 89
	if err := w.initSegments(); err != nil {
		return nil, err
	}

90 91
	go w.run(flushInterval)

F
Fabian Reinartz 已提交
92 93 94
	return w, nil
}

95 96 97 98 99 100
// Reader returns a new reader over the the write ahead log data.
// It must be completely consumed before writing to the WAL.
func (w *WAL) Reader() *WALReader {
	var rs []io.ReadCloser
	for _, f := range w.files {
		rs = append(rs, f)
101
	}
102
	return NewWALReader(rs...)
103 104
}

F
Fabian Reinartz 已提交
105
// Log writes a batch of new series labels and samples to the log.
106
func (w *WAL) Log(series []labels.Labels, samples []refdSample) error {
107
	if err := w.encodeSeries(series); err != nil {
F
Fabian Reinartz 已提交
108 109
		return err
	}
110
	if err := w.encodeSamples(samples); err != nil {
F
Fabian Reinartz 已提交
111 112
		return err
	}
113
	if w.flushInterval <= 0 {
114
		return w.Sync()
115
	}
F
Fabian Reinartz 已提交
116 117 118
	return nil
}

119 120 121 122 123 124 125 126 127 128 129
// initSegments finds all existing segment files and opens them in the
// appropriate file modes.
func (w *WAL) initSegments() error {
	fns, err := sequenceFiles(w.dirFile.Name(), "")
	if err != nil {
		return err
	}
	if len(fns) == 0 {
		return nil
	}
	if len(fns) > 1 {
F
Fabian Reinartz 已提交
130
		for _, fn := range fns[:len(fns)-1] {
131
			f, err := os.Open(fn)
132 133 134
			if err != nil {
				return err
			}
135
			w.files = append(w.files, f)
136 137 138
		}
	}
	// The most recent WAL file is the one we have to keep appending to.
139
	f, err := os.OpenFile(fns[len(fns)-1], os.O_RDWR, 0666)
140 141 142
	if err != nil {
		return err
	}
143
	w.files = append(w.files, f)
144

145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
	// Consume and validate meta headers.
	for _, f := range w.files {
		metab := make([]byte, 8)

		if n, err := f.Read(metab); err != nil {
			return errors.Wrapf(err, "validate meta %q", f.Name())
		} else if n != 8 {
			return errors.Errorf("invalid header size %d in %q", n, f.Name())
		}

		if m := binary.BigEndian.Uint32(metab[:4]); m != WALMagic {
			return errors.Errorf("invalid magic header %x in %q", m, f.Name())
		}
		if metab[4] != WALFormatDefault {
			return errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name())
		}
	}

163 164 165 166 167 168
	return nil
}

// cut finishes the currently active segments and open the next one.
// The encoder is reset to point to the new segment.
func (w *WAL) cut() error {
F
Fabian Reinartz 已提交
169
	// Sync current tail to disc and close.
170
	if tf := w.tail(); tf != nil {
F
Fabian Reinartz 已提交
171
		if err := w.sync(); err != nil {
172 173
			return err
		}
174 175 176 177 178 179 180
		off, err := tf.Seek(0, os.SEEK_CUR)
		if err != nil {
			return err
		}
		if err := tf.Truncate(off); err != nil {
			return err
		}
F
Fabian Reinartz 已提交
181
		if err := tf.Close(); err != nil {
182 183 184 185 186 187 188 189
			return err
		}
	}

	p, _, err := nextSequenceFile(w.dirFile.Name(), "")
	if err != nil {
		return err
	}
190
	f, err := os.Create(p)
191 192 193
	if err != nil {
		return err
	}
194
	if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
195 196 197 198 199 200
		return err
	}
	if err = w.dirFile.Sync(); err != nil {
		return err
	}

201 202 203 204 205 206 207 208 209
	// Write header metadata for new file.
	metab := make([]byte, 8)
	binary.BigEndian.PutUint32(metab[:4], WALMagic)
	metab[4] = WALFormatDefault

	if _, err := f.Write(metab); err != nil {
		return err
	}

210 211
	w.files = append(w.files, f)
	w.cur = bufio.NewWriterSize(f, 4*1024*1024)
F
Fabian Reinartz 已提交
212
	w.curN = 8
213 214 215 216

	return nil
}

217
func (w *WAL) tail() *os.File {
218 219 220 221 222 223 224 225 226 227 228 229 230
	if len(w.files) == 0 {
		return nil
	}
	return w.files[len(w.files)-1]
}

func (w *WAL) Sync() error {
	w.mtx.Lock()
	defer w.mtx.Unlock()

	return w.sync()
}

F
Fabian Reinartz 已提交
231
func (w *WAL) sync() error {
232 233 234
	if w.cur == nil {
		return nil
	}
235
	if err := w.cur.Flush(); err != nil {
F
Fabian Reinartz 已提交
236 237
		return err
	}
238
	return fileutil.Fdatasync(w.tail())
F
Fabian Reinartz 已提交
239 240
}

241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
func (w *WAL) run(interval time.Duration) {
	var tick <-chan time.Time

	if interval > 0 {
		ticker := time.NewTicker(interval)
		defer ticker.Stop()
		tick = ticker.C
	}
	defer close(w.donec)

	for {
		select {
		case <-w.stopc:
			return
		case <-tick:
256
			if err := w.Sync(); err != nil {
257 258 259 260 261 262
				w.logger.Log("msg", "sync failed", "err", err)
			}
		}
	}
}

F
Fabian Reinartz 已提交
263 264
// Close sync all data and closes the underlying resources.
func (w *WAL) Close() error {
265 266 267
	close(w.stopc)
	<-w.donec

F
Fabian Reinartz 已提交
268 269
	// Lock mutex and leave it locked so we panic if there's a bug causing
	// the block to be used afterwards.
270 271
	w.mtx.Lock()

F
Fabian Reinartz 已提交
272 273 274
	if err := w.sync(); err != nil {
		return err
	}
F
Fabian Reinartz 已提交
275 276
	// On opening, a WAL must be fully consumed once. Afterwards
	// only the current segment will still be open.
277 278 279 280
	if tf := w.tail(); tf != nil {
		return tf.Close()
	}
	return nil
F
Fabian Reinartz 已提交
281 282
}

F
Fabian Reinartz 已提交
283 284 285 286 287 288
const (
	minSectorSize = 512

	// walPageBytes is the alignment for flushing records to the backing Writer.
	// It should be a multiple of the minimum sector size so that WAL can safely
	// distinguish between torn writes and ordinary data corruption.
F
Fabian Reinartz 已提交
289
	walPageBytes = 16 * minSectorSize
F
Fabian Reinartz 已提交
290
)
F
Fabian Reinartz 已提交
291

292 293 294
func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
	w.mtx.Lock()
	defer w.mtx.Unlock()
F
Fabian Reinartz 已提交
295

296 297 298 299 300 301 302
	// Cut to the next segment if exceeds the file size unless it would also
	// exceed the size of a new segment.
	var (
		sz    = int64(6 + 4 + len(buf))
		newsz = w.curN + sz
	)
	if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
303 304 305 306
		if err := w.cut(); err != nil {
			return err
		}
	}
F
Fabian Reinartz 已提交
307

308 309
	w.crc32.Reset()
	wr := io.MultiWriter(w.crc32, w.cur)
F
Fabian Reinartz 已提交
310 311 312 313 314

	b := make([]byte, 6)
	b[0] = byte(et)
	b[1] = flag

F
Fabian Reinartz 已提交
315
	binary.BigEndian.PutUint32(b[2:], uint32(len(buf)))
F
Fabian Reinartz 已提交
316

317
	if _, err := wr.Write(b); err != nil {
F
Fabian Reinartz 已提交
318 319
		return err
	}
320
	if _, err := wr.Write(buf); err != nil {
F
Fabian Reinartz 已提交
321 322
		return err
	}
323
	if _, err := w.cur.Write(w.crc32.Sum(nil)); err != nil {
F
Fabian Reinartz 已提交
324 325 326
		return err
	}

327 328
	w.curN += sz

F
Fabian Reinartz 已提交
329
	putWALBuffer(buf)
F
Fabian Reinartz 已提交
330 331 332 333 334 335 336 337
	return nil
}

const (
	walSeriesSimple  = 1
	walSamplesSimple = 1
)

F
Fabian Reinartz 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
var walBuffers = sync.Pool{}

func getWALBuffer() []byte {
	b := walBuffers.Get()
	if b == nil {
		return make([]byte, 0, 64*1024)
	}
	return b.([]byte)
}

func putWALBuffer(b []byte) {
	b = b[:0]
	walBuffers.Put(b)
}

353
func (w *WAL) encodeSeries(series []labels.Labels) error {
F
Fabian Reinartz 已提交
354 355 356
	if len(series) == 0 {
		return nil
	}
357 358

	b := make([]byte, binary.MaxVarintLen32)
F
Fabian Reinartz 已提交
359
	buf := getWALBuffer()
F
Fabian Reinartz 已提交
360 361 362

	for _, lset := range series {
		n := binary.PutUvarint(b, uint64(len(lset)))
F
Fabian Reinartz 已提交
363
		buf = append(buf, b[:n]...)
F
Fabian Reinartz 已提交
364 365 366

		for _, l := range lset {
			n = binary.PutUvarint(b, uint64(len(l.Name)))
F
Fabian Reinartz 已提交
367 368
			buf = append(buf, b[:n]...)
			buf = append(buf, l.Name...)
F
Fabian Reinartz 已提交
369 370

			n = binary.PutUvarint(b, uint64(len(l.Value)))
F
Fabian Reinartz 已提交
371 372
			buf = append(buf, b[:n]...)
			buf = append(buf, l.Value...)
F
Fabian Reinartz 已提交
373 374 375
		}
	}

376
	return w.entry(WALEntrySeries, walSeriesSimple, buf)
F
Fabian Reinartz 已提交
377 378
}

379
func (w *WAL) encodeSamples(samples []refdSample) error {
F
Fabian Reinartz 已提交
380 381 382
	if len(samples) == 0 {
		return nil
	}
383 384

	b := make([]byte, binary.MaxVarintLen64)
F
Fabian Reinartz 已提交
385
	buf := getWALBuffer()
F
Fabian Reinartz 已提交
386 387 388 389 390 391 392

	// Store base timestamp and base reference number of first sample.
	// All samples encode their timestamp and ref as delta to those.
	//
	// TODO(fabxc): optimize for all samples having the same timestamp.
	first := samples[0]

393
	binary.BigEndian.PutUint64(b, first.ref)
394
	buf = append(buf, b[:8]...)
F
Fabian Reinartz 已提交
395
	binary.BigEndian.PutUint64(b, uint64(first.t))
F
Fabian Reinartz 已提交
396
	buf = append(buf, b[:8]...)
F
Fabian Reinartz 已提交
397 398 399

	for _, s := range samples {
		n := binary.PutVarint(b, int64(s.ref)-int64(first.ref))
F
Fabian Reinartz 已提交
400
		buf = append(buf, b[:n]...)
F
Fabian Reinartz 已提交
401 402

		n = binary.PutVarint(b, s.t-first.t)
F
Fabian Reinartz 已提交
403
		buf = append(buf, b[:n]...)
F
Fabian Reinartz 已提交
404 405

		binary.BigEndian.PutUint64(b, math.Float64bits(s.v))
F
Fabian Reinartz 已提交
406
		buf = append(buf, b[:8]...)
F
Fabian Reinartz 已提交
407 408
	}

409
	return w.entry(WALEntrySamples, walSamplesSimple, buf)
F
Fabian Reinartz 已提交
410 411
}

412 413
// WALReader decodes and emits write ahead log entries.
type WALReader struct {
414 415 416 417
	rs    []io.ReadCloser
	cur   int
	buf   []byte
	crc32 hash.Hash32
418 419 420 421

	err     error
	labels  []labels.Labels
	samples []refdSample
422 423
}

424 425 426
// NewWALReader returns a new WALReader over the sequence of the given ReadClosers.
func NewWALReader(rs ...io.ReadCloser) *WALReader {
	return &WALReader{
427 428 429
		rs:    rs,
		buf:   make([]byte, 0, 128*4096),
		crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
430 431 432
	}
}

433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
// At returns the last decoded entry of labels or samples.
func (r *WALReader) At() ([]labels.Labels, []refdSample) {
	return r.labels, r.samples
}

// Err returns the last error the reader encountered.
func (r *WALReader) Err() error {
	return r.err
}

// nextEntry retrieves the next entry. It is also used as a testing hook.
func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) {
	if r.cur >= len(r.rs) {
		return 0, 0, nil, io.EOF
	}
	cr := r.rs[r.cur]

	et, flag, b, err := r.entry(cr)
F
Fabian Reinartz 已提交
451 452 453 454 455
	// If we reached the end of the reader, advance to the next one
	// and close.
	// Do not close on the last one as it will still be appended to.
	// XXX(fabxc): leaky abstraction.
	if err == io.EOF && r.cur < len(r.rs)-1 {
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
		// Current reader completed, close and move to the next one.
		if err := cr.Close(); err != nil {
			return 0, 0, nil, err
		}
		r.cur++
		return r.nextEntry()
	}
	return et, flag, b, err
}

// Next returns decodes the next entry pair and returns true
// if it was succesful.
func (r *WALReader) Next() bool {
	r.labels = r.labels[:0]
	r.samples = r.samples[:0]

	et, flag, b, err := r.nextEntry()
F
Fabian Reinartz 已提交
473
	if err != nil {
474 475 476 477
		if err != io.EOF {
			r.err = err
		}
		return false
F
Fabian Reinartz 已提交
478
	}
479 480

	switch et {
F
Fabian Reinartz 已提交
481
	case WALEntrySamples:
482 483 484
		if err := r.decodeSamples(flag, b); err != nil {
			r.err = err
		}
F
Fabian Reinartz 已提交
485
	case WALEntrySeries:
486 487 488 489 490 491 492 493 494 495
		if err := r.decodeSeries(flag, b); err != nil {
			r.err = err
		}
	default:
		r.err = errors.Errorf("unknown WAL entry type %d", et)
	}
	return r.err == nil
}

func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
496 497
	r.crc32.Reset()
	tr := io.TeeReader(cr, r.crc32)
498 499 500 501 502 503 504 505 506 507 508 509 510 511

	b := make([]byte, 6)
	if _, err := tr.Read(b); err != nil {
		return 0, 0, nil, err
	}

	var (
		etype  = WALEntryType(b[0])
		flag   = b[1]
		length = int(binary.BigEndian.Uint32(b[2:]))
	)
	// Exit if we reached pre-allocated space.
	if etype == 0 {
		return 0, 0, nil, io.EOF
F
Fabian Reinartz 已提交
512
	}
513 514 515 516 517 518 519 520 521 522 523 524 525

	if length > len(r.buf) {
		r.buf = make([]byte, length)
	}
	buf := r.buf[:length]

	if _, err := tr.Read(buf); err != nil {
		return 0, 0, nil, err
	}
	_, err := cr.Read(b[:4])
	if err != nil {
		return 0, 0, nil, err
	}
526
	if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp {
527 528 529 530
		return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp)
	}

	return etype, flag, buf, nil
F
Fabian Reinartz 已提交
531 532
}

533
func (r *WALReader) decodeSeries(flag byte, b []byte) error {
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
	for len(b) > 0 {
		l, n := binary.Uvarint(b)
		if n < 1 {
			return errors.Wrap(errInvalidSize, "number of labels")
		}
		b = b[n:]
		lset := make(labels.Labels, l)

		for i := 0; i < int(l); i++ {
			nl, n := binary.Uvarint(b)
			if n < 1 || len(b) < n+int(nl) {
				return errors.Wrap(errInvalidSize, "label name")
			}
			lset[i].Name = string(b[n : n+int(nl)])
			b = b[n+int(nl):]

			vl, n := binary.Uvarint(b)
			if n < 1 || len(b) < n+int(vl) {
				return errors.Wrap(errInvalidSize, "label value")
			}
			lset[i].Value = string(b[n : n+int(vl)])
			b = b[n+int(vl):]
		}

558
		r.labels = append(r.labels, lset)
559 560 561 562
	}
	return nil
}

563
func (r *WALReader) decodeSamples(flag byte, b []byte) error {
564
	if len(b) < 16 {
565 566 567
		return errors.Wrap(errInvalidSize, "header length")
	}
	var (
568
		baseRef  = binary.BigEndian.Uint64(b)
569
		baseTime = int64(binary.BigEndian.Uint64(b[8:]))
570
	)
571
	b = b[16:]
572 573

	for len(b) > 0 {
574
		var smpl refdSample
575 576 577 578 579 580

		dref, n := binary.Varint(b)
		if n < 1 {
			return errors.Wrap(errInvalidSize, "sample ref delta")
		}
		b = b[n:]
581 582

		smpl.ref = uint64(int64(baseRef) + dref)
583 584 585 586 587 588 589 590 591 592 593 594 595 596

		dtime, n := binary.Varint(b)
		if n < 1 {
			return errors.Wrap(errInvalidSize, "sample timestamp delta")
		}
		b = b[n:]
		smpl.t = baseTime + dtime

		if len(b) < 8 {
			return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
		}
		smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
		b = b[8:]

597
		r.samples = append(r.samples, smpl)
598 599 600
	}
	return nil
}