未验证 提交 fe0a1bc2 编写于 作者: Y yihao.dai 提交者: GitHub

Fix panic caused by wrong logic of getting unindexed segments (#24044)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 30415e1b
......@@ -145,25 +145,52 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ..
unIndexedIDs.Insert(s.GetID())
}
}
hasUnIndexed := true
for hasUnIndexed {
hasUnIndexed = false
// ================================================
// Segments blood relationship:
// a b
// \ /
// c d
// \ /
// e
//
// GC: a, b
// Indexed: c, d, e
// ||
// || (Index dropped and creating new index and not finished)
// \/
// UnIndexed: c, d, e
//
// Retrieve unIndexed expected result:
// unIndexed: c, d
// ================================================
isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := segmentInfos[id]; !ok || seg == nil {
return false
}
}
return true
}
retrieveUnIndexed := func() bool {
continueRetrieve := false
for id := range unIndexedIDs {
// Indexed segments are compacted to a raw segment,
// replace it with the indexed ones
if len(segmentInfos[id].GetCompactionFrom()) > 0 {
unIndexedIDs.Remove(id)
for _, segID := range segmentInfos[id].GetCompactionFrom() {
if indexed.Contain(segID) {
indexedIDs.Insert(segID)
compactionFrom := segmentInfos[id].GetCompactionFrom()
if len(compactionFrom) > 0 && isValid(compactionFrom...) {
for _, fromID := range compactionFrom {
if indexed.Contain(fromID) {
indexedIDs.Insert(fromID)
} else {
unIndexedIDs.Insert(segID)
hasUnIndexed = true
unIndexedIDs.Insert(fromID)
continueRetrieve = true
}
}
droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
unIndexedIDs.Remove(id)
droppedIDs.Remove(compactionFrom...)
}
}
return continueRetrieve
}
for retrieveUnIndexed() {
}
return &datapb.VchannelInfo{
......
......@@ -2371,6 +2371,253 @@ func TestGetQueryVChanPositions(t *testing.T) {
})
}
func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
t.Run("ab GC-ed, cde unIndexed", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Schema: schema,
})
err := svr.meta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
assert.Nil(t, err)
c := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed
}
err = svr.meta.AddSegment(NewSegmentInfo(c))
assert.Nil(t, err)
d := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
}
err = svr.meta.AddSegment(NewSegmentInfo(d))
assert.Nil(t, err)
e := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
CompactionFrom: []int64{1, 2}, // c, d
}
err = svr.meta.AddSegment(NewSegmentInfo(e))
assert.Nil(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.UnflushedSegmentIds) // expected c, d
})
t.Run("a GC-ed, bcde unIndexed", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Schema: schema,
})
err := svr.meta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
assert.Nil(t, err)
a := &datapb.SegmentInfo{
ID: 99,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
}
err = svr.meta.AddSegment(NewSegmentInfo(a))
assert.Nil(t, err)
c := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed
}
err = svr.meta.AddSegment(NewSegmentInfo(c))
assert.Nil(t, err)
d := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
}
err = svr.meta.AddSegment(NewSegmentInfo(d))
assert.Nil(t, err)
e := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 1,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
CompactionFrom: []int64{1, 2}, // c, d
}
err = svr.meta.AddSegment(NewSegmentInfo(e))
assert.Nil(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 0, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 2, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{c.GetID(), d.GetID()}, vchan.UnflushedSegmentIds) // expected c, d
})
t.Run("ab GC-ed, c unIndexed, de indexed", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Schema: schema,
})
err := svr.meta.CreateIndex(&model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
assert.Nil(t, err)
c := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
CompactionFrom: []int64{99, 100}, // a, b which have been GC-ed
}
err = svr.meta.AddSegment(NewSegmentInfo(c))
assert.Nil(t, err)
d := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
}
err = svr.meta.AddSegment(NewSegmentInfo(d))
assert.Nil(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: 2,
BuildID: 1,
IndexID: 1,
})
assert.Nil(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: 1,
State: commonpb.IndexState_Finished,
})
assert.Nil(t, err)
e := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
CompactionFrom: []int64{1, 2}, // c, d
}
err = svr.meta.AddSegment(NewSegmentInfo(e))
assert.Nil(t, err)
err = svr.meta.AddSegmentIndex(&model.SegmentIndex{
SegmentID: 3,
BuildID: 2,
IndexID: 1,
})
assert.Nil(t, err)
err = svr.meta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: 2,
State: commonpb.IndexState_Finished,
})
assert.Nil(t, err)
vchan := svr.handler.GetQueryVChanPositions(&channel{Name: "ch1", CollectionID: 0}, allPartitionID)
assert.EqualValues(t, 1, len(vchan.FlushedSegmentIds))
assert.EqualValues(t, 0, len(vchan.UnflushedSegmentIds))
assert.ElementsMatch(t, []int64{e.GetID()}, vchan.FlushedSegmentIds) // expected e
})
}
func TestShouldDropChannel(t *testing.T) {
type myRootCoord struct {
mocks.RootCoord
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册