未验证 提交 72ee35a7 编写于 作者: C congqixia 提交者: GitHub

Add loading list filter logic for load segments (#19347)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 42884017
......@@ -39,6 +39,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/concurrency"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo"
)
// ReplicaInterface specifies all the methods that the Collection object needs to implement in QueryNode.
......@@ -115,6 +117,11 @@ type ReplicaInterface interface {
// printReplica prints the collections, partitions and segments in the collectionReplica
printReplica()
// addSegmentsLoadingList add segment into black list, so get sealed segments will not return them.
addSegmentsLoadingList(segmentIDs []UniqueID)
// removeSegmentsLoadingList add segment into black list, so get sealed segments will not return them.
removeSegmentsLoadingList(segmentIDs []UniqueID)
getGrowingSegments() []*Segment
getSealedSegments() []*Segment
}
......@@ -130,6 +137,9 @@ type metaReplica struct {
excludedSegments map[UniqueID][]*datapb.SegmentInfo // map[collectionID]segmentIDs
// segmentsBlackList stores segments which are still loading
segmentsBlackList typeutil.UniqueSet
cgoPool *concurrency.Pool
}
......@@ -779,6 +789,24 @@ func (replica *metaReplica) freeAll() {
replica.sealedSegments = make(map[UniqueID]*Segment)
}
func (replica *metaReplica) addSegmentsLoadingList(segmentIDs []UniqueID) {
replica.mu.Lock()
defer replica.mu.Unlock()
// add to black list only segment is not loaded before
replica.segmentsBlackList.Insert(lo.Filter(segmentIDs, func(id UniqueID, idx int) bool {
_, isSealed := replica.sealedSegments[id]
return !isSealed
})...)
}
func (replica *metaReplica) removeSegmentsLoadingList(segmentIDs []UniqueID) {
replica.mu.Lock()
defer replica.mu.Unlock()
replica.segmentsBlackList.Remove(segmentIDs...)
}
func (replica *metaReplica) getGrowingSegments() []*Segment {
replica.mu.RLock()
defer replica.mu.RUnlock()
......@@ -796,7 +824,9 @@ func (replica *metaReplica) getSealedSegments() []*Segment {
ret := make([]*Segment, 0, len(replica.sealedSegments))
for _, s := range replica.sealedSegments {
ret = append(ret, s)
if !replica.segmentsBlackList.Contain(s.segmentID) {
ret = append(ret, s)
}
}
return ret
}
......@@ -810,7 +840,10 @@ func newCollectionReplica(pool *concurrency.Pool) ReplicaInterface {
sealedSegments: make(map[UniqueID]*Segment),
excludedSegments: make(map[UniqueID][]*datapb.SegmentInfo),
cgoPool: pool,
segmentsBlackList: make(typeutil.UniqueSet),
cgoPool: pool,
}
return replica
......
......@@ -343,6 +343,71 @@ func TestMetaReplica_segment(t *testing.T) {
})
}
func TestMetaReplica_BlackList(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
schema := genTestCollectionSchema()
collection := replica.addCollection(defaultCollectionID, schema)
replica.addPartition(defaultCollectionID, defaultPartitionID)
replica.addPartition(defaultCollectionID, defaultPartitionID+1)
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
require.NoError(t, err)
segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeSealed, defaultSegmentVersion, pool)
assert.NoError(t, err)
segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID, defaultCollectionID, "channel2", segmentTypeSealed, defaultSegmentVersion, pool)
assert.NoError(t, err)
segment3, err := newSegment(collection, UniqueID(3), defaultPartitionID, defaultCollectionID, "channel2", segmentTypeGrowing, defaultSegmentVersion, pool)
assert.NoError(t, err)
replica.addSegmentsLoadingList([]UniqueID{1, 2, 3})
segments := replica.getSealedSegments()
assert.Equal(t, 0, len(segments))
segments = replica.getGrowingSegments()
assert.Equal(t, 0, len(segments))
// add segments
err = replica.setSegment(segment1)
assert.NoError(t, err)
err = replica.setSegment(segment2)
assert.NoError(t, err)
err = replica.setSegment(segment3)
assert.NoError(t, err)
// no segments since all in black list
segments = replica.getSealedSegments()
assert.Equal(t, 0, len(segments))
// affect sealed segment only
segments = replica.getGrowingSegments()
assert.Equal(t, 1, len(segments))
replica.removeSegmentsLoadingList([]UniqueID{1, 2, 3})
segments = replica.getSealedSegments()
assert.Equal(t, 2, len(segments))
segments = replica.getGrowingSegments()
assert.Equal(t, 1, len(segments))
// try add black list, shall fail since all loaded before
replica.addSegmentsLoadingList([]UniqueID{1, 2, 3})
segments = replica.getSealedSegments()
assert.Equal(t, 2, len(segments))
segments = replica.getGrowingSegments()
assert.Equal(t, 1, len(segments))
}
func TestMetaReplica_freeAll(t *testing.T) {
replica, err := genSimpleReplica()
assert.NoError(t, err)
......
......@@ -33,6 +33,7 @@ import (
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo"
)
type task interface {
......@@ -535,6 +536,10 @@ func (l *loadSegmentsTask) PreExecute(ctx context.Context) error {
func (l *loadSegmentsTask) Execute(ctx context.Context) error {
log.Info("LoadSegmentTask Execute start", zap.Int64("msgID", l.req.Base.MsgID))
segmentIDs := lo.Map(l.req.Infos, func(info *queryPb.SegmentLoadInfo, idx int) UniqueID { return info.SegmentID })
l.node.metaReplica.addSegmentsLoadingList(segmentIDs)
defer l.node.metaReplica.removeSegmentsLoadingList(segmentIDs)
err := l.node.loader.LoadSegment(l.req, segmentTypeSealed)
if err != nil {
log.Warn("failed to load segment", zap.Int64("collectionID", l.req.CollectionID),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册