未验证 提交 0d54697d 编写于 作者: T Ten Thousand Leaves 提交者: GitHub

Implement ReportImport related logic. (#16312)

/kind feature

issue: #15604
Signed-off-by: NYuchen Gao <yuchen.gao@zilliz.com>
上级 baa745b8
......@@ -265,7 +265,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"ceID\030\004 \001(\003\"7\n\tMsgHeader\022*\n\004base\030\001 \001(\0132\034."
"milvus.proto.common.MsgBase\"M\n\014DMLMsgHea"
"der\022*\n\004base\030\001 \001(\0132\034.milvus.proto.common."
"MsgBase\022\021\n\tshardName\030\002 \001(\t*\272\004\n\tErrorCode"
"MsgBase\022\021\n\tshardName\030\002 \001(\t*\363\004\n\tErrorCode"
"\022\013\n\007Success\020\000\022\023\n\017UnexpectedError\020\001\022\021\n\rCo"
"nnectFailed\020\002\022\024\n\020PermissionDenied\020\003\022\027\n\023C"
"ollectionNotExists\020\004\022\023\n\017IllegalArgument\020"
......@@ -279,54 +279,56 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"\n\020CannotDeleteFile\020\024\022\023\n\017BuildIndexError\020"
"\025\022\020\n\014IllegalNLIST\020\026\022\025\n\021IllegalMetricType"
"\020\027\022\017\n\013OutOfMemory\020\030\022\021\n\rIndexNotExist\020\031\022\023"
"\n\017EmptyCollection\020\032\022\022\n\rDDRequestRace\020\350\007*"
"X\n\nIndexState\022\022\n\016IndexStateNone\020\000\022\014\n\010Uni"
"ssued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n\n"
"\006Failed\020\004*s\n\014SegmentState\022\024\n\020SegmentStat"
"eNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022\n\n\006Se"
"aled\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013\n\007Dro"
"pped\020\006*\300\t\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n\020Cre"
"ateCollection\020d\022\022\n\016DropCollection\020e\022\021\n\rH"
"asCollection\020f\022\026\n\022DescribeCollection\020g\022\023"
"\n\017ShowCollections\020h\022\024\n\020GetSystemConfigs\020"
"i\022\022\n\016LoadCollection\020j\022\025\n\021ReleaseCollecti"
"on\020k\022\017\n\013CreateAlias\020l\022\r\n\tDropAlias\020m\022\016\n\n"
"AlterAlias\020n\022\024\n\017CreatePartition\020\310\001\022\022\n\rDr"
"opPartition\020\311\001\022\021\n\014HasPartition\020\312\001\022\026\n\021Des"
"cribePartition\020\313\001\022\023\n\016ShowPartitions\020\314\001\022\023"
"\n\016LoadPartitions\020\315\001\022\026\n\021ReleasePartitions"
"\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n\017DescribeSegmen"
"t\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024\n\017ReleaseSegmen"
"ts\020\375\001\022\024\n\017HandoffSegments\020\376\001\022\030\n\023LoadBalan"
"ceSegments\020\377\001\022\025\n\020DescribeSegments\020\200\002\022\020\n\013"
"CreateIndex\020\254\002\022\022\n\rDescribeIndex\020\255\002\022\016\n\tDr"
"opIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005"
"Flush\020\222\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003"
"\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetIndexBuildPro"
"gress\020\367\003\022\034\n\027GetCollectionStatistics\020\370\003\022\033"
"\n\026GetPartitionStatistics\020\371\003\022\r\n\010Retrieve\020"
"\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmChanne"
"ls\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022WatchQue"
"ryChannels\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022"
"\035\n\030SealedSegmentsChangeInfo\020\200\004\022\027\n\022WatchD"
"eltaChannels\020\201\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSys"
"temInfo\020\331\004\022\024\n\017GetRecoveryInfo\020\332\004\022\024\n\017GetS"
"egmentState\020\333\004\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNo"
"deStats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tRequestID\020"
"\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017AllocateSegment\020\265"
"\t\022\026\n\021SegmentStatistics\020\266\t\022\025\n\020SegmentFlus"
"hDone\020\267\t\022\017\n\nDataNodeTt\020\270\t*\"\n\007DslType\022\007\n\003"
"Dsl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017CompactionState"
"\022\021\n\rUndefiedState\020\000\022\r\n\tExecuting\020\001\022\r\n\tCo"
"mpleted\020\002*X\n\020ConsistencyLevel\022\n\n\006Strong\020"
"\000\022\013\n\007Session\020\001\022\013\n\007Bounded\020\002\022\016\n\nEventuall"
"y\020\003\022\016\n\nCustomized\020\004*\227\001\n\013ImportState\022\021\n\rI"
"mportPending\020\000\022\020\n\014ImportFailed\020\001\022\021\n\rImpo"
"rtStarted\020\002\022\024\n\020ImportDownloaded\020\003\022\020\n\014Imp"
"ortParsed\020\004\022\023\n\017ImportPersisted\020\005\022\023\n\017Impo"
"rtCompleted\020\006BW\n\016io.milvus.grpcB\013CommonP"
"rotoP\001Z3github.com/milvus-io/milvus/inte"
"rnal/proto/commonpb\240\001\001b\006proto3"
"\n\017EmptyCollection\020\032\022\033\n\027UpdateImportTaskF"
"ailure\020\033\022\032\n\026CollectionNameNotFound\020\034\022\022\n\r"
"DDRequestRace\020\350\007*X\n\nIndexState\022\022\n\016IndexS"
"tateNone\020\000\022\014\n\010Unissued\020\001\022\016\n\nInProgress\020\002"
"\022\014\n\010Finished\020\003\022\n\n\006Failed\020\004*s\n\014SegmentSta"
"te\022\024\n\020SegmentStateNone\020\000\022\014\n\010NotExist\020\001\022\013"
"\n\007Growing\020\002\022\n\n\006Sealed\020\003\022\013\n\007Flushed\020\004\022\014\n\010"
"Flushing\020\005\022\013\n\007Dropped\020\006*\300\t\n\007MsgType\022\r\n\tU"
"ndefined\020\000\022\024\n\020CreateCollection\020d\022\022\n\016Drop"
"Collection\020e\022\021\n\rHasCollection\020f\022\026\n\022Descr"
"ibeCollection\020g\022\023\n\017ShowCollections\020h\022\024\n\020"
"GetSystemConfigs\020i\022\022\n\016LoadCollection\020j\022\025"
"\n\021ReleaseCollection\020k\022\017\n\013CreateAlias\020l\022\r"
"\n\tDropAlias\020m\022\016\n\nAlterAlias\020n\022\024\n\017CreateP"
"artition\020\310\001\022\022\n\rDropPartition\020\311\001\022\021\n\014HasPa"
"rtition\020\312\001\022\026\n\021DescribePartition\020\313\001\022\023\n\016Sh"
"owPartitions\020\314\001\022\023\n\016LoadPartitions\020\315\001\022\026\n\021"
"ReleasePartitions\020\316\001\022\021\n\014ShowSegments\020\372\001\022"
"\024\n\017DescribeSegment\020\373\001\022\021\n\014LoadSegments\020\374\001"
"\022\024\n\017ReleaseSegments\020\375\001\022\024\n\017HandoffSegment"
"s\020\376\001\022\030\n\023LoadBalanceSegments\020\377\001\022\025\n\020Descri"
"beSegments\020\200\002\022\020\n\013CreateIndex\020\254\002\022\022\n\rDescr"
"ibeIndex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006Insert\020\220\003"
"\022\013\n\006Delete\020\221\003\022\n\n\005Flush\020\222\003\022\013\n\006Search\020\364\003\022\021"
"\n\014SearchResult\020\365\003\022\022\n\rGetIndexState\020\366\003\022\032\n"
"\025GetIndexBuildProgress\020\367\003\022\034\n\027GetCollecti"
"onStatistics\020\370\003\022\033\n\026GetPartitionStatistic"
"s\020\371\003\022\r\n\010Retrieve\020\372\003\022\023\n\016RetrieveResult\020\373\003"
"\022\024\n\017WatchDmChannels\020\374\003\022\025\n\020RemoveDmChanne"
"ls\020\375\003\022\027\n\022WatchQueryChannels\020\376\003\022\030\n\023Remove"
"QueryChannels\020\377\003\022\035\n\030SealedSegmentsChange"
"Info\020\200\004\022\027\n\022WatchDeltaChannels\020\201\004\022\020\n\013Segm"
"entInfo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017GetRecove"
"ryInfo\020\332\004\022\024\n\017GetSegmentState\020\333\004\022\r\n\010TimeT"
"ick\020\260\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\tLoadIndex"
"\020\262\t\022\016\n\tRequestID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017"
"AllocateSegment\020\265\t\022\026\n\021SegmentStatistics\020"
"\266\t\022\025\n\020SegmentFlushDone\020\267\t\022\017\n\nDataNodeTt\020"
"\270\t*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001*B"
"\n\017CompactionState\022\021\n\rUndefiedState\020\000\022\r\n\t"
"Executing\020\001\022\r\n\tCompleted\020\002*X\n\020Consistenc"
"yLevel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bound"
"ed\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*\227\001\n"
"\013ImportState\022\021\n\rImportPending\020\000\022\020\n\014Impor"
"tFailed\020\001\022\021\n\rImportStarted\020\002\022\024\n\020ImportDo"
"wnloaded\020\003\022\020\n\014ImportParsed\020\004\022\023\n\017ImportPe"
"rsisted\020\005\022\023\n\017ImportCompleted\020\006BW\n\016io.mil"
"vus.grpcB\013CommonProtoP\001Z3github.com/milv"
"us-io/milvus/internal/proto/commonpb\240\001\001b"
"\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
};
......@@ -343,7 +345,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 2950,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 3007,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 8, 0,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 8, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
......@@ -386,6 +388,8 @@ bool ErrorCode_IsValid(int value) {
case 24:
case 25:
case 26:
case 27:
case 28:
case 1000:
return true;
default:
......
......@@ -126,6 +126,8 @@ enum ErrorCode : int {
OutOfMemory = 24,
IndexNotExist = 25,
EmptyCollection = 26,
UpdateImportTaskFailure = 27,
CollectionNameNotFound = 28,
DDRequestRace = 1000,
ErrorCode_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
ErrorCode_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
......
......@@ -2267,7 +2267,8 @@ func TestImport(t *testing.T) {
defer closeTestServer(t, svr)
resp, err := svr.Import(svr.ctx, &datapb.ImportTask{
CollectionName: "dummy",
CollectionId: 100,
PartitionId: 100,
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.GetErrorCode())
......@@ -2280,7 +2281,8 @@ func TestImport(t *testing.T) {
closeTestServer(t, svr)
resp, err := svr.Import(svr.ctx, &datapb.ImportTask{
CollectionName: "dummy",
CollectionId: 100,
PartitionId: 100,
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode())
......
......@@ -969,7 +969,7 @@ func (s *Server) Import(ctx context.Context, req *datapb.ImportTask) (*datapb.Im
}
if s.isClosed() {
log.Warn("failed to import because of closed server", zap.String("collectionName", req.GetCollectionName()))
log.Warn("failed to import because of closed server", zap.Int64("collection ID", req.GetCollectionId()))
resp.Status.Reason = msgDataCoordIsUnhealthy(Params.DataCoordCfg.NodeID)
return resp, nil
}
......
......@@ -781,8 +781,8 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTask) (*comm
if !node.isHealthy() {
log.Warn("DataNode.Import failed",
zap.String("collectionName", req.GetCollectionName()),
zap.String("partitionName", req.GetPartitionName()),
zap.Int64("collection ID", req.GetCollectionId()),
zap.Int64("partition ID", req.GetPartitionId()),
zap.Int64("taskID", req.GetTaskId()),
zap.Error(errDataNodeIsUnhealthy(Params.DataNodeCfg.NodeID)))
......
......@@ -318,7 +318,8 @@ func TestDataNode(t *testing.T) {
t.Run("Test Import", func(t *testing.T) {
req := &datapb.ImportTask{
CollectionName: "dummy",
CollectionId: 100,
PartitionId: 100,
}
stat, err := node.Import(node.ctx, req)
assert.NoError(t, err)
......
......@@ -35,6 +35,8 @@ enum ErrorCode {
OutOfMemory = 24;
IndexNotExist = 25;
EmptyCollection = 26;
UpdateImportTaskFailure = 27;
CollectionNameNotFound = 28;
// internal error code.
DDRequestRace = 1000;
......
......@@ -420,8 +420,8 @@ message DropVirtualChannelResponse {
message ImportTask {
common.Status status = 1;
string collection_name = 2; // target collection
string partition_name = 3; // target partition
int64 collection_id = 2; // target collection ID
int64 partition_id = 3; // target partition ID
bool row_based = 4; // the file is row-based or column-based
int64 task_id = 5; // id of the task
repeated string files = 6; // file paths to be imported
......@@ -440,13 +440,13 @@ message ImportTaskInfo {
int64 id = 1; // Task ID.
int64 request_id = 2; // Request ID of the import task.
int64 datanode_id = 3; // ID of DataNode that processes the task.
string collection_id = 4; // Collection ID for the import task.
string partition_id = 5; // Partition ID for the import task.
int64 collection_id = 4; // Collection ID for the import task.
int64 partition_id = 5; // Partition ID for the import task.
string bucket = 6; // Bucket for the import task.
bool row_based = 7; // Boolean indicating whether import files are row-based or column-based.
repeated string files = 8; // A list of files to import.
int64 create_ts = 9; // Timestamp when the import task is created.
ImportTaskState state = 10; // State of the import task.
ImportTaskState state = 10; // State of the import task.
}
message ImportTaskResponse {
......
......@@ -56,6 +56,7 @@ type importManager struct {
cancel context.CancelFunc // reserved
taskStore kv.MetaKv // Persistent task info storage.
// TODO: Make pendingTask a map to improve look up performance.
pendingTasks []*datapb.ImportTaskInfo // pending tasks
workingTasks map[int64]*datapb.ImportTaskInfo // in-progress tasks
pendingLock sync.RWMutex // lock pending task list
......@@ -88,33 +89,25 @@ func newImportManager(ctx context.Context, client kv.MetaKv, importService func(
func (m *importManager) init() error {
// Read tasks from etcd and save them as pendingTasks or workingTasks.
m.load()
// trigger Import() action to DataCoord
m.pushTasks()
m.sendOutTasks()
return nil
}
func (m *importManager) pushTasks() error {
// sendOutTasks pushes all pending tasks to DataCoord, gets DataCoord response and re-add these tasks as working tasks.
func (m *importManager) sendOutTasks() error {
m.pendingLock.Lock()
defer m.pendingLock.Unlock()
// trigger Import() action to DataCoord
for {
if len(m.pendingTasks) == 0 {
log.Debug("import manger pending task list is empty")
break
}
// Trigger Import() action to DataCoord.
for len(m.pendingTasks) > 0 {
task := m.pendingTasks[0]
log.Debug("import manager send import task", zap.Int64("taskID", task.Id))
dbTask := &datapb.ImportTask{
CollectionName: task.GetCollectionId(),
PartitionName: task.GetPartitionId(),
RowBased: task.GetRowBased(),
TaskId: task.GetId(),
Files: task.GetFiles(),
it := &datapb.ImportTask{
CollectionId: task.GetCollectionId(),
PartitionId: task.GetPartitionId(),
RowBased: task.GetRowBased(),
TaskId: task.GetId(),
Files: task.GetFiles(),
Infos: []*commonpb.KeyValuePair{
{
Key: Bucket,
......@@ -123,14 +116,17 @@ func (m *importManager) pushTasks() error {
},
}
// call DataCoord.Import()
resp := m.callImportService(m.ctx, dbTask)
log.Debug("sending import task to DataCoord", zap.Int64("taskID", task.GetId()))
// Call DataCoord.Import().
resp := m.callImportService(m.ctx, it)
if resp.Status.ErrorCode == commonpb.ErrorCode_UnexpectedError {
log.Debug("import task is rejected", zap.Int64("task ID", dbTask.TaskId))
log.Debug("import task is rejected", zap.Int64("task ID", it.GetTaskId()))
break
}
task.DatanodeId = resp.GetDatanodeId()
log.Debug("import task is assigned", zap.Int64("task ID", dbTask.TaskId), zap.Int64("datanode id", task.DatanodeId))
log.Debug("import task successfully assigned to DataNode",
zap.Int64("task ID", it.GetTaskId()),
zap.Int64("DataNode ID", task.GetDatanodeId()))
// erase this task from head of pending list if the callImportService succeed
m.pendingTasks = m.pendingTasks[1:]
......@@ -139,11 +135,9 @@ func (m *importManager) pushTasks() error {
m.workingLock.Lock()
defer m.workingLock.Unlock()
log.Debug("import task was taken to execute", zap.Int64("task ID", dbTask.TaskId))
// TODO: Guard nil task state.
log.Debug("import task added as working task", zap.Int64("task ID", it.TaskId))
task.State.StateCode = commonpb.ImportState_ImportPending
m.workingTasks[task.Id] = task
m.workingTasks[task.GetId()] = task
m.updateImportTask(task)
}()
}
......@@ -167,7 +161,9 @@ func (m *importManager) genReqID() int64 {
return m.lastReqID
}
func (m *importManager) importJob(req *milvuspb.ImportRequest) *milvuspb.ImportResponse {
// importJob processes the import request, generates import tasks, sends these tasks to DataCoord, and returns
// immediately.
func (m *importManager) importJob(req *milvuspb.ImportRequest, cID int64) *milvuspb.ImportResponse {
if req == nil || len(req.Files) == 0 {
return &milvuspb.ImportResponse{
Status: &commonpb.Status{
......@@ -192,7 +188,9 @@ func (m *importManager) importJob(req *milvuspb.ImportRequest) *milvuspb.ImportR
},
}
log.Debug("import manager receive request", zap.String("collection", req.GetCollectionName()))
log.Debug("request received",
zap.String("collection name", req.GetCollectionName()),
zap.Int64("collection ID", cID))
func() {
m.pendingLock.Lock()
defer m.pendingLock.Unlock()
......@@ -225,14 +223,13 @@ func (m *importManager) importJob(req *milvuspb.ImportRequest) *milvuspb.ImportR
reqID := m.genReqID()
// convert import request to import tasks
if req.RowBased {
// for row-based, each file is a task
// For row-based importing, each file makes a task.
taskList := make([]int64, len(req.Files))
for i := 0; i < len(req.Files); i++ {
newTask := &datapb.ImportTaskInfo{
Id: m.nextTaskID,
RequestId: reqID,
CollectionId: req.GetCollectionName(),
PartitionId: req.GetPartitionName(),
CollectionId: cID,
Bucket: bucket,
RowBased: req.GetRowBased(),
Files: []string{req.GetFiles()[i]},
......@@ -241,20 +238,20 @@ func (m *importManager) importJob(req *milvuspb.ImportRequest) *milvuspb.ImportR
StateCode: commonpb.ImportState_ImportPending,
},
}
taskList[i] = newTask.GetId()
m.nextTaskID++
log.Info("new task created as pending task", zap.Int64("task ID", newTask.GetId()))
m.pendingTasks = append(m.pendingTasks, newTask)
m.saveImportTask(newTask)
}
log.Info("process row-based import request", zap.Int64("reqID", reqID), zap.Any("taskIDs", taskList))
log.Info("row-based import request processed", zap.Int64("reqID", reqID), zap.Any("taskIDs", taskList))
} else {
// TODO: Merge duplicated code :(
// for column-based, all files is a task
newTask := &datapb.ImportTaskInfo{
Id: m.nextTaskID,
RequestId: reqID,
CollectionId: req.GetCollectionName(),
PartitionId: req.GetPartitionName(),
CollectionId: cID,
Bucket: bucket,
RowBased: req.GetRowBased(),
Files: req.GetFiles(),
......@@ -264,55 +261,56 @@ func (m *importManager) importJob(req *milvuspb.ImportRequest) *milvuspb.ImportR
},
}
m.nextTaskID++
log.Info("new task created as pending task", zap.Int64("task ID", newTask.GetId()))
m.pendingTasks = append(m.pendingTasks, newTask)
m.saveImportTask(newTask)
log.Info("process column-based import request", zap.Int64("reqID", reqID), zap.Int64("taskID", newTask.Id))
log.Info("column-based import request processed", zap.Int64("reqID", reqID), zap.Int64("taskID", newTask.GetId()))
}
}()
m.pushTasks()
m.sendOutTasks()
return resp
}
func (m *importManager) updateTaskState(state *rootcoordpb.ImportResult) error {
if state == nil {
return errors.New("import task state is nil")
// updateTaskState updates the task's state in task store given ImportResult result, and returns the ImportTaskInfo of
// the given task.
func (m *importManager) updateTaskState(ir *rootcoordpb.ImportResult) (*datapb.ImportTaskInfo, error) {
if ir == nil {
return nil, errors.New("import result is nil")
}
log.Debug("import manager update task state", zap.Int64("taskID", state.GetTaskId()))
log.Debug("import manager update task import result", zap.Int64("taskID", ir.GetTaskId()))
found := false
var v *datapb.ImportTaskInfo
func() {
m.workingLock.Lock()
defer m.workingLock.Unlock()
for k, v := range m.workingTasks {
if state.TaskId == k {
found = true
v.State.StateCode = state.GetState()
v.State.Segments = state.GetSegments()
v.State.RowCount = state.GetRowCount()
for _, kv := range state.GetInfos() {
if kv.GetKey() == FailedReason {
v.State.ErrorMessage = kv.GetValue()
break
}
ok := false
if v, ok = m.workingTasks[ir.TaskId]; ok {
found = true
v.State.StateCode = ir.GetState()
v.State.Segments = ir.GetSegments()
v.State.RowCount = ir.GetRowCount()
for _, kv := range ir.GetInfos() {
if kv.GetKey() == FailedReason {
v.State.ErrorMessage = kv.GetValue()
break
}
// Update task in task store.
m.updateImportTask(v)
}
// Update task in task store.
m.updateImportTask(v)
}
m.updateImportTask(v)
}()
if !found {
log.Debug("import manager update task state failed", zap.Int64("taskID", state.GetTaskId()))
return errors.New("failed to update import task, id not found: " + strconv.FormatInt(state.TaskId, 10))
log.Debug("import manager update task import result failed", zap.Int64("taskID", ir.GetTaskId()))
return nil, errors.New("failed to update import task, ID not found: " + strconv.FormatInt(ir.TaskId, 10))
}
return nil
return v, nil
}
func (m *importManager) getTaskState(id int64) *milvuspb.GetImportStateResponse {
// getTaskState looks for task with the given ID and returns its import state.
func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse {
resp := &milvuspb.GetImportStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -320,15 +318,13 @@ func (m *importManager) getTaskState(id int64) *milvuspb.GetImportStateResponse
},
}
log.Debug("import manager get task state", zap.Int64("taskID", id))
log.Debug("getting import task state", zap.Int64("taskID", tID))
found := false
func() {
m.pendingLock.Lock()
defer m.pendingLock.Unlock()
for i := 0; i < len(m.pendingTasks); i++ {
if id == m.pendingTasks[i].Id {
if tID == m.pendingTasks[i].Id {
resp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
......@@ -338,7 +334,6 @@ func (m *importManager) getTaskState(id int64) *milvuspb.GetImportStateResponse
}
}
}()
if found {
return resp
}
......@@ -346,36 +341,33 @@ func (m *importManager) getTaskState(id int64) *milvuspb.GetImportStateResponse
func() {
m.workingLock.Lock()
defer m.workingLock.Unlock()
for k, v := range m.workingTasks {
if id == k {
found = true
resp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
resp.State = v.GetState().GetStateCode()
resp.RowCount = v.GetState().GetRowCount()
resp.IdList = v.GetState().GetRowIds()
resp.Infos = append(resp.Infos, &commonpb.KeyValuePair{Key: FailedReason, Value: v.GetState().GetErrorMessage()})
break
if v, ok := m.workingTasks[tID]; ok {
found = true
resp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
resp.State = v.GetState().GetStateCode()
resp.RowCount = v.GetState().GetRowCount()
resp.IdList = v.GetState().GetRowIds()
resp.Infos = append(resp.Infos, &commonpb.KeyValuePair{
Key: FailedReason,
Value: v.GetState().GetErrorMessage(),
})
}
}()
if !found {
log.Debug("import manager get task state failed", zap.Int64("taskID", id))
if found {
return resp
}
log.Debug("get import task state failed", zap.Int64("taskID", tID))
return resp
}
// load Loads task info from Etcd when RootCoord (re)starts.
// load Loads task info from task store when RootCoord (re)starts.
func (m *importManager) load() error {
log.Info("Import manager starts loading from Etcd")
log.Info("import manager starts loading from Etcd")
_, v, err := m.taskStore.LoadWithPrefix(Params.RootCoordCfg.ImportTaskSubPath)
if err != nil {
log.Error("RootCoord Import manager failed to load from Etcd", zap.Error(err))
log.Error("import manager failed to load from Etcd", zap.Error(err))
return err
}
m.workingLock.Lock()
......@@ -385,17 +377,17 @@ func (m *importManager) load() error {
for i := range v {
ti := &datapb.ImportTaskInfo{}
if err := proto.Unmarshal([]byte(v[i]), ti); err != nil {
log.Error("Failed to unmarshal proto", zap.String("taskInfo", v[i]), zap.Error(err))
log.Error("failed to unmarshal proto", zap.String("taskInfo", v[i]), zap.Error(err))
// Ignore bad protos.
continue
}
// Put tasks back to pending or working task list, given their import states.
if ti.GetState().GetStateCode() == commonpb.ImportState_ImportPending {
log.Info("Task has been reloaded as a pending task", zap.Int64("TaskID", ti.Id))
log.Info("task has been reloaded as a pending task", zap.Int64("TaskID", ti.GetId()))
m.pendingTasks = append(m.pendingTasks, ti)
} else {
log.Info("Task has been reloaded as a working tasks", zap.Int64("TaskID", ti.Id))
m.workingTasks[ti.Id] = ti
log.Info("task has been reloaded as a working tasks", zap.Int64("TaskID", ti.GetId()))
m.workingTasks[ti.GetId()] = ti
}
}
return nil
......@@ -403,41 +395,51 @@ func (m *importManager) load() error {
// saveImportTask signs a lease and saves import task info into Etcd with this lease.
func (m *importManager) saveImportTask(task *datapb.ImportTaskInfo) error {
log.Info("Saving import task to Etcd", zap.Int64("Task ID", task.Id))
log.Debug("saving import task to Etcd", zap.Int64("Task ID", task.GetId()))
// TODO: Change default lease time and read it into config, once we figure out a proper value.
// Sign a lease.
leaseID, err := m.taskStore.Grant(10800) /*3 hours*/
if err != nil {
log.Error("Failed to grant lease from Etcd for data import.", zap.Int64("Task ID", task.Id), zap.Error(err))
log.Error("failed to grant lease from Etcd for data import",
zap.Int64("Task ID", task.GetId()),
zap.Error(err))
return err
}
log.Info("Lease granted for task", zap.Int64("Task ID", task.Id))
log.Debug("lease granted for task", zap.Int64("Task ID", task.GetId()))
var taskInfo []byte
if taskInfo, err = proto.Marshal(task); err != nil {
log.Error("Failed to marshall task proto", zap.Int64("Task ID", task.Id), zap.Error(err))
log.Error("failed to marshall task proto", zap.Int64("Task ID", task.GetId()), zap.Error(err))
return err
} else if err = m.taskStore.SaveWithLease(BuildImportTaskKey(task.Id), string(taskInfo), leaseID); err != nil {
log.Error("Failed to save import task info into Etcd", zap.Int64("Task ID", task.Id), zap.Error(err))
} else if err = m.taskStore.SaveWithLease(BuildImportTaskKey(task.GetId()), string(taskInfo), leaseID); err != nil {
log.Error("failed to save import task info into Etcd",
zap.Int64("task ID", task.GetId()),
zap.Error(err))
return err
}
log.Info("Task info successfully saved.", zap.Int64("Task ID", task.Id))
log.Debug("task info successfully saved", zap.Int64("Task ID", task.GetId()))
return nil
}
// updateImportTask updates the task info in Etcd according to task ID. It won't change the lease on the key.
func (m *importManager) updateImportTask(task *datapb.ImportTaskInfo) error {
log.Info("Updating import task.", zap.Int64("Task ID", task.Id))
if taskInfo, err := proto.Marshal(task); err != nil {
log.Error("Failed to marshall task proto.", zap.Int64("Task ID", task.Id), zap.Error(err))
func (m *importManager) updateImportTask(ti *datapb.ImportTaskInfo) error {
log.Debug("updating import task info in Etcd", zap.Int64("Task ID", ti.GetId()))
if taskInfo, err := proto.Marshal(ti); err != nil {
log.Error("failed to marshall task info proto", zap.Int64("Task ID", ti.GetId()), zap.Error(err))
return err
} else if err = m.taskStore.SaveWithIgnoreLease(BuildImportTaskKey(task.Id), string(taskInfo)); err != nil {
log.Error("Failed to update import task info in Etcd.", zap.Int64("Task ID", task.Id), zap.Error(err))
} else if err = m.taskStore.SaveWithIgnoreLease(BuildImportTaskKey(ti.GetId()), string(taskInfo)); err != nil {
log.Error("failed to update import task info info in Etcd", zap.Int64("Task ID", ti.GetId()), zap.Error(err))
return err
}
log.Info("Task info successfully updated.", zap.Int64("Task ID", task.Id))
log.Debug("task info successfully updated in Etcd", zap.Int64("Task ID", ti.GetId()))
return nil
}
// bringSegmentsOnline brings the segments online so that data in these segments become searchable.
func (m *importManager) bringSegmentsOnline(ti *datapb.ImportTaskInfo) {
log.Info("Bringing import tasks segments online!", zap.Int64("Task ID", ti.GetId()))
// TODO: Implement it.
}
// BuildImportTaskKey constructs and returns an Etcd key with given task ID.
func BuildImportTaskKey(taskID int64) string {
return fmt.Sprintf("%s%s%d", Params.RootCoordCfg.ImportTaskSubPath, delimiter, taskID)
......
......@@ -70,10 +70,11 @@ func TestImportManager_NewImportManager(t *testing.T) {
func TestImportManager_ImportJob(t *testing.T) {
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
colID := int64(100)
mockKv := &kv.MockMetaKV{}
mockKv.InMemKv = make(map[string]string)
mgr := newImportManager(context.TODO(), mockKv, nil)
resp := mgr.importJob(nil)
resp := mgr.importJob(nil, colID)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
rowReq := &milvuspb.ImportRequest{
......@@ -83,7 +84,7 @@ func TestImportManager_ImportJob(t *testing.T) {
Files: []string{"f1", "f2", "f3"},
}
resp = mgr.importJob(rowReq)
resp = mgr.importJob(rowReq, colID)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
colReq := &milvuspb.ImportRequest{
......@@ -108,12 +109,12 @@ func TestImportManager_ImportJob(t *testing.T) {
}
mgr = newImportManager(context.TODO(), mockKv, fn)
resp = mgr.importJob(rowReq)
resp = mgr.importJob(rowReq, colID)
assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks))
assert.Equal(t, 0, len(mgr.workingTasks))
mgr = newImportManager(context.TODO(), mockKv, fn)
resp = mgr.importJob(colReq)
resp = mgr.importJob(colReq, colID)
assert.Equal(t, 1, len(mgr.pendingTasks))
assert.Equal(t, 0, len(mgr.workingTasks))
......@@ -126,12 +127,12 @@ func TestImportManager_ImportJob(t *testing.T) {
}
mgr = newImportManager(context.TODO(), mockKv, fn)
resp = mgr.importJob(rowReq)
resp = mgr.importJob(rowReq, colID)
assert.Equal(t, 0, len(mgr.pendingTasks))
assert.Equal(t, len(rowReq.Files), len(mgr.workingTasks))
mgr = newImportManager(context.TODO(), mockKv, fn)
resp = mgr.importJob(colReq)
resp = mgr.importJob(colReq, colID)
assert.Equal(t, 0, len(mgr.pendingTasks))
assert.Equal(t, 1, len(mgr.workingTasks))
......@@ -153,13 +154,14 @@ func TestImportManager_ImportJob(t *testing.T) {
}
mgr = newImportManager(context.TODO(), mockKv, fn)
resp = mgr.importJob(rowReq)
resp = mgr.importJob(rowReq, colID)
assert.Equal(t, len(rowReq.Files)-2, len(mgr.pendingTasks))
assert.Equal(t, 2, len(mgr.workingTasks))
}
func TestImportManager_TaskState(t *testing.T) {
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
colID := int64(100)
mockKv := &kv.MockMetaKV{}
mockKv.InMemKv = make(map[string]string)
fn := func(ctx context.Context, req *datapb.ImportTask) *datapb.ImportTaskResponse {
......@@ -178,12 +180,12 @@ func TestImportManager_TaskState(t *testing.T) {
}
mgr := newImportManager(context.TODO(), mockKv, fn)
mgr.importJob(rowReq)
mgr.importJob(rowReq, colID)
state := &rootcoordpb.ImportResult{
TaskId: 10000,
}
err := mgr.updateTaskState(state)
_, err := mgr.updateTaskState(state)
assert.NotNil(t, err)
state = &rootcoordpb.ImportResult{
......@@ -201,8 +203,16 @@ func TestImportManager_TaskState(t *testing.T) {
},
},
}
err = mgr.updateTaskState(state)
assert.Nil(t, err)
ti, err := mgr.updateTaskState(state)
assert.NoError(t, err)
assert.Equal(t, int64(1), ti.GetId())
assert.Equal(t, int64(100), ti.GetCollectionId())
assert.Equal(t, int64(100), ti.GetCollectionId())
assert.Equal(t, int64(0), ti.GetPartitionId())
assert.Equal(t, true, ti.GetRowBased())
assert.Equal(t, []string{"f2"}, ti.GetFiles())
assert.Equal(t, commonpb.ImportState_ImportCompleted, ti.GetState().GetStateCode())
assert.Equal(t, int64(1000), ti.GetState().GetRowCount())
resp := mgr.getTaskState(10000)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
......
......@@ -84,7 +84,11 @@ func metricProxy(v int64) string {
return fmt.Sprintf("client_%d", v)
}
var Params paramtable.ComponentParam
var (
Params paramtable.ComponentParam
CheckCompleteIndexInterval = 3 * time.Minute
TaskTimeLimit = 3 * time.Hour
)
// Core root coordinator core
type Core struct {
......@@ -833,13 +837,13 @@ func (c *Core) SetIndexCoord(s types.IndexCoord) error {
log.Error("RootCoord failed to get index states from IndexCoord.", zap.Error(err))
return nil, err
}
log.Debug("got index states", zap.String("get index state result", res.String()))
if res.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("Get index states failed.",
zap.String("error_code", res.GetStatus().GetErrorCode().String()),
zap.String("reason", res.GetStatus().GetReason()))
return nil, fmt.Errorf(res.GetStatus().GetErrorCode().String())
}
log.Debug("Successfully got index states.")
return res.GetStates(), nil
}
return nil
......@@ -2230,7 +2234,7 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (
return succStatus(), nil
}
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
// Import imports large files (json, numpy, etc.) on MinIO/S3 storage into Milvus storage.
func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.ImportResponse{
......@@ -2238,11 +2242,25 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
}, nil
}
log.Info("receive import request")
resp := c.importManager.importJob(req)
// Get collection/partition ID from collection/partition name.
var cID int64
var ok bool
if cID, ok = c.MetaTable.collName2ID[req.GetCollectionName()]; !ok {
log.Error("failed to find collection ID for collection name",
zap.String("collection name", req.GetCollectionName()))
return nil, fmt.Errorf("collection ID not found for collection name %s", req.GetCollectionName())
}
log.Info("receive import request",
zap.String("collection name", req.GetCollectionName()),
zap.Int64("collection ID", cID),
zap.String("partition name", req.GetPartitionName()),
zap.Int("# of files = ", len(req.GetFiles())),
)
resp := c.importManager.importJob(req, cID)
return resp, nil
}
// TODO: Implement this.
// Check import task state from datanode
func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
if code, ok := c.checkHealthy(); !ok {
......@@ -2261,21 +2279,39 @@ func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateR
return resp, nil
}
// Report impot task state to rootcoord
// ReportImport reports import task state to RootCoord.
func (c *Core) ReportImport(ctx context.Context, req *rootcoordpb.ImportResult) (*commonpb.Status, error) {
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
log.Info("receive import state report")
err := c.importManager.updateTaskState(req)
// Upon receiving ReportImport request, update the related task's state in task store.
ti, err := c.importManager.updateTaskState(req)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure,
Reason: err.Error(),
}, nil
}
// Reverse look up collection name on collection ID.
var colName string
for k, v := range c.MetaTable.collName2ID {
if v == ti.GetCollectionId() {
colName = k
}
}
if colName == "" {
log.Error("Collection name not found for collection ID", zap.Int64("collection ID", ti.GetCollectionId()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_CollectionNameNotFound,
Reason: "Collection name not found for collection ID" + strconv.FormatInt(ti.GetCollectionId(), 10),
}, nil
}
// Start a loop to check segments' index states periodically.
c.wg.Add(1)
go c.CheckCompleteIndexLoop(ctx, ti, colName, req.Segments)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
......@@ -2300,7 +2336,7 @@ func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, co
if err != nil {
return 0, err
}
log.Debug("Got index description", zap.String("index_description", indexDescriptionResp.String()))
log.Debug("got index description", zap.String("index_description", indexDescriptionResp.String()))
// Check if the target index name exists.
matchIndexID := int64(-1)
......@@ -2315,6 +2351,7 @@ func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, co
if !foundIndexID {
return 0, fmt.Errorf("no index is created")
}
log.Debug("found match index ID", zap.Int64("match index ID", matchIndexID))
getIndexStatesRequest := &indexpb.GetIndexStatesRequest{
IndexBuildIDs: make([]UniqueID, 0),
......@@ -2331,6 +2368,9 @@ func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, co
}
segmentDesc, err := c.DescribeSegment(ctx, describeSegmentRequest)
if err != nil {
log.Error("Failed to describe segment",
zap.Int64("collection ID", collectionID),
zap.Int64("segment ID", segmentID))
return 0, err
}
if segmentDesc.IndexID == matchIndexID {
......@@ -2339,16 +2379,15 @@ func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, co
}
}
}
log.Debug("Proxy GetIndexState", zap.Int("IndexBuildIDs", len(getIndexStatesRequest.IndexBuildIDs)), zap.Error(err))
log.Debug("proxy GetIndexState", zap.Int("# of IndexBuildIDs", len(getIndexStatesRequest.IndexBuildIDs)), zap.Error(err))
// Return early on empty results.
if len(getIndexStatesRequest.IndexBuildIDs) == 0 {
log.Info("Empty index build IDs returned.", zap.String("collection name", collectionName), zap.Int64("collection ID", collectionID))
log.Info("empty index build IDs returned", zap.String("collection name", collectionName), zap.Int64("collection ID", collectionID))
return 0, nil
}
states, err := c.CallGetIndexStatesService(ctx, getIndexStatesRequest.IndexBuildIDs)
if err != nil {
log.Error("Failed to get index state in checkSegmentIndexStates.", zap.Error(err))
log.Error("failed to get index state in checkSegmentIndexStates", zap.Error(err))
return 0, err
}
......@@ -2359,7 +2398,7 @@ func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, co
ct++
}
}
log.Info("Segment indexing state checked.",
log.Info("segment indexing state checked",
zap.Int("# of checked segment", len(states)),
zap.Int("# of segments with complete index", ct),
zap.String("collection name", collectionName),
......@@ -2367,3 +2406,50 @@ func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, co
)
return ct, nil
}
// CheckCompleteIndexLoop checks index build states for an import task's segments and bring these segments online when
// the criteria are met. CheckCompleteIndexLoop does the check every CheckCompleteIndexInterval and exits if:
// (1) a certain percent of indices are built, (2) when context is done or (3) when the task is expired.
func (c *Core) CheckCompleteIndexLoop(ctx context.Context, ti *datapb.ImportTaskInfo, colName string, segIDs []UniqueID) {
defer c.wg.Done()
ticker := time.NewTicker(CheckCompleteIndexInterval)
spent := time.Duration(time.Unix(time.Now().Unix()-ti.GetCreateTs(), 0).Nanosecond())
log.Info("reporting task time left",
zap.Int64("task ID", ti.GetId()),
zap.Int64("minutes remaining", int64((TaskTimeLimit-spent).Minutes())))
// TODO: Replace with real task time limit.
expireTicker := time.NewTicker(TaskTimeLimit - spent)
for {
select {
case <-c.ctx.Done():
log.Info("(in loop)context done, exiting CheckCompleteIndexLoop", zap.Int64("task ID", ti.GetId()))
return
case <-ticker.C:
log.Info("(in loop)check segments' index states", zap.Int64("task ID", ti.GetId()))
if ct, err := c.CountCompleteIndex(ctx, colName, ti.GetCollectionId(), segIDs); err == nil &&
segmentsOnlineReady(ct, len(segIDs)) {
log.Info("(in loop)segment indices are ready",
zap.Int64("task ID", ti.GetId()),
zap.Int("total # of segments", len(segIDs)),
zap.Int("# of segments with index ready", ct))
c.importManager.bringSegmentsOnline(ti)
return
}
case <-expireTicker.C:
log.Info("(in loop)task has expired, stop waiting for segment results", zap.Int64("task ID", ti.GetId()))
return
}
}
}
// segmentsOnlineReady returns true if segments are ready to go up online (a.k.a. searchable).
func segmentsOnlineReady(idxBuilt, segCount int) bool {
// Consider segments are ready when:
// (1) all but up to 2 segments have indices ready, or
// (2) over 85% of segments have indices ready.
if segCount-idxBuilt <= 2 || float64(idxBuilt)/float64(segCount) > 0.85 {
return true
}
return false
}
......@@ -27,6 +27,8 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/kv"
......@@ -61,6 +63,8 @@ const (
returnUnsuccessfulStatus = "ReturnUnsuccessfulStatus"
)
var disabledIndexBuildID []int64
type ctxKey struct{}
type proxyMock struct {
......@@ -264,8 +268,10 @@ func (idx *indexMock) getFileArray() []string {
func (idx *indexMock) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) {
v := ctx.Value(ctxKey{}).(string)
if v == returnError {
log.Debug("(testing) simulating injected error")
return nil, fmt.Errorf("injected error")
} else if v == returnUnsuccessfulStatus {
log.Debug("(testing) simulating unsuccessful status")
return &indexpb.GetIndexStatesResponse{
Status: &commonpb.Status{
ErrorCode: 100,
......@@ -279,10 +285,23 @@ func (idx *indexMock) GetIndexStates(ctx context.Context, req *indexpb.GetIndexS
Reason: "all good",
},
}
for range req.IndexBuildIDs {
resp.States = append(resp.States, &indexpb.IndexInfo{
State: commonpb.IndexState_Finished,
})
log.Debug(fmt.Sprint("(testing) getting index state for index build IDs:", req.IndexBuildIDs))
log.Debug(fmt.Sprint("(testing) banned index build IDs:", disabledIndexBuildID))
for _, id := range req.IndexBuildIDs {
ban := false
for _, disabled := range disabledIndexBuildID {
if disabled == id {
ban = true
resp.States = append(resp.States, &indexpb.IndexInfo{
State: commonpb.IndexState_InProgress,
})
}
}
if !ban {
resp.States = append(resp.States, &indexpb.IndexInfo{
State: commonpb.IndexState_Finished,
})
}
}
return resp, nil
}
......@@ -605,6 +624,9 @@ func TestRootCoord_Base(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
CheckCompleteIndexInterval = 100 * time.Millisecond
TaskTimeLimit = 200 * time.Millisecond
coreFactory := msgstream.NewPmsFactory()
Params.Init()
Params.RootCoordCfg.DmlChannelNum = TestDMLChannelNum
......@@ -1063,7 +1085,7 @@ func TestRootCoord_Base(t *testing.T) {
assert.Nil(t, err)
partID := coll.PartitionIDs[1]
dm.mu.Lock()
dm.segs = []typeutil.UniqueID{1000, 1001, 1002}
dm.segs = []typeutil.UniqueID{1000, 1001, 1002, 1003, 1004, 1005}
dm.mu.Unlock()
req := &milvuspb.ShowSegmentsRequest{
......@@ -1073,7 +1095,7 @@ func TestRootCoord_Base(t *testing.T) {
Timestamp: 170,
SourceID: 170,
},
CollectionID: coll.ID,
CollectionID: coll.GetID(),
PartitionID: partID,
}
rsp, err := core.ShowSegments(ctx, req)
......@@ -1082,7 +1104,10 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, int64(1000), rsp.SegmentIDs[0])
assert.Equal(t, int64(1001), rsp.SegmentIDs[1])
assert.Equal(t, int64(1002), rsp.SegmentIDs[2])
assert.Equal(t, 3, len(rsp.SegmentIDs))
assert.Equal(t, int64(1003), rsp.SegmentIDs[3])
assert.Equal(t, int64(1004), rsp.SegmentIDs[4])
assert.Equal(t, int64(1005), rsp.SegmentIDs[5])
assert.Equal(t, 6, len(rsp.SegmentIDs))
})
wg.Add(1)
......@@ -1114,9 +1139,12 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode)
time.Sleep(100 * time.Millisecond)
files := im.getFileArray()
assert.Equal(t, 3*3, len(files))
assert.Equal(t, 6*3, len(files))
assert.ElementsMatch(t, files,
[]string{"file0-100", "file1-100", "file2-100",
"file0-100", "file1-100", "file2-100",
"file0-100", "file1-100", "file2-100",
"file0-100", "file1-100", "file2-100",
"file0-100", "file1-100", "file2-100",
"file0-100", "file1-100", "file2-100"})
collMeta, err = core.MetaTable.GetCollectionByName(collName, 0)
......@@ -1262,6 +1290,127 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, Params.CommonCfg.DefaultIndexName, rsp.IndexDescriptions[0].IndexName)
})
wg.Add(1)
t.Run("import", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.ImportRequest{
CollectionName: collName,
PartitionName: partName,
RowBased: true,
Files: []string{"f1", "f2", "f3"},
}
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.NoError(t, err)
core.MetaTable.collName2ID[collName] = coll.GetID()
rsp, err := core.Import(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
})
wg.Add(1)
t.Run("import w/ collection ID not found", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.ImportRequest{
CollectionName: "bad name",
PartitionName: partName,
RowBased: true,
Files: []string{"f1", "f2", "f3"},
}
_, err := core.Import(ctx, req)
assert.Error(t, err)
})
wg.Add(1)
t.Run("get import state", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.GetImportStateRequest{
Task: 0,
}
rsp, err := core.GetImportState(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
})
wg.Add(1)
t.Run("report import task timeout", func(t *testing.T) {
defer wg.Done()
coll, err := core.MetaTable.GetCollectionByName(collName, 0)
assert.Nil(t, err)
req := &rootcoordpb.ImportResult{
TaskId: 1,
RowCount: 100,
Segments: []int64{1003, 1004, 1005},
}
for _, segmentID := range []int64{1003, 1004, 1005} {
describeSegmentRequest := &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeSegment,
},
CollectionID: coll.ID,
SegmentID: segmentID,
}
segDesc, err := core.DescribeSegment(ctx, describeSegmentRequest)
assert.NoError(t, err)
disabledIndexBuildID = append(disabledIndexBuildID, segDesc.BuildID)
}
rsp, err := core.ReportImport(context.WithValue(ctx, ctxKey{}, ""), req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode)
time.Sleep(500 * time.Millisecond)
})
wg.Add(1)
t.Run("report import update import task fail", func(t *testing.T) {
defer wg.Done()
// Case where report import request is nil.
resp, err := core.ReportImport(context.WithValue(ctx, ctxKey{}, ""), nil)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UpdateImportTaskFailure, resp.ErrorCode)
})
wg.Add(1)
t.Run("report import collection name not found", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.ImportRequest{
CollectionName: "new" + collName,
PartitionName: partName,
RowBased: true,
Files: []string{"f1", "f2", "f3"},
}
core.MetaTable.collName2ID["new"+collName] = 123
rsp, err := core.Import(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
delete(core.MetaTable.collName2ID, "new"+collName)
reqIR := &rootcoordpb.ImportResult{
TaskId: 3,
RowCount: 100,
Segments: []int64{1003, 1004, 1005},
}
// Case where report import request is nil.
resp, err := core.ReportImport(context.WithValue(ctx, ctxKey{}, ""), reqIR)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_CollectionNameNotFound, resp.ErrorCode)
})
wg.Add(1)
t.Run("report import segments online ready", func(t *testing.T) {
defer wg.Done()
req := &rootcoordpb.ImportResult{
TaskId: 0,
RowCount: 100,
Segments: []int64{1000, 1001, 1002},
}
resp, err := core.ReportImport(context.WithValue(ctx, ctxKey{}, ""), req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
time.Sleep(500 * time.Millisecond)
})
wg.Add(1)
t.Run("over ride index", func(t *testing.T) {
defer wg.Done()
......@@ -2145,43 +2294,6 @@ func TestRootCoord_Base(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
wg.Add(1)
t.Run("import", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.ImportRequest{
CollectionName: "c1",
PartitionName: "p1",
RowBased: true,
Files: []string{"f1", "f2", "f3"},
}
rsp, err := core.Import(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
})
wg.Add(1)
t.Run("get import state", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.GetImportStateRequest{
Task: 0,
}
rsp, err := core.GetImportState(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.Status.ErrorCode)
})
wg.Add(1)
t.Run("report import", func(t *testing.T) {
defer wg.Done()
req := &rootcoordpb.ImportResult{
TaskId: 0,
RowCount: 100,
}
rsp, err := core.ReportImport(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, rsp.ErrorCode)
})
wg.Add(1)
t.Run("get system info", func(t *testing.T) {
defer wg.Done()
......@@ -2418,6 +2530,7 @@ func TestRootCoord_Base(t *testing.T) {
assert.Nil(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, p2.Status.ErrorCode)
})
wg.Wait()
err = core.Stop()
assert.Nil(t, err)
......
......@@ -283,7 +283,7 @@ func (t *timetickSync) initSessions(sess []*sessionutil.Session) {
}
}
// StartWatch watch session change and process all channels' timetick msg
// StartWatch watches on session changes and processes timeTick messages of all channels.
func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
defer wg.Done()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册