delta.go 11.7 KB
Newer Older
B
Bjoern Rabenstein 已提交
1
// Copyright 2014 The Prometheus Authors
B
Bjoern Rabenstein 已提交
2 3 4 5 6 7 8 9 10 11 12 13
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

B
Bjoern Rabenstein 已提交
14
package local
15 16 17 18 19 20 21 22

import (
	"encoding/binary"
	"fmt"
	"io"
	"math"
	"sort"

23
	"github.com/prometheus/common/model"
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46

	"github.com/prometheus/prometheus/storage/metric"
)

// The 21-byte header of a delta-encoded chunk looks like:
//
// - time delta bytes:  1 bytes
// - value delta bytes: 1 bytes
// - is integer:        1 byte
// - base time:         8 bytes
// - base value:        8 bytes
// - used buf bytes:    2 bytes
const (
	deltaHeaderBytes = 21

	deltaHeaderTimeBytesOffset  = 0
	deltaHeaderValueBytesOffset = 1
	deltaHeaderIsIntOffset      = 2
	deltaHeaderBaseTimeOffset   = 3
	deltaHeaderBaseValueOffset  = 11
	deltaHeaderBufLenOffset     = 19
)

B
Bjoern Rabenstein 已提交
47
// A deltaEncodedChunk adaptively stores sample timestamps and values with a
B
beorn7 已提交
48
// delta encoding of various types (int, float) and bit widths. However, once 8
B
Bjoern Rabenstein 已提交
49 50
// bytes would be needed to encode a delta value, a fall-back to the absolute
// numbers happens (so that timestamps are saved directly as int64 and values as
B
Bjoern Rabenstein 已提交
51
// float64). It implements the chunk interface.
B
beorn7 已提交
52
type deltaEncodedChunk []byte
53

B
Bjoern Rabenstein 已提交
54
// newDeltaEncodedChunk returns a newly allocated deltaEncodedChunk.
B
beorn7 已提交
55 56 57 58 59 60 61 62 63 64 65
func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncodedChunk {
	if tb < 1 {
		panic("need at least 1 time delta byte")
	}
	if length < deltaHeaderBytes+16 {
		panic(fmt.Errorf(
			"chunk length %d bytes is insufficient, need at least %d",
			length, deltaHeaderBytes+16,
		))
	}
	c := make(deltaEncodedChunk, deltaHeaderIsIntOffset+1, length)
66

B
beorn7 已提交
67 68
	c[deltaHeaderTimeBytesOffset] = byte(tb)
	c[deltaHeaderValueBytesOffset] = byte(vb)
B
Bjoern Rabenstein 已提交
69
	if vb < d8 && isInt { // Only use int for fewer than 8 value delta bytes.
B
beorn7 已提交
70
		c[deltaHeaderIsIntOffset] = 1
71
	} else {
B
beorn7 已提交
72
		c[deltaHeaderIsIntOffset] = 0
73 74
	}

B
beorn7 已提交
75
	return &c
76 77
}

B
Bjoern Rabenstein 已提交
78
// add implements chunk.
79
func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
B
beorn7 已提交
80
	if c.len() == 0 {
B
beorn7 已提交
81 82 83
		c = c[:deltaHeaderBytes]
		binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
		binary.LittleEndian.PutUint64(c[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value)))
84 85
	}

B
beorn7 已提交
86
	remainingBytes := cap(c) - len(c)
87 88 89
	sampleSize := c.sampleSize()

	// Do we generally have space for another sample in this chunk? If not,
B
Bjoern Rabenstein 已提交
90
	// overflow into a new one.
91
	if remainingBytes < sampleSize {
92
		overflowChunks := newChunk().add(s)
B
beorn7 已提交
93
		return []chunk{&c, overflowChunks[0]}
94 95
	}

B
beorn7 已提交
96
	baseValue := c.baseValue()
97
	dt := s.Timestamp - c.baseTime()
98 99 100 101
	if dt < 0 {
		panic("time delta is less than zero")
	}

B
beorn7 已提交
102
	dv := s.Value - baseValue
