未验证 提交 0727318b 编写于 作者: F Frederic Branczyk 提交者: GitHub

Merge pull request #5592 from csmarchbanks/compress-wal

Compress WAL
......@@ -207,6 +207,9 @@ func main() {
a.Flag("storage.tsdb.allow-overlapping-blocks", "[EXPERIMENTAL] Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge.").
Default("false").BoolVar(&cfg.tsdb.AllowOverlappingBlocks)
a.Flag("storage.tsdb.wal-compression", "Compress the tsdb WAL.").
Default("false").BoolVar(&cfg.tsdb.WALCompression)
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)
......@@ -671,6 +674,7 @@ func main() {
"RetentionDuration", cfg.tsdb.RetentionDuration,
"WALSegmentSize", cfg.tsdb.WALSegmentSize,
"AllowOverlappingBlocks", cfg.tsdb.AllowOverlappingBlocks,
"WALCompression", cfg.tsdb.WALCompression,
)
startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
......
......@@ -70,7 +70,7 @@ require (
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/prometheus/common v0.4.1
github.com/prometheus/tsdb v0.8.0
github.com/prometheus/tsdb v0.9.1
github.com/rlmcpherson/s3gof3r v0.5.0 // indirect
github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect
github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13
......
......@@ -333,7 +333,6 @@ github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f h1:BVwpUVJ
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw=
......@@ -345,8 +344,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/prometheus/tsdb v0.8.0 h1:w1tAGxsBMLkuGrFMhqgcCeBkM5d1YI24udArs+aASuQ=
github.com/prometheus/tsdb v0.8.0/go.mod h1:fSI0j+IUQrDd7+ZtR9WKIGtoYAYAJUKcKhYLG25tN4g=
github.com/prometheus/tsdb v0.9.1 h1:IWaAmWkYlgG7/S4iw4IpAQt5Y35QaZM6/GsZ7GsjAuk=
github.com/prometheus/tsdb v0.9.1/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSgwXEyGCt4=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k=
......
......@@ -78,6 +78,7 @@ var (
},
[]string{queue},
)
liveReaderMetrics = wal.NewLiveReaderMetrics(prometheus.DefaultRegisterer)
)
func init() {
......@@ -293,7 +294,7 @@ func (w *WALWatcher) watch(segmentNum int, tail bool) error {
}
defer segment.Close()
reader := wal.NewLiveReader(w.logger, segment)
reader := wal.NewLiveReader(w.logger, liveReaderMetrics, segment)
readTicker := time.NewTicker(readPeriod)
defer readTicker.Stop()
......@@ -509,7 +510,7 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
}
defer sr.Close()
r := wal.NewLiveReader(w.logger, sr)
r := wal.NewLiveReader(w.logger, liveReaderMetrics, sr)
if err := w.readSegment(r, index, false); err != io.EOF && err != nil {
return errors.Wrap(err, "readSegment")
}
......
此差异已折叠。
......@@ -130,6 +130,9 @@ type Options struct {
// When true it disables the overlapping blocks check.
// This in-turn enables vertical compaction and vertical query merge.
AllowOverlappingBlocks bool
// When true records in the WAL will be compressed.
WALCompression bool
}
var (
......@@ -195,6 +198,7 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t
BlockRanges: rngs,
NoLockfile: opts.NoLockfile,
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
WALCompression: opts.WALCompression,
})
if err != nil {
return nil, err
......
## master / unreleased
## Master / unreleased
## 0.9.1
- [CHANGE] LiveReader metrics are now injected rather than global.
## 0.9.0
- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
- [BUGFIX] Re-calculate block size when calling `block.Delete`.
- [BUGFIX] Re-encode all head chunks at compaction that are open (being appended to) or outside the Maxt block range. This avoids writing out corrupt data. It happens when snapshotting with the head included.
- [BUGFIX] Improved handling of multiple refs for the same series in WAL reading.
- [BUGFIX] `prometheus_tsdb_compactions_failed_total` is now incremented on any compaction failure.
- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before.
- [CHANGE] Create new clean segment when starting the WAL.
- [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total`.
- [ENHANCEMENT] Improved atomicity of .tmp block replacement during compaction for usual case.
- [ENHANCEMENT] Improved postings intersection matching.
- [ENHANCEMENT] Reduced disk usage for WAL for small setups.
- [ENHANCEMENT] Optimize queries using regexp for set lookups.
## 0.8.0
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.
- [BUGFIX] Don't panic and recover nicely when running out of disk space.
- [BUGFIX] Correctly handle empty labels.
......@@ -11,9 +30,11 @@
- [FEATURE] Added `currentSegment` metric for the current WAL segment it is being written to.
## 0.7.1
- [ENHANCEMENT] Reduce memory usage in mergedPostings.Seek
## 0.7.0
- [CHANGE] tsdb now requires golang 1.12 or higher.
- [REMOVED] `chunks.NewReader` is removed as it wasn't used anywhere.
- [REMOVED] `FromData` is considered unused so was removed.
......@@ -29,12 +50,15 @@
- [ENHANCEMENT] PostListings and NotMatcher now public.
## 0.6.1
- [BUGFIX] Update `last` after appending a non-overlapping chunk in `chunks.MergeOverlappingChunks`. [#539](https://github.com/prometheus/tsdb/pull/539)
## 0.6.0
- [CHANGE] `AllowOverlappingBlock` is now `AllowOverlappingBlocks`.
## 0.5.0
- [FEATURE] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370)
- Disabled by default and can be enabled via `AllowOverlappingBlock` option.
- Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks.
......@@ -50,6 +74,7 @@
- [BUGFIX] LiveReader can get into an infinite loop on corrupt WALs.
## 0.4.0
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
......@@ -61,9 +86,11 @@
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.
## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.
## 0.3.0
- [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path.
- [CHANGE] `NewSegmentsRangeReader()` can now read over miltiple wal ranges by using the new `SegmentRange{}` struct.
- [CHANGE] `CorruptionErr{}` now also exposes the Segment `Dir` which is added when displaying any errors.
......
......@@ -69,7 +69,7 @@ else
GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH)
endif
PROMU_VERSION ?= 0.4.0
PROMU_VERSION ?= 0.5.0
PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz
GOLANGCI_LINT :=
......@@ -86,6 +86,8 @@ endif
PREFIX ?= $(shell pwd)
BIN_DIR ?= $(shell pwd)
DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD))
DOCKERFILE_PATH ?= ./Dockerfile
DOCKERBUILD_CONTEXT ?= ./
DOCKER_REPO ?= prom
DOCKER_ARCHS ?= amd64
......@@ -210,9 +212,10 @@ common-tarball: promu
common-docker: $(BUILD_DOCKER_ARCHS)
$(BUILD_DOCKER_ARCHS): common-docker-%:
docker build -t "$(DOCKER_REPO)/$(DOCKER_IMAGE_NAME)-linux-$*:$(DOCKER_IMAGE_TAG)" \
-f $(DOCKERFILE_PATH) \
--build-arg ARCH="$*" \
--build-arg OS="linux" \
.
$(DOCKERBUILD_CONTEXT)
.PHONY: common-docker-publish $(PUBLISH_DOCKER_ARCHS)
common-docker-publish: $(PUBLISH_DOCKER_ARCHS)
......@@ -247,7 +250,9 @@ proto:
ifdef GOLANGCI_LINT
$(GOLANGCI_LINT):
mkdir -p $(FIRST_GOPATH)/bin
curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(FIRST_GOPATH)/bin $(GOLANGCI_LINT_VERSION)
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/$(GOLANGCI_LINT_VERSION)/install.sh \
| sed -e '/install -d/d' \
| sh -s -- -b $(FIRST_GOPATH)/bin $(GOLANGCI_LINT_VERSION)
endif
ifdef GOVENDOR
......
......@@ -151,12 +151,6 @@ type Appendable interface {
Appender() Appender
}
// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}
// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
......@@ -183,7 +177,6 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
}
// BlockDesc describes a block by ULID and time range.
......@@ -214,24 +207,24 @@ const metaFilename = "meta.json"
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
func readMetaFile(dir string) (*BlockMeta, error) {
func readMetaFile(dir string) (*BlockMeta, int64, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil {
return nil, err
return nil, 0, err
}
var m BlockMeta
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
return nil, 0, err
}
if m.Version != 1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
return nil, 0, errors.Errorf("unexpected meta file version %d", m.Version)
}
return &m, nil
return &m, int64(len(b)), nil
}
func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {
func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) (int64, error) {
meta.Version = 1
// Make any changes to the file appear atomic.
......@@ -245,26 +238,32 @@ func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {
f, err := os.Create(tmp)
if err != nil {
return err
return 0, err
}
enc := json.NewEncoder(f)
enc.SetIndent("", "\t")
jsonMeta, err := json.MarshalIndent(meta, "", "\t")
if err != nil {
return 0, err
}
var merr tsdb_errors.MultiError
if merr.Add(enc.Encode(meta)); merr.Err() != nil {
n, err := f.Write(jsonMeta)
if err != nil {
merr.Add(err)
merr.Add(f.Close())
return merr.Err()
return 0, merr.Err()
}
// Force the kernel to persist the file on disk to avoid data loss if the host crashes.
if merr.Add(f.Sync()); merr.Err() != nil {
if err := f.Sync(); err != nil {
merr.Add(err)
merr.Add(f.Close())
return merr.Err()
return 0, merr.Err()
}
if err := f.Close(); err != nil {
return err
return 0, err
}
return fileutil.Replace(tmp, path)
return int64(n), fileutil.Replace(tmp, path)
}
// Block represents a directory of time series data covering a continuous time range.
......@@ -285,6 +284,11 @@ type Block struct {
tombstones TombstoneReader
logger log.Logger
numBytesChunks int64
numBytesIndex int64
numBytesTombstone int64
numBytesMeta int64
}
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
......@@ -302,7 +306,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
err = merr.Err()
}
}()
meta, err := readMetaFile(dir)
meta, sizeMeta, err := readMetaFile(dir)
if err != nil {
return nil, err
}
......@@ -319,43 +323,28 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
}
closers = append(closers, ir)
tr, tsr, err := readTombstones(dir)
tr, sizeTomb, err := readTombstones(dir)
if err != nil {
return nil, err
}
closers = append(closers, tr)
// TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(logger, dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}
pb = &Block{
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
tombstones: tr,
symbolTableSize: ir.SymbolTableSize(),
logger: logger,
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
tombstones: tr,
symbolTableSize: ir.SymbolTableSize(),
logger: logger,
numBytesChunks: cr.Size(),
numBytesIndex: ir.Size(),
numBytesTombstone: sizeTomb,
numBytesMeta: sizeMeta,
}
return pb, nil
}
func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}
// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
......@@ -390,7 +379,9 @@ func (pb *Block) MinTime() int64 { return pb.meta.MinTime }
func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime }
// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }
func (pb *Block) Size() int64 {
return pb.numBytesChunks + pb.numBytesIndex + pb.numBytesTombstone + pb.numBytesMeta
}
// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")
......@@ -437,7 +428,12 @@ func (pb *Block) GetSymbolTableSize() uint64 {
func (pb *Block) setCompactionFailed() error {
pb.meta.Compaction.Failed = true
return writeMetaFile(pb.logger, pb.dir, &pb.meta)
n, err := writeMetaFile(pb.logger, pb.dir, &pb.meta)
if err != nil {
return err
}
pb.numBytesMeta = n
return nil
}
type blockIndexReader struct {
......@@ -457,7 +453,10 @@ func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, erro
func (r blockIndexReader) Postings(name, value string) (index.Postings, error) {
p, err := r.ir.Postings(name, value)
return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
if err != nil {
return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return p, nil
}
func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
......@@ -465,11 +464,10 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
}
func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
return errors.Wrapf(
r.ir.Series(ref, lset, chks),
"block: %s",
r.b.Meta().ULID,
)
if err := r.ir.Series(ref, lset, chks); err != nil {
return errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return nil
}
func (r blockIndexReader) LabelIndices() ([][]string, error) {
......@@ -561,10 +559,17 @@ Outer:
pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total()
if err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil {
n, err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones)
if err != nil {
return err
}
return writeMetaFile(pb.logger, pb.dir, &pb.meta)
pb.numBytesTombstone = n
n, err = writeMetaFile(pb.logger, pb.dir, &pb.meta)
if err != nil {
return err
}
pb.numBytesMeta = n
return nil
}
// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
......
......@@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
}
cp, err := wal.New(nil, nil, cpdirtmp)
cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
}
......
......@@ -51,7 +51,9 @@ type Meta struct {
Ref uint64
Chunk chunkenc.Chunk
MinTime, MaxTime int64 // time range the data covers
// Time range the data covers.
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
MinTime, MaxTime int64
}
// writeHash writes the chunk encoding and raw data into the provided hash.
......@@ -218,7 +220,7 @@ func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
last += 1
last++
continue
}
nc := &newChks[last]
......
......@@ -84,7 +84,6 @@ type LeveledCompactor struct {
type compactorMetrics struct {
ran prometheus.Counter
populatingBlocks prometheus.Gauge
failed prometheus.Counter
overlappingBlocks prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
......@@ -103,10 +102,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Name: "prometheus_tsdb_compaction_populating_block",
Help: "Set to 1 when a block is currently being written to the disk.",
})
m.failed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.",
})
m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_vertical_compactions_total",
Help: "Total number of compactions done on overlapping blocks.",
......@@ -136,7 +131,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
r.MustRegister(
m.ran,
m.populatingBlocks,
m.failed,
m.overlappingBlocks,
m.duration,
m.chunkRange,
......@@ -184,7 +178,7 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
var dms []dirMeta
for _, dir := range dirs {
meta, err := readMetaFile(dir)
meta, _, err := readMetaFile(dir)
if err != nil {
return nil, err
}
......@@ -386,7 +380,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
start := time.Now()
for _, d := range dirs {
meta, err := readMetaFile(d)
meta, _, err := readMetaFile(d)
if err != nil {
return uid, err
}
......@@ -426,12 +420,14 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
if meta.Stats.NumSamples == 0 {
for _, b := range bs {
b.meta.Compaction.Deletable = true
if err = writeMetaFile(c.logger, b.dir, &b.meta); err != nil {
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
if err != nil {
level.Error(c.logger).Log(
"msg", "Failed to write 'Deletable' to meta file after compaction",
"ulid", b.meta.ULID,
)
}
b.numBytesMeta = n
}
uid = ulid.ULID{}
level.Info(c.logger).Log(
......@@ -541,9 +537,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err := os.RemoveAll(tmp); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
if err != nil {
c.metrics.failed.Inc()
}
c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds())
}(time.Now())
......@@ -609,12 +602,12 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return nil
}
if err = writeMetaFile(c.logger, tmp, meta); err != nil {
if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
// Create an empty tombstones file.
if err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
if _, err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}
......@@ -764,6 +757,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
for i, chk := range chks {
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block.
//
// Block time range is half-open: [meta.MinTime, meta.MaxTime) and
// chunks are closed hence the chk.MaxTime >= meta.MaxTime check.
//
// TODO think how to avoid the typecasting to verify when it is head block.
if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime {
dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})
} else
// Sanity check for disk blocks.
// chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that.
if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime {
return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d",
chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime)
......@@ -781,12 +789,21 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
var (
t int64
v float64
)
for it.Next() {
ts, v := it.At()
app.Append(ts, v)
t, v = it.At()
app.Append(t, v)
}
if err := it.Err(); err != nil {
return errors.Wrap(err, "iterate chunk while re-encoding")
}
chks[i].Chunk = newChunk
chks[i].MaxTime = t
}
}
......
......@@ -51,6 +51,7 @@ var DefaultOptions = &Options{
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
NoLockfile: false,
AllowOverlappingBlocks: false,
WALCompression: false,
}
// Options of the DB storage.
......@@ -80,6 +81,9 @@ type Options struct {
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
// This in-turn enables vertical compaction and vertical query merge.
AllowOverlappingBlocks bool
// WALCompression will turn on Snappy compression for records on the WAL.
WALCompression bool
}
// Appender allows appending a batch of data. It must be completed with a
......@@ -147,6 +151,7 @@ type dbMetrics struct {
reloads prometheus.Counter
reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter
compactionsFailed prometheus.Counter
timeRetentionCount prometheus.Counter
compactionsSkipped prometheus.Counter
startTime prometheus.GaugeFunc
......@@ -191,6 +196,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
})
m.compactionsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.",
})
m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_time_retentions_total",
Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.",
......@@ -231,6 +240,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m.reloadsFailed,
m.timeRetentionCount,
m.compactionsTriggered,
m.compactionsFailed,
m.startTime,
m.tombCleanTimer,
m.blocksBytes,
......@@ -300,7 +310,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize
}
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
if err != nil {
return nil, err
}
......@@ -322,8 +332,12 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
minValidTime = blocks[len(blocks)-1].Meta().MaxTime
}
if err := db.head.Init(minValidTime); err != nil {
return nil, errors.Wrap(err, "read WAL")
if initErr := db.head.Init(minValidTime); initErr != nil {
db.head.metrics.walCorruptionsTotal.Inc()
level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
if err := wlog.Repair(initErr); err != nil {
return nil, errors.Wrap(err, "repair corrupted WAL")
}
}
go db.run()
......@@ -411,6 +425,11 @@ func (a dbAppender) Commit() error {
func (db *DB) compact() (err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()
defer func() {
if err != nil {
db.metrics.compactionsFailed.Inc()
}
}()
// Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority.
for {
......@@ -610,7 +629,7 @@ func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err
corrupted = make(map[ulid.ULID]error)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
meta, _, err := readMetaFile(dir)
if err != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
continue
......@@ -924,8 +943,20 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
if !withHead {
return nil
}
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil)
return errors.Wrap(err, "snapshot head block")
mint := db.head.MinTime()
maxt := db.head.MaxTime()
head := &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
}
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
if _, err := db.compactor.Write(dir, head, mint, maxt+1, nil); err != nil {
return errors.Wrap(err, "snapshot head block")
}
return nil
}
// Querier returns a new querier over the data partition for the given time range.
......
......@@ -128,9 +128,19 @@ func Rename(from, to string) error {
// Replace moves a file or directory to a new location and deletes any previous data.
// It is not atomic.
func Replace(from, to string) error {
if err := os.RemoveAll(to); err != nil {
return err
// Remove destination only if it is a dir otherwise leave it to os.Rename
// as it replaces the destination file and is atomic.
{
f, err := os.Stat(to)
if !os.IsNotExist(err) {
if err == nil && f.IsDir() {
if err := os.RemoveAll(to); err != nil {
return err
}
}
}
}
if err := os.Rename(from, to); err != nil {
return err
}
......
module github.com/prometheus/tsdb
require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/cespare/xxhash v1.1.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954
github.com/go-kit/kit v0.8.0
github.com/go-logfmt/logfmt v0.3.0 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gogo/protobuf v1.1.1 // indirect
github.com/golang/protobuf v1.2.0 // indirect
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/golang/snappy v0.0.1
github.com/oklog/ulid v1.3.1
github.com/pkg/errors v0.8.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v0.9.1
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce // indirect
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d // indirect
github.com/stretchr/testify v1.2.2 // indirect
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8
github.com/prometheus/client_golang v1.0.0
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)
......@@ -6,8 +6,11 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZq
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954 h1:RMLoZVzv4GliuWafOuPuQDKSm1SJph7uCRnnS61JAn4=
......@@ -22,10 +25,20 @@ github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
......@@ -34,19 +47,37 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1 h1:K47Rk0v/fkEfwfQet2KWhscE0cJzjgCCDBG2KHZoVno=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce h1:X0jFYGnHemYDIW6jlc+fSI8f9Cg+jqCnClYP2WgZT/A=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8 h1:YoY1wS6JYVRpIfFngRf2HHo9R9dAne3xbkGOQ5rJXjU=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXavqjmgO17k/2puhcFR94=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
......@@ -14,6 +14,7 @@
package tsdb
import (
"fmt"
"math"
"runtime"
"sort"
......@@ -140,8 +141,9 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Help: "Total number of chunks removed in the head",
})
m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
Name: "prometheus_tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
Objectives: map[float64]float64{},
})
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time",
......@@ -156,8 +158,9 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
return float64(h.MinTime())
})
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
Objectives: map[float64]float64{},
})
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_corruptions_total",
......@@ -312,7 +315,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) {
}
}
func (h *Head) loadWAL(r *wal.Reader) error {
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs uint64
......@@ -321,13 +324,26 @@ func (h *Head) loadWAL(r *wal.Reader) error {
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var (
wg sync.WaitGroup
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []RefSample, n)
outputs = make([]chan []RefSample, n)
wg sync.WaitGroup
multiRefLock sync.Mutex
n = runtime.GOMAXPROCS(0)
inputs = make([]chan []RefSample, n)
outputs = make([]chan []RefSample, n)
)
wg.Add(n)
defer func() {
// For CorruptionErr ensure to terminate all workers before exiting.
if _, ok := err.(*wal.CorruptionErr); ok {
for i := 0; i < n; i++ {
close(inputs[i])
for range outputs[i] {
}
}
wg.Wait()
}
}()
for i := 0; i < n; i++ {
outputs[i] = make(chan []RefSample, 300)
inputs[i] = make(chan []RefSample, 300)
......@@ -345,9 +361,12 @@ func (h *Head) loadWAL(r *wal.Reader) error {
samples []RefSample
tstones []Stone
allStones = newMemTombstones()
err error
)
defer allStones.Close()
defer func() {
if err := allStones.Close(); err != nil {
level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err)
}
}()
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
rec := r.Record()
......@@ -363,7 +382,14 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
}
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
if !created {
// There's already a different ref for this series.
multiRefLock.Lock()
multiRef[s.Ref] = series.ref
multiRefLock.Unlock()
}
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
......@@ -398,6 +424,9 @@ func (h *Head) loadWAL(r *wal.Reader) error {
shards[i] = buf[:0]
}
for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := sam.Ref % uint64(n)
shards[mod] = append(shards[mod], sam)
}
......@@ -436,9 +465,6 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
}
}
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
// Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < n; i++ {
......@@ -448,6 +474,10 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
wg.Wait()
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
if err := allStones.Iter(func(ref uint64, dranges Intervals) error {
return h.chunkRewrite(ref, dranges)
}); err != nil {
......@@ -477,37 +507,49 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil && err != ErrNotFound {
return errors.Wrap(err, "find last checkpoint")
}
multiRef := map[uint64]uint64{}
if err == nil {
sr, err := wal.NewSegmentsReader(dir)
if err != nil {
return errors.Wrap(err, "open checkpoint")
}
defer sr.Close()
defer func() {
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
}()
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wal.NewReader(sr)); err != nil {
if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil {
return errors.Wrap(err, "backfill checkpoint")
}
startFrom++
}
// Backfill segments from the last checkpoint onwards
sr, err := wal.NewSegmentsRangeReader(wal.SegmentRange{Dir: h.wal.Dir(), First: startFrom, Last: -1})
// Find the last segment.
_, last, err := h.wal.Segments()
if err != nil {
return errors.Wrap(err, "open WAL segments")
return errors.Wrap(err, "finding WAL segments")
}
err = h.loadWAL(wal.NewReader(sr))
sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows.
if err == nil {
return nil
}
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
h.metrics.walCorruptionsTotal.Inc()
if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
// Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= last; i++ {
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
if err != nil {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
}
sr := wal.NewSegmentBufReader(s)
err = h.loadWAL(wal.NewReader(sr), multiRef)
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
if err != nil {
return err
}
}
return nil
}
......@@ -553,6 +595,12 @@ func (h *Head) Truncate(mint int64) (err error) {
if err != nil {
return errors.Wrap(err, "get segment range")
}
// Start a new segment, so low ingestion volume TSDB don't have more WAL than
// needed.
err = h.wal.NextSegment()
if err != nil {
return errors.Wrap(err, "next segment")
}
last-- // Never consider last segment for checkpoint.
if last < 0 {
return nil // no segments yet.
......@@ -1250,9 +1298,15 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue
}
// Set the head chunks as open (being appended to).
maxTime := c.maxTime
if s.headChunk == c {
maxTime = math.MaxInt64
}
*chks = append(*chks, chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
MaxTime: maxTime,
Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
})
}
......
......@@ -303,68 +303,68 @@ func Intersect(its ...Postings) Postings {
if len(its) == 1 {
return its[0]
}
l := len(its) / 2
a := Intersect(its[:l]...)
b := Intersect(its[l:]...)
if a == EmptyPostings() || b == EmptyPostings() {
return EmptyPostings()
for _, p := range its {
if p == EmptyPostings() {
return EmptyPostings()
}
}
return newIntersectPostings(a, b)
return newIntersectPostings(its...)
}
type intersectPostings struct {
a, b Postings
cur uint64
arr []Postings
cur uint64
}
func newIntersectPostings(a, b Postings) *intersectPostings {
return &intersectPostings{a: a, b: b}
func newIntersectPostings(its ...Postings) *intersectPostings {
return &intersectPostings{arr: its}
}
func (it *intersectPostings) At() uint64 {
return it.cur
}
func (it *intersectPostings) doNext(id uint64) bool {
func (it *intersectPostings) doNext() bool {
Loop:
for {
if !it.b.Seek(id) {
return false
}
if vb := it.b.At(); vb != id {
if !it.a.Seek(vb) {
for _, p := range it.arr {
if !p.Seek(it.cur) {
return false
}
id = it.a.At()
if vb != id {
continue
if p.At() > it.cur {
it.cur = p.At()
continue Loop
}
}
it.cur = id
return true
}
}
func (it *intersectPostings) Next() bool {
if !it.a.Next() {
return false
for _, p := range it.arr {
if !p.Next() {
return false
}
if p.At() > it.cur {
it.cur = p.At()
}
}
return it.doNext(it.a.At())
return it.doNext()
}
func (it *intersectPostings) Seek(id uint64) bool {
if !it.a.Seek(id) {
return false
}
return it.doNext(it.a.At())
it.cur = id
return it.doNext()
}
func (it *intersectPostings) Err() error {
if it.a.Err() != nil {
return it.a.Err()
for _, p := range it.arr {
if p.Err() != nil {
return p.Err()
}
}
return it.b.Err()
return nil
}
// Merge returns a new iterator over the union of the input iterators.
......
......@@ -63,14 +63,15 @@ func NewEqualMatcher(name, value string) Matcher {
return &EqualMatcher{name: name, value: value}
}
type regexpMatcher struct {
type RegexpMatcher struct {
name string
re *regexp.Regexp
}
func (m regexpMatcher) Name() string { return m.name }
func (m regexpMatcher) Matches(v string) bool { return m.re.MatchString(v) }
func (m regexpMatcher) String() string { return fmt.Sprintf("%s=~%q", m.name, m.re.String()) }
func (m RegexpMatcher) Name() string { return m.name }
func (m RegexpMatcher) Matches(v string) bool { return m.re.MatchString(v) }
func (m RegexpMatcher) String() string { return fmt.Sprintf("%s=~%q", m.name, m.re.String()) }
func (m RegexpMatcher) Value() string { return m.re.String() }
// NewRegexpMatcher returns a new matcher verifying that a value matches
// the regular expression pattern.
......@@ -79,7 +80,7 @@ func NewRegexpMatcher(name, pattern string) (Matcher, error) {
if err != nil {
return nil, err
}
return &regexpMatcher{name: name, re: re}, nil
return &RegexpMatcher{name: name, re: re}, nil
}
// NewMustRegexpMatcher returns a new matcher verifying that a value matches
......@@ -90,7 +91,7 @@ func NewMustRegexpMatcher(name, pattern string) Matcher {
if err != nil {
panic(err)
}
return &regexpMatcher{name: name, re: re}
return &RegexpMatcher{name: name, re: re}
}
......
......@@ -17,6 +17,7 @@ import (
"fmt"
"sort"
"strings"
"unicode/utf8"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
......@@ -266,6 +267,62 @@ func (q *blockQuerier) Close() error {
return merr.Err()
}
// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped.
var regexMetaCharacterBytes [16]byte
// isRegexMetaCharacter reports whether byte b needs to be escaped.
func isRegexMetaCharacter(b byte) bool {
return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0
}
func init() {
for _, b := range []byte(`.+*?()|[]{}^$`) {
regexMetaCharacterBytes[b%16] |= 1 << (b / 16)
}
}
func findSetMatches(pattern string) []string {
// Return empty matches if the wrapper from Prometheus is missing.
if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" {
return nil
}
escaped := false
sets := []*strings.Builder{&strings.Builder{}}
for i := 4; i < len(pattern)-2; i++ {
if escaped {
switch {
case isRegexMetaCharacter(pattern[i]):
sets[len(sets)-1].WriteByte(pattern[i])
case pattern[i] == '\\':
sets[len(sets)-1].WriteByte('\\')
default:
return nil
}
escaped = false
} else {
switch {
case isRegexMetaCharacter(pattern[i]):
if pattern[i] == '|' {
sets = append(sets, &strings.Builder{})
} else {
return nil
}
case pattern[i] == '\\':
escaped = true
default:
sets[len(sets)-1].WriteByte(pattern[i])
}
}
}
matches := make([]string, 0, len(sets))
for _, s := range sets {
if s.Len() > 0 {
matches = append(matches, s.String())
}
}
return matches
}
// PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers.
func PostingsForMatchers(ix IndexReader, ms ...labels.Matcher) (index.Postings, error) {
......@@ -346,6 +403,14 @@ func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error
return ix.Postings(em.Name(), em.Value())
}
// Fast-path for set matching.
if em, ok := m.(*labels.RegexpMatcher); ok {
setMatches := findSetMatches(em.Value())
if len(setMatches) > 0 {
return postingsForSetMatcher(ix, em.Name(), setMatches)
}
}
tpls, err := ix.LabelValues(m.Name())
if err != nil {
return nil, err
......@@ -411,6 +476,18 @@ func inversePostingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings
return index.Merge(rit...), nil
}
func postingsForSetMatcher(ix IndexReader, name string, matches []string) (index.Postings, error) {
var its []index.Postings
for _, match := range matches {
if it, err := ix.Postings(name, match); err == nil {
its = append(its, it)
} else {
return nil, err
}
}
return index.Merge(its...), nil
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
......
......@@ -109,7 +109,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
}
// Reset version of meta.json to 1.
meta.Version = 1
if err := writeMetaFile(logger, d, meta); err != nil {
if _, err := writeMetaFile(logger, d, meta); err != nil {
return wrapErr(err, d)
}
}
......
......@@ -54,14 +54,15 @@ type TombstoneReader interface {
Close() error
}
func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error {
func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) {
path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp"
hash := newCRC32()
var size int
f, err := os.Create(tmp)
if err != nil {
return err
return 0, err
}
defer func() {
if f != nil {
......@@ -79,10 +80,11 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error
// Write the meta.
buf.PutBE32(MagicTombstone)
buf.PutByte(tombstoneFormatV1)
_, err = f.Write(buf.Get())
n, err := f.Write(buf.Get())
if err != nil {
return err
return 0, err
}
size += n
mw := io.MultiWriter(f, hash)
......@@ -94,32 +96,34 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error
buf.PutVarint64(iv.Mint)
buf.PutVarint64(iv.Maxt)
_, err = mw.Write(buf.Get())
n, err = mw.Write(buf.Get())
if err != nil {
return err
}
size += n
}
return nil
}); err != nil {
return fmt.Errorf("error writing tombstones: %v", err)
return 0, fmt.Errorf("error writing tombstones: %v", err)
}
_, err = f.Write(hash.Sum(nil))
n, err = f.Write(hash.Sum(nil))
if err != nil {
return err
return 0, err
}
size += n
var merr tsdb_errors.MultiError
if merr.Add(f.Sync()); merr.Err() != nil {
merr.Add(f.Close())
return merr.Err()
return 0, merr.Err()
}
if err = f.Close(); err != nil {
return err
return 0, err
}
f = nil
return fileutil.Replace(tmp, path)
return int64(size), fileutil.Replace(tmp, path)
}
// Stone holds the information on the posting and time-range
......@@ -129,41 +133,37 @@ type Stone struct {
intervals Intervals
}
func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
func readTombstones(dir string) (TombstoneReader, int64, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if os.IsNotExist(err) {
return newMemTombstones(), nil, nil
return newMemTombstones(), 0, nil
} else if err != nil {
return nil, nil, err
}
sr := &TombstoneFile{
size: int64(len(b)),
return nil, 0, err
}
if len(b) < 5 {
return nil, sr, errors.Wrap(encoding.ErrInvalidSize, "tombstones header")
return nil, 0, errors.Wrap(encoding.ErrInvalidSize, "tombstones header")
}
d := &encoding.Decbuf{B: b[:len(b)-4]} // 4 for the checksum.
if mg := d.Be32(); mg != MagicTombstone {
return nil, sr, fmt.Errorf("invalid magic number %x", mg)
return nil, 0, fmt.Errorf("invalid magic number %x", mg)
}
if flag := d.Byte(); flag != tombstoneFormatV1 {
return nil, sr, fmt.Errorf("invalid tombstone format %x", flag)
return nil, 0, fmt.Errorf("invalid tombstone format %x", flag)
}
if d.Err() != nil {
return nil, sr, d.Err()
return nil, 0, d.Err()
}
// Verify checksum.
hash := newCRC32()
if _, err := hash.Write(d.Get()); err != nil {
return nil, sr, errors.Wrap(err, "write to hash")
return nil, 0, errors.Wrap(err, "write to hash")
}
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
return nil, sr, errors.New("checksum did not match")
return nil, 0, errors.New("checksum did not match")
}
stonesMap := newMemTombstones()
......@@ -173,13 +173,13 @@ func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
mint := d.Varint64()
maxt := d.Varint64()
if d.Err() != nil {
return nil, sr, d.Err()
return nil, 0, d.Err()
}
stonesMap.addInterval(k, Interval{mint, maxt})
}
return stonesMap, sr, nil
return stonesMap, int64(len(b)), nil
}
type memTombstones struct {
......@@ -230,16 +230,6 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
}
}
// TombstoneFile holds information about the tombstone file.
type TombstoneFile struct {
size int64
}
// Size returns the tombstone file size.
func (t *TombstoneFile) Size() int64 {
return t.size
}
func (*memTombstones) Close() error {
return nil
}
......
......@@ -65,8 +65,9 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
m := &walMetrics{}
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_corruptions_total",
......@@ -1245,7 +1246,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
if err := os.RemoveAll(tmpdir); err != nil {
return errors.Wrap(err, "cleanup replacement dir")
}
repl, err := wal.New(logger, nil, tmpdir)
repl, err := wal.New(logger, nil, tmpdir, false)
if err != nil {
return errors.Wrap(err, "open new WAL")
}
......
......@@ -22,28 +22,46 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
readerCorruptionErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_reader_corruption_errors",
Help: "Errors encountered when reading the WAL.",
}, []string{"error"})
)
// liveReaderMetrics holds all metrics exposed by the LiveReader.
type liveReaderMetrics struct {
readerCorruptionErrors *prometheus.CounterVec
}
// LiveReaderMetrics instatiates, registers and returns metrics to be injected
// at LiveReader instantiation.
func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics {
m := &liveReaderMetrics{
readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_reader_corruption_errors_total",
Help: "Errors encountered when reading the WAL.",
}, []string{"error"}),
}
if reg != nil {
reg.Register(m.readerCorruptionErrors)
}
return m
}
// NewLiveReader returns a new live reader.
func NewLiveReader(logger log.Logger, r io.Reader) *LiveReader {
return &LiveReader{
logger: logger,
rdr: r,
func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader {
lr := &LiveReader{
logger: logger,
rdr: r,
metrics: metrics,
// Until we understand how they come about, make readers permissive
// to records spanning pages.
permissive: true,
}
return lr
}
// LiveReader reads WAL records from an io.Reader. It allows reading of WALs
......@@ -54,6 +72,7 @@ type LiveReader struct {
rdr io.Reader
err error
rec []byte
snappyBuf []byte
hdr [recordHeaderSize]byte
buf [pageSize]byte
readIndex int // Index in buf to start at for next read.
......@@ -68,6 +87,8 @@ type LiveReader struct {
// does. Until we track down why, set permissive to true to tolerate it.
// NB the non-ive Reader implementation allows for this.
permissive bool
metrics *liveReaderMetrics
}
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal
......@@ -166,11 +187,18 @@ func (r *LiveReader) buildRecord() (bool, error) {
return false, nil
}
rt := recType(r.hdr[0])
rt := recTypeFromHeader(r.hdr[0])
if rt == recFirst || rt == recFull {
r.rec = r.rec[:0]
r.snappyBuf = r.snappyBuf[:0]
}
compressed := r.hdr[0]&snappyMask != 0
if compressed {
r.snappyBuf = append(r.snappyBuf, temp...)
} else {
r.rec = append(r.rec, temp...)
}
r.rec = append(r.rec, temp...)
if err := validateRecord(rt, r.index); err != nil {
r.index = 0
......@@ -178,6 +206,16 @@ func (r *LiveReader) buildRecord() (bool, error) {
}
if rt == recLast || rt == recFull {
r.index = 0
if compressed && len(r.snappyBuf) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
r.rec = r.rec[:cap(r.rec)]
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
if err != nil {
return false, err
}
}
return true, nil
}
// Only increment i for non-zero records since we use it
......@@ -258,7 +296,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) {
if !r.permissive {
return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize)
}
readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
r.metrics.readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize)
}
if recordHeaderSize+length > pageSize {
......
......@@ -19,6 +19,7 @@ import (
"hash/crc32"
"io"
"github.com/golang/snappy"
"github.com/pkg/errors"
)
......@@ -27,6 +28,7 @@ type Reader struct {
rdr io.Reader
err error
rec []byte
snappyBuf []byte
buf [pageSize]byte
total int64 // Total bytes processed.
curRecTyp recType // Used for checking that the last record is not torn.
......@@ -45,7 +47,7 @@ func (r *Reader) Next() bool {
// The last WAL segment record shouldn't be torn(should be full or last).
// The last record would be torn after a crash just before
// the last record part could be persisted to disk.
if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle {
if r.curRecTyp == recFirst || r.curRecTyp == recMiddle {
r.err = errors.New("last record is torn")
}
return false
......@@ -61,6 +63,7 @@ func (r *Reader) next() (err error) {
buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0]
r.snappyBuf = r.snappyBuf[:0]
i := 0
for {
......@@ -68,7 +71,8 @@ func (r *Reader) next() (err error) {
return errors.Wrap(err, "read first header byte")
}
r.total++
r.curRecTyp = recType(hdr[0])
r.curRecTyp = recTypeFromHeader(hdr[0])
compressed := hdr[0]&snappyMask != 0
// Gobble up zero bytes.
if r.curRecTyp == recPageTerm {
......@@ -123,12 +127,25 @@ func (r *Reader) next() (err error) {
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
r.rec = append(r.rec, buf[:length]...)
if compressed {
r.snappyBuf = append(r.snappyBuf, buf[:length]...)
} else {
r.rec = append(r.rec, buf[:length]...)
}
if err := validateRecord(r.curRecTyp, i); err != nil {
return err
}
if r.curRecTyp == recLast || r.curRecTyp == recFull {
if compressed && len(r.snappyBuf) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
r.rec = r.rec[:cap(r.rec)]
r.rec, err = snappy.Decode(r.rec, r.snappyBuf)
return err
}
return nil
}
......
......@@ -29,6 +29,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/golang/snappy"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
......@@ -165,6 +166,8 @@ type WAL struct {
stopc chan chan struct{}
actorc chan func()
closed bool // To allow calling Close() more than once without blocking.
compress bool
snappyBuf []byte
fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter
......@@ -175,13 +178,13 @@ type WAL struct {
}
// New returns a new WAL over the given directory.
func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) {
return NewSize(logger, reg, dir, DefaultSegmentSize)
func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) {
return NewSize(logger, reg, dir, DefaultSegmentSize, compress)
}
// NewSize returns a new WAL over the given directory.
// New segments are created with the specified size.
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) {
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error) {
if segmentSize%pageSize != 0 {
return nil, errors.New("invalid segment size")
}
......@@ -198,10 +201,12 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
page: &page{},
actorc: make(chan func(), 100),
stopc: make(chan chan struct{}),
compress: compress,
}
w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_page_flushes_total",
......@@ -228,34 +233,35 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
}
_, j, err := w.Segments()
// Index of the Segment we want to open and write to.
writeSegmentIndex := 0
if err != nil {
return nil, errors.Wrap(err, "get segment range")
}
// Fresh dir, no segments yet.
if j == -1 {
segment, err := CreateSegment(w.dir, 0)
if err != nil {
return nil, err
}
// If some segments already exist create one with a higher index than the last segment.
if j != -1 {
writeSegmentIndex = j + 1
}
if err := w.setSegment(segment); err != nil {
return nil, err
}
} else {
segment, err := OpenWriteSegment(logger, w.dir, j)
if err != nil {
return nil, err
}
segment, err := CreateSegment(w.dir, writeSegmentIndex)
if err != nil {
return nil, err
}
if err := w.setSegment(segment); err != nil {
return nil, err
}
if err := w.setSegment(segment); err != nil {
return nil, err
}
go w.run()
return w, nil
}
// CompressionEnabled returns if compression is enabled on this WAL.
func (w *WAL) CompressionEnabled() bool {
return w.compress
}
// Dir returns the directory of the WAL.
func (w *WAL) Dir() string {
return w.dir
......@@ -363,6 +369,9 @@ func (w *WAL) Repair(origErr error) error {
}
// We expect an error here from r.Err(), so nothing to handle.
// We need to pad to the end of the last page in the repaired segment
w.flushPage(true)
// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
......@@ -372,6 +381,20 @@ func (w *WAL) Repair(origErr error) error {
if err := os.Remove(tmpfn); err != nil {
return errors.Wrap(err, "delete corrupted segment")
}
// Explicitly close the the segment we just repaired to avoid issues with Windows.
s.Close()
// We always want to start writing to a new Segment rather than an existing
// Segment, which is handled by NewSize, but earlier in Repair we're deleting
// all segments that come after the corrupted Segment. Recreate a new Segment here.
s, err = CreateSegment(w.dir, cerr.Segment+1)
if err != nil {
return err
}
if err := w.setSegment(s); err != nil {
return err
}
return nil
}
......@@ -380,6 +403,13 @@ func SegmentName(dir string, i int) string {
return filepath.Join(dir, fmt.Sprintf("%08d", i))
}
// NextSegment creates the next segment and closes the previous one.
func (w *WAL) NextSegment() error {
w.mtx.Lock()
defer w.mtx.Unlock()
return w.nextSegment()
}
// nextSegment creates the next segment and closes the previous one.
func (w *WAL) nextSegment() error {
// Only flush the current page if it actually holds data.
......@@ -455,6 +485,14 @@ func (w *WAL) flushPage(clear bool) error {
return nil
}
// First Byte of header format:
// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ]
const (
snappyMask = 1 << 3
recTypeMask = snappyMask - 1
)
type recType uint8
const (
......@@ -465,6 +503,10 @@ const (
recLast recType = 4 // Final fragment of a record.
)
func recTypeFromHeader(header byte) recType {
return recType(header & recTypeMask)
}
func (t recType) String() string {
switch t {
case recPageTerm:
......@@ -525,6 +567,19 @@ func (w *WAL) log(rec []byte, final bool) error {
}
}
compressed := false
if w.compress && len(rec) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)]
w.snappyBuf = snappy.Encode(w.snappyBuf, rec)
if len(w.snappyBuf) < len(rec) {
rec = w.snappyBuf
compressed = true
}
}
// Populate as many pages as necessary to fit the record.
// Be careful to always do one pass to ensure we write zero-length records.
for i := 0; i == 0 || len(rec) > 0; i++ {
......@@ -548,6 +603,9 @@ func (w *WAL) log(rec []byte, final bool) error {
default:
typ = recMiddle
}
if compressed {
typ |= snappyMask
}
buf[0] = byte(typ)
crc := crc32.Checksum(part, castagnoliTable)
......@@ -710,7 +768,7 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) {
segs = append(segs, s)
}
}
return newSegmentBufReader(segs...), nil
return NewSegmentBufReader(segs...), nil
}
// segmentBufReader is a buffered reader that reads in multiples of pages.
......@@ -725,7 +783,7 @@ type segmentBufReader struct {
off int // Offset of read data into current segment.
}
func newSegmentBufReader(segs ...*Segment) *segmentBufReader {
func NewSegmentBufReader(segs ...*Segment) *segmentBufReader {
return &segmentBufReader{
buf: bufio.NewReaderSize(segs[0], 16*pageSize),
segs: segs,
......
......@@ -278,7 +278,7 @@ github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg
# github.com/prometheus/procfs v0.0.2
github.com/prometheus/procfs
github.com/prometheus/procfs/internal/fs
# github.com/prometheus/tsdb v0.8.0
# github.com/prometheus/tsdb v0.9.1
github.com/prometheus/tsdb
github.com/prometheus/tsdb/fileutil
github.com/prometheus/tsdb/labels
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册