diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index e819f52ab92f8b58e791c34bfda6d82107593447..c7c4164b99cc867e780fe8a1484c976abdc4e424 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -154,7 +154,7 @@ func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) { log.Debug("import manager context done, exit check flipTaskStateLoop") return case <-flipPersistedTicker.C: - log.Debug("start trying to flip ImportPersisted task") + // log.Debug("start trying to flip ImportPersisted task") if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil { log.Error("failed to flip ImportPersisted task", zap.Error(err)) } @@ -786,7 +786,7 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse // other in-progress tasks as failed, when `load2Mem` is set to `true`. // loadFromTaskStore instead returns a list of all import tasks if `load2Mem` is set to `false`. func (m *importManager) loadFromTaskStore(load2Mem bool) ([]*datapb.ImportTaskInfo, error) { - log.Debug("import manager starts loading from Etcd") + // log.Debug("import manager starts loading from Etcd") _, v, err := m.taskStore.LoadWithPrefix(Params.RootCoordCfg.ImportTaskSubPath.GetValue()) if err != nil { log.Error("import manager failed to load from Etcd", zap.Error(err)) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index c02ec34cdc98d48e802d67bfb877e5e3b249a56a..bdc98551ada1b6794dcbf2ed71eeea4f86b37a71 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1793,14 +1793,16 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( log.Info("an import task has failed, marking DataNode available and resending import task", zap.Int64("task ID", ir.GetTaskId())) resendTaskFunc() - } else if ir.GetState() != commonpb.ImportState_ImportPersisted { - log.Debug("unexpected import task state reported, return immediately (this should not happen)", - zap.Any("task ID", ir.GetTaskId()), - zap.Any("import state", ir.GetState())) + } else if ir.GetState() == commonpb.ImportState_ImportCompleted { + // When a DataNode completes importing, remove this DataNode from the busy node list and send out import tasks again. + log.Info("an import task has completed, marking DataNode available and resending import task", + zap.Int64("task ID", ir.GetTaskId())) resendTaskFunc() - } else { + } else if ir.GetState() == commonpb.ImportState_ImportPersisted { // Here ir.GetState() == commonpb.ImportState_ImportPersisted // Seal these import segments, so they can be auto-flushed later. + log.Info("an import task turns to persisted state, flush segments to be sealed", + zap.Any("task ID", ir.GetTaskId()), zap.Any("segments", ir.GetSegments())) if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil { log.Error("failed to call Flush on bulk insert segments", zap.Int64("task ID", ir.GetTaskId())) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 9b6cdd6407434498991fb677e52f1800630564a0..a3bb21551a440c492a4741179c7450676c88d847 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1192,24 +1192,7 @@ func TestCore_ReportImport(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) - t.Run("report import started state", func(t *testing.T) { - ctx := context.Background() - c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) - c.importManager.loadFromTaskStore(true) - c.importManager.sendOutTasks(ctx) - resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ - TaskId: 100, - State: commonpb.ImportState_ImportStarted, - }) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - // Change the state back. - err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending) - assert.NoError(t, err) - }) - - t.Run("report persisted import", func(t *testing.T) { + testFunc := func(state commonpb.ImportState) { ctx := context.Background() c := newTestCore( withHealthyCode(), @@ -1224,13 +1207,29 @@ func TestCore_ReportImport(t *testing.T) { resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ TaskId: 100, - State: commonpb.ImportState_ImportPersisted, + State: state, }) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) // Change the state back. err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending) assert.NoError(t, err) + } + + t.Run("report import started state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportStarted) + }) + + t.Run("report import persisted state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportPersisted) + }) + + t.Run("report import completed state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportCompleted) + }) + + t.Run("report import failed state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportFailed) }) }