提交 df96d97d 编写于 作者: F Fabian Reinartz

Move chunk checksum

上级 db4272b8
...@@ -17,8 +17,6 @@ import ( ...@@ -17,8 +17,6 @@ import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"hash"
"hash/crc32"
"io" "io"
"os" "os"
...@@ -63,7 +61,6 @@ type chunkWriter struct { ...@@ -63,7 +61,6 @@ type chunkWriter struct {
files []*os.File files []*os.File
wbuf *bufio.Writer wbuf *bufio.Writer
n int64 n int64
crc32 hash.Hash
segmentSize int64 segmentSize int64
} }
...@@ -85,7 +82,6 @@ func newChunkWriter(dir string) (*chunkWriter, error) { ...@@ -85,7 +82,6 @@ func newChunkWriter(dir string) (*chunkWriter, error) {
cw := &chunkWriter{ cw := &chunkWriter{
dirFile: dirFile, dirFile: dirFile,
n: 0, n: 0,
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
segmentSize: defaultChunkSegmentSize, segmentSize: defaultChunkSegmentSize,
} }
return cw, nil return cw, nil
...@@ -165,8 +161,8 @@ func (w *chunkWriter) cut() error { ...@@ -165,8 +161,8 @@ func (w *chunkWriter) cut() error {
return nil return nil
} }
func (w *chunkWriter) write(wr io.Writer, b []byte) error { func (w *chunkWriter) write(b []byte) error {
n, err := wr.Write(b) n, err := w.wbuf.Write(b)
w.n += int64(n) w.n += int64(n)
return err return err
} }
...@@ -187,14 +183,10 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { ...@@ -187,14 +183,10 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
} }
} }
// Write chunks sequentially and set the reference field in the ChunkMeta.
w.crc32.Reset()
wr := io.MultiWriter(w.crc32, w.wbuf)
b := make([]byte, binary.MaxVarintLen32) b := make([]byte, binary.MaxVarintLen32)
n := binary.PutUvarint(b, uint64(len(chks))) n := binary.PutUvarint(b, uint64(len(chks)))
if err := w.write(wr, b[:n]); err != nil { if err := w.write(b[:n]); err != nil {
return err return err
} }
seq := uint64(w.seq()) << 32 seq := uint64(w.seq()) << 32
...@@ -204,21 +196,17 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { ...@@ -204,21 +196,17 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
if err := w.write(wr, b[:n]); err != nil { if err := w.write(b[:n]); err != nil {
return err return err
} }
if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil {
return err return err
} }
if err := w.write(wr, chk.Chunk.Bytes()); err != nil { if err := w.write(chk.Chunk.Bytes()); err != nil {
return err return err
} }
chk.Chunk = nil
} }
if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil {
return err
}
return nil return nil
} }
......
...@@ -228,7 +228,8 @@ func (w *indexWriter) writeSeries() error { ...@@ -228,7 +228,8 @@ func (w *indexWriter) writeSeries() error {
for _, s := range series { for _, s := range series {
// Write label set symbol references. // Write label set symbol references.
s.offset = base + uint32(len(w.b)) start := len(w.b)
s.offset = base + uint32(start)
n := binary.PutUvarint(buf, uint64(len(s.labels))) n := binary.PutUvarint(buf, uint64(len(s.labels)))
w.b = append(w.b, buf[:n]...) w.b = append(w.b, buf[:n]...)
...@@ -253,6 +254,22 @@ func (w *indexWriter) writeSeries() error { ...@@ -253,6 +254,22 @@ func (w *indexWriter) writeSeries() error {
n = binary.PutUvarint(buf, uint64(c.Ref)) n = binary.PutUvarint(buf, uint64(c.Ref))
w.b = append(w.b, buf[:n]...) w.b = append(w.b, buf[:n]...)
} }
// Write checksum over series index entry and all its chunk data.
w.crc32.Reset()
w.crc32.Write(w.b[start:])
for _, c := range s.chunks {
fmt.Println(c)
if _, err := w.crc32.Write([]byte{byte(c.Chunk.Encoding())}); err != nil {
return err
}
if _, err := w.crc32.Write(c.Chunk.Bytes()); err != nil {
return err
}
}
w.b = append(w.b, w.crc32.Sum(nil)...)
} }
return w.section(len(w.b), flagStd, func(wr io.Writer) error { return w.section(len(w.b), flagStd, func(wr io.Writer) error {
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"testing" "testing"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -50,10 +51,14 @@ func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) ...@@ -50,10 +51,14 @@ func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta)
return errors.Errorf("series with reference %d already added", ref) return errors.Errorf("series with reference %d already added", ref)
} }
m.series[ref] = series{ s := series{l: l}
l: l, // Actual chunk data is not stored in the index.
chunks: chunks, for _, c := range chunks {
cc := *c
cc.Chunk = nil
s.chunks = append(s.chunks, &cc)
} }
m.series[ref] = s
return nil return nil
} }
...@@ -241,6 +246,7 @@ func TestPersistence_index_e2e(t *testing.T) { ...@@ -241,6 +246,7 @@ func TestPersistence_index_e2e(t *testing.T) {
MinTime: int64(j * 10000), MinTime: int64(j * 10000),
MaxTime: int64((j + 1) * 10000), MaxTime: int64((j + 1) * 10000),
Ref: rand.Uint64(), Ref: rand.Uint64(),
Chunk: chunks.NewXORChunk(),
}) })
} }
input = append(input, &indexWriterSeries{ input = append(input, &indexWriterSeries{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册