未验证 提交 0af53d1e 编写于 作者: G groot 提交者: GitHub

Fix bulkinsert report bug (#21778)

Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 13dc837c
......@@ -154,7 +154,7 @@ func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) {
log.Debug("import manager context done, exit check flipTaskStateLoop")
return
case <-flipPersistedTicker.C:
log.Debug("start trying to flip ImportPersisted task")
// log.Debug("start trying to flip ImportPersisted task")
if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil {
log.Error("failed to flip ImportPersisted task", zap.Error(err))
}
......@@ -786,7 +786,7 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse
// other in-progress tasks as failed, when `load2Mem` is set to `true`.
// loadFromTaskStore instead returns a list of all import tasks if `load2Mem` is set to `false`.
func (m *importManager) loadFromTaskStore(load2Mem bool) ([]*datapb.ImportTaskInfo, error) {
log.Debug("import manager starts loading from Etcd")
// log.Debug("import manager starts loading from Etcd")
_, v, err := m.taskStore.LoadWithPrefix(Params.RootCoordCfg.ImportTaskSubPath.GetValue())
if err != nil {
log.Error("import manager failed to load from Etcd", zap.Error(err))
......
......@@ -1793,14 +1793,16 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
log.Info("an import task has failed, marking DataNode available and resending import task",
zap.Int64("task ID", ir.GetTaskId()))
resendTaskFunc()
} else if ir.GetState() != commonpb.ImportState_ImportPersisted {
log.Debug("unexpected import task state reported, return immediately (this should not happen)",
zap.Any("task ID", ir.GetTaskId()),
zap.Any("import state", ir.GetState()))
} else if ir.GetState() == commonpb.ImportState_ImportCompleted {
// When a DataNode completes importing, remove this DataNode from the busy node list and send out import tasks again.
log.Info("an import task has completed, marking DataNode available and resending import task",
zap.Int64("task ID", ir.GetTaskId()))
resendTaskFunc()
} else {
} else if ir.GetState() == commonpb.ImportState_ImportPersisted {
// Here ir.GetState() == commonpb.ImportState_ImportPersisted
// Seal these import segments, so they can be auto-flushed later.
log.Info("an import task turns to persisted state, flush segments to be sealed",
zap.Any("task ID", ir.GetTaskId()), zap.Any("segments", ir.GetSegments()))
if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil {
log.Error("failed to call Flush on bulk insert segments",
zap.Int64("task ID", ir.GetTaskId()))
......
......@@ -1192,24 +1192,7 @@ func TestCore_ReportImport(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("report import started state", func(t *testing.T) {
ctx := context.Background()
c := newTestCore(withHealthyCode())
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
c.importManager.loadFromTaskStore(true)
c.importManager.sendOutTasks(ctx)
resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{
TaskId: 100,
State: commonpb.ImportState_ImportStarted,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
// Change the state back.
err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending)
assert.NoError(t, err)
})
t.Run("report persisted import", func(t *testing.T) {
testFunc := func(state commonpb.ImportState) {
ctx := context.Background()
c := newTestCore(
withHealthyCode(),
......@@ -1224,13 +1207,29 @@ func TestCore_ReportImport(t *testing.T) {
resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{
TaskId: 100,
State: commonpb.ImportState_ImportPersisted,
State: state,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
// Change the state back.
err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending)
assert.NoError(t, err)
}
t.Run("report import started state", func(t *testing.T) {
testFunc(commonpb.ImportState_ImportStarted)
})
t.Run("report import persisted state", func(t *testing.T) {
testFunc(commonpb.ImportState_ImportPersisted)
})
t.Run("report import completed state", func(t *testing.T) {
testFunc(commonpb.ImportState_ImportCompleted)
})
t.Run("report import failed state", func(t *testing.T) {
testFunc(commonpb.ImportState_ImportFailed)
})
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册