未验证 提交 442c73a5 编写于 作者: T Ten Thousand Leaves 提交者: GitHub

Support passing channel names to DataNode (#16424)

/kind feature

issue: #15604
Signed-off-by: NYuchen Gao <yuchen.gao@zilliz.com>
上级 88d11b95
......@@ -287,54 +287,54 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"dUsersFailure\020!\022\022\n\rDDRequestRace\020\350\007*X\n\nI"
"ndexState\022\022\n\016IndexStateNone\020\000\022\014\n\010Unissue"
"d\020\001\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n\n\006Fai"
"led\020\004*s\n\014SegmentState\022\024\n\020SegmentStateNon"
"e\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022\n\n\006Sealed"
"\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013\n\007Dropped"
"\020\006*\261\n\n\007MsgType\022\r\n\tUndefined\020\000\022\024\n\020CreateC"
"ollection\020d\022\022\n\016DropCollection\020e\022\021\n\rHasCo"
"llection\020f\022\026\n\022DescribeCollection\020g\022\023\n\017Sh"
"owCollections\020h\022\024\n\020GetSystemConfigs\020i\022\022\n"
"\016LoadCollection\020j\022\025\n\021ReleaseCollection\020k"
"\022\017\n\013CreateAlias\020l\022\r\n\tDropAlias\020m\022\016\n\nAlte"
"rAlias\020n\022\024\n\017CreatePartition\020\310\001\022\022\n\rDropPa"
"rtition\020\311\001\022\021\n\014HasPartition\020\312\001\022\026\n\021Describ"
"ePartition\020\313\001\022\023\n\016ShowPartitions\020\314\001\022\023\n\016Lo"
"adPartitions\020\315\001\022\026\n\021ReleasePartitions\020\316\001\022"
"\021\n\014ShowSegments\020\372\001\022\024\n\017DescribeSegment\020\373\001"
"\022\021\n\014LoadSegments\020\374\001\022\024\n\017ReleaseSegments\020\375"
"\001\022\024\n\017HandoffSegments\020\376\001\022\030\n\023LoadBalanceSe"
"gments\020\377\001\022\025\n\020DescribeSegments\020\200\002\022\020\n\013Crea"
"teIndex\020\254\002\022\022\n\rDescribeIndex\020\255\002\022\016\n\tDropIn"
"dex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006Delete\020\221\003\022\n\n\005Flus"
"h\020\222\003\022\013\n\006Search\020\364\003\022\021\n\014SearchResult\020\365\003\022\022\n\r"
"GetIndexState\020\366\003\022\032\n\025GetIndexBuildProgres"
"s\020\367\003\022\034\n\027GetCollectionStatistics\020\370\003\022\033\n\026Ge"
"tPartitionStatistics\020\371\003\022\r\n\010Retrieve\020\372\003\022\023"
"\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmChannels\020\374"
"\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022WatchQueryCh"
"annels\020\376\003\022\030\n\023RemoveQueryChannels\020\377\003\022\035\n\030S"
"ealedSegmentsChangeInfo\020\200\004\022\027\n\022WatchDelta"
"Channels\020\201\004\022\020\n\013SegmentInfo\020\330\004\022\017\n\nSystemI"
"nfo\020\331\004\022\024\n\017GetRecoveryInfo\020\332\004\022\024\n\017GetSegme"
"ntState\020\333\004\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNodeSt"
"ats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tRequestID\020\263\t\022\017"
"\n\nRequestTSO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022\026\n"
"\021SegmentStatistics\020\266\t\022\025\n\020SegmentFlushDon"
"e\020\267\t\022\017\n\nDataNodeTt\020\270\t\022\025\n\020CreateCredentia"
"l\020\334\013\022\022\n\rGetCredential\020\335\013\022\025\n\020DeleteCreden"
"tial\020\336\013\022\025\n\020UpdateCredential\020\337\013\022\026\n\021ListCr"
"edUsernames\020\340\013*\"\n\007DslType\022\007\n\003Dsl\020\000\022\016\n\nBo"
"olExprV1\020\001*B\n\017CompactionState\022\021\n\rUndefie"
"dState\020\000\022\r\n\tExecuting\020\001\022\r\n\tCompleted\020\002*X"
"\n\020ConsistencyLevel\022\n\n\006Strong\020\000\022\013\n\007Sessio"
"n\020\001\022\013\n\007Bounded\020\002\022\016\n\nEventually\020\003\022\016\n\nCust"
"omized\020\004*\227\001\n\013ImportState\022\021\n\rImportPendin"
"g\020\000\022\020\n\014ImportFailed\020\001\022\021\n\rImportStarted\020\002"
"\022\024\n\020ImportDownloaded\020\003\022\020\n\014ImportParsed\020\004"
"\022\023\n\017ImportPersisted\020\005\022\023\n\017ImportCompleted"
"\020\006BW\n\016io.milvus.grpcB\013CommonProtoP\001Z3git"
"hub.com/milvus-io/milvus/internal/proto/"
"commonpb\240\001\001b\006proto3"
"led\020\004*\202\001\n\014SegmentState\022\024\n\020SegmentStateNo"
"ne\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022\n\n\006Seale"
"d\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005\022\013\n\007Droppe"
"d\020\006\022\r\n\tImporting\020\007*\261\n\n\007MsgType\022\r\n\tUndefi"
"ned\020\000\022\024\n\020CreateCollection\020d\022\022\n\016DropColle"
"ction\020e\022\021\n\rHasCollection\020f\022\026\n\022DescribeCo"
"llection\020g\022\023\n\017ShowCollections\020h\022\024\n\020GetSy"
"stemConfigs\020i\022\022\n\016LoadCollection\020j\022\025\n\021Rel"
"easeCollection\020k\022\017\n\013CreateAlias\020l\022\r\n\tDro"
"pAlias\020m\022\016\n\nAlterAlias\020n\022\024\n\017CreatePartit"
"ion\020\310\001\022\022\n\rDropPartition\020\311\001\022\021\n\014HasPartiti"
"on\020\312\001\022\026\n\021DescribePartition\020\313\001\022\023\n\016ShowPar"
"titions\020\314\001\022\023\n\016LoadPartitions\020\315\001\022\026\n\021Relea"
"sePartitions\020\316\001\022\021\n\014ShowSegments\020\372\001\022\024\n\017De"
"scribeSegment\020\373\001\022\021\n\014LoadSegments\020\374\001\022\024\n\017R"
"eleaseSegments\020\375\001\022\024\n\017HandoffSegments\020\376\001\022"
"\030\n\023LoadBalanceSegments\020\377\001\022\025\n\020DescribeSeg"
"ments\020\200\002\022\020\n\013CreateIndex\020\254\002\022\022\n\rDescribeIn"
"dex\020\255\002\022\016\n\tDropIndex\020\256\002\022\013\n\006Insert\020\220\003\022\013\n\006D"
"elete\020\221\003\022\n\n\005Flush\020\222\003\022\013\n\006Search\020\364\003\022\021\n\014Sea"
"rchResult\020\365\003\022\022\n\rGetIndexState\020\366\003\022\032\n\025GetI"
"ndexBuildProgress\020\367\003\022\034\n\027GetCollectionSta"
"tistics\020\370\003\022\033\n\026GetPartitionStatistics\020\371\003\022"
"\r\n\010Retrieve\020\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017W"
"atchDmChannels\020\374\003\022\025\n\020RemoveDmChannels\020\375\003"
"\022\027\n\022WatchQueryChannels\020\376\003\022\030\n\023RemoveQuery"
"Channels\020\377\003\022\035\n\030SealedSegmentsChangeInfo\020"
"\200\004\022\027\n\022WatchDeltaChannels\020\201\004\022\020\n\013SegmentIn"
"fo\020\330\004\022\017\n\nSystemInfo\020\331\004\022\024\n\017GetRecoveryInf"
"o\020\332\004\022\024\n\017GetSegmentState\020\333\004\022\r\n\010TimeTick\020\260"
"\t\022\023\n\016QueryNodeStats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016"
"\n\tRequestID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017Alloc"
"ateSegment\020\265\t\022\026\n\021SegmentStatistics\020\266\t\022\025\n"
"\020SegmentFlushDone\020\267\t\022\017\n\nDataNodeTt\020\270\t\022\025\n"
"\020CreateCredential\020\334\013\022\022\n\rGetCredential\020\335\013"
"\022\025\n\020DeleteCredential\020\336\013\022\025\n\020UpdateCredent"
"ial\020\337\013\022\026\n\021ListCredUsernames\020\340\013*\"\n\007DslTyp"
"e\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017Compaction"
"State\022\021\n\rUndefiedState\020\000\022\r\n\tExecuting\020\001\022"
"\r\n\tCompleted\020\002*X\n\020ConsistencyLevel\022\n\n\006St"
"rong\020\000\022\013\n\007Session\020\001\022\013\n\007Bounded\020\002\022\016\n\nEven"
"tually\020\003\022\016\n\nCustomized\020\004*\227\001\n\013ImportState"
"\022\021\n\rImportPending\020\000\022\020\n\014ImportFailed\020\001\022\021\n"
"\rImportStarted\020\002\022\024\n\020ImportDownloaded\020\003\022\020"
"\n\014ImportParsed\020\004\022\023\n\017ImportPersisted\020\005\022\023\n"
"\017ImportCompleted\020\006BW\n\016io.milvus.grpcB\013Co"
"mmonProtoP\001Z3github.com/milvus-io/milvus"
"/internal/proto/commonpb\240\001\001b\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
};
......@@ -351,7 +351,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 3259,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 3275,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 8, 0,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 8, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
......@@ -438,6 +438,7 @@ bool SegmentState_IsValid(int value) {
case 4:
case 5:
case 6:
case 7:
return true;
default:
return false;
......
......@@ -192,12 +192,13 @@ enum SegmentState : int {
Flushed = 4,
Flushing = 5,
Dropped = 6,
Importing = 7,
SegmentState_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
SegmentState_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool SegmentState_IsValid(int value);
constexpr SegmentState SegmentState_MIN = SegmentStateNone;
constexpr SegmentState SegmentState_MAX = Dropped;
constexpr SegmentState SegmentState_MAX = Importing;
constexpr int SegmentState_ARRAYSIZE = SegmentState_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* SegmentState_descriptor();
......
此差异已折叠。
......@@ -13317,13 +13317,31 @@ class ImportRequest :
// accessors -------------------------------------------------------
enum : int {
kFilesFieldNumber = 4,
kOptionsFieldNumber = 5,
kChannelNamesFieldNumber = 3,
kFilesFieldNumber = 5,
kOptionsFieldNumber = 6,
kCollectionNameFieldNumber = 1,
kPartitionNameFieldNumber = 2,
kRowBasedFieldNumber = 3,
kRowBasedFieldNumber = 4,
};
// repeated string files = 4;
// repeated string channel_names = 3;
int channel_names_size() const;
void clear_channel_names();
const std::string& channel_names(int index) const;
std::string* mutable_channel_names(int index);
void set_channel_names(int index, const std::string& value);
void set_channel_names(int index, std::string&& value);
void set_channel_names(int index, const char* value);
void set_channel_names(int index, const char* value, size_t size);
std::string* add_channel_names();
void add_channel_names(const std::string& value);
void add_channel_names(std::string&& value);
void add_channel_names(const char* value);
void add_channel_names(const char* value, size_t size);
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>& channel_names() const;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>* mutable_channel_names();
// repeated string files = 5;
int files_size() const;
void clear_files();
const std::string& files(int index) const;
......@@ -13340,7 +13358,7 @@ class ImportRequest :
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>& files() const;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>* mutable_files();
// repeated .milvus.proto.common.KeyValuePair options = 5;
// repeated .milvus.proto.common.KeyValuePair options = 6;
int options_size() const;
void clear_options();
::milvus::proto::common::KeyValuePair* mutable_options(int index);
......@@ -13373,7 +13391,7 @@ class ImportRequest :
std::string* release_partition_name();
void set_allocated_partition_name(std::string* partition_name);
// bool row_based = 3;
// bool row_based = 4;
void clear_row_based();
bool row_based() const;
void set_row_based(bool value);
......@@ -13383,6 +13401,7 @@ class ImportRequest :
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string> channel_names_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string> files_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::common::KeyValuePair > options_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr collection_name_;
......@@ -26713,7 +26732,72 @@ inline void ImportRequest::set_allocated_partition_name(std::string* partition_n
// @@protoc_insertion_point(field_set_allocated:milvus.proto.milvus.ImportRequest.partition_name)
}
// bool row_based = 3;
// repeated string channel_names = 3;
inline int ImportRequest::channel_names_size() const {
return channel_names_.size();
}
inline void ImportRequest::clear_channel_names() {
channel_names_.Clear();
}
inline const std::string& ImportRequest::channel_names(int index) const {
// @@protoc_insertion_point(field_get:milvus.proto.milvus.ImportRequest.channel_names)
return channel_names_.Get(index);
}
inline std::string* ImportRequest::mutable_channel_names(int index) {
// @@protoc_insertion_point(field_mutable:milvus.proto.milvus.ImportRequest.channel_names)
return channel_names_.Mutable(index);
}
inline void ImportRequest::set_channel_names(int index, const std::string& value) {
// @@protoc_insertion_point(field_set:milvus.proto.milvus.ImportRequest.channel_names)
channel_names_.Mutable(index)->assign(value);
}
inline void ImportRequest::set_channel_names(int index, std::string&& value) {
// @@protoc_insertion_point(field_set:milvus.proto.milvus.ImportRequest.channel_names)
channel_names_.Mutable(index)->assign(std::move(value));
}
inline void ImportRequest::set_channel_names(int index, const char* value) {
GOOGLE_DCHECK(value != nullptr);
channel_names_.Mutable(index)->assign(value);
// @@protoc_insertion_point(field_set_char:milvus.proto.milvus.ImportRequest.channel_names)
}
inline void ImportRequest::set_channel_names(int index, const char* value, size_t size) {
channel_names_.Mutable(index)->assign(
reinterpret_cast<const char*>(value), size);
// @@protoc_insertion_point(field_set_pointer:milvus.proto.milvus.ImportRequest.channel_names)
}
inline std::string* ImportRequest::add_channel_names() {
// @@protoc_insertion_point(field_add_mutable:milvus.proto.milvus.ImportRequest.channel_names)
return channel_names_.Add();
}
inline void ImportRequest::add_channel_names(const std::string& value) {
channel_names_.Add()->assign(value);
// @@protoc_insertion_point(field_add:milvus.proto.milvus.ImportRequest.channel_names)
}
inline void ImportRequest::add_channel_names(std::string&& value) {
channel_names_.Add(std::move(value));
// @@protoc_insertion_point(field_add:milvus.proto.milvus.ImportRequest.channel_names)
}
inline void ImportRequest::add_channel_names(const char* value) {
GOOGLE_DCHECK(value != nullptr);
channel_names_.Add()->assign(value);
// @@protoc_insertion_point(field_add_char:milvus.proto.milvus.ImportRequest.channel_names)
}
inline void ImportRequest::add_channel_names(const char* value, size_t size) {
channel_names_.Add()->assign(reinterpret_cast<const char*>(value), size);
// @@protoc_insertion_point(field_add_pointer:milvus.proto.milvus.ImportRequest.channel_names)
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>&
ImportRequest::channel_names() const {
// @@protoc_insertion_point(field_list:milvus.proto.milvus.ImportRequest.channel_names)
return channel_names_;
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField<std::string>*
ImportRequest::mutable_channel_names() {
// @@protoc_insertion_point(field_mutable_list:milvus.proto.milvus.ImportRequest.channel_names)
return &channel_names_;
}
// bool row_based = 4;
inline void ImportRequest::clear_row_based() {
row_based_ = false;
}
......@@ -26727,7 +26811,7 @@ inline void ImportRequest::set_row_based(bool value) {
// @@protoc_insertion_point(field_set:milvus.proto.milvus.ImportRequest.row_based)
}
// repeated string files = 4;
// repeated string files = 5;
inline int ImportRequest::files_size() const {
return files_.size();
}
......@@ -26792,7 +26876,7 @@ ImportRequest::mutable_files() {
return &files_;
}
// repeated .milvus.proto.common.KeyValuePair options = 5;
// repeated .milvus.proto.common.KeyValuePair options = 6;
inline int ImportRequest::options_size() const {
return options_.size();
}
......@@ -1017,14 +1017,14 @@ func (s *Server) Import(ctx context.Context, itr *datapb.ImportTaskRequest) (*da
log.Info("picking a free dataNode",
zap.Any("all dataNodes", nodes),
zap.Int64("picking free dataNode with ID", dnID))
s.cluster.Import(ctx, dnID, itr)
s.cluster.Import(s.ctx, dnID, itr)
} else {
// No DataNodes are available, choose a still working DataNode randomly.
dnID := nodes[rand.Intn(len(nodes))]
log.Info("all dataNodes are busy, picking a random dataNode still",
zap.Any("all dataNodes", nodes),
zap.Int64("picking dataNode with ID", dnID))
s.cluster.Import(ctx, dnID, itr)
s.cluster.Import(s.ctx, dnID, itr)
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success
......
......@@ -768,7 +768,12 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*commonpb.Status, error) {
log.Info("receive import request")
log.Info("receive import request",
zap.Int64("task ID", req.GetImportTask().GetTaskId()),
zap.Int64("collection ID", req.GetImportTask().GetCollectionId()),
zap.Int64("partition ID", req.GetImportTask().GetPartitionId()),
zap.Any("channel names", req.GetImportTask().GetChannelNames()),
zap.Any("working dataNodes", req.WorkingNodes))
if !node.isHealthy() {
log.Warn("DataNode.Import failed",
......
......@@ -63,6 +63,7 @@ enum SegmentState {
Flushed = 4;
Flushing = 5;
Dropped = 6;
Importing = 7;
}
message Status {
......
......@@ -241,6 +241,7 @@ message SaveBinlogPathsRequest {
repeated FieldBinlog field2StatslogPaths = 8;
repeated FieldBinlog deltalogs = 9;
bool dropped = 10;
bool importing = 11;
}
message CheckPoint {
......@@ -433,10 +434,11 @@ message ImportTask {
common.Status status = 1;
int64 collection_id = 2; // target collection ID
int64 partition_id = 3; // target partition ID
bool row_based = 4; // the file is row-based or column-based
int64 task_id = 5; // id of the task
repeated string files = 6; // file paths to be imported
repeated common.KeyValuePair infos = 7; // more informations about the task, bucket, etc.
repeated string channel_names = 4; // target channel names of the collection.
bool row_based = 5; // the file is row-based or column-based
int64 task_id = 6; // id of the task
repeated string files = 7; // file paths to be imported
repeated common.KeyValuePair infos = 8; // extra information about the task, bucket, etc.
}
message ImportTaskState {
......@@ -448,16 +450,17 @@ message ImportTaskState {
}
message ImportTaskInfo {
int64 id = 1; // Task ID.
int64 request_id = 2; // Request ID of the import task.
int64 datanode_id = 3; // ID of DataNode that processes the task.
int64 collection_id = 4; // Collection ID for the import task.
int64 partition_id = 5; // Partition ID for the import task.
string bucket = 6; // Bucket for the import task.
bool row_based = 7; // Boolean indicating whether import files are row-based or column-based.
repeated string files = 8; // A list of files to import.
int64 create_ts = 9; // Timestamp when the import task is created.
ImportTaskState state = 10; // State of the import task.
int64 id = 1; // Task ID.
int64 request_id = 2; // Request ID of the import task.
int64 datanode_id = 3; // ID of DataNode that processes the task.
int64 collection_id = 4; // Collection ID for the import task.
int64 partition_id = 5; // Partition ID for the import task.
repeated string channel_names = 6; // Names of channels for the collection.
string bucket = 7; // Bucket for the import task.
bool row_based = 8; // Boolean indicating whether import files are row-based or column-based.
repeated string files = 9; // A list of files to import.
int64 create_ts = 10; // Timestamp when the import task is created.
ImportTaskState state = 11; // State of the import task.
}
message ImportTaskResponse {
......
......@@ -804,11 +804,12 @@ message GetFlushStateResponse {
}
message ImportRequest {
string collection_name = 1; // target collection
string partition_name = 2; // target partition
bool row_based = 3; // the file is row-based or column-based
repeated string files = 4; // file paths to be imported
repeated common.KeyValuePair options = 5; // import options, bucket, etc.
string collection_name = 1; // target collection
string partition_name = 2; // target partition
repeated string channel_names = 3; // channel names for the collection
bool row_based = 4; // the file is row-based or column-based
repeated string files = 5; // file paths to be imported
repeated common.KeyValuePair options = 6; // import options, bucket, etc.
}
message ImportResponse {
......
......@@ -3921,14 +3921,42 @@ func unhealthyStatus() *commonpb.Status {
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
log.Info("received import request")
resp := &milvuspb.ImportResponse{}
resp := &milvuspb.ImportResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
return resp, nil
}
resp, err := node.rootCoord.Import(ctx, req)
log.Info("received import response", zap.String("collectionName", req.GetCollectionName()), zap.Any("resp", resp), zap.Error(err))
// Get collection ID and then channel names.
collID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
if err != nil {
log.Error("collection ID not found",
zap.String("collection name", req.GetCollectionName()),
zap.Error(err))
resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
resp.Status.Reason = err.Error()
return resp, err
}
chNames, err := node.chMgr.getVChannels(collID)
if err != nil {
log.Error("get vChannels failed",
zap.Int64("collection ID", collID),
zap.Error(err))
resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
resp.Status.Reason = err.Error()
return resp, err
}
req.ChannelNames = chNames
// Call rootCoord to finish import.
resp, err = node.rootCoord.Import(ctx, req)
log.Info("received import response",
zap.String("collection name", req.GetCollectionName()),
zap.Any("resp", resp),
zap.Error(err))
return resp, err
}
......
......@@ -1597,6 +1597,45 @@ func TestProxy(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
wg.Add(1)
t.Run("test import", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.ImportRequest{
CollectionName: collectionName,
Files: []string{"f1", "f2", "f3"},
}
proxy.stateCode.Store(internalpb.StateCode_Healthy)
resp, err := proxy.Import(context.TODO(), req)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Nil(t, err)
})
wg.Add(1)
t.Run("test import collection ID not found", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.ImportRequest{
CollectionName: "bad_collection_name",
Files: []string{"f1", "f2", "f3"},
}
proxy.stateCode.Store(internalpb.StateCode_Healthy)
resp, err := proxy.Import(context.TODO(), req)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
assert.Error(t, err)
})
wg.Add(1)
t.Run("test import get vChannel fail", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.ImportRequest{
CollectionName: "bad_collection_name",
Files: []string{"f1", "f2", "f3"},
}
proxy.stateCode.Store(internalpb.StateCode_Healthy)
resp, err := proxy.Import(context.TODO(), req)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
assert.Error(t, err)
})
wg.Add(1)
t.Run("release collection", func(t *testing.T) {
defer wg.Done()
......@@ -1610,6 +1649,7 @@ func TestProxy(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, "", resp.Reason)
// release dql message stream
resp, err = proxy.ReleaseDQLMessageStream(ctx, &proxypb.ReleaseDQLMessageStreamRequest{
......@@ -1619,6 +1659,7 @@ func TestProxy(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, "", resp.Reason)
})
wg.Add(1)
......@@ -1881,6 +1922,7 @@ func TestProxy(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, "", resp.Reason)
// invalidate meta cache
resp, err = proxy.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{
......@@ -2933,38 +2975,66 @@ func TestProxy_GetComponentStates_state_code(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, states.Status.ErrorCode)
}
func TestProxy__Import(t *testing.T) {
req := &milvuspb.ImportRequest{
CollectionName: "dummy",
}
rootCoord := &RootCoordMock{}
rootCoord.state.Store(internalpb.StateCode_Healthy)
t.Run("test import", func(t *testing.T) {
proxy := &Proxy{rootCoord: rootCoord}
func TestProxy_Import(t *testing.T) {
rc := NewRootCoordMock()
rc.Start()
defer rc.Stop()
err := InitMetaCache(rc)
assert.NoError(t, err)
rc.CreateCollection(context.TODO(), &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropCollection,
MsgID: 100,
Timestamp: 100,
},
CollectionName: "import_collection",
})
localMsg := true
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory := dependency.NewDefaultFactory(localMsg)
proxy, err := NewProxy(ctx, factory)
assert.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
t.Run("test import get vChannel failed", func(t *testing.T) {
defer wg.Done()
proxy.stateCode.Store(internalpb.StateCode_Healthy)
resp, err := proxy.Import(context.TODO(), req)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Nil(t, err)
proxy.chMgr = newChannelsMgrImpl(nil, nil, nil, nil, nil)
resp, err := proxy.Import(context.TODO(),
&milvuspb.ImportRequest{
CollectionName: "import_collection",
})
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
assert.Error(t, err)
})
wg.Add(1)
t.Run("test import with unhealthy", func(t *testing.T) {
proxy := &Proxy{rootCoord: rootCoord}
defer wg.Done()
req := &milvuspb.ImportRequest{
CollectionName: "dummy",
}
proxy.stateCode.Store(internalpb.StateCode_Abnormal)
resp, err := proxy.Import(context.TODO(), req)
assert.EqualValues(t, unhealthyStatus(), resp.Status)
assert.Nil(t, err)
assert.NoError(t, err)
})
resp, err := rc.DropCollection(context.TODO(), &milvuspb.DropCollectionRequest{
CollectionName: "import_collection",
})
wg.Wait()
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
rc.Stop()
}
func TestProxy__GetImportState(t *testing.T) {
func TestProxy_GetImportState(t *testing.T) {
req := &milvuspb.GetImportStateRequest{
Task: 1,
}
rootCoord := &RootCoordMock{}
rootCoord.state.Store(internalpb.StateCode_Healthy)
t.Run("test get import state", func(t *testing.T) {
proxy := &Proxy{rootCoord: rootCoord}
proxy.stateCode.Store(internalpb.StateCode_Healthy)
......
......@@ -113,9 +113,11 @@ func (m *importManager) sendOutTasks(ctx context.Context) error {
if task.GetState().GetStateCode() == commonpb.ImportState_ImportFailed {
continue
}
// TODO: Use ImportTaskInfo directly.
it := &datapb.ImportTask{
CollectionId: task.GetCollectionId(),
PartitionId: task.GetPartitionId(),
ChannelNames: task.GetChannelNames(),
RowBased: task.GetRowBased(),
TaskId: task.GetId(),
Files: task.GetFiles(),
......@@ -146,7 +148,7 @@ func (m *importManager) sendOutTasks(ctx context.Context) error {
task.DatanodeId = resp.GetDatanodeId()
log.Debug("import task successfully assigned to DataNode",
zap.Int64("task ID", it.GetTaskId()),
zap.Int64("DataNode ID", task.GetDatanodeId()))
zap.Int64("dataNode ID", task.GetDatanodeId()))
// Add new working dataNode to busyNodes.
m.busyNodes[resp.GetDatanodeId()] = true
......@@ -252,6 +254,7 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
Id: m.nextTaskID,
RequestId: reqID,
CollectionId: cID,
ChannelNames: req.ChannelNames,
Bucket: bucket,
RowBased: req.GetRowBased(),
Files: []string{req.GetFiles()[i]},
......@@ -274,6 +277,7 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
Id: m.nextTaskID,
RequestId: reqID,
CollectionId: cID,
ChannelNames: req.ChannelNames,
Bucket: bucket,
RowBased: req.GetRowBased(),
Files: req.GetFiles(),
......
......@@ -611,7 +611,7 @@ type RootCoord interface {
// error is always nil
Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error)
// Check import task state from datanode
// GetImportState checks import task state from datanode
//
// ctx is the context to control request deadline and cancellation
// req contains the request params, including a task id
......@@ -621,7 +621,7 @@ type RootCoord interface {
// error is always nil
GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error)
// Report impot task state to rootcoord
// ReportImport reports import task state to rootCoord
//
// ctx is the context to control request deadline and cancellation
// req contains the import results, including imported row count and an id list of generated segments
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册