B
Bjoern Rabenstein 已提交
103 104
	tb := c.timeBytes()
	vb := c.valueBytes()
105
	isInt := c.isInt()
106 107 108

	// If the new sample is incompatible with the current encoding, reencode the
	// existing chunk data into new chunk(s).
B
beorn7 已提交
109

110 111 112 113 114
	ntb, nvb, nInt := tb, vb, isInt
	if isInt && !isInt64(dv) {
		// int->float.
		nvb = d4
		nInt = false
115
	} else if !isInt && vb == d4 && baseValue+model.SampleValue(float32(dv)) != s.Value {
116 117 118 119 120 121 122 123 124 125 126
		// float32->float64.
		nvb = d8
	} else {
		if tb < d8 {
			// Maybe more bytes for timestamp.
			ntb = max(tb, bytesNeededForUnsignedTimestampDelta(dt))
		}
		if c.isInt() && vb < d8 {
			// Maybe more bytes for sample value.
			nvb = max(vb, bytesNeededForIntegerSampleValueDelta(dv))
		}
B
beorn7 已提交
127
	}
128 129 130 131 132 133 134
	if tb != ntb || vb != nvb || isInt != nInt {
		if len(c)*2 < cap(c) {
			return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
		}
		// Chunk is already half full. Better create a new one and save the transcoding efforts.
		overflowChunks := newChunk().add(s)
		return []chunk{&c, overflowChunks[0]}
135
	}
136

B
beorn7 已提交
137 138
	offset := len(c)
	c = c[:offset+sampleSize]
139

B
Bjoern Rabenstein 已提交
140 141
	switch tb {
	case d1:
B
beorn7 已提交
142
		c[offset] = byte(dt)
B
Bjoern Rabenstein 已提交
143
	case d2:
B
beorn7 已提交
144
		binary.LittleEndian.PutUint16(c[offset:], uint16(dt))
B
Bjoern Rabenstein 已提交
145
	case d4:
B
beorn7 已提交
146
		binary.LittleEndian.PutUint32(c[offset:], uint32(dt))
B
Bjoern Rabenstein 已提交
147 148
	case d8:
		// Store the absolute value (no delta) in case of d8.
B
beorn7 已提交
149
		binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
B
Bjoern Rabenstein 已提交
150 151
	default:
		panic("invalid number of bytes for time delta")
152 153
	}

B
Bjoern Rabenstein 已提交
154
	offset += int(tb)
155 156

	if c.isInt() {
B
Bjoern Rabenstein 已提交
157 158
		switch vb {
		case d0:
159
			// No-op. Constant value is stored as base value.
B
Bjoern Rabenstein 已提交
160
		case d1:
161
			c[offset] = byte(int8(dv))
B
Bjoern Rabenstein 已提交
162
		case d2:
163
			binary.LittleEndian.PutUint16(c[offset:], uint16(int16(dv)))
B
Bjoern Rabenstein 已提交
164
		case d4:
165
			binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv)))
B
Bjoern Rabenstein 已提交
166
		// d8 must not happen. Those samples are encoded as float64.
167
		default:
B
Bjoern Rabenstein 已提交
168
			panic("invalid number of bytes for integer delta")
169 170
		}
	} else {
B
Bjoern Rabenstein 已提交
171 172
		switch vb {
		case d4:
B
beorn7 已提交
173
			binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(dv)))
B
Bjoern Rabenstein 已提交
174 175
		case d8:
			// Store the absolute value (no delta) in case of d8.
B
beorn7 已提交
176
			binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
177
		default:
B
Bjoern Rabenstein 已提交
178
			panic("invalid number of bytes for floating point delta")
179 180
		}
	}
B
beorn7 已提交
181
	return []chunk{&c}
182 183
}

184 185 186 187 188
// clone implements chunk.
func (c deltaEncodedChunk) clone() chunk {
	clone := make(deltaEncodedChunk, len(c), cap(c))
	copy(clone, c)
	return &clone
189 190
}

