未验证 提交 5fdef607 编写于 作者: G groot 提交者: GitHub

Add rpc interfaces for import (#15930)

Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 d013134b
...@@ -169,7 +169,7 @@ static void InitDefaultsscc_info_Status_common_2eproto() { ...@@ -169,7 +169,7 @@ static void InitDefaultsscc_info_Status_common_2eproto() {
{{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 0, InitDefaultsscc_info_Status_common_2eproto}, {}}; {{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 0, InitDefaultsscc_info_Status_common_2eproto}, {}};
static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_common_2eproto[8]; static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_common_2eproto[8];
static const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* file_level_enum_descriptors_common_2eproto[7]; static const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* file_level_enum_descriptors_common_2eproto[8];
static constexpr ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor const** file_level_service_descriptors_common_2eproto = nullptr; static constexpr ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor const** file_level_service_descriptors_common_2eproto = nullptr;
const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_common_2eproto::offsets[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = { const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_common_2eproto::offsets[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = {
...@@ -319,9 +319,11 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( ...@@ -319,9 +319,11 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"\n\017CompactionState\022\021\n\rUndefiedState\020\000\022\r\n\t" "\n\017CompactionState\022\021\n\rUndefiedState\020\000\022\r\n\t"
"Executing\020\001\022\r\n\tCompleted\020\002*X\n\020Consistenc" "Executing\020\001\022\r\n\tCompleted\020\002*X\n\020Consistenc"
"yLevel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bound" "yLevel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bound"
"ed\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004B5Z3" "ed\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*\\\n\013"
"github.com/milvus-io/milvus/internal/pro" "ImportState\022\021\n\rImportPending\020\000\022\023\n\017Import"
"to/commonpbb\006proto3" "Executing\020\001\022\023\n\017ImportCompleted\020\002\022\020\n\014Impo"
"rtFailed\020\003B5Z3github.com/milvus-io/milvu"
"s/internal/proto/commonpbb\006proto3"
; ;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = { static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
}; };
...@@ -338,7 +340,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com ...@@ -338,7 +340,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once; static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false; static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = { const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 2739, &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 2833,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 8, 0, &descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 8, 0,
schemas, file_default_instances, TableStruct_common_2eproto::offsets, schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 8, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto, file_level_metadata_common_2eproto, 8, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
...@@ -540,6 +542,22 @@ bool ConsistencyLevel_IsValid(int value) { ...@@ -540,6 +542,22 @@ bool ConsistencyLevel_IsValid(int value) {
} }
} }
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ImportState_descriptor() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&descriptor_table_common_2eproto);
return file_level_enum_descriptors_common_2eproto[7];
}
bool ImportState_IsValid(int value) {
switch (value) {
case 0:
case 1:
case 2:
case 3:
return true;
default:
return false;
}
}
// =================================================================== // ===================================================================
......
...@@ -367,6 +367,33 @@ inline bool ConsistencyLevel_Parse( ...@@ -367,6 +367,33 @@ inline bool ConsistencyLevel_Parse(
return ::PROTOBUF_NAMESPACE_ID::internal::ParseNamedEnum<ConsistencyLevel>( return ::PROTOBUF_NAMESPACE_ID::internal::ParseNamedEnum<ConsistencyLevel>(
ConsistencyLevel_descriptor(), name, value); ConsistencyLevel_descriptor(), name, value);
} }
enum ImportState : int {
ImportPending = 0,
ImportExecuting = 1,
ImportCompleted = 2,
ImportFailed = 3,
ImportState_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
ImportState_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool ImportState_IsValid(int value);
constexpr ImportState ImportState_MIN = ImportPending;
constexpr ImportState ImportState_MAX = ImportFailed;
constexpr int ImportState_ARRAYSIZE = ImportState_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ImportState_descriptor();
template<typename T>
inline const std::string& ImportState_Name(T enum_t_value) {
static_assert(::std::is_same<T, ImportState>::value ||
::std::is_integral<T>::value,
"Incorrect type passed to function ImportState_Name.");
return ::PROTOBUF_NAMESPACE_ID::internal::NameOfEnum(
ImportState_descriptor(), enum_t_value);
}
inline bool ImportState_Parse(
const std::string& name, ImportState* value) {
return ::PROTOBUF_NAMESPACE_ID::internal::ParseNamedEnum<ImportState>(
ImportState_descriptor(), name, value);
}
// =================================================================== // ===================================================================
class Status : class Status :
...@@ -2219,6 +2246,11 @@ template <> ...@@ -2219,6 +2246,11 @@ template <>
inline const EnumDescriptor* GetEnumDescriptor< ::milvus::proto::common::ConsistencyLevel>() { inline const EnumDescriptor* GetEnumDescriptor< ::milvus::proto::common::ConsistencyLevel>() {
return ::milvus::proto::common::ConsistencyLevel_descriptor(); return ::milvus::proto::common::ConsistencyLevel_descriptor();
} }
template <> struct is_proto_enum< ::milvus::proto::common::ImportState> : ::std::true_type {};
template <>
inline const EnumDescriptor* GetEnumDescriptor< ::milvus::proto::common::ImportState>() {
return ::milvus::proto::common::ImportState_descriptor();
}
PROTOBUF_NAMESPACE_CLOSE PROTOBUF_NAMESPACE_CLOSE
......
此差异已折叠。
此差异已折叠。
...@@ -196,6 +196,10 @@ func (c *mockDataNodeClient) Compaction(ctx context.Context, req *datapb.Compact ...@@ -196,6 +196,10 @@ func (c *mockDataNodeClient) Compaction(ctx context.Context, req *datapb.Compact
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "not implemented"}, nil return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: "not implemented"}, nil
} }
func (c *mockDataNodeClient) Import(ctx context.Context, in *milvuspb.ImportRequest) (*commonpb.Status, error) {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}
func (c *mockDataNodeClient) Stop() error { func (c *mockDataNodeClient) Stop() error {
c.state = internalpb.StateCode_Abnormal c.state = internalpb.StateCode_Abnormal
return nil return nil
......
...@@ -961,3 +961,38 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR ...@@ -961,3 +961,38 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil return resp, nil
} }
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (s *Server) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
log.Info("receive import request")
resp := &milvuspb.ImportResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
return resp, nil
}
// Check import task state from datanode
func (s *Server) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
log.Info("receive get import state request")
resp := &milvuspb.GetImportStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
return resp, nil
}
// Report impot task state to datacoord
func (s *Server) CompleteImport(ctx context.Context, req *datapb.ImportResult) (*commonpb.Status, error) {
log.Info("receive complete import request")
resp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
return resp, nil
}
...@@ -777,3 +777,11 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan ...@@ -777,3 +777,11 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
ErrorCode: commonpb.ErrorCode_Success, ErrorCode: commonpb.ErrorCode_Success,
}, nil }, nil
} }
// Compaction handles compaction request from DataCoord
// returns status as long as compaction task enqueued or invalid
func (node *DataNode) Import(ctx context.Context, req *milvuspb.ImportRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, nil
}
...@@ -486,3 +486,45 @@ func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual ...@@ -486,3 +486,45 @@ func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
} }
return ret.(*datapb.DropVirtualChannelResponse), err return ret.(*datapb.DropVirtualChannelResponse), err
} }
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (c *Client) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.(datapb.DataCoordClient).Import(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.ImportResponse), err
}
// Check import task state from datanode
func (c *Client) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.(datapb.DataCoordClient).GetImportState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.GetImportStateResponse), err
}
// Report impot task state to datacoord
func (c *Client) CompleteImport(ctx context.Context, req *datapb.ImportResult) (*commonpb.Status, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.(datapb.DataCoordClient).CompleteImport(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
...@@ -327,3 +327,18 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR ...@@ -327,3 +327,18 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
return s.dataCoord.DropVirtualChannel(ctx, req) return s.dataCoord.DropVirtualChannel(ctx, req)
} }
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (s *Server) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
return s.dataCoord.Import(ctx, req)
}
// Check import task state from datanode
func (s *Server) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
return s.dataCoord.GetImportState(ctx, req)
}
// Report impot task state to datacoord
func (s *Server) CompleteImport(ctx context.Context, req *datapb.ImportResult) (*commonpb.Status, error) {
return s.dataCoord.CompleteImport(ctx, req)
}
...@@ -55,6 +55,8 @@ type MockDataCoord struct { ...@@ -55,6 +55,8 @@ type MockDataCoord struct {
watchChannelsResp *datapb.WatchChannelsResponse watchChannelsResp *datapb.WatchChannelsResponse
getFlushStateResp *milvuspb.GetFlushStateResponse getFlushStateResp *milvuspb.GetFlushStateResponse
dropVChanResp *datapb.DropVirtualChannelResponse dropVChanResp *datapb.DropVirtualChannelResponse
importResp *milvuspb.ImportResponse
getImportStateResp *milvuspb.GetImportStateResponse
} }
func (m *MockDataCoord) Init() error { func (m *MockDataCoord) Init() error {
...@@ -164,6 +166,18 @@ func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.Drop ...@@ -164,6 +166,18 @@ func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.Drop
return m.dropVChanResp, m.err return m.dropVChanResp, m.err
} }
func (m *MockDataCoord) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
return m.importResp, m.err
}
func (m *MockDataCoord) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
return m.getImportStateResp, m.err
}
func (m *MockDataCoord) CompleteImport(ctx context.Context, req *datapb.ImportResult) (*commonpb.Status, error) {
return m.status, m.err
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) { func Test_NewServer(t *testing.T) {
ctx := context.Background() ctx := context.Background()
......
...@@ -181,3 +181,17 @@ func (c *Client) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*c ...@@ -181,3 +181,17 @@ func (c *Client) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*c
} }
return ret.(*commonpb.Status), err return ret.(*commonpb.Status), err
} }
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (c *Client) Import(ctx context.Context, req *milvuspb.ImportRequest) (*commonpb.Status, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.(datapb.DataNodeClient).Import(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*commonpb.Status), err
}
...@@ -353,3 +353,7 @@ func (s *Server) GetMetrics(ctx context.Context, request *milvuspb.GetMetricsReq ...@@ -353,3 +353,7 @@ func (s *Server) GetMetrics(ctx context.Context, request *milvuspb.GetMetricsReq
func (s *Server) Compaction(ctx context.Context, request *datapb.CompactionPlan) (*commonpb.Status, error) { func (s *Server) Compaction(ctx context.Context, request *datapb.CompactionPlan) (*commonpb.Status, error) {
return s.datanode.Compaction(ctx, request) return s.datanode.Compaction(ctx, request)
} }
func (s *Server) Import(ctx context.Context, request *milvuspb.ImportRequest) (*commonpb.Status, error) {
return s.datanode.Import(ctx, request)
}
...@@ -111,6 +111,10 @@ func (m *MockDataNode) Compaction(ctx context.Context, req *datapb.CompactionPla ...@@ -111,6 +111,10 @@ func (m *MockDataNode) Compaction(ctx context.Context, req *datapb.CompactionPla
func (m *MockDataNode) SetEtcdClient(client *clientv3.Client) { func (m *MockDataNode) SetEtcdClient(client *clientv3.Client) {
} }
func (m *MockDataNode) Import(ctx context.Context, req *milvuspb.ImportRequest) (*commonpb.Status, error) {
return m.status, m.err
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type mockDataCoord struct { type mockDataCoord struct {
types.DataCoord types.DataCoord
......
...@@ -630,3 +630,11 @@ func (s *Server) SendSearchResult(ctx context.Context, results *internalpb.Searc ...@@ -630,3 +630,11 @@ func (s *Server) SendSearchResult(ctx context.Context, results *internalpb.Searc
func (s *Server) SendRetrieveResult(ctx context.Context, results *internalpb.RetrieveResults) (*commonpb.Status, error) { func (s *Server) SendRetrieveResult(ctx context.Context, results *internalpb.RetrieveResults) (*commonpb.Status, error) {
return s.proxy.SendRetrieveResult(ctx, results) return s.proxy.SendRetrieveResult(ctx, results)
} }
func (s *Server) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
return s.proxy.Import(ctx, req)
}
func (s *Server) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
return s.proxy.GetImportState(ctx, req)
}
...@@ -419,6 +419,18 @@ func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.Drop ...@@ -419,6 +419,18 @@ func (m *MockDataCoord) DropVirtualChannel(ctx context.Context, req *datapb.Drop
return &datapb.DropVirtualChannelResponse{}, nil return &datapb.DropVirtualChannelResponse{}, nil
} }
func (m *MockDataCoord) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
return nil, nil
}
func (m *MockDataCoord) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
return nil, nil
}
func (m *MockDataCoord) CompleteImport(ctx context.Context, req *datapb.ImportResult) (*commonpb.Status, error) {
return nil, nil
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockProxy struct { type MockProxy struct {
MockBase MockBase
...@@ -644,6 +656,14 @@ func (m *MockProxy) SendRetrieveResult(ctx context.Context, req *internalpb.Retr ...@@ -644,6 +656,14 @@ func (m *MockProxy) SendRetrieveResult(ctx context.Context, req *internalpb.Retr
return nil, nil return nil, nil
} }
func (m *MockProxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
return nil, nil
}
func (m *MockProxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
return nil, nil
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) { func Test_NewServer(t *testing.T) {
ctx := context.Background() ctx := context.Background()
......
...@@ -192,3 +192,10 @@ enum ConsistencyLevel { ...@@ -192,3 +192,10 @@ enum ConsistencyLevel {
Eventually = 3; Eventually = 3;
Customized = 4; // Users pass their own `guarantee_timestamp`. Customized = 4; // Users pass their own `guarantee_timestamp`.
} }
enum ImportState {
ImportPending = 0;
ImportExecuting = 1;
ImportCompleted = 2;
ImportFailed = 3;
}
\ No newline at end of file
...@@ -41,6 +41,11 @@ service DataCoord { ...@@ -41,6 +41,11 @@ service DataCoord {
rpc WatchChannels(WatchChannelsRequest) returns (WatchChannelsResponse) {} rpc WatchChannels(WatchChannelsRequest) returns (WatchChannelsResponse) {}
rpc GetFlushState(milvus.GetFlushStateRequest) returns (milvus.GetFlushStateResponse) {} rpc GetFlushState(milvus.GetFlushStateRequest) returns (milvus.GetFlushStateResponse) {}
rpc DropVirtualChannel(DropVirtualChannelRequest) returns (DropVirtualChannelResponse) {} rpc DropVirtualChannel(DropVirtualChannelRequest) returns (DropVirtualChannelResponse) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
rpc Import(milvus.ImportRequest) returns (milvus.ImportResponse) {}
rpc GetImportState(milvus.GetImportStateRequest) returns (milvus.GetImportStateResponse) {}
rpc CompleteImport(ImportResult) returns (common.Status) {}
} }
service DataNode { service DataNode {
...@@ -53,6 +58,9 @@ service DataNode { ...@@ -53,6 +58,9 @@ service DataNode {
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy // https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {} rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
rpc Compaction(CompactionPlan) returns (common.Status) {} rpc Compaction(CompactionPlan) returns (common.Status) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
rpc Import(milvus.ImportRequest) returns(common.Status) {}
} }
message FlushRequest { message FlushRequest {
...@@ -411,3 +419,9 @@ message DropVirtualChannelSegment { ...@@ -411,3 +419,9 @@ message DropVirtualChannelSegment {
message DropVirtualChannelResponse { message DropVirtualChannelResponse {
common.Status status = 1; common.Status status = 1;
} }
message ImportResult {
common.Status status = 1;
repeated int64 segments = 2; // id array of new sealed segments
int64 row_count = 3; // how many rows are imported by this task
}
\ No newline at end of file
...@@ -56,6 +56,10 @@ service MilvusService { ...@@ -56,6 +56,10 @@ service MilvusService {
rpc GetCompactionState(GetCompactionStateRequest) returns (GetCompactionStateResponse) {} rpc GetCompactionState(GetCompactionStateRequest) returns (GetCompactionStateResponse) {}
rpc ManualCompaction(ManualCompactionRequest) returns (ManualCompactionResponse) {} rpc ManualCompaction(ManualCompactionRequest) returns (ManualCompactionResponse) {}
rpc GetCompactionStateWithPlans(GetCompactionPlansRequest) returns (GetCompactionPlansResponse) {} rpc GetCompactionStateWithPlans(GetCompactionPlansRequest) returns (GetCompactionPlansResponse) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
rpc Import(ImportRequest) returns (ImportResponse) {}
rpc GetImportState(GetImportStateRequest) returns (GetImportStateResponse) {}
} }
message CreateAliasRequest { message CreateAliasRequest {
...@@ -783,6 +787,29 @@ message GetFlushStateResponse { ...@@ -783,6 +787,29 @@ message GetFlushStateResponse {
bool flushed = 2; bool flushed = 2;
} }
message ImportRequest {
string collection_name = 1; // target collection
string partition_name = 2; // target partition
bool row_based = 3; // the file is row-based or column-based
repeated string files = 4; // file paths to be imported
repeated common.KeyValuePair options = 5; // import options
}
message ImportResponse {
common.Status status = 1;
repeated int64 tasks = 2; // id array of import tasks
}
message GetImportStateRequest {
int64 task = 1; // id of an import task
}
message GetImportStateResponse {
common.Status status = 1;
common.ImportState state = 2; // is this import task finished or not
int64 row_count = 3; // if the task is finished, this value is how many rows are imported. if the task is not finished, this value is how many rows are parsed.
}
service ProxyService { service ProxyService {
rpc RegisterLink(RegisterLinkRequest) returns (RegisterLinkResponse) {} rpc RegisterLink(RegisterLinkRequest) returns (RegisterLinkResponse) {}
} }
...@@ -206,6 +206,18 @@ func (coord *DataCoordMock) DropVirtualChannel(ctx context.Context, req *datapb. ...@@ -206,6 +206,18 @@ func (coord *DataCoordMock) DropVirtualChannel(ctx context.Context, req *datapb.
return &datapb.DropVirtualChannelResponse{}, nil return &datapb.DropVirtualChannelResponse{}, nil
} }
func (coord *DataCoordMock) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
return &milvuspb.ImportResponse{}, nil
}
func (coord *DataCoordMock) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
return &milvuspb.GetImportStateResponse{}, nil
}
func (coord *DataCoordMock) CompleteImport(ctx context.Context, req *datapb.ImportResult) (*commonpb.Status, error) {
return &commonpb.Status{}, nil
}
func NewDataCoordMock() *DataCoordMock { func NewDataCoordMock() *DataCoordMock {
return &DataCoordMock{ return &DataCoordMock{
nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
......
...@@ -3939,3 +3939,27 @@ func unhealthyStatus() *commonpb.Status { ...@@ -3939,3 +3939,27 @@ func unhealthyStatus() *commonpb.Status {
Reason: "proxy not healthy", Reason: "proxy not healthy",
} }
} }
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
log.Info("received Import request")
resp := &milvuspb.ImportResponse{}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
return resp, nil
}
return resp, nil
}
// Check import task state from datanode
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
log.Info("received GetImportState request", zap.Int64("taskID", req.GetTask()))
resp := &milvuspb.GetImportStateResponse{}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
return resp, nil
}
return resp, nil
}
...@@ -68,6 +68,15 @@ type DataNode interface { ...@@ -68,6 +68,15 @@ type DataNode interface {
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
// Compaction will add a compaction task according to the request plan // Compaction will add a compaction task according to the request plan
Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error)
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
//
// ctx is the context to control request deadline and cancellation
// req contains the request params, including file path and options
//
// Return status indicates if this operation is processed successfully or fail cause;
// error is always nil
Import(ctx context.Context, req *milvuspb.ImportRequest) (*commonpb.Status, error)
} }
// DataNodeComponent is used by grpc server of DataNode // DataNodeComponent is used by grpc server of DataNode
...@@ -256,6 +265,35 @@ type DataCoord interface { ...@@ -256,6 +265,35 @@ type DataCoord interface {
// response status contains the status/error code and failing reason if any // response status contains the status/error code and failing reason if any
// error is returned only when some communication issue occurs // error is returned only when some communication issue occurs
DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error)
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
//
// ctx is the context to control request deadline and cancellation
// req contains the request params, including file path and options
//
// The `Status` in response struct `ImportResponse` indicates if this operation is processed successfully or fail cause;
// the `tasks` in `ImportResponse` return an id list of tasks.
// error is always nil
Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error)
// Check import task state from datanode
//
// ctx is the context to control request deadline and cancellation
// req contains the request params, including a task id
//
// The `Status` in response struct `GetImportStateResponse` indicates if this operation is processed successfully or fail cause;
// the `state` in `GetImportStateResponse` return the state of the import task.
// error is always nil
GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error)
// Report impot task state to datacoord
//
// ctx is the context to control request deadline and cancellation
// req contains the import results, including imported row count and an id list of generated segments
//
// response status contains the status/error code and failing reason if any error is returned
// error is always nil
CompleteImport(ctx context.Context, req *datapb.ImportResult) (*commonpb.Status, error)
} }
// DataCoordComponent defines the interface of DataCoord component. // DataCoordComponent defines the interface of DataCoord component.
...@@ -1030,6 +1068,26 @@ type ProxyComponent interface { ...@@ -1030,6 +1068,26 @@ type ProxyComponent interface {
GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error)
// GetFlushState gets the flush state of multiple segments // GetFlushState gets the flush state of multiple segments
GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error)
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
//
// ctx is the context to control request deadline and cancellation
// req contains the request params, including file path and options
//
// The `Status` in response struct `ImportResponse` indicates if this operation is processed successfully or fail cause;
// the `tasks` in `ImportResponse` return an id list of tasks.
// error is always nil
Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error)
// Check import task state from datanode
//
// ctx is the context to control request deadline and cancellation
// req contains the request params, including a task id
//
// The `Status` in response struct `GetImportStateResponse` indicates if this operation is processed successfully or fail cause;
// the `state` in `GetImportStateResponse` return the state of the import task.
// error is always nil
GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error)
} }
// QueryNode is the interface `querynode` package implements // QueryNode is the interface `querynode` package implements
......
...@@ -118,3 +118,15 @@ func (m *DataCoordClient) GetFlushState(ctx context.Context, req *milvuspb.GetFl ...@@ -118,3 +118,15 @@ func (m *DataCoordClient) GetFlushState(ctx context.Context, req *milvuspb.GetFl
func (m *DataCoordClient) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest, opts ...grpc.CallOption) (*datapb.DropVirtualChannelResponse, error) { func (m *DataCoordClient) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest, opts ...grpc.CallOption) (*datapb.DropVirtualChannelResponse, error) {
return &datapb.DropVirtualChannelResponse{}, m.Err return &datapb.DropVirtualChannelResponse{}, m.Err
} }
func (m *DataCoordClient) Import(ctx context.Context, req *milvuspb.ImportRequest, opts ...grpc.CallOption) (*milvuspb.ImportResponse, error) {
return &milvuspb.ImportResponse{}, m.Err
}
func (m *DataCoordClient) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest, opts ...grpc.CallOption) (*milvuspb.GetImportStateResponse, error) {
return &milvuspb.GetImportStateResponse{}, m.Err
}
func (m *DataCoordClient) CompleteImport(ctx context.Context, req *datapb.ImportResult, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
...@@ -54,3 +54,7 @@ func (m *DataNodeClient) GetMetrics(ctx context.Context, in *milvuspb.GetMetrics ...@@ -54,3 +54,7 @@ func (m *DataNodeClient) GetMetrics(ctx context.Context, in *milvuspb.GetMetrics
func (m *DataNodeClient) Compaction(ctx context.Context, req *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) { func (m *DataNodeClient) Compaction(ctx context.Context, req *datapb.CompactionPlan, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err return &commonpb.Status{}, m.Err
} }
func (m *DataNodeClient) Import(ctx context.Context, req *milvuspb.ImportRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
...@@ -27,7 +27,7 @@ echo "mode: atomic" > ${FILE_COVERAGE_INFO} ...@@ -27,7 +27,7 @@ echo "mode: atomic" > ${FILE_COVERAGE_INFO}
echo "Running unittest under ./internal" echo "Running unittest under ./internal"
if [[ "$(uname -s)" == "Darwin" ]]; then if [[ "$(uname -s)" == "Darwin" ]]; then
export MallocNanoZone=0 export MallocNanoZone=0
for d in $(go list ./internal... | grep -v -e vendor -e internal/querycoord -e /metricsinfo -e internal/proxy -e internal/querynode); do for d in $(go list ./internal/... | grep -v -e vendor -e internal/querycoord -e /metricsinfo -e internal/proxy -e internal/querynode); do
go test -race -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" go test -race -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
if [ -f profile.out ]; then if [ -f profile.out ]; then
sed '1d' profile.out >> ${FILE_COVERAGE_INFO} sed '1d' profile.out >> ${FILE_COVERAGE_INFO}
...@@ -35,7 +35,7 @@ if [[ "$(uname -s)" == "Darwin" ]]; then ...@@ -35,7 +35,7 @@ if [[ "$(uname -s)" == "Darwin" ]]; then
fi fi
done done
else else
for d in $(go list ./internal... | grep -v vendor); do for d in $(go list ./internal/... | grep -v vendor); do
go test -race -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" go test -race -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d"
if [ -f profile.out ]; then if [ -f profile.out ]; then
sed '1d' profile.out >> ${FILE_COVERAGE_INFO} sed '1d' profile.out >> ${FILE_COVERAGE_INFO}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册