diff --git a/head.go b/head.go index 07724b965515c3a54779fa79f2abcfd010bbafdc..1b8af29f0cb4b6a5446a3ecede46cc0ae86f6da5 100644 --- a/head.go +++ b/head.go @@ -409,6 +409,7 @@ func (h *HeadBlock) updateMapping() { h.mtx.RLock() if h.mapper.sortable != nil && h.mapper.Len() == len(h.descs) { + h.mtx.RUnlock() return } diff --git a/wal.go b/wal.go index fc08d7ce9372e13764bc764841da35b349366319..b22906e93ebf3e31520bf24412a847762e911901 100644 --- a/wal.go +++ b/wal.go @@ -174,7 +174,7 @@ const ( // walPageBytes is the alignment for flushing records to the backing Writer. // It should be a multiple of the minimum sector size so that WAL can safely // distinguish between torn writes and ordinary data corruption. - walPageBytes = 32 * minSectorSize + walPageBytes = 16 * minSectorSize ) func newWALEncoder(f *os.File) (*walEncoder, error) { diff --git a/writer.go b/writer.go index f91ca84b35be2ea0105963742d6d2ee86854e11b..c246bad0d70448bc2bc18770c3b61aeade0e0ac8 100644 --- a/writer.go +++ b/writer.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/bradfitz/slice" + "github.com/coreos/etcd/pkg/ioutil" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/pkg/errors" @@ -22,6 +23,8 @@ const ( MagicIndex = 0xBAAAD700 ) +const compactionPageBytes = minSectorSize * 64 + // SeriesWriter serializes a time block of chunked series data. type SeriesWriter interface { // WriteSeries writes the time series data chunks for a single series. @@ -40,16 +43,18 @@ type SeriesWriter interface { // seriesWriter implements the SeriesWriter interface for the standard // serialization format. type seriesWriter struct { - w io.Writer - n int64 - c int + ow io.Writer + w *ioutil.PageWriter + n int64 + c int index IndexWriter } func newSeriesWriter(w io.Writer, index IndexWriter) *seriesWriter { return &seriesWriter{ - w: w, + ow: w, + w: ioutil.NewPageWriter(w, compactionPageBytes, 0), n: 0, index: index, } @@ -128,7 +133,7 @@ func (w *seriesWriter) Size() int64 { } func (w *seriesWriter) Close() error { - return nil + return w.w.Flush() } // ChunkMeta holds information about a chunk of data. @@ -178,8 +183,9 @@ type indexWriterSeries struct { // indexWriter implements the IndexWriter interface for the standard // serialization format. type indexWriter struct { - w io.Writer - n int64 + ow io.Writer + w *ioutil.PageWriter + n int64 series map[uint32]*indexWriterSeries @@ -190,7 +196,8 @@ type indexWriter struct { func newIndexWriter(w io.Writer) *indexWriter { return &indexWriter{ - w: w, + w: ioutil.NewPageWriter(w, compactionPageBytes, 0), + ow: w, n: 0, symbols: make(map[string]uint32, 4096), series: make(map[uint32]*indexWriterSeries, 4096), @@ -489,5 +496,8 @@ func (w *indexWriter) finalize() error { } func (w *indexWriter) Close() error { - return w.finalize() + if err := w.finalize(); err != nil { + return err + } + return w.w.Flush() }