From 5f84923bfa0e4120829c80bf5a519b6aff86a8c9 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 26 May 2023 15:01:26 +0800 Subject: [PATCH] Fix search/query unloaded data (#24368) Signed-off-by: bigsheeper --- internal/querynodev2/delegator/delegator.go | 8 ++++++++ internal/querynodev2/delegator/delegator_data.go | 5 ----- internal/querynodev2/delegator/delegator_test.go | 6 +++++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index e900c45c7..c0e6f0cde 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -223,6 +223,10 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest sealed, growing, version := sd.distribution.GetCurrent(req.GetReq().GetPartitionIDs()...) defer sd.distribution.FinishUsage(version) + existPartitions := sd.collection.GetPartitions() + growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool { + return funcutil.SliceContain(existPartitions, segment.PartitionID) + }) if req.Req.IgnoreGrowing { growing = []SegmentEntry{} @@ -274,6 +278,10 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) sealed, growing, version := sd.distribution.GetCurrent(req.GetReq().GetPartitionIDs()...) defer sd.distribution.FinishUsage(version) + existPartitions := sd.collection.GetPartitions() + growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool { + return funcutil.SliceContain(existPartitions, segment.PartitionID) + }) if req.Req.IgnoreGrowing { growing = []SegmentEntry{} } diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 811348f9e..6623c822c 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -76,11 +76,6 @@ func (d *DeleteData) Append(ad DeleteData) { func (sd *shardDelegator) newGrowing(segmentID int64, insertData *InsertData) segments.Segment { log := sd.getLogger(context.Background()).With(zap.Int64("segmentID", segmentID)) - // try add partition - if sd.collection.GetLoadType() == loadTypeCollection { - sd.collection.AddPartition(insertData.PartitionID) - } - segment, err := segments.NewSegment(sd.collection, segmentID, insertData.PartitionID, sd.collectionID, sd.vchannelName, segments.SegmentTypeGrowing, 0, insertData.StartPosition, insertData.StartPosition) if err != nil { log.Error("failed to create new segment", zap.Error(err)) diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index aaa42c227..a30b8e71e 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -49,6 +49,7 @@ type DelegatorSuite struct { suite.Suite collectionID int64 + partitionIDs []int64 replicaID int64 vchannelName string version int64 @@ -71,6 +72,7 @@ func (s *DelegatorSuite) TearDownSuite() { func (s *DelegatorSuite) SetupTest() { s.collectionID = 1000 + s.partitionIDs = []int64{500, 501} s.replicaID = 65535 s.vchannelName = "rootcoord-dml_1000_v0" s.version = 2000 @@ -142,7 +144,9 @@ func (s *DelegatorSuite) SetupTest() { }, }, }, - }, &querypb.LoadMetaInfo{}) + }, &querypb.LoadMetaInfo{ + PartitionIDs: s.partitionIDs, + }) s.mq = &msgstream.MockMsgStream{} -- GitLab