diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index cdf71a3307d06352c4e6281dbf72591e340a9783..581ca74bbe050e2fcb023f5b16602950e415030b 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" "go.uber.org/atomic" @@ -269,8 +270,11 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { segment := resp.GetInfos()[0] indexes, err := ex.broker.GetIndexInfo(ctx, task.CollectionID(), segment.GetID()) if err != nil { - log.Warn("failed to get index of segment", zap.Error(err)) - return err + if !errors.Is(err, merr.ErrIndexNotFound) { + log.Warn("failed to get index of segment", zap.Error(err)) + return err + } + indexes = nil } readableVersion := int64(0) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index dfc64a29920003283ec69c9784c6b5c7c48058ea..e6add071829164e861dad94c78f4395a4d16da66 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -158,6 +158,7 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) { switch testName { case "TestSubscribeChannelTask", "TestLoadSegmentTask", + "TestLoadSegmentTaskNotIndex", "TestLoadSegmentTaskFailed", "TestSegmentTaskStale", "TestTaskCanceled", @@ -453,6 +454,100 @@ func (suite *TaskSuite) TestLoadSegmentTask() { } } +func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() { + ctx := context.Background() + timeout := 10 * time.Second + targetNode := int64(3) + partition := int64(100) + channel := &datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test", + } + + // Expect + suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{ + Name: "TestLoadSegmentTask", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, Name: "vec", DataType: schemapb.DataType_FloatVector}, + }, + }, nil) + suite.broker.EXPECT().DescribeIndex(mock.Anything, suite.collection).Return([]*indexpb.IndexInfo{ + { + CollectionID: suite.collection, + }, + }, nil) + for _, segment := range suite.loadSegments { + suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return(&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{ + { + ID: segment, + CollectionID: suite.collection, + PartitionID: partition, + InsertChannel: channel.ChannelName, + }}, + }, nil) + suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, merr.WrapErrIndexNotFound()) + } + suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil) + + // Test load segment task + suite.dist.ChannelDistManager.Update(targetNode, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: suite.collection, + ChannelName: channel.ChannelName, + })) + tasks := []Task{} + segments := make([]*datapb.SegmentInfo, 0) + for _, segment := range suite.loadSegments { + segments = append(segments, &datapb.SegmentInfo{ + ID: segment, + InsertChannel: channel.ChannelName, + PartitionID: 1, + }) + task, err := NewSegmentTask( + ctx, + timeout, + 0, + suite.collection, + suite.replica, + NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment), + ) + suite.NoError(err) + tasks = append(tasks, task) + err = suite.scheduler.Add(task) + suite.NoError(err) + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return(nil, segments, nil) + suite.target.UpdateCollectionNextTargetWithPartitions(suite.collection, int64(1)) + segmentsNum := len(suite.loadSegments) + suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) + + // Process tasks + suite.dispatchAndWait(targetNode) + suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) + + // Process tasks done + // Dist contains channels + view := &meta.LeaderView{ + ID: targetNode, + CollectionID: suite.collection, + Segments: map[int64]*querypb.SegmentDist{}, + } + for _, segment := range suite.loadSegments { + view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} + } + distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment { + return meta.SegmentFromInfo(info) + }) + suite.dist.LeaderViewManager.Update(targetNode, view) + suite.dist.SegmentDistManager.Update(targetNode, distSegments...) + suite.dispatchAndWait(targetNode) + suite.AssertTaskNum(0, 0, 0, 0) + + for _, task := range tasks { + suite.Equal(TaskStatusSucceeded, task.Status()) + suite.NoError(task.Err()) + } +} + func (suite *TaskSuite) TestLoadSegmentTaskFailed() { ctx := context.Background() timeout := 10 * time.Second