diff --git a/chunks.go b/chunks.go index aa44b259e687af7a64a4bf88fb7409e33bce0acb..5063a20e7a2a858c88b8d5db9a7b78edd8b16e39 100644 --- a/chunks.go +++ b/chunks.go @@ -17,8 +17,6 @@ import ( "bufio" "encoding/binary" "fmt" - "hash" - "hash/crc32" "io" "os" @@ -63,7 +61,6 @@ type chunkWriter struct { files []*os.File wbuf *bufio.Writer n int64 - crc32 hash.Hash segmentSize int64 } @@ -85,7 +82,6 @@ func newChunkWriter(dir string) (*chunkWriter, error) { cw := &chunkWriter{ dirFile: dirFile, n: 0, - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), segmentSize: defaultChunkSegmentSize, } return cw, nil @@ -165,8 +161,8 @@ func (w *chunkWriter) cut() error { return nil } -func (w *chunkWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) +func (w *chunkWriter) write(b []byte) error { + n, err := w.wbuf.Write(b) w.n += int64(n) return err } @@ -187,14 +183,10 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { } } - // Write chunks sequentially and set the reference field in the ChunkMeta. - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.wbuf) - b := make([]byte, binary.MaxVarintLen32) n := binary.PutUvarint(b, uint64(len(chks))) - if err := w.write(wr, b[:n]); err != nil { + if err := w.write(b[:n]); err != nil { return err } seq := uint64(w.seq()) << 32 @@ -204,21 +196,17 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) - if err := w.write(wr, b[:n]); err != nil { + if err := w.write(b[:n]); err != nil { return err } - if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { + if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil { return err } - if err := w.write(wr, chk.Chunk.Bytes()); err != nil { + if err := w.write(chk.Chunk.Bytes()); err != nil { return err } - chk.Chunk = nil } - if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil { - return err - } return nil } diff --git a/index.go b/index.go index 73b65da306b8cd068758a6a7bae74e22394613f9..9f3da5019ed4d7c760f443f440c6a8372393bc23 100644 --- a/index.go +++ b/index.go @@ -228,7 +228,8 @@ func (w *indexWriter) writeSeries() error { for _, s := range series { // Write label set symbol references. - s.offset = base + uint32(len(w.b)) + start := len(w.b) + s.offset = base + uint32(start) n := binary.PutUvarint(buf, uint64(len(s.labels))) w.b = append(w.b, buf[:n]...) @@ -253,6 +254,22 @@ func (w *indexWriter) writeSeries() error { n = binary.PutUvarint(buf, uint64(c.Ref)) w.b = append(w.b, buf[:n]...) } + + // Write checksum over series index entry and all its chunk data. + w.crc32.Reset() + w.crc32.Write(w.b[start:]) + + for _, c := range s.chunks { + fmt.Println(c) + if _, err := w.crc32.Write([]byte{byte(c.Chunk.Encoding())}); err != nil { + return err + } + if _, err := w.crc32.Write(c.Chunk.Bytes()); err != nil { + return err + } + } + + w.b = append(w.b, w.crc32.Sum(nil)...) } return w.section(len(w.b), flagStd, func(wr io.Writer) error { diff --git a/index_test.go b/index_test.go index 3522ca68ed7ef742c44493e62f33b3878e8d85f6..b38350dcf79fcf9c6ea29a4f8fb1e448e8810183 100644 --- a/index_test.go +++ b/index_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" "github.com/stretchr/testify/require" ) @@ -50,10 +51,14 @@ func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) return errors.Errorf("series with reference %d already added", ref) } - m.series[ref] = series{ - l: l, - chunks: chunks, + s := series{l: l} + // Actual chunk data is not stored in the index. + for _, c := range chunks { + cc := *c + cc.Chunk = nil + s.chunks = append(s.chunks, &cc) } + m.series[ref] = s return nil } @@ -241,6 +246,7 @@ func TestPersistence_index_e2e(t *testing.T) { MinTime: int64(j * 10000), MaxTime: int64((j + 1) * 10000), Ref: rand.Uint64(), + Chunk: chunks.NewXORChunk(), }) } input = append(input, &indexWriterSeries{