From 1ddedf2b30b1da9e525f37baea22a01a7b740580 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 4 Sep 2017 16:08:38 +0200 Subject: [PATCH] Change series ID from uint32 to uint64 --- block.go | 2 +- compact.go | 8 +-- head.go | 60 ++++++++++++-------- head_test.go | 8 +-- index.go | 17 +++--- index_test.go | 24 ++++---- postings.go | 46 +++++++-------- postings_test.go | 138 ++++++++++++++++++++++----------------------- querier_test.go | 18 +++--- tombstones.go | 26 ++++----- tombstones_test.go | 6 +- wal.go | 24 ++++---- wal_test.go | 2 +- 13 files changed, 198 insertions(+), 181 deletions(-) diff --git a/block.go b/block.go index 84f006037..67cd57491 100644 --- a/block.go +++ b/block.go @@ -225,7 +225,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := map[uint32]Intervals{} + stones := map[uint64]Intervals{} var lset labels.Labels var chks []ChunkMeta diff --git a/compact.go b/compact.go index dd7bc3832..ad9700fac 100644 --- a/compact.go +++ b/compact.go @@ -477,9 +477,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, // We fully rebuild the postings list index from merged series. var ( - postings = &memPostings{m: make(map[term][]uint32, 512)} + postings = &memPostings{m: make(map[term][]uint64, 512)} values = map[string]stringset{} - i = uint32(0) + i = uint64(0) ) if err := indexw.AddSymbols(allSymbols); err != nil { @@ -568,9 +568,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } // Write a postings list containing all series. - all := make([]uint32, i) + all := make([]uint64, i) for i := range all { - all[i] = uint32(i) + all[i] = uint64(i) } if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { return errors.Wrap(err, "write 'all' postings") diff --git a/head.go b/head.go index c1c25bbd4..1dbfc917d 100644 --- a/head.go +++ b/head.go @@ -55,11 +55,11 @@ type Head struct { appendPool sync.Pool minTime, maxTime int64 - lastSeriesID uint32 + lastSeriesID uint64 // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. - series map[uint32]*memSeries + series map[uint64]*memSeries // hashes contains a collision map of label set hashes of chunks // to their chunk descs. hashes map[uint64][]*memSeries @@ -178,11 +178,11 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( chunkRange: chunkRange, minTime: math.MaxInt64, maxTime: math.MinInt64, - series: map[uint32]*memSeries{}, + series: map[uint64]*memSeries{}, hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, symbols: map[string]struct{}{}, - postings: &memPostings{m: make(map[term][]uint32)}, + postings: &memPostings{m: make(map[term][]uint64)}, tombstones: newEmptyTombstoneReader(), } h.metrics = newHeadMetrics(h, r) @@ -201,7 +201,7 @@ func (h *Head) readWAL() error { } samplesFunc := func(samples []RefSample) error { for _, s := range samples { - ms, ok := h.series[uint32(s.Ref)] + ms, ok := h.series[s.Ref] if !ok { return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) } @@ -424,13 +424,13 @@ func (a *headAppender) AddFast(ref string, t int64, v float64) error { } var ( refn = binary.BigEndian.Uint64(yoloBytes(ref)) - id = uint32(refn) + id = (refn << 1) >> 1 inTx = refn&(1<<63) != 0 ) // Distinguish between existing series and series created in // this transaction. if inTx { - if id > uint32(len(a.newSeries)-1) { + if id > uint64(len(a.newSeries)-1) { return errors.Wrap(ErrNotFound, "transaction series ID too high") } // TODO(fabxc): we also have to validate here that the @@ -527,7 +527,7 @@ func (a *headAppender) Commit() error { total := uint64(len(a.samples)) for _, s := range a.samples { - series, ok := a.head.series[uint32(s.Ref)] + series, ok := a.head.series[s.Ref] if !ok { return errors.Errorf("series with ID %d not found", s.Ref) } @@ -614,7 +614,7 @@ func (h *Head) gc() { // Only data strictly lower than this timestamp must be deleted. mint := h.MinTime() - deletedHashes := map[uint64][]uint32{} + deletedHashes := map[uint64][]uint64{} h.mtx.RLock() @@ -630,7 +630,7 @@ func (h *Head) gc() { } } - deletedIDs := make(map[uint32]struct{}, len(deletedHashes)) + deletedIDs := make(map[uint64]struct{}, len(deletedHashes)) h.mtx.RUnlock() @@ -639,7 +639,7 @@ func (h *Head) gc() { for hash, ids := range deletedHashes { - inIDs := func(id uint32) bool { + inIDs := func(id uint64) bool { for _, o := range ids { if o == id { return true @@ -675,7 +675,7 @@ func (h *Head) gc() { } for t, p := range h.postings.m { - repl := make([]uint32, 0, len(p)) + repl := make([]uint64, 0, len(p)) for _, id := range p { if _, ok := deletedIDs[id]; !ok { @@ -761,16 +761,32 @@ func (h *headChunkReader) Close() error { return nil } +// packChunkID packs a seriesID and a chunkID within it into a global 8 byte ID. +// It panicks if the seriesID exceeds 5 bytes or the chunk ID 3 bytes. +func packChunkID(seriesID, chunkID uint64) uint64 { + if seriesID > (1<<40)-1 { + panic("series ID exceeds 5 bytes") + } + if chunkID > (1<<24)-1 { + panic("chunk ID exceeds 3 bytes") + } + return (seriesID << 24) | chunkID +} + +func unpackChunkID(id uint64) (seriesID, chunkID uint64) { + return id >> 24, (id << 40) >> 40 +} + // Chunk returns the chunk for the reference number. func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { h.head.mtx.RLock() defer h.head.mtx.RUnlock() - s := h.head.series[uint32(ref>>32)] + sid, cid := unpackChunkID(ref) + s := h.head.series[sid] s.mtx.RLock() - cid := int((ref << 32) >> 32) - c := s.chunk(cid) + c := s.chunk(int(cid)) s.mtx.RUnlock() // Do not expose chunks that are outside of the specified range. @@ -780,7 +796,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { return &safeChunk{ Chunk: c.chunk, s: s, - cid: cid, + cid: int(cid), }, nil } @@ -860,7 +876,7 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings { h.head.mtx.RLock() defer h.head.mtx.RUnlock() - ep := make([]uint32, 0, 1024) + ep := make([]uint64, 0, 1024) for p.Next() { ep = append(ep, p.At()) @@ -890,7 +906,7 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings { } // Series returns the series for the given reference. -func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error { +func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { h.head.mtx.RLock() defer h.head.mtx.RUnlock() @@ -913,7 +929,7 @@ func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkM *chks = append(*chks, ChunkMeta{ MinTime: c.minTime, MaxTime: c.maxTime, - Ref: (uint64(ref) << 32) | uint64(s.chunkID(i)), + Ref: packChunkID(s.ref, uint64(s.chunkID(i))), }) } @@ -949,7 +965,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries { h.metrics.series.Inc() h.metrics.seriesCreated.Inc() - id := atomic.AddUint32(&h.lastSeriesID, 1) + id := atomic.AddUint64(&h.lastSeriesID, 1) s := newMemSeries(lset, id, h.chunkRange) h.series[id] = s @@ -983,7 +999,7 @@ type sample struct { type memSeries struct { mtx sync.RWMutex - ref uint32 + ref uint64 lset labels.Labels chunks []*memChunk chunkRange int64 @@ -1020,7 +1036,7 @@ func (s *memSeries) cut(mint int64) *memChunk { return c } -func newMemSeries(lset labels.Labels, id uint32, chunkRange int64) *memSeries { +func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { s := &memSeries{ lset: lset, ref: id, diff --git a/head_test.go b/head_test.go index 8752bcd08..50aa80e6e 100644 --- a/head_test.go +++ b/head_test.go @@ -134,10 +134,10 @@ func TestHead_Truncate(t *testing.T) { postingsC1, _ := expandPostings(h.postings.get(term{"c", "1"})) postingsAll, _ := expandPostings(h.postings.get(term{"", ""})) - require.Equal(t, []uint32{s1.ref}, postingsA1) - require.Equal(t, []uint32{s2.ref}, postingsA2) - require.Equal(t, []uint32{s1.ref, s2.ref}, postingsB1) - require.Equal(t, []uint32{s1.ref, s2.ref}, postingsAll) + require.Equal(t, []uint64{s1.ref}, postingsA1) + require.Equal(t, []uint64{s2.ref}, postingsA2) + require.Equal(t, []uint64{s1.ref, s2.ref}, postingsB1) + require.Equal(t, []uint64{s1.ref, s2.ref}, postingsAll) require.Nil(t, postingsB2) require.Nil(t, postingsC1) diff --git a/index.go b/index.go index e935366cc..ddc2c4f52 100644 --- a/index.go +++ b/index.go @@ -99,7 +99,7 @@ type IndexWriter interface { // their labels. // The reference numbers are used to resolve entries in postings lists that // are added later. - AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error + AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error // WriteLabelIndex serializes an index from label names to values. // The passed in values chained tuples of strings of the length of names. @@ -130,7 +130,7 @@ type indexWriter struct { uint32s []uint32 symbols map[string]uint32 // symbol offsets - seriesOffsets map[uint32]uint64 // offsets of series + seriesOffsets map[uint64]uint64 // offsets of series labelIndexes []hashEntry // label index offsets postings []hashEntry // postings lists offsets @@ -175,7 +175,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { // Caches. symbols: make(map[string]uint32, 1<<13), - seriesOffsets: make(map[uint32]uint64, 1<<16), + seriesOffsets: make(map[uint64]uint64, 1<<16), crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { @@ -260,7 +260,7 @@ func (w *indexWriter) writeMeta() error { return w.write(w.buf1.get()) } -func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { +func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkMeta) error { if err := w.ensureStage(idxStageSeries); err != nil { return err } @@ -457,7 +457,10 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { if !ok { return errors.Errorf("%p series for reference %d not found", w, it.At()) } - refs = append(refs, uint32(offset)) // XXX(fabxc): get uint64 vs uint32 sorted out. + if offset > (1<<32)-1 { + return errors.Errorf("series offset %d exceeds 4 bytes", offset) + } + refs = append(refs, uint32(offset)) } if err := it.Err(); err != nil { return err @@ -524,7 +527,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. - Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error + Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error // LabelIndices returns the label pairs for which indices exist. LabelIndices() ([][]string, error) @@ -740,7 +743,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) { return res, nil } -func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error { +func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { d1 := r.decbufAt(int(ref)) d2 := d1.decbuf(int(d1.uvarint())) diff --git a/index_test.go b/index_test.go index ab45ffccc..9f3e14faa 100644 --- a/index_test.go +++ b/index_test.go @@ -33,7 +33,7 @@ type series struct { } type mockIndex struct { - series map[uint32]series + series map[uint64]series labelIndex map[string][]string postings *memPostings symbols map[string]struct{} @@ -41,9 +41,9 @@ type mockIndex struct { func newMockIndex() mockIndex { return mockIndex{ - series: make(map[uint32]series), + series: make(map[uint64]series), labelIndex: make(map[string][]string), - postings: &memPostings{m: make(map[term][]uint32)}, + postings: &memPostings{m: make(map[term][]uint64)}, symbols: make(map[string]struct{}), } } @@ -52,7 +52,7 @@ func (m mockIndex) Symbols() (map[string]struct{}, error) { return m.symbols, nil } -func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error { +func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error { if _, ok := m.series[ref]; ok { return errors.Errorf("series with reference %d already added", ref) } @@ -125,7 +125,7 @@ func (m mockIndex) SortedPostings(p Postings) Postings { return newListPostings(ep) } -func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error { +func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error { s, ok := m.series[ref] if !ok { return ErrNotFound @@ -202,7 +202,7 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, iw.AddSeries(3, series[2])) require.NoError(t, iw.AddSeries(4, series[3])) - err = iw.WritePostings("a", "1", newListPostings([]uint32{1, 2, 3, 4})) + err = iw.WritePostings("a", "1", newListPostings([]uint64{1, 2, 3, 4})) require.NoError(t, err) require.NoError(t, iw.Close()) @@ -274,16 +274,16 @@ func TestPersistence_index_e2e(t *testing.T) { // Population procedure as done by compaction. var ( - postings = &memPostings{m: make(map[term][]uint32, 512)} + postings = &memPostings{m: make(map[term][]uint64, 512)} values = map[string]stringset{} ) mi := newMockIndex() for i, s := range input { - err = iw.AddSeries(uint32(i), s.labels, s.chunks...) + err = iw.AddSeries(uint64(i), s.labels, s.chunks...) require.NoError(t, err) - mi.AddSeries(uint32(i), s.labels, s.chunks...) + mi.AddSeries(uint64(i), s.labels, s.chunks...) for _, l := range s.labels { valset, ok := values[l.Name] @@ -293,7 +293,7 @@ func TestPersistence_index_e2e(t *testing.T) { } valset.set(l.Value) - postings.add(uint32(i), term{name: l.Name, value: l.Value}) + postings.add(uint64(i), term{name: l.Name, value: l.Value}) } i++ } @@ -305,9 +305,9 @@ func TestPersistence_index_e2e(t *testing.T) { require.NoError(t, mi.WriteLabelIndex([]string{k}, vals)) } - all := make([]uint32, len(lbls)) + all := make([]uint64, len(lbls)) for i := range all { - all[i] = uint32(i) + all[i] = uint64(i) } err = iw.WritePostings("", "", newListPostings(all)) require.NoError(t, err) diff --git a/postings.go b/postings.go index f2f1eb5b8..e3ca75de9 100644 --- a/postings.go +++ b/postings.go @@ -20,7 +20,7 @@ import ( ) type memPostings struct { - m map[term][]uint32 + m map[term][]uint64 } type term struct { @@ -38,7 +38,7 @@ func (p *memPostings) get(t term) Postings { // add adds a document to the index. The caller has to ensure that no // term argument appears twice. -func (p *memPostings) add(id uint32, terms ...term) { +func (p *memPostings) add(id uint64, terms ...term) { for _, t := range terms { p.m[t] = append(p.m[t], id) } @@ -51,10 +51,10 @@ type Postings interface { // Seek advances the iterator to value v or greater and returns // true if a value was found. - Seek(v uint32) bool + Seek(v uint64) bool // At returns the value at the current iterator position. - At() uint32 + At() uint64 // Err returns the last error of the iterator. Err() error @@ -66,8 +66,8 @@ type errPostings struct { } func (e errPostings) Next() bool { return false } -func (e errPostings) Seek(uint32) bool { return false } -func (e errPostings) At() uint32 { return 0 } +func (e errPostings) Seek(uint64) bool { return false } +func (e errPostings) At() uint64 { return 0 } func (e errPostings) Err() error { return e.err } var emptyPostings = errPostings{} @@ -88,18 +88,18 @@ func Intersect(its ...Postings) Postings { type intersectPostings struct { a, b Postings aok, bok bool - cur uint32 + cur uint64 } func newIntersectPostings(a, b Postings) *intersectPostings { return &intersectPostings{a: a, b: b} } -func (it *intersectPostings) At() uint32 { +func (it *intersectPostings) At() uint64 { return it.cur } -func (it *intersectPostings) doNext(id uint32) bool { +func (it *intersectPostings) doNext(id uint64) bool { for { if !it.b.Seek(id) { return false @@ -125,7 +125,7 @@ func (it *intersectPostings) Next() bool { return it.doNext(it.a.At()) } -func (it *intersectPostings) Seek(id uint32) bool { +func (it *intersectPostings) Seek(id uint64) bool { if !it.a.Seek(id) { return false } @@ -155,14 +155,14 @@ type mergedPostings struct { a, b Postings initialized bool aok, bok bool - cur uint32 + cur uint64 } func newMergedPostings(a, b Postings) *mergedPostings { return &mergedPostings{a: a, b: b} } -func (it *mergedPostings) At() uint32 { +func (it *mergedPostings) At() uint64 { return it.cur } @@ -204,7 +204,7 @@ func (it *mergedPostings) Next() bool { return true } -func (it *mergedPostings) Seek(id uint32) bool { +func (it *mergedPostings) Seek(id uint64) bool { if it.cur >= id { return true } @@ -225,15 +225,15 @@ func (it *mergedPostings) Err() error { // listPostings implements the Postings interface over a plain list. type listPostings struct { - list []uint32 - cur uint32 + list []uint64 + cur uint64 } -func newListPostings(list []uint32) *listPostings { +func newListPostings(list []uint64) *listPostings { return &listPostings{list: list} } -func (it *listPostings) At() uint32 { +func (it *listPostings) At() uint64 { return it.cur } @@ -247,7 +247,7 @@ func (it *listPostings) Next() bool { return false } -func (it *listPostings) Seek(x uint32) bool { +func (it *listPostings) Seek(x uint64) bool { // If the current value satisfies, then return. if it.cur >= x { return true @@ -281,8 +281,8 @@ func newBigEndianPostings(list []byte) *bigEndianPostings { return &bigEndianPostings{list: list} } -func (it *bigEndianPostings) At() uint32 { - return it.cur +func (it *bigEndianPostings) At() uint64 { + return uint64(it.cur) } func (it *bigEndianPostings) Next() bool { @@ -294,15 +294,15 @@ func (it *bigEndianPostings) Next() bool { return false } -func (it *bigEndianPostings) Seek(x uint32) bool { - if it.cur >= x { +func (it *bigEndianPostings) Seek(x uint64) bool { + if uint64(it.cur) >= x { return true } num := len(it.list) / 4 // Do binary search between current position and end. i := sort.Search(num, func(i int) bool { - return binary.BigEndian.Uint32(it.list[i*4:]) >= x + return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x) }) if i < num { j := i * 4 diff --git a/postings_test.go b/postings_test.go index efffe3d98..5d726ca3a 100644 --- a/postings_test.go +++ b/postings_test.go @@ -23,17 +23,17 @@ import ( type mockPostings struct { next func() bool - seek func(uint32) bool - value func() uint32 + seek func(uint64) bool + value func() uint64 err func() error } func (m *mockPostings) Next() bool { return m.next() } -func (m *mockPostings) Seek(v uint32) bool { return m.seek(v) } -func (m *mockPostings) Value() uint32 { return m.value() } +func (m *mockPostings) Seek(v uint64) bool { return m.seek(v) } +func (m *mockPostings) Value() uint64 { return m.value() } func (m *mockPostings) Err() error { return m.err() } -func expandPostings(p Postings) (res []uint32, err error) { +func expandPostings(p Postings) (res []uint64, err error) { for p.Next() { res = append(res, p.At()) } @@ -42,27 +42,27 @@ func expandPostings(p Postings) (res []uint32, err error) { func TestIntersect(t *testing.T) { var cases = []struct { - a, b []uint32 - res []uint32 + a, b []uint64 + res []uint64 }{ { - a: []uint32{1, 2, 3, 4, 5}, - b: []uint32{6, 7, 8, 9, 10}, + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{6, 7, 8, 9, 10}, res: nil, }, { - a: []uint32{1, 2, 3, 4, 5}, - b: []uint32{4, 5, 6, 7, 8}, - res: []uint32{4, 5}, + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{4, 5, 6, 7, 8}, + res: []uint64{4, 5}, }, { - a: []uint32{1, 2, 3, 4, 9, 10}, - b: []uint32{1, 4, 5, 6, 7, 8, 10, 11}, - res: []uint32{1, 4, 10}, + a: []uint64{1, 2, 3, 4, 9, 10}, + b: []uint64{1, 4, 5, 6, 7, 8, 10, 11}, + res: []uint64{1, 4, 10}, }, { - a: []uint32{1}, - b: []uint32{0, 1}, - res: []uint32{1}, + a: []uint64{1}, + b: []uint64{0, 1}, + res: []uint64{1}, }, } @@ -78,29 +78,29 @@ func TestIntersect(t *testing.T) { func TestMultiIntersect(t *testing.T) { var cases = []struct { - p [][]uint32 - res []uint32 + p [][]uint64 + res []uint64 }{ { - p: [][]uint32{ + p: [][]uint64{ {1, 2, 3, 4, 5, 6, 1000, 1001}, {2, 4, 5, 6, 7, 8, 999, 1001}, {1, 2, 5, 6, 7, 8, 1001, 1200}, }, - res: []uint32{2, 5, 6, 1001}, + res: []uint64{2, 5, 6, 1001}, }, // One of the reproduceable cases for: // https://github.com/prometheus/prometheus/issues/2616 // The initialisation of intersectPostings was moving the iterator forward // prematurely making us miss some postings. { - p: [][]uint32{ + p: [][]uint64{ {1, 2}, {1, 2}, {1, 2}, {2}, }, - res: []uint32{2}, + res: []uint64{2}, }, } @@ -118,22 +118,22 @@ func TestMultiIntersect(t *testing.T) { } func BenchmarkIntersect(t *testing.B) { - var a, b, c, d []uint32 + var a, b, c, d []uint64 for i := 0; i < 10000000; i += 2 { - a = append(a, uint32(i)) + a = append(a, uint64(i)) } for i := 5000000; i < 5000100; i += 4 { - b = append(b, uint32(i)) + b = append(b, uint64(i)) } for i := 5090000; i < 5090600; i += 4 { - b = append(b, uint32(i)) + b = append(b, uint64(i)) } for i := 4990000; i < 5100000; i++ { - c = append(c, uint32(i)) + c = append(c, uint64(i)) } for i := 4000000; i < 6000000; i++ { - d = append(d, uint32(i)) + d = append(d, uint64(i)) } i1 := newListPostings(a) @@ -152,14 +152,14 @@ func BenchmarkIntersect(t *testing.B) { func TestMultiMerge(t *testing.T) { var cases = []struct { - a, b, c []uint32 - res []uint32 + a, b, c []uint64 + res []uint64 }{ { - a: []uint32{1, 2, 3, 4, 5, 6, 1000, 1001}, - b: []uint32{2, 4, 5, 6, 7, 8, 999, 1001}, - c: []uint32{1, 2, 5, 6, 7, 8, 1001, 1200}, - res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200}, + a: []uint64{1, 2, 3, 4, 5, 6, 1000, 1001}, + b: []uint64{2, 4, 5, 6, 7, 8, 999, 1001}, + c: []uint64{1, 2, 5, 6, 7, 8, 1001, 1200}, + res: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200}, }, } @@ -176,23 +176,23 @@ func TestMultiMerge(t *testing.T) { func TestMergedPostings(t *testing.T) { var cases = []struct { - a, b []uint32 - res []uint32 + a, b []uint64 + res []uint64 }{ { - a: []uint32{1, 2, 3, 4, 5}, - b: []uint32{6, 7, 8, 9, 10}, - res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{6, 7, 8, 9, 10}, + res: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, }, { - a: []uint32{1, 2, 3, 4, 5}, - b: []uint32{4, 5, 6, 7, 8}, - res: []uint32{1, 2, 3, 4, 5, 6, 7, 8}, + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{4, 5, 6, 7, 8}, + res: []uint64{1, 2, 3, 4, 5, 6, 7, 8}, }, { - a: []uint32{1, 2, 3, 4, 9, 10}, - b: []uint32{1, 4, 5, 6, 7, 8, 10, 11}, - res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + a: []uint64{1, 2, 3, 4, 9, 10}, + b: []uint64{1, 4, 5, 6, 7, 8, 10, 11}, + res: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, }, } @@ -209,43 +209,43 @@ func TestMergedPostings(t *testing.T) { func TestMergedPostingsSeek(t *testing.T) { var cases = []struct { - a, b []uint32 + a, b []uint64 - seek uint32 + seek uint64 success bool - res []uint32 + res []uint64 }{ { - a: []uint32{2, 3, 4, 5}, - b: []uint32{6, 7, 8, 9, 10}, + a: []uint64{2, 3, 4, 5}, + b: []uint64{6, 7, 8, 9, 10}, seek: 1, success: true, - res: []uint32{2, 3, 4, 5, 6, 7, 8, 9, 10}, + res: []uint64{2, 3, 4, 5, 6, 7, 8, 9, 10}, }, { - a: []uint32{1, 2, 3, 4, 5}, - b: []uint32{6, 7, 8, 9, 10}, + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{6, 7, 8, 9, 10}, seek: 2, success: true, - res: []uint32{2, 3, 4, 5, 6, 7, 8, 9, 10}, + res: []uint64{2, 3, 4, 5, 6, 7, 8, 9, 10}, }, { - a: []uint32{1, 2, 3, 4, 5}, - b: []uint32{4, 5, 6, 7, 8}, + a: []uint64{1, 2, 3, 4, 5}, + b: []uint64{4, 5, 6, 7, 8}, seek: 9, success: false, res: nil, }, { - a: []uint32{1, 2, 3, 4, 9, 10}, - b: []uint32{1, 4, 5, 6, 7, 8, 10, 11}, + a: []uint64{1, 2, 3, 4, 9, 10}, + b: []uint64{1, 4, 5, 6, 7, 8, 10, 11}, seek: 10, success: true, - res: []uint32{10, 11}, + res: []uint64{10, 11}, }, } @@ -263,7 +263,7 @@ func TestMergedPostingsSeek(t *testing.T) { lst, err := expandPostings(p) require.NoError(t, err) - lst = append([]uint32{start}, lst...) + lst = append([]uint64{start}, lst...) require.Equal(t, c.res, lst) } } @@ -290,7 +290,7 @@ func TestBigEndian(t *testing.T) { bep := newBigEndianPostings(beLst) for i := 0; i < num; i++ { require.True(t, bep.Next()) - require.Equal(t, ls[i], bep.At()) + require.Equal(t, uint64(ls[i]), bep.At()) } require.False(t, bep.Next()) @@ -338,8 +338,8 @@ func TestBigEndian(t *testing.T) { bep := newBigEndianPostings(beLst) for _, v := range table { - require.Equal(t, v.found, bep.Seek(v.seek)) - require.Equal(t, v.val, bep.At()) + require.Equal(t, v.found, bep.Seek(uint64(v.seek))) + require.Equal(t, uint64(v.val), bep.At()) require.Nil(t, bep.Err()) } }) @@ -348,16 +348,16 @@ func TestBigEndian(t *testing.T) { func TestIntersectWithMerge(t *testing.T) { // One of the reproduceable cases for: // https://github.com/prometheus/prometheus/issues/2616 - a := newListPostings([]uint32{21, 22, 23, 24, 25, 30}) + a := newListPostings([]uint64{21, 22, 23, 24, 25, 30}) b := newMergedPostings( - newListPostings([]uint32{10, 20, 30}), - newListPostings([]uint32{15, 26, 30}), + newListPostings([]uint64{10, 20, 30}), + newListPostings([]uint64{15, 26, 30}), ) p := Intersect(a, b) res, err := expandPostings(p) require.NoError(t, err) - require.Equal(t, []uint32{30}, res) + require.Equal(t, []uint64{30}, res) } diff --git a/querier_test.go b/querier_test.go index 12ff22013..3f243ad69 100644 --- a/querier_test.go +++ b/querier_test.go @@ -228,7 +228,7 @@ func createIdxChkReaders(tc []struct { return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0 }) - postings := &memPostings{m: make(map[term][]uint32, 512)} + postings := &memPostings{m: make(map[term][]uint64, 512)} chkReader := mockChunkReader(make(map[uint64]chunks.Chunk)) lblIdx := make(map[string]stringset) mi := newMockIndex() @@ -255,11 +255,11 @@ func createIdxChkReaders(tc []struct { } ls := labels.FromMap(s.lset) - mi.AddSeries(uint32(i), ls, metas...) + mi.AddSeries(uint64(i), ls, metas...) - postings.add(uint32(i), term{}) + postings.add(uint64(i), term{}) for _, l := range ls { - postings.add(uint32(i), term{l.Name, l.Value}) + postings.add(uint64(i), term{l.Name, l.Value}) vs, present := lblIdx[l.Name] if !present { @@ -555,7 +555,7 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, tombstones: newTombstoneReader( - map[uint32]Intervals{ + map[uint64]Intervals{ 1: Intervals{{1, 3}}, 2: Intervals{{1, 3}, {6, 10}}, 3: Intervals{{6, 10}}, @@ -663,13 +663,13 @@ func TestBaseChunkSeries(t *testing.T) { lset labels.Labels chunks []ChunkMeta - ref uint32 + ref uint64 } cases := []struct { series []refdSeries // Postings should be in the sorted order of the the series - postings []uint32 + postings []uint64 expIdxs []int }{ @@ -703,7 +703,7 @@ func TestBaseChunkSeries(t *testing.T) { ref: 108, }, }, - postings: []uint32{12, 10, 108}, + postings: []uint64{12, 10, 108}, expIdxs: []int{0, 1, 3}, }, @@ -722,7 +722,7 @@ func TestBaseChunkSeries(t *testing.T) { ref: 1, }, }, - postings: []uint32{}, + postings: []uint64{}, expIdxs: []int{}, }, diff --git a/tombstones.go b/tombstones.go index b79fab4f5..7b24407b5 100644 --- a/tombstones.go +++ b/tombstones.go @@ -33,6 +33,11 @@ const ( tombstoneFormatV1 = 1 ) +// TombstoneReader is the iterator over tombstones. +type TombstoneReader interface { + Get(ref uint64) Intervals +} + func writeTombstoneFile(dir string, tr tombstoneReader) error { path := filepath.Join(dir, tombstoneFilename) tmp := path + ".tmp" @@ -59,7 +64,7 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { for k, v := range tr { for _, itv := range v { buf.reset() - buf.putUvarint32(k) + buf.putUvarint64(k) buf.putVarint64(itv.Mint) buf.putVarint64(itv.Maxt) @@ -81,15 +86,10 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { // Stone holds the information on the posting and time-range // that is deleted. type Stone struct { - ref uint32 + ref uint64 intervals Intervals } -// TombstoneReader is the iterator over tombstones. -type TombstoneReader interface { - Get(ref uint32) Intervals -} - func readTombstones(dir string) (tombstoneReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if err != nil { @@ -123,7 +123,7 @@ func readTombstones(dir string) (tombstoneReader, error) { stonesMap := newEmptyTombstoneReader() for d.len() > 0 { - k := d.uvarint32() + k := d.uvarint64() mint := d.varint64() maxt := d.varint64() if d.err() != nil { @@ -136,21 +136,21 @@ func readTombstones(dir string) (tombstoneReader, error) { return newTombstoneReader(stonesMap), nil } -type tombstoneReader map[uint32]Intervals +type tombstoneReader map[uint64]Intervals -func newTombstoneReader(ts map[uint32]Intervals) tombstoneReader { +func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader { return tombstoneReader(ts) } func newEmptyTombstoneReader() tombstoneReader { - return tombstoneReader(make(map[uint32]Intervals)) + return tombstoneReader(make(map[uint64]Intervals)) } -func (t tombstoneReader) Get(ref uint32) Intervals { +func (t tombstoneReader) Get(ref uint64) Intervals { return t[ref] } -func (t tombstoneReader) add(ref uint32, itv Interval) { +func (t tombstoneReader) add(ref uint64, itv Interval) { t[ref] = t[ref].add(itv) } diff --git a/tombstones_test.go b/tombstones_test.go index fa9c2bb0d..9265b76b3 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -27,12 +27,12 @@ func TestWriteAndReadbackTombStones(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - ref := uint32(0) + ref := uint64(0) - stones := make(map[uint32]Intervals) + stones := make(map[uint64]Intervals) // Generate the tombstones. for i := 0; i < 100; i++ { - ref += uint32(rand.Int31n(10)) + 1 + ref += uint64(rand.Int31n(10)) + 1 numRanges := rand.Intn(5) + 1 dranges := make(Intervals, 0, numRanges) mint := rand.Int63n(time.Now().UnixNano()) diff --git a/wal.go b/wal.go index 3714c8800..2af7d542a 100644 --- a/wal.go +++ b/wal.go @@ -238,11 +238,11 @@ WRLoop: activeSeries := make([]RefSeries, 0, len(series)) for _, s := range series { - if !p.Seek(uint32(s.Ref)) { + if !p.Seek(s.Ref) { break WRLoop } - if p.At() == uint32(s.Ref) { + if p.At() == s.Ref { activeSeries = append(activeSeries, s) } } @@ -596,11 +596,11 @@ func encodeSeries(buf []byte, series []RefSeries) []byte { buf = append(buf, b[:8]...) for _, s := range series { - n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) - buf = append(buf, b[:n]...) + binary.BigEndian.PutUint64(b, s.Ref) + buf = append(buf, b[:8]...) lset := s.Labels - n = binary.PutUvarint(b, uint64(len(lset))) + n := binary.PutUvarint(b, uint64(len(lset))) buf = append(buf, b[:n]...) for _, l := range lset { @@ -662,7 +662,7 @@ func (w *SegmentWAL) encodeDeletes(stones []Stone) error { for _, s := range stones { for _, itv := range s.intervals { eb.reset() - eb.putUvarint32(s.ref) + eb.putUvarint64(s.ref) eb.putVarint64(itv.Mint) eb.putVarint64(itv.Maxt) buf = append(buf, eb.get()...) @@ -913,18 +913,16 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { return nil, errors.Wrap(errInvalidSize, "header length") } - baseRef := binary.BigEndian.Uint64(b) b = b[8:] for len(b) > 0 { var ser RefSeries // TODO: Check again. - dref, n := binary.Varint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "series ref delta") + if len(b) < 8 { + return nil, errors.Wrap(errInvalidSize, "series ref") } - b = b[n:] - ser.Ref = uint64(int64(baseRef) + dref) + ser.Ref = binary.BigEndian.Uint64(b) + b = b[8:] l, n := binary.Uvarint(b) if n < 1 { @@ -1002,7 +1000,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { for db.len() > 0 { var s Stone - s.ref = db.uvarint32() + s.ref = db.uvarint64() s.intervals = Intervals{{db.varint64(), db.varint64()}} if db.err() != nil { return nil, db.err() diff --git a/wal_test.go b/wal_test.go index dd2f4cf55..15b64943e 100644 --- a/wal_test.go +++ b/wal_test.go @@ -219,7 +219,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { for j := 0; j < i*20; j++ { ts := rand.Int63() - stones = append(stones, Stone{rand.Uint32(), Intervals{{ts, ts + rand.Int63n(10000)}}}) + stones = append(stones, Stone{rand.Uint64(), Intervals{{ts, ts + rand.Int63n(10000)}}}) } lbls := series[i : i+stepSize] -- GitLab