191
// firstTime implements chunk.
192
func (c deltaEncodedChunk) firstTime() model.Time {
193
	return c.baseTime()
194 195 196 197 198
}

// newIterator implements chunk.
func (c *deltaEncodedChunk) newIterator() chunkIterator {
	return &deltaEncodedChunkIterator{
199 200 201 202 203 204 205
		c:      *c,
		len:    c.len(),
		baseT:  c.baseTime(),
		baseV:  c.baseValue(),
		tBytes: c.timeBytes(),
		vBytes: c.valueBytes(),
		isInt:  c.isInt(),
206
	}
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
}

// marshal implements chunk.
func (c deltaEncodedChunk) marshal(w io.Writer) error {
	if len(c) > math.MaxUint16 {
		panic("chunk buffer length would overflow a 16 bit uint.")
	}
	binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c)))

	n, err := w.Write(c[:cap(c)])
	if err != nil {
		return err
	}
	if n != cap(c) {
		return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n)
	}
	return nil
}

// unmarshal implements chunk.
func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
	*c = (*c)[:cap(*c)]
B
beorn7 已提交
229 230
	if _, err := io.ReadFull(r, *c); err != nil {
		return err
231 232 233
	}
	*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
	return nil
234 235
}

B
beorn7 已提交
236 237 238 239 240 241 242
// unmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) {
	*c = (*c)[:cap(*c)]
	copy(*c, buf)
	*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
}

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
// encoding implements chunk.
func (c deltaEncodedChunk) encoding() chunkEncoding { return delta }

func (c deltaEncodedChunk) timeBytes() deltaBytes {
	return deltaBytes(c[deltaHeaderTimeBytesOffset])
}

func (c deltaEncodedChunk) valueBytes() deltaBytes {
	return deltaBytes(c[deltaHeaderValueBytesOffset])
}

func (c deltaEncodedChunk) isInt() bool {
	return c[deltaHeaderIsIntOffset] == 1
}

258 259
func (c deltaEncodedChunk) baseTime() model.Time {
	return model.Time(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:]))
260 261
}

262 263
func (c deltaEncodedChunk) baseValue() model.SampleValue {
	return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:])))
264 265 266 267 268 269 270 271 272 273 274 275 276
}

func (c deltaEncodedChunk) sampleSize() int {
	return int(c.timeBytes() + c.valueBytes())
}

func (c deltaEncodedChunk) len() int {
	if len(c) < deltaHeaderBytes {
		return 0
	}
	return (len(c) - deltaHeaderBytes) / c.sampleSize()
}

B
Bjoern Rabenstein 已提交
277
// deltaEncodedChunkIterator implements chunkIterator.
278
type deltaEncodedChunkIterator struct {
279 280
	c              deltaEncodedChunk
	len            int
281 282
	baseT          model.Time
	baseV          model.SampleValue
283 284
	tBytes, vBytes deltaBytes
	isInt          bool
285 286
}

287 288 289
// length implements chunkIterator.
func (it *deltaEncodedChunkIterator) length() int { return it.len }

B
beorn7 已提交
290
// valueAtTime implements chunkIterator.
291
func (it *deltaEncodedChunkIterator) valueAtTime(t model.Time) []model.SamplePair {
292
	i := sort.Search(it.len, func(i int) bool {
B
beorn7 已提交
293
		return !it.timestampAtIndex(i).Before(t)
294 295 296 297
	})

	switch i {
	case 0:
298
		return []model.SamplePair{{
B
beorn7 已提交
299 300
			Timestamp: it.timestampAtIndex(0),
			Value:     it.sampleValueAtIndex(0),
301 302
		}}
	case it.len:
303
		return []model.SamplePair{{
B
beorn7 已提交
304 305
			Timestamp: it.timestampAtIndex(it.len - 1),
			Value:     it.sampleValueAtIndex(it.len - 1),
306
		}}
307
	default:
B
beorn7 已提交
308
		ts := it.timestampAtIndex(i)
309
		if ts.Equal(t) {
310
			return []model.SamplePair{{
311
				Timestamp: ts,
B
beorn7 已提交
312
				Value:     it.sampleValueAtIndex(i),
313 314
			}}
		}
315 316
		return []model.SamplePair{
			{
B
beorn7 已提交
317 318
				Timestamp: it.timestampAtIndex(i - 1),
				Value:     it.sampleValueAtIndex(i - 1),
319
			},
320
			{
321
				Timestamp: ts,
B
beorn7 已提交
322
				Value:     it.sampleValueAtIndex(i),
323
			},
324 325 326 327
		}
	}
}

B
beorn7 已提交
328
// rangeValues implements chunkIterator.
329
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair {
330
	oldest := sort.Search(it.len, func(i int) bool {
B
beorn7 已提交
331
		return !it.timestampAtIndex(i).Before(in.OldestInclusive)
332 333
	})

334
	newest := sort.Search(it.len, func(i int) bool {
B
beorn7 已提交
335
		return it.timestampAtIndex(i).After(in.NewestInclusive)
336 337
	})

338
	if oldest == it.len {
339 340 341
		return nil
	}

342
	result := make([]model.SamplePair, 0, newest-oldest)
343
	for i := oldest; i < newest; i++ {
344
		result = append(result, model.SamplePair{
B
beorn7 已提交
345 346
			Timestamp: it.timestampAtIndex(i),
			Value:     it.sampleValueAtIndex(i),
347
		})
348 349 350 351
	}
	return result
}

B
Bjoern Rabenstein 已提交
352
// contains implements chunkIterator.
353
func (it *deltaEncodedChunkIterator) contains(t model.Time) bool {
B
beorn7 已提交
354
	return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1))
355 356 357
}

// values implements chunkIterator.
358 359
func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair {
	valuesChan := make(chan *model.SamplePair)
360 361
	go func() {
		for i := 0; i < it.len; i++ {
362
			valuesChan <- &model.SamplePair{
B
beorn7 已提交
363 364
				Timestamp: it.timestampAtIndex(i),
				Value:     it.sampleValueAtIndex(i),
365 366 367 368 369 370 371
			}
		}
		close(valuesChan)
	}()
	return valuesChan
}

B
beorn7 已提交
372
// timestampAtIndex implements chunkIterator.
373
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time {
374 375 376 377
	offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)

	switch it.tBytes {
	case d1:
378
		return it.baseT + model.Time(uint8(it.c[offset]))
379
	case d2:
380
		return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:]))
381
	case d4:
382
		return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:]))
383 384
	case d8:
		// Take absolute value for d8.
385
		return model.Time(binary.LittleEndian.Uint64(it.c[offset:]))
386
	default:
387
		panic("invalid number of bytes for time delta")
388 389 390
	}
}

B
beorn7 已提交
391
// lastTimestamp implements chunkIterator.
392
func (it *deltaEncodedChunkIterator) lastTimestamp() model.Time {
B
beorn7 已提交
393
	return it.timestampAtIndex(it.len - 1)
394 395
}

B
beorn7 已提交
396
// sampleValueAtIndex implements chunkIterator.
397
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue {
398 399 400 401 402 403 404
	offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)

	if it.isInt {
		switch it.vBytes {
		case d0:
			return it.baseV
		case d1:
405
			return it.baseV + model.SampleValue(int8(it.c[offset]))
406
		case d2:
407
			return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
408
		case d4:
409
			return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
410 411
		// No d8 for ints.
		default:
412
			panic("invalid number of bytes for integer delta")
413 414 415 416
		}
	} else {
		switch it.vBytes {
		case d4:
417
			return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
418 419
		case d8:
			// Take absolute value for d8.
420
			return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
421
		default:
422
			panic("invalid number of bytes for floating point delta")
423 424 425 426
		}
	}
}

B
beorn7 已提交
427
// lastSampleValue implements chunkIterator.
428
func (it *deltaEncodedChunkIterator) lastSampleValue() model.SampleValue {
B
beorn7 已提交
429
	return it.sampleValueAtIndex(it.len - 1)
430
}