提交 6d79ac11 编写于 作者: B Bartlomiej Plotka

Avoid un-symbolizing labels if not needed.

Similar optimization to what we did on Thanos: https://github.com/thanos-io/thanos/pull/3531Signed-off-by: NBartlomiej Plotka <bwplotka@gmail.com>
上级 8b64b70f
......@@ -104,6 +104,10 @@ type LabelQuerier interface {
Close() error
}
// DiscardSamplesFunc is a special token to be placed in hint's Func field.
// There is no series function, this token is used for lookups that don't need samples.
const DiscardSamplesFunc = "series"
// SelectHints specifies hints passed for data selections.
// This is used only as an option for implementation to use.
type SelectHints struct {
......@@ -111,7 +115,7 @@ type SelectHints struct {
End int64 // End time in milliseconds for this select.
Step int64 // Query step size in milliseconds.
Func string // String representation of surrounding function or aggregation.
Func string // String representation of surrounding function or aggregation or "series" if samples can be skipped.
Grouping []string // List of label names used in aggregation.
By bool // Indicate whether it is without or by.
......
......@@ -81,7 +81,11 @@ type IndexReader interface {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
//Series(id uint64, s *index.SymbolizedLabels, chks *[]chunks.Meta, skipChunks bool, selectMint, selectMaxt int64) (ok bool, err error)
// Series
// TODO(bwplotka): Add commentary.
Series() index.SeriesSelector
// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames() ([]string, error)
......@@ -437,11 +441,8 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p)
}
func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
if err := r.ir.Series(ref, lset, chks); err != nil {
return errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return nil
func (r blockIndexReader) Series() index.SeriesSelector {
return r.ir.Series()
}
func (r blockIndexReader) LabelNames() ([]string, error) {
......@@ -487,17 +488,13 @@ func (pb *Block) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
return errors.Wrap(err, "select series")
}
ir := pb.indexr
series := pb.indexr.Series()
// Choose only valid postings which have chunks in the time-range.
stones := tombstones.NewMemTombstones()
var lset labels.Labels
var chks []chunks.Meta
Outer:
for p.Next() {
err := ir.Series(p.At(), &lset, &chks)
_, chks, err := series.Select(p.At(), false)
if err != nil {
return err
}
......
......@@ -715,7 +715,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1))
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, true, meta.MinTime, meta.MaxTime-1))
syms := indexr.Symbols()
if i == 0 {
symbols = syms
......
......@@ -1672,6 +1672,12 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
return index.NewListPostings(ep)
}
// Series returns the series for the given reference.
func (h *headIndexReader) SeriesForTime(id uint64, s *index.SymbolizedLabels, chks *[]chunks.Meta, skipChunks bool, selectMint, selectMaxt int64) (ok bool, err error) {
// TODO(bwplotka): Do it.
return false, errors.New("not implemented")
}
// Series returns the series for the given reference.
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
s := h.head.series.getByID(ref)
......
......@@ -29,6 +29,7 @@ import (
"unsafe"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
......@@ -1505,19 +1506,71 @@ func (r *Reader) LabelValues(name string) ([]string, error) {
return values, nil
}
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
var ErrNoChunkMatched = errors.New("series Querier: No chunk matched mint and maxt")
// SeriesSelector is selector that allows selecting series entries from index matching time selection.
// Use this reader within single block querying to reuse chunk and label buffers.
type SeriesSelector interface {
// Select returns symbolized labels and chunks (if SeriesSelector was created with skipChunk set to false).
// Label set is turned in form of Labels interface that allows to ... ?
// It returns ErrNoChunkMatched if no chunks matched time selection.
// It's caller responsibility to copy labels and chunks if needed: they are valid only until next SeriesEntry Selection.
Select(id uint64, skipChunks bool, dranges ...tombstones.Interval) (labels.Labels, []chunks.Meta, error)
}
type seriesSelector struct {
r *Reader
bufChks []chunks.Meta
bufLabels labels.Labels
bufSLabels []symbolizedLabel
}
func (r *Reader) Series() SeriesSelector {
return &seriesSelector{
r: r,
}
}
// Postings returns a postings list for b and its number of elements.
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
l := d.Get()
return n, newBigEndianPostings(l), d.Err()
}
// TODO(bwplotka): Add commentary.
func (s *seriesSelector) Select(id uint64, skipChunks bool, dranges ...tombstones.Interval) (labels.Labels, []chunks.Meta, error) {
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == FormatV2 {
if s.r.version == FormatV2 {
offset = id * 16
}
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
d := encoding.NewDecbufUvarintAt(s.r.b, int(offset), castagnoliTable)
if d.Err() != nil {
return d.Err()
return nil, nil, d.Err()
}
if err := DecodeSeriesForTime(d.Get(), &s.bufSLabels, &s.bufChks, skipChunks, dranges); err != nil {
return nil, nil, errors.Wrap(err, "read series")
}
s.bufLabels = s.bufLabels[:0]
for _, l := range s.bufSLabels {
// TODO(bwplotka): Cache it. It sometimes takes majority of query time.
ln, err := s.r.dec.LookupSymbol(l.Name)
if err != nil {
return nil, nil, errors.Wrap(err, "lookup label name")
}
lv, err := s.r.dec.LookupSymbol(l.Value)
if err != nil {
return nil, nil, errors.Wrap(err, "lookup label value")
}
s.bufLabels = append(s.bufLabels, labels.Label{Name: ln, Value: lv})
}
return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series")
return s.bufLabels, s.bufChks, nil
}
func (r *Reader) Postings(name string, values ...string) (Postings, error) {
......@@ -1675,12 +1728,78 @@ type Decoder struct {
LookupSymbol func(uint32) (string, error)
}
// Postings returns a postings list for b and its number of elements.
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
type symbolizedLabel struct {
Name, Value uint32
}
// DecodeSeriesForTime decodes a series entry from the given byte slice decoding only chunk metas that are within given min and max time.
// If skipChunks is specified DecodeSeriesForTime does not return any chunks, but only labels and only if at least single chunk is within time range.
// DecodeSeriesForTime returns false, when there are no series data for given time range.
func DecodeSeriesForTime(b []byte, lset *[]symbolizedLabel, chks *[]chunks.Meta, skipChunks bool, dranges tombstones.Intervals) error {
*lset = (*lset)[:0]
*chks = (*chks)[:0]
d := encoding.Decbuf{B: b}
n := d.Be32int()
l := d.Get()
return n, newBigEndianPostings(l), d.Err()
// Read labels without looking up symbols.
k := d.Uvarint()
for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())
*lset = append(*lset, symbolizedLabel{Name: lno, Value: lvo})
}
// Read the chunks meta data.
k = d.Uvarint()
if k == 0 {
// Series without chunks.
if err := d.Err(); err != nil {
return d.Err()
}
return ErrNoChunkMatched
}
// First t0 is absolute, rest is just diff so different type is used (Uvarint64).
mint := d.Varint64()
maxt := int64(d.Uvarint64()) + mint
// Similar for first ref.
ref := int64(d.Uvarint64())
mintScope := dranges[0].Maxt
maxtScope := dranges[len(dranges)-1].Mint
for i := 0; i < k; i++ {
if i > 0 {
mint += int64(d.Uvarint64())
maxt = int64(d.Uvarint64()) + mint
ref += d.Varint64()
}
if mint > maxtScope {
break
}
if maxt >= mintScope && !(tombstones.Interval{Mint: mint, Maxt: maxt}).IsSubrange(dranges) {
// Found a full or partial chunk.
if skipChunks {
// We are not interested in chunks and we know there is at least one, that's enough to return series.
return ErrNoChunkMatched
}
*chks = append(*chks, chunks.Meta{
Ref: uint64(ref),
MinTime: mint,
MaxTime: maxt,
})
}
mint = maxt
}
if err := d.Err(); err != nil {
return d.Err()
}
if len(*chks) == 0 {
return ErrNoChunkMatched
}
return nil
}
// Series decodes a series entry from the given byte slice into lset and chks.
......
......@@ -121,8 +121,6 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
}
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
mint := q.mint
maxt := q.maxt
p, err := PostingsForMatchers(q.index, ms...)
if err != nil {
return storage.ErrSeriesSet(err)
......@@ -131,16 +129,15 @@ func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ..
p = q.index.SortedPostings(p)
}
mint := q.mint
maxt := q.maxt
discardSamples := false
if hints != nil {
mint = hints.Start
maxt = hints.End
if hints.Func == "series" {
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt)
}
discardSamples = hints.Func == storage.DiscardSamplesFunc
}
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt)
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, discardSamples, mint, maxt)
}
// blockChunkQuerier provides chunk querying access to a single block database.
......@@ -158,12 +155,6 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier
}
func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
mint := q.mint
maxt := q.maxt
if hints != nil {
mint = hints.Start
maxt = hints.End
}
p, err := PostingsForMatchers(q.index, ms...)
if err != nil {
return storage.ErrChunkSeriesSet(err)
......@@ -171,7 +162,16 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
if sortSeries {
p = q.index.SortedPostings(p)
}
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt)
mint := q.mint
maxt := q.maxt
discardSamples := false
if hints != nil {
mint = hints.Start
maxt = hints.End
discardSamples = hints.Func == storage.DiscardSamplesFunc
}
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, discardSamples, mint, maxt)
}
func findSetMatches(pattern string) []string {
......@@ -373,39 +373,62 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
type blockBaseSeriesSet struct {
p index.Postings
index IndexReader
p index.Postings
index IndexReader
series index.SeriesSelector
chunks ChunkReader
tombstones tombstones.Reader
mint, maxt int64
skipChunks bool
currIterFn func() *populateWithDelGenericSeriesIterator
currLabels labels.Labels
bufChks []chunks.Meta
bufLbls labels.Labels
err error
bufIntervals tombstones.Intervals
err error
}
func (b *blockBaseSeriesSet) Next() bool {
edgesIntervals := tombstones.Intervals{
tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1},
tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64},
}
for b.p.Next() {
if err := b.index.Series(b.p.At(), &b.bufLbls, &b.bufChks); err != nil {
intervals, err := b.tombstones.Get(b.p.At())
if err != nil {
b.err = errors.Wrap(err, "get tombstones")
return false
}
b.bufIntervals = b.bufIntervals[:0]
if len(intervals) > 0 {
b.bufIntervals = append(b.bufIntervals, intervals...)
}
b.bufIntervals = b.bufIntervals.Add(edgesIntervals[0]).Add(edgesIntervals[1])
lset, bufChks, err := b.series.Select(b.p.At(), b.skipChunks, b.bufIntervals...)
if err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == storage.ErrNotFound {
continue
}
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == index.ErrNoChunkMatched {
continue
}
b.err = errors.Wrapf(err, "get series %d", b.p.At())
return false
}
// Copy labels as they can be used across iterations.
b.currLabels = make(labels.Labels, len(lset))
copy(b.currLabels, lset)
if len(b.bufChks) == 0 {
continue
}
intervals, err := b.tombstones.Get(b.p.At())
if err != nil {
b.err = errors.Wrap(err, "get tombstones")
return false
if b.skipChunks {
b.currIterFn = func() *populateWithDelGenericSeriesIterator {
panic("discard samples hint was passed, no sample iteration was expected")
}
return true
}
// NOTE:
......@@ -413,47 +436,9 @@ func (b *blockBaseSeriesSet) Next() bool {
// * chunks are both closed: [chk.MinTime, chk.MaxTime].
// * requested time ranges are closed: [req.Start, req.End].
var trimFront, trimBack bool
// Copy chunks as iteratables are reusable.
chks := make([]chunks.Meta, 0, len(b.bufChks))
// Prefilter chunks and pick those which are not entirely deleted or totally outside of the requested range.
for _, chk := range b.bufChks {
if chk.MaxTime < b.mint {
continue
}
if chk.MinTime > b.maxt {
continue
}
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(intervals)) {
chks = append(chks, chk)
}
// If still not entirely deleted, check if trim is needed based on requested time range.
if chk.MinTime < b.mint {
trimFront = true
}
if chk.MaxTime > b.maxt {
trimBack = true
}
}
if len(chks) == 0 {
continue
}
if trimFront {
intervals = intervals.Add(tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1})
}
if trimBack {
intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64})
}
b.currLabels = make(labels.Labels, len(b.bufLbls))
copy(b.currLabels, b.bufLbls)
chks := make([]chunks.Meta, len(bufChks))
copy(chks, bufChks)
b.currIterFn = func() *populateWithDelGenericSeriesIterator {
return newPopulateWithDelGenericSeriesIterator(b.chunks, chks, intervals)
}
......@@ -666,7 +651,7 @@ type blockSeriesSet struct {
blockBaseSeriesSet
}
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet {
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, skipChunks bool, mint, maxt int64) storage.SeriesSet {
return &blockSeriesSet{
blockBaseSeriesSet{
index: i,
......@@ -675,7 +660,7 @@ func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p inde
p: p,
mint: mint,
maxt: maxt,
bufLbls: make(labels.Labels, 0, 10),
skipChunks: skipChunks,
},
}
}
......@@ -698,7 +683,7 @@ type blockChunkSeriesSet struct {
blockBaseSeriesSet
}
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet {
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, skipChunks bool, mint, maxt int64) storage.ChunkSeriesSet {
return &blockChunkSeriesSet{
blockBaseSeriesSet{
index: i,
......@@ -707,7 +692,7 @@ func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p
p: p,
mint: mint,
maxt: maxt,
bufLbls: make(labels.Labels, 0, 10),
skipChunks: skipChunks,
},
}
}
......@@ -830,17 +815,3 @@ Outer:
}
func (it *DeletedIterator) Err() error { return it.Iter.Err() }
type nopChunkReader struct {
emptyChunk chunkenc.Chunk
}
func newNopChunkReader() ChunkReader {
return nopChunkReader{
emptyChunk: chunkenc.NewXORChunk(),
}
}
func (cr nopChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { return cr.emptyChunk, nil }
func (cr nopChunkReader) Close() error { return nil }
......@@ -1182,15 +1182,8 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
return index.NewListPostings(ep)
}
func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
s, ok := m.series[ref]
if !ok {
return storage.ErrNotFound
}
*lset = append((*lset)[:0], s.l...)
*chks = append((*chks)[:0], s.chunks...)
return nil
func (m mockIndex) Series() index.SeriesSelector {
return nil // TODO
}
func (m mockIndex) LabelNames() ([]string, error) {
......@@ -1982,7 +1975,7 @@ func (m mockMatcherIndex) SortedPostings(p index.Postings) index.Postings {
return index.EmptyPostings()
}
func (m mockMatcherIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
func (m mockMatcherIndex) Series() index.SeriesSelector {
return nil
}
......
......@@ -292,10 +292,12 @@ type Interval struct {
Mint, Maxt int64
}
// InBounds returns true if t is within interval.
func (tr Interval) InBounds(t int64) bool {
return t >= tr.Mint && t <= tr.Maxt
}
// IsSubrange returns true if at least one interval covers fully the tr interval.
func (tr Interval) IsSubrange(dranges Intervals) bool {
for _, r := range dranges {
if r.InBounds(tr.Mint) && r.InBounds(tr.Maxt) {
......
......@@ -606,7 +606,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
hints := &storage.SelectHints{
Start: timestamp.FromTime(start),
End: timestamp.FromTime(end),
Func: "series", // There is no series function, this token is used for lookups that don't need samples.
Func: storage.DiscardSamplesFunc,
}
var sets []storage.SeriesSet
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册