diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 2fd6678f7d40b6ecd840bb59529e70a88b4c03c6..d93701e33e56f97919ebd5f7589ee89eba70d4c7 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -1235,6 +1235,32 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil zap.Int64("segmentID", segmentID), zap.Int("shard ID", shardID), zap.String("target channel name", targetChName)) + + // call report to notify the rootcoord update the segment id list for this task + // ignore the returned error, since even report failed the segments still can be cleaned + retry.Do(context.Background(), func() error { + importResult := &rootcoordpb.ImportResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + TaskId: req.GetImportTask().TaskId, + DatanodeId: Params.DataNodeCfg.GetNodeID(), + State: commonpb.ImportState_ImportStarted, + Segments: []int64{segmentID}, + AutoIds: make([]int64, 0), + RowCount: 0, + } + status, err := node.rootCoord.ReportImport(context.Background(), importResult) + if err != nil { + log.Error("fail to report import state to RootCoord", zap.Error(err)) + return err + } + if status != nil && status.ErrorCode != commonpb.ErrorCode_Success { + return errors.New(status.GetReason()) + } + return nil + }) + return segmentID, targetChName, nil } } diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index 61883ddc718ebd94cd4743f2ccf12d3830d5ba46..d3fe0d0c87c78b75dd340e1a63f2887394d2e4e2 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -595,7 +595,12 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im // Meta persist should be done before memory objs change. toPersistImportTaskInfo = cloneImportTaskInfo(v) toPersistImportTaskInfo.State.StateCode = ir.GetState() - toPersistImportTaskInfo.State.Segments = ir.GetSegments() + // if is started state, append the new created segment id + if v.GetState().GetStateCode() == commonpb.ImportState_ImportStarted { + toPersistImportTaskInfo.State.Segments = append(toPersistImportTaskInfo.State.Segments, ir.GetSegments()...) + } else { + toPersistImportTaskInfo.State.Segments = ir.GetSegments() + } toPersistImportTaskInfo.State.RowCount = ir.GetRowCount() toPersistImportTaskInfo.State.RowIds = ir.GetAutoIds() for _, kv := range ir.GetInfos() { @@ -606,6 +611,8 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im toPersistImportTaskInfo.Infos = append(toPersistImportTaskInfo.Infos, kv) } } + log.Info("importManager update task info", zap.Any("toPersistImportTaskInfo", toPersistImportTaskInfo)) + // Update task in task store. if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil { log.Error("failed to update import task",