提交 3158b03e 编写于 作者: G Goutham Veeramachaneni 提交者: Goutham Veeramachaneni
上级 8b7b1971
......@@ -289,7 +289,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return ErrClosing
}
p, absent, err := PostingsForMatchers(pb.indexr, ms...)
p, err := PostingsForMatchers(pb.indexr, ms...)
if err != nil {
return errors.Wrap(err, "select series")
}
......@@ -309,12 +309,6 @@ Outer:
return err
}
for _, abs := range absent {
if lset.Get(abs) != "" {
continue Outer
}
}
for _, chk := range chks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
// Delete only until the current vlaues and not beyond.
......
......@@ -807,3 +807,89 @@ func TestDB_Retention(t *testing.T) {
testutil.Equals(t, 1, len(db.blocks))
testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
}
func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)
testutil.Ok(t, err)
defer db.Close()
labelpairs := []labels.Labels{
labels.FromStrings("a", "abcd", "b", "abcde"),
labels.FromStrings("labelname", "labelvalue"),
}
app := db.Appender()
for _, lbls := range labelpairs {
_, err = app.Add(lbls, 0, 1)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
cases := []struct {
selector labels.Selector
series []labels.Labels
}{{
selector: labels.Selector{
labels.Not(labels.NewEqualMatcher("lname", "lvalue")),
},
series: labelpairs,
}, {
selector: labels.Selector{
labels.NewEqualMatcher("a", "abcd"),
labels.Not(labels.NewEqualMatcher("b", "abcde")),
},
series: []labels.Labels{},
}, {
selector: labels.Selector{
labels.NewEqualMatcher("a", "abcd"),
labels.Not(labels.NewEqualMatcher("b", "abc")),
},
series: []labels.Labels{labelpairs[0]},
}, {
selector: labels.Selector{
labels.Not(labels.NewMustRegexpMatcher("a", "abd.*")),
},
series: labelpairs,
}, {
selector: labels.Selector{
labels.Not(labels.NewMustRegexpMatcher("a", "abc.*")),
},
series: labelpairs[1:],
}, {
selector: labels.Selector{
labels.Not(labels.NewMustRegexpMatcher("c", "abd.*")),
},
series: labelpairs,
}, {
selector: labels.Selector{
labels.Not(labels.NewMustRegexpMatcher("labelname", "labelvalue")),
},
series: labelpairs[:1],
}}
q, err := db.Querier(0, 10)
testutil.Ok(t, err)
defer q.Close()
for _, c := range cases {
ss, err := q.Select(c.selector...)
testutil.Ok(t, err)
lres, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, c.series, lres)
}
}
func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) {
result := []labels.Labels{}
for ss.Next() {
result = append(result, ss.At().Labels())
}
return result, ss.Err()
}
......@@ -574,23 +574,16 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := h.indexRange(mint, maxt)
p, absent, err := PostingsForMatchers(ir, ms...)
p, err := PostingsForMatchers(ir, ms...)
if err != nil {
return errors.Wrap(err, "select series")
}
var stones []Stone
Outer:
for p.Next() {
series := h.series.getByID(p.At())
for _, abs := range absent {
if series.lset.Get(abs) != "" {
continue Outer
}
}
// Delete only until the current values and not beyond.
t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime())
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
......
......@@ -76,6 +76,18 @@ func NewRegexpMatcher(name, pattern string) (Matcher, error) {
return &regexpMatcher{name: name, re: re}, nil
}
// NewRegexpMatcher returns a new matcher verifying that a value matches
// the regular expression pattern. Will panic if the pattern is not a valid
// regular expression.
func NewMustRegexpMatcher(name, pattern string) Matcher {
re, err := regexp.Compile(pattern)
if err != nil {
panic(err)
}
return &regexpMatcher{name: name, re: re}
}
// notMatcher inverts the matching result for a matcher.
type notMatcher struct {
Matcher
......
......@@ -259,7 +259,7 @@ func (it *intersectPostings) Err() error {
// Merge returns a new iterator over the union of the input iterators.
func Merge(its ...Postings) Postings {
if len(its) == 0 {
return nil
return EmptyPostings()
}
if len(its) == 1 {
return its[0]
......@@ -340,6 +340,80 @@ func (it *mergedPostings) Err() error {
return it.b.Err()
}
type removedPostings struct {
full, remove Postings
cur uint64
initialized bool
fok, rok bool
}
func newRemovedPostings(full, remove Postings) *removedPostings {
return &removedPostings{
full: full,
remove: remove,
}
}
func (rp *removedPostings) At() uint64 {
return rp.cur
}
func (rp *removedPostings) Next() bool {
if !rp.initialized {
rp.fok = rp.full.Next()
rp.rok = rp.remove.Next()
rp.initialized = true
}
if !rp.fok {
return false
}
if !rp.rok {
rp.cur = rp.full.At()
rp.fok = rp.full.Next()
return true
}
fcur, rcur := rp.full.At(), rp.remove.At()
if fcur < rcur {
rp.cur = fcur
rp.fok = rp.full.Next()
return true
} else if rcur < fcur {
// Forward the remove postings to the right position.
rp.rok = rp.remove.Seek(fcur)
} else {
// Skip the current posting.
rp.fok = rp.full.Next()
}
return rp.Next()
}
func (rp *removedPostings) Seek(id uint64) bool {
if rp.cur >= id {
return true
}
rp.fok = rp.full.Seek(id)
rp.rok = rp.remove.Seek(id)
rp.initialized = true
return rp.Next()
}
func (rp *removedPostings) Err() error {
if rp.full.Err() != nil {
return rp.full.Err()
}
return rp.remove.Err()
}
// listPostings implements the Postings interface over a plain list.
type listPostings struct {
list []uint64
......
......@@ -301,6 +301,147 @@ func TestMergedPostingsSeek(t *testing.T) {
return
}
func TestRemovedPostings(t *testing.T) {
var cases = []struct {
a, b []uint64
res []uint64
}{
{
a: nil,
b: nil,
res: []uint64(nil),
},
{
a: []uint64{1, 2, 3, 4},
b: nil,
res: []uint64{1, 2, 3, 4},
},
{
a: nil,
b: []uint64{1, 2, 3, 4},
res: []uint64(nil),
},
{
a: []uint64{1, 2, 3, 4, 5},
b: []uint64{6, 7, 8, 9, 10},
res: []uint64{1, 2, 3, 4, 5},
},
{
a: []uint64{1, 2, 3, 4, 5},
b: []uint64{4, 5, 6, 7, 8},
res: []uint64{1, 2, 3},
},
{
a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint64{1, 4, 5, 6, 7, 8, 10, 11},
res: []uint64{2, 3, 9},
},
{
a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
res: []uint64(nil),
},
}
for _, c := range cases {
a := newListPostings(c.a)
b := newListPostings(c.b)
res, err := expandPostings(newRemovedPostings(a, b))
testutil.Ok(t, err)
testutil.Equals(t, c.res, res)
}
}
func TestRemovedPostingsSeek(t *testing.T) {
var cases = []struct {
a, b []uint64
seek uint64
success bool
res []uint64
}{
{
a: []uint64{2, 3, 4, 5},
b: []uint64{6, 7, 8, 9, 10},
seek: 1,
success: true,
res: []uint64{2, 3, 4, 5},
},
{
a: []uint64{1, 2, 3, 4, 5},
b: []uint64{6, 7, 8, 9, 10},
seek: 2,
success: true,
res: []uint64{2, 3, 4, 5},
},
{
a: []uint64{1, 2, 3, 4, 5},
b: []uint64{4, 5, 6, 7, 8},
seek: 9,
success: false,
res: nil,
},
{
a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint64{1, 4, 5, 6, 7, 8, 10, 11},
seek: 10,
success: false,
res: nil,
},
{
a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint64{1, 4, 5, 6, 7, 8, 11},
seek: 4,
success: true,
res: []uint64{9, 10},
},
{
a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint64{1, 4, 5, 6, 7, 8, 11},
seek: 5,
success: true,
res: []uint64{9, 10},
},
{
a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint64{1, 4, 5, 6, 7, 8, 11},
seek: 10,
success: true,
res: []uint64{10},
},
}
for _, c := range cases {
a := newListPostings(c.a)
b := newListPostings(c.b)
p := newRemovedPostings(a, b)
testutil.Equals(t, c.success, p.Seek(c.seek))
// After Seek(), At() should be called.
if c.success {
start := p.At()
lst, err := expandPostings(p)
testutil.Ok(t, err)
lst = append([]uint64{start}, lst...)
testutil.Equals(t, c.res, lst)
}
}
return
}
func TestBigEndian(t *testing.T) {
num := 1000
// mock a list as postings
......
......@@ -202,25 +202,18 @@ func (q *blockQuerier) Close() error {
// PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers. It returns a list of label names that must be manually
// checked to not exist in series the postings list points to.
func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) {
func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, error) {
var (
its []Postings
absent []string
its []Postings
)
for _, m := range ms {
// If the matcher checks absence of a label, don't select them
// but propagate the check into the series set.
if _, ok := m.(*labels.EqualMatcher); ok && m.Matches("") {
absent = append(absent, m.Name())
continue
}
it, err := postingsForMatcher(index, m)
if err != nil {
return nil, nil, err
return nil, err
}
its = append(its, it)
}
return index.SortedPostings(Intersect(its...)), absent, nil
return index.SortedPostings(Intersect(its...)), nil
}
// tuplesByPrefix uses binary search to find prefix matches within ts.
......@@ -255,6 +248,13 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error)
}
func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
// If the matcher selects an empty value, it selects all the series which dont
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
if m.Matches("") {
return postingsForUnsetLabelMatcher(index, m)
}
// Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok {
it, err := index.Postings(em.Name(), em.Value())
......@@ -305,6 +305,43 @@ func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
return Merge(rit...), nil
}
func postingsForUnsetLabelMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
tpls, err := index.LabelValues(m.Name())
if err != nil {
return nil, err
}
var res []string
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return nil, err
}
if !m.Matches(vals[0]) {
res = append(res, vals[0])
}
}
var rit []Postings
for _, v := range res {
it, err := index.Postings(m.Name(), v)
if err != nil {
return nil, err
}
rit = append(rit, it)
}
mrit := Merge(rit...)
allPostings, err := index.Postings(allPostingsKey.Name, allPostingsKey.Value)
if err != nil {
return nil, err
}
return newRemovedPostings(allPostings, mrit), nil
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
......@@ -417,6 +454,8 @@ func (s *mergedSeriesSet) Next() bool {
return true
}
// ChunkSeriesSet exposes the chunks and intervals of a series instead of the
// actual series itself.
type ChunkSeriesSet interface {
Next() bool
At() (labels.Labels, []ChunkMeta, Intervals)
......@@ -429,7 +468,6 @@ type baseChunkSeries struct {
p Postings
index IndexReader
tombstones TombstoneReader
absent []string // labels that must be unset in results.
lset labels.Labels
chks []ChunkMeta
......@@ -443,7 +481,7 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher)
if tr == nil {
tr = EmptyTombstoneReader()
}
p, absent, err := PostingsForMatchers(ir, ms...)
p, err := PostingsForMatchers(ir, ms...)
if err != nil {
return nil, err
}
......@@ -451,7 +489,6 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher)
p: p,
index: ir,
tombstones: tr,
absent: absent,
}, nil
}
......@@ -467,7 +504,7 @@ func (s *baseChunkSeries) Next() bool {
chunks []ChunkMeta
err error
)
Outer:
for s.p.Next() {
ref := s.p.At()
if err := s.index.Series(ref, &lset, &chunks); err != nil {
......@@ -479,13 +516,6 @@ Outer:
return false
}
// If a series contains a label that must be absent, it is skipped as well.
for _, abs := range s.absent {
if lset.Get(abs) != "" {
continue Outer
}
}
s.lset = lset
s.chks = chunks
s.intervals, err = s.tombstones.Get(s.p.At())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册