From 0af53d1e2e85f6306b737e8c4c8cbccb85f52474 Mon Sep 17 00:00:00 2001 From: groot Date: Wed, 18 Jan 2023 10:33:43 +0800 Subject: [PATCH] Fix bulkinsert report bug (#21778) Signed-off-by: yhmo --- internal/rootcoord/import_manager.go | 4 +-- internal/rootcoord/root_coord.go | 12 +++++---- internal/rootcoord/root_coord_test.go | 37 +++++++++++++-------------- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index e819f52ab..c7c4164b9 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -154,7 +154,7 @@ func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) { log.Debug("import manager context done, exit check flipTaskStateLoop") return case <-flipPersistedTicker.C: - log.Debug("start trying to flip ImportPersisted task") + // log.Debug("start trying to flip ImportPersisted task") if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil { log.Error("failed to flip ImportPersisted task", zap.Error(err)) } @@ -786,7 +786,7 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse // other in-progress tasks as failed, when `load2Mem` is set to `true`. // loadFromTaskStore instead returns a list of all import tasks if `load2Mem` is set to `false`. func (m *importManager) loadFromTaskStore(load2Mem bool) ([]*datapb.ImportTaskInfo, error) { - log.Debug("import manager starts loading from Etcd") + // log.Debug("import manager starts loading from Etcd") _, v, err := m.taskStore.LoadWithPrefix(Params.RootCoordCfg.ImportTaskSubPath.GetValue()) if err != nil { log.Error("import manager failed to load from Etcd", zap.Error(err)) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index c02ec34cd..bdc98551a 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1793,14 +1793,16 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( log.Info("an import task has failed, marking DataNode available and resending import task", zap.Int64("task ID", ir.GetTaskId())) resendTaskFunc() - } else if ir.GetState() != commonpb.ImportState_ImportPersisted { - log.Debug("unexpected import task state reported, return immediately (this should not happen)", - zap.Any("task ID", ir.GetTaskId()), - zap.Any("import state", ir.GetState())) + } else if ir.GetState() == commonpb.ImportState_ImportCompleted { + // When a DataNode completes importing, remove this DataNode from the busy node list and send out import tasks again. + log.Info("an import task has completed, marking DataNode available and resending import task", + zap.Int64("task ID", ir.GetTaskId())) resendTaskFunc() - } else { + } else if ir.GetState() == commonpb.ImportState_ImportPersisted { // Here ir.GetState() == commonpb.ImportState_ImportPersisted // Seal these import segments, so they can be auto-flushed later. + log.Info("an import task turns to persisted state, flush segments to be sealed", + zap.Any("task ID", ir.GetTaskId()), zap.Any("segments", ir.GetSegments())) if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil { log.Error("failed to call Flush on bulk insert segments", zap.Int64("task ID", ir.GetTaskId())) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 9b6cdd640..a3bb21551 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1192,24 +1192,7 @@ func TestCore_ReportImport(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) - t.Run("report import started state", func(t *testing.T) { - ctx := context.Background() - c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) - c.importManager.loadFromTaskStore(true) - c.importManager.sendOutTasks(ctx) - resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ - TaskId: 100, - State: commonpb.ImportState_ImportStarted, - }) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - // Change the state back. - err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending) - assert.NoError(t, err) - }) - - t.Run("report persisted import", func(t *testing.T) { + testFunc := func(state commonpb.ImportState) { ctx := context.Background() c := newTestCore( withHealthyCode(), @@ -1224,13 +1207,29 @@ func TestCore_ReportImport(t *testing.T) { resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ TaskId: 100, - State: commonpb.ImportState_ImportPersisted, + State: state, }) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) // Change the state back. err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending) assert.NoError(t, err) + } + + t.Run("report import started state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportStarted) + }) + + t.Run("report import persisted state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportPersisted) + }) + + t.Run("report import completed state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportCompleted) + }) + + t.Run("report import failed state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportFailed) }) } -- GitLab