提交 d8e6fca4 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-482 Change search stream transport to unary in grpc

See merge request megasearch/milvus!488

Former-commit-id: e7d576cf58f2154658fbdc1813ea76b42e31ee03
......@@ -83,6 +83,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-455 - Distribute tasks by minimal cost in scheduler
- MS-460 - Put transport speed as weight when choosing neighbour to execute task
- MS-459 - Add cache for pick function in tasktable
- MS-482 - Change search stream transport to unary in grpc
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -49,8 +49,8 @@ MilvusService::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& chan
, rpcmethod_DropTable_(MilvusService_method_names[2], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_CreateIndex_(MilvusService_method_names[3], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_Insert_(MilvusService_method_names[4], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_Search_(MilvusService_method_names[5], ::grpc::internal::RpcMethod::SERVER_STREAMING, channel)
, rpcmethod_SearchInFiles_(MilvusService_method_names[6], ::grpc::internal::RpcMethod::SERVER_STREAMING, channel)
, rpcmethod_Search_(MilvusService_method_names[5], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_SearchInFiles_(MilvusService_method_names[6], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_DescribeTable_(MilvusService_method_names[7], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_CountTable_(MilvusService_method_names[8], ::grpc::internal::RpcMethod::NORMAL_RPC, channel)
, rpcmethod_ShowTables_(MilvusService_method_names[9], ::grpc::internal::RpcMethod::SERVER_STREAMING, channel)
......@@ -201,36 +201,60 @@ void MilvusService::Stub::experimental_async::Insert(::grpc::ClientContext* cont
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::VectorIds>::Create(channel_.get(), cq, rpcmethod_Insert_, context, request, false);
}
::grpc::ClientReader< ::milvus::grpc::TopKQueryResult>* MilvusService::Stub::SearchRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchParam& request) {
return ::grpc_impl::internal::ClientReaderFactory< ::milvus::grpc::TopKQueryResult>::Create(channel_.get(), rpcmethod_Search_, context, request);
::grpc::Status MilvusService::Stub::Search(::grpc::ClientContext* context, const ::milvus::grpc::SearchParam& request, ::milvus::grpc::TopKQueryResultList* response) {
return ::grpc::internal::BlockingUnaryCall(channel_.get(), rpcmethod_Search_, context, request, response);
}
void MilvusService::Stub::experimental_async::Search(::grpc::ClientContext* context, ::milvus::grpc::SearchParam* request, ::grpc::experimental::ClientReadReactor< ::milvus::grpc::TopKQueryResult>* reactor) {
::grpc_impl::internal::ClientCallbackReaderFactory< ::milvus::grpc::TopKQueryResult>::Create(stub_->channel_.get(), stub_->rpcmethod_Search_, context, request, reactor);
void MilvusService::Stub::experimental_async::Search(::grpc::ClientContext* context, const ::milvus::grpc::SearchParam* request, ::milvus::grpc::TopKQueryResultList* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_Search_, context, request, response, std::move(f));
}
::grpc::ClientAsyncReader< ::milvus::grpc::TopKQueryResult>* MilvusService::Stub::AsyncSearchRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchParam& request, ::grpc::CompletionQueue* cq, void* tag) {
return ::grpc_impl::internal::ClientAsyncReaderFactory< ::milvus::grpc::TopKQueryResult>::Create(channel_.get(), cq, rpcmethod_Search_, context, request, true, tag);
void MilvusService::Stub::experimental_async::Search(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::TopKQueryResultList* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_Search_, context, request, response, std::move(f));
}
::grpc::ClientAsyncReader< ::milvus::grpc::TopKQueryResult>* MilvusService::Stub::PrepareAsyncSearchRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchParam& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncReaderFactory< ::milvus::grpc::TopKQueryResult>::Create(channel_.get(), cq, rpcmethod_Search_, context, request, false, nullptr);
void MilvusService::Stub::experimental_async::Search(::grpc::ClientContext* context, const ::milvus::grpc::SearchParam* request, ::milvus::grpc::TopKQueryResultList* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_Search_, context, request, response, reactor);
}
::grpc::ClientReader< ::milvus::grpc::TopKQueryResult>* MilvusService::Stub::SearchInFilesRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchInFilesParam& request) {
return ::grpc_impl::internal::ClientReaderFactory< ::milvus::grpc::TopKQueryResult>::Create(channel_.get(), rpcmethod_SearchInFiles_, context, request);
void MilvusService::Stub::experimental_async::Search(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::TopKQueryResultList* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_Search_, context, request, response, reactor);
}
void MilvusService::Stub::experimental_async::SearchInFiles(::grpc::ClientContext* context, ::milvus::grpc::SearchInFilesParam* request, ::grpc::experimental::ClientReadReactor< ::milvus::grpc::TopKQueryResult>* reactor) {
::grpc_impl::internal::ClientCallbackReaderFactory< ::milvus::grpc::TopKQueryResult>::Create(stub_->channel_.get(), stub_->rpcmethod_SearchInFiles_, context, request, reactor);
::grpc::ClientAsyncResponseReader< ::milvus::grpc::TopKQueryResultList>* MilvusService::Stub::AsyncSearchRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchParam& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::TopKQueryResultList>::Create(channel_.get(), cq, rpcmethod_Search_, context, request, true);
}
::grpc::ClientAsyncReader< ::milvus::grpc::TopKQueryResult>* MilvusService::Stub::AsyncSearchInFilesRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchInFilesParam& request, ::grpc::CompletionQueue* cq, void* tag) {
return ::grpc_impl::internal::ClientAsyncReaderFactory< ::milvus::grpc::TopKQueryResult>::Create(channel_.get(), cq, rpcmethod_SearchInFiles_, context, request, true, tag);
::grpc::ClientAsyncResponseReader< ::milvus::grpc::TopKQueryResultList>* MilvusService::Stub::PrepareAsyncSearchRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchParam& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::TopKQueryResultList>::Create(channel_.get(), cq, rpcmethod_Search_, context, request, false);
}
::grpc::ClientAsyncReader< ::milvus::grpc::TopKQueryResult>* MilvusService::Stub::PrepareAsyncSearchInFilesRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchInFilesParam& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncReaderFactory< ::milvus::grpc::TopKQueryResult>::Create(channel_.get(), cq, rpcmethod_SearchInFiles_, context, request, false, nullptr);
::grpc::Status MilvusService::Stub::SearchInFiles(::grpc::ClientContext* context, const ::milvus::grpc::SearchInFilesParam& request, ::milvus::grpc::TopKQueryResultList* response) {
return ::grpc::internal::BlockingUnaryCall(channel_.get(), rpcmethod_SearchInFiles_, context, request, response);
}
void MilvusService::Stub::experimental_async::SearchInFiles(::grpc::ClientContext* context, const ::milvus::grpc::SearchInFilesParam* request, ::milvus::grpc::TopKQueryResultList* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_SearchInFiles_, context, request, response, std::move(f));
}
void MilvusService::Stub::experimental_async::SearchInFiles(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::TopKQueryResultList* response, std::function<void(::grpc::Status)> f) {
::grpc_impl::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_SearchInFiles_, context, request, response, std::move(f));
}
void MilvusService::Stub::experimental_async::SearchInFiles(::grpc::ClientContext* context, const ::milvus::grpc::SearchInFilesParam* request, ::milvus::grpc::TopKQueryResultList* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_SearchInFiles_, context, request, response, reactor);
}
void MilvusService::Stub::experimental_async::SearchInFiles(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::milvus::grpc::TopKQueryResultList* response, ::grpc::experimental::ClientUnaryReactor* reactor) {
::grpc_impl::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_SearchInFiles_, context, request, response, reactor);
}
::grpc::ClientAsyncResponseReader< ::milvus::grpc::TopKQueryResultList>* MilvusService::Stub::AsyncSearchInFilesRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchInFilesParam& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::TopKQueryResultList>::Create(channel_.get(), cq, rpcmethod_SearchInFiles_, context, request, true);
}
::grpc::ClientAsyncResponseReader< ::milvus::grpc::TopKQueryResultList>* MilvusService::Stub::PrepareAsyncSearchInFilesRaw(::grpc::ClientContext* context, const ::milvus::grpc::SearchInFilesParam& request, ::grpc::CompletionQueue* cq) {
return ::grpc_impl::internal::ClientAsyncResponseReaderFactory< ::milvus::grpc::TopKQueryResultList>::Create(channel_.get(), cq, rpcmethod_SearchInFiles_, context, request, false);
}
::grpc::Status MilvusService::Stub::DescribeTable(::grpc::ClientContext* context, const ::milvus::grpc::TableName& request, ::milvus::grpc::TableSchema* response) {
......@@ -473,13 +497,13 @@ MilvusService::Service::Service() {
std::mem_fn(&MilvusService::Service::Insert), this)));
AddMethod(new ::grpc::internal::RpcServiceMethod(
MilvusService_method_names[5],
::grpc::internal::RpcMethod::SERVER_STREAMING,
new ::grpc::internal::ServerStreamingHandler< MilvusService::Service, ::milvus::grpc::SearchParam, ::milvus::grpc::TopKQueryResult>(
::grpc::internal::RpcMethod::NORMAL_RPC,
new ::grpc::internal::RpcMethodHandler< MilvusService::Service, ::milvus::grpc::SearchParam, ::milvus::grpc::TopKQueryResultList>(
std::mem_fn(&MilvusService::Service::Search), this)));
AddMethod(new ::grpc::internal::RpcServiceMethod(
MilvusService_method_names[6],
::grpc::internal::RpcMethod::SERVER_STREAMING,
new ::grpc::internal::ServerStreamingHandler< MilvusService::Service, ::milvus::grpc::SearchInFilesParam, ::milvus::grpc::TopKQueryResult>(
::grpc::internal::RpcMethod::NORMAL_RPC,
new ::grpc::internal::RpcMethodHandler< MilvusService::Service, ::milvus::grpc::SearchInFilesParam, ::milvus::grpc::TopKQueryResultList>(
std::mem_fn(&MilvusService::Service::SearchInFiles), this)));
AddMethod(new ::grpc::internal::RpcServiceMethod(
MilvusService_method_names[7],
......@@ -561,17 +585,17 @@ MilvusService::Service::~Service() {
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
::grpc::Status MilvusService::Service::Search(::grpc::ServerContext* context, const ::milvus::grpc::SearchParam* request, ::grpc::ServerWriter< ::milvus::grpc::TopKQueryResult>* writer) {
::grpc::Status MilvusService::Service::Search(::grpc::ServerContext* context, const ::milvus::grpc::SearchParam* request, ::milvus::grpc::TopKQueryResultList* response) {
(void) context;
(void) request;
(void) writer;
(void) response;
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
::grpc::Status MilvusService::Service::SearchInFiles(::grpc::ServerContext* context, const ::milvus::grpc::SearchInFilesParam* request, ::grpc::ServerWriter< ::milvus::grpc::TopKQueryResult>* writer) {
::grpc::Status MilvusService::Service::SearchInFiles(::grpc::ServerContext* context, const ::milvus::grpc::SearchInFilesParam* request, ::milvus::grpc::TopKQueryResultList* response) {
(void) context;
(void) request;
(void) writer;
(void) response;
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}
......
......@@ -48,7 +48,7 @@ struct TableStruct_milvus_2eproto {
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::AuxillaryParseTableField aux[]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[17]
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[18]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::FieldMetadata field_metadata[];
static const ::PROTOBUF_NAMESPACE_ID::internal::SerializationTable serialization_table[];
......@@ -105,6 +105,9 @@ extern TableSchemaDefaultTypeInternal _TableSchema_default_instance_;
class TopKQueryResult;
class TopKQueryResultDefaultTypeInternal;
extern TopKQueryResultDefaultTypeInternal _TopKQueryResult_default_instance_;
class TopKQueryResultList;
class TopKQueryResultListDefaultTypeInternal;
extern TopKQueryResultListDefaultTypeInternal _TopKQueryResultList_default_instance_;
class VectorIds;
class VectorIdsDefaultTypeInternal;
extern VectorIdsDefaultTypeInternal _VectorIds_default_instance_;
......@@ -127,6 +130,7 @@ template<> ::milvus::grpc::TableName* Arena::CreateMaybeMessage<::milvus::grpc::
template<> ::milvus::grpc::TableRowCount* Arena::CreateMaybeMessage<::milvus::grpc::TableRowCount>(Arena*);
template<> ::milvus::grpc::TableSchema* Arena::CreateMaybeMessage<::milvus::grpc::TableSchema>(Arena*);
template<> ::milvus::grpc::TopKQueryResult* Arena::CreateMaybeMessage<::milvus::grpc::TopKQueryResult>(Arena*);
template<> ::milvus::grpc::TopKQueryResultList* Arena::CreateMaybeMessage<::milvus::grpc::TopKQueryResultList>(Arena*);
template<> ::milvus::grpc::VectorIds* Arena::CreateMaybeMessage<::milvus::grpc::VectorIds>(Arena*);
PROTOBUF_NAMESPACE_CLOSE
namespace milvus {
......@@ -1610,10 +1614,9 @@ class TopKQueryResult :
// accessors -------------------------------------------------------
enum : int {
kQueryResultArraysFieldNumber = 2,
kStatusFieldNumber = 1,
kQueryResultArraysFieldNumber = 1,
};
// repeated .milvus.grpc.QueryResult query_result_arrays = 2;
// repeated .milvus.grpc.QueryResult query_result_arrays = 1;
int query_result_arrays_size() const;
void clear_query_result_arrays();
::milvus::grpc::QueryResult* mutable_query_result_arrays(int index);
......@@ -1624,6 +1627,144 @@ class TopKQueryResult :
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::QueryResult >&
query_result_arrays() const;
// @@protoc_insertion_point(class_scope:milvus.grpc.TopKQueryResult)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::QueryResult > query_result_arrays_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_milvus_2eproto;
};
// -------------------------------------------------------------------
class TopKQueryResultList :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.grpc.TopKQueryResultList) */ {
public:
TopKQueryResultList();
virtual ~TopKQueryResultList();
TopKQueryResultList(const TopKQueryResultList& from);
TopKQueryResultList(TopKQueryResultList&& from) noexcept
: TopKQueryResultList() {
*this = ::std::move(from);
}
inline TopKQueryResultList& operator=(const TopKQueryResultList& from) {
CopyFrom(from);
return *this;
}
inline TopKQueryResultList& operator=(TopKQueryResultList&& from) noexcept {
if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) {
if (this != &from) InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return GetMetadataStatic().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return GetMetadataStatic().reflection;
}
static const TopKQueryResultList& default_instance();
static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY
static inline const TopKQueryResultList* internal_default_instance() {
return reinterpret_cast<const TopKQueryResultList*>(
&_TopKQueryResultList_default_instance_);
}
static constexpr int kIndexInFileMessages =
10;
friend void swap(TopKQueryResultList& a, TopKQueryResultList& b) {
a.Swap(&b);
}
inline void Swap(TopKQueryResultList* other) {
if (other == this) return;
InternalSwap(other);
}
// implements Message ----------------------------------------------
inline TopKQueryResultList* New() const final {
return CreateMaybeMessage<TopKQueryResultList>(nullptr);
}
TopKQueryResultList* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final {
return CreateMaybeMessage<TopKQueryResultList>(arena);
}
void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void CopyFrom(const TopKQueryResultList& from);
void MergeFrom(const TopKQueryResultList& from);
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
#else
bool MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final;
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final;
::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
inline void SharedCtor();
inline void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(TopKQueryResultList* other);
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "milvus.grpc.TopKQueryResultList";
}
private:
inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const {
return nullptr;
}
inline void* MaybeArenaPtr() const {
return nullptr;
}
public:
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
private:
static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_milvus_2eproto);
return ::descriptor_table_milvus_2eproto.file_level_metadata[kIndexInFileMessages];
}
public:
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kTopkQueryResultFieldNumber = 2,
kStatusFieldNumber = 1,
};
// repeated .milvus.grpc.TopKQueryResult topk_query_result = 2;
int topk_query_result_size() const;
void clear_topk_query_result();
::milvus::grpc::TopKQueryResult* mutable_topk_query_result(int index);
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::TopKQueryResult >*
mutable_topk_query_result();
const ::milvus::grpc::TopKQueryResult& topk_query_result(int index) const;
::milvus::grpc::TopKQueryResult* add_topk_query_result();
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::TopKQueryResult >&
topk_query_result() const;
// .milvus.grpc.Status status = 1;
bool has_status() const;
void clear_status();
......@@ -1632,12 +1773,12 @@ class TopKQueryResult :
::milvus::grpc::Status* mutable_status();
void set_allocated_status(::milvus::grpc::Status* status);
// @@protoc_insertion_point(class_scope:milvus.grpc.TopKQueryResult)
// @@protoc_insertion_point(class_scope:milvus.grpc.TopKQueryResultList)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::QueryResult > query_result_arrays_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::TopKQueryResult > topk_query_result_;
::milvus::grpc::Status* status_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_milvus_2eproto;
......@@ -1686,7 +1827,7 @@ class StringReply :
&_StringReply_default_instance_);
}
static constexpr int kIndexInFileMessages =
10;
11;
friend void swap(StringReply& a, StringReply& b) {
a.Swap(&b);
......@@ -1833,7 +1974,7 @@ class BoolReply :
&_BoolReply_default_instance_);
}
static constexpr int kIndexInFileMessages =
11;
12;
friend void swap(BoolReply& a, BoolReply& b) {
a.Swap(&b);
......@@ -1974,7 +2115,7 @@ class TableRowCount :
&_TableRowCount_default_instance_);
}
static constexpr int kIndexInFileMessages =
12;
13;
friend void swap(TableRowCount& a, TableRowCount& b) {
a.Swap(&b);
......@@ -2115,7 +2256,7 @@ class Command :
&_Command_default_instance_);
}
static constexpr int kIndexInFileMessages =
13;
14;
friend void swap(Command& a, Command& b) {
a.Swap(&b);
......@@ -2252,7 +2393,7 @@ class Index :
&_Index_default_instance_);
}
static constexpr int kIndexInFileMessages =
14;
15;
friend void swap(Index& a, Index& b) {
a.Swap(&b);
......@@ -2397,7 +2538,7 @@ class IndexParam :
&_IndexParam_default_instance_);
}
static constexpr int kIndexInFileMessages =
15;
16;
friend void swap(IndexParam& a, IndexParam& b) {
a.Swap(&b);
......@@ -2541,7 +2682,7 @@ class DeleteByRangeParam :
&_DeleteByRangeParam_default_instance_);
}
static constexpr int kIndexInFileMessages =
16;
17;
friend void swap(DeleteByRangeParam& a, DeleteByRangeParam& b) {
a.Swap(&b);
......@@ -3467,33 +3608,67 @@ inline void QueryResult::set_distance(double value) {
// TopKQueryResult
// repeated .milvus.grpc.QueryResult query_result_arrays = 1;
inline int TopKQueryResult::query_result_arrays_size() const {
return query_result_arrays_.size();
}
inline void TopKQueryResult::clear_query_result_arrays() {
query_result_arrays_.Clear();
}
inline ::milvus::grpc::QueryResult* TopKQueryResult::mutable_query_result_arrays(int index) {
// @@protoc_insertion_point(field_mutable:milvus.grpc.TopKQueryResult.query_result_arrays)
return query_result_arrays_.Mutable(index);
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::QueryResult >*
TopKQueryResult::mutable_query_result_arrays() {
// @@protoc_insertion_point(field_mutable_list:milvus.grpc.TopKQueryResult.query_result_arrays)
return &query_result_arrays_;
}
inline const ::milvus::grpc::QueryResult& TopKQueryResult::query_result_arrays(int index) const {
// @@protoc_insertion_point(field_get:milvus.grpc.TopKQueryResult.query_result_arrays)
return query_result_arrays_.Get(index);
}
inline ::milvus::grpc::QueryResult* TopKQueryResult::add_query_result_arrays() {
// @@protoc_insertion_point(field_add:milvus.grpc.TopKQueryResult.query_result_arrays)
return query_result_arrays_.Add();
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::QueryResult >&
TopKQueryResult::query_result_arrays() const {
// @@protoc_insertion_point(field_list:milvus.grpc.TopKQueryResult.query_result_arrays)
return query_result_arrays_;
}
// -------------------------------------------------------------------
// TopKQueryResultList
// .milvus.grpc.Status status = 1;
inline bool TopKQueryResult::has_status() const {
inline bool TopKQueryResultList::has_status() const {
return this != internal_default_instance() && status_ != nullptr;
}
inline const ::milvus::grpc::Status& TopKQueryResult::status() const {
inline const ::milvus::grpc::Status& TopKQueryResultList::status() const {
const ::milvus::grpc::Status* p = status_;
// @@protoc_insertion_point(field_get:milvus.grpc.TopKQueryResult.status)
// @@protoc_insertion_point(field_get:milvus.grpc.TopKQueryResultList.status)
return p != nullptr ? *p : *reinterpret_cast<const ::milvus::grpc::Status*>(
&::milvus::grpc::_Status_default_instance_);
}
inline ::milvus::grpc::Status* TopKQueryResult::release_status() {
// @@protoc_insertion_point(field_release:milvus.grpc.TopKQueryResult.status)
inline ::milvus::grpc::Status* TopKQueryResultList::release_status() {
// @@protoc_insertion_point(field_release:milvus.grpc.TopKQueryResultList.status)
::milvus::grpc::Status* temp = status_;
status_ = nullptr;
return temp;
}
inline ::milvus::grpc::Status* TopKQueryResult::mutable_status() {
inline ::milvus::grpc::Status* TopKQueryResultList::mutable_status() {
if (status_ == nullptr) {
auto* p = CreateMaybeMessage<::milvus::grpc::Status>(GetArenaNoVirtual());
status_ = p;
}
// @@protoc_insertion_point(field_mutable:milvus.grpc.TopKQueryResult.status)
// @@protoc_insertion_point(field_mutable:milvus.grpc.TopKQueryResultList.status)
return status_;
}
inline void TopKQueryResult::set_allocated_status(::milvus::grpc::Status* status) {
inline void TopKQueryResultList::set_allocated_status(::milvus::grpc::Status* status) {
::PROTOBUF_NAMESPACE_ID::Arena* message_arena = GetArenaNoVirtual();
if (message_arena == nullptr) {
delete reinterpret_cast< ::PROTOBUF_NAMESPACE_ID::MessageLite*>(status_);
......@@ -3509,37 +3684,37 @@ inline void TopKQueryResult::set_allocated_status(::milvus::grpc::Status* status
}
status_ = status;
// @@protoc_insertion_point(field_set_allocated:milvus.grpc.TopKQueryResult.status)
// @@protoc_insertion_point(field_set_allocated:milvus.grpc.TopKQueryResultList.status)
}
// repeated .milvus.grpc.QueryResult query_result_arrays = 2;
inline int TopKQueryResult::query_result_arrays_size() const {
return query_result_arrays_.size();
// repeated .milvus.grpc.TopKQueryResult topk_query_result = 2;
inline int TopKQueryResultList::topk_query_result_size() const {
return topk_query_result_.size();
}
inline void TopKQueryResult::clear_query_result_arrays() {
query_result_arrays_.Clear();
inline void TopKQueryResultList::clear_topk_query_result() {
topk_query_result_.Clear();
}
inline ::milvus::grpc::QueryResult* TopKQueryResult::mutable_query_result_arrays(int index) {
// @@protoc_insertion_point(field_mutable:milvus.grpc.TopKQueryResult.query_result_arrays)
return query_result_arrays_.Mutable(index);
inline ::milvus::grpc::TopKQueryResult* TopKQueryResultList::mutable_topk_query_result(int index) {
// @@protoc_insertion_point(field_mutable:milvus.grpc.TopKQueryResultList.topk_query_result)
return topk_query_result_.Mutable(index);
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::QueryResult >*
TopKQueryResult::mutable_query_result_arrays() {
// @@protoc_insertion_point(field_mutable_list:milvus.grpc.TopKQueryResult.query_result_arrays)
return &query_result_arrays_;
inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::TopKQueryResult >*
TopKQueryResultList::mutable_topk_query_result() {
// @@protoc_insertion_point(field_mutable_list:milvus.grpc.TopKQueryResultList.topk_query_result)
return &topk_query_result_;
}
inline const ::milvus::grpc::QueryResult& TopKQueryResult::query_result_arrays(int index) const {
// @@protoc_insertion_point(field_get:milvus.grpc.TopKQueryResult.query_result_arrays)
return query_result_arrays_.Get(index);
inline const ::milvus::grpc::TopKQueryResult& TopKQueryResultList::topk_query_result(int index) const {
// @@protoc_insertion_point(field_get:milvus.grpc.TopKQueryResultList.topk_query_result)
return topk_query_result_.Get(index);
}
inline ::milvus::grpc::QueryResult* TopKQueryResult::add_query_result_arrays() {
// @@protoc_insertion_point(field_add:milvus.grpc.TopKQueryResult.query_result_arrays)
return query_result_arrays_.Add();
inline ::milvus::grpc::TopKQueryResult* TopKQueryResultList::add_topk_query_result() {
// @@protoc_insertion_point(field_add:milvus.grpc.TopKQueryResultList.topk_query_result)
return topk_query_result_.Add();
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::QueryResult >&
TopKQueryResult::query_result_arrays() const {
// @@protoc_insertion_point(field_list:milvus.grpc.TopKQueryResult.query_result_arrays)
return query_result_arrays_;
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::grpc::TopKQueryResult >&
TopKQueryResultList::topk_query_result() const {
// @@protoc_insertion_point(field_list:milvus.grpc.TopKQueryResultList.topk_query_result)
return topk_query_result_;
}
// -------------------------------------------------------------------
......@@ -4116,6 +4291,8 @@ inline void DeleteByRangeParam::set_allocated_table_name(std::string* table_name
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// @@protoc_insertion_point(namespace_scope)
......
......@@ -84,8 +84,15 @@ message QueryResult {
* @brief TopK query result
*/
message TopKQueryResult {
repeated QueryResult query_result_arrays = 1;
}
/**
* @brief List of topK query result
*/
message TopKQueryResultList {
Status status = 1;
repeated QueryResult query_result_arrays = 2;
repeated TopKQueryResult topk_query_result = 2;
}
/**
......@@ -211,7 +218,7 @@ service MilvusService {
*
* @return query result array.
*/
rpc Search(SearchParam) returns (stream TopKQueryResult) {}
rpc Search(SearchParam) returns (TopKQueryResultList) {}
/**
* @brief Internal use query interface
......@@ -225,7 +232,7 @@ service MilvusService {
*
* @return query result array.
*/
rpc SearchInFiles(SearchInFilesParam) returns (stream TopKQueryResult) {}
rpc SearchInFiles(SearchInFilesParam) returns (TopKQueryResultList) {}
/**
* @brief Get table schema
......
......@@ -21,50 +21,67 @@ std::mutex SchedInst::mutex_;
void
StartSchedulerService() {
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
for (auto &resource : resources) {
auto &resname = resource.first;
auto &resconf = resource.second;
auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
try {
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
if (config.GetChildren().empty()) throw "resource_config null exception";
auto resources = config.GetChild(server::CONFIG_RESOURCES).GetChildren();
if (resources.empty()) throw "Children of resource_config null exception";
for (auto &resource : resources) {
auto &resname = resource.first;
auto &resconf = resource.second;
auto type = resconf.GetValue(server::CONFIG_RESOURCE_TYPE);
// auto memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_MEMORY);
auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
auto res = ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname,
type,
device_id,
enable_loader,
enable_executor));
if (res.lock()->Type() == ResourceType::GPU) {
auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY, 300);
auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY, 300);
auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM, 2);
pinned_memory = 1024 * 1024 * pinned_memory;
temp_memory = 1024 * 1024 * temp_memory;
knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id, pinned_memory, temp_memory, resource_num);
auto device_id = resconf.GetInt64Value(server::CONFIG_RESOURCE_DEVICE_ID);
auto enable_loader = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_LOADER);
auto enable_executor = resconf.GetBoolValue(server::CONFIG_RESOURCE_ENABLE_EXECUTOR);
auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY);
auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY);
auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM);
auto res = ResMgrInst::GetInstance()->Add(ResourceFactory::Create(resname,
type,
device_id,
enable_loader,
enable_executor));
if (res.lock()->Type() == ResourceType::GPU) {
auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY, 300);
auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY, 300);
auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM, 2);
pinned_memory = 1024 * 1024 * pinned_memory;
temp_memory = 1024 * 1024 * temp_memory;
knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(device_id,
pinned_memory,
temp_memory,
resource_num);
}
}
}
knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
// auto default_connection = Connection("default_connection", 500.0);
auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren();
for (auto &conn : connections) {
auto &connect_name = conn.first;
auto &connect_conf = conn.second;
auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS);
auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS);
auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren();
if(connections.empty()) throw "connections config null exception";
for (auto &conn : connections) {
auto &connect_name = conn.first;
auto &connect_conf = conn.second;
auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS);
auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS);
std::string delimiter = "===";
std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter));
std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3,
connect_endpoint.length());
std::string delimiter = "===";
std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter));
std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3,
connect_endpoint.length());
auto connection = Connection(connect_name, connect_speed);
ResMgrInst::GetInstance()->Connect(left, right, connection);
auto connection = Connection(connect_name, connect_speed);
ResMgrInst::GetInstance()->Connect(left, right, connection);
}
} catch (const char* msg) {
SERVER_LOG_ERROR << msg;
exit(-1);
}
ResMgrInst::GetInstance()->Start();
......
......@@ -163,6 +163,7 @@ XSearchTask::Execute() {
double span = rc.RecordSection("do search for context:" + context->Identity());
context->AccumSearchCost(span);
//step 3: cluster result
SearchContext::ResultSet result_set;
auto spec_k = index_engine_->Count() < context->topk() ? index_engine_->Count() : context->topk();
......@@ -176,7 +177,6 @@ XSearchTask::Execute() {
span = rc.RecordSection("reduce topk for context:" + context->Identity());
context->AccumReduceCost(span);
} catch (std::exception &ex) {
ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
......
......@@ -240,16 +240,16 @@ ClientProxy::Search(const std::string &table_name,
}
//step 3: search vectors
std::vector<::milvus::grpc::TopKQueryResult> result_array;
Status status = client_ptr_->Search(result_array, search_param);
::milvus::grpc::TopKQueryResultList topk_query_result_list;
Status status = client_ptr_->Search(topk_query_result_list, search_param);
//step 4: convert result array
for (auto &grpc_topk_result : result_array) {
for (uint64_t i = 0; i < topk_query_result_list.topk_query_result_size(); ++i) {
TopKQueryResult result;
for (size_t i = 0; i < grpc_topk_result.query_result_arrays_size(); i++) {
for (uint64_t j = 0; j < topk_query_result_list.topk_query_result(i).query_result_arrays_size(); ++j) {
QueryResult query_result;
query_result.id = grpc_topk_result.query_result_arrays(i).id();
query_result.distance = grpc_topk_result.query_result_arrays(i).distance();
query_result.id = topk_query_result_list.topk_query_result(i).query_result_arrays(j).id();
query_result.distance = topk_query_result_list.topk_query_result(i).query_result_arrays(j).distance();
result.query_result_arrays.emplace_back(query_result);
}
......
......@@ -121,28 +121,21 @@ GrpcClient::Insert(::milvus::grpc::VectorIds& vector_ids,
}
Status
GrpcClient::Search(std::vector<::milvus::grpc::TopKQueryResult>& result_array,
const ::milvus::grpc::SearchParam& search_param) {
GrpcClient::Search(::milvus::grpc::TopKQueryResultList& topk_query_result_list,
const ::milvus::grpc::SearchParam &search_param) {
::milvus::grpc::TopKQueryResult query_result;
ClientContext context;
std::unique_ptr<ClientReader<::milvus::grpc::TopKQueryResult> > reader(
stub_->Search(&context, search_param));
while (reader->Read(&query_result)) {
result_array.emplace_back(query_result);
}
::grpc::Status grpc_status = reader->Finish();
::grpc::Status grpc_status = stub_->Search(&context, search_param, &topk_query_result_list);
if (!grpc_status.ok()) {
std::cerr << "SearchVector rpc failed!" << std::endl;
std::cerr << grpc_status.error_message() << std::endl;
return Status(StatusCode::RPCFailed, grpc_status.error_message());
}
if (query_result.status().error_code() != grpc::SUCCESS) {
std::cerr << query_result.status().reason() << std::endl;
if (topk_query_result_list.status().error_code() != grpc::SUCCESS) {
std::cerr << topk_query_result_list.status().reason() << std::endl;
return Status(StatusCode::ServerFailed,
query_result.status().reason());
topk_query_result_list.status().reason());
}
return Status::OK();
......
......@@ -50,8 +50,8 @@ public:
Status& status);
Status
Search(std::vector<grpc::TopKQueryResult>& result_array,
const grpc::SearchParam& search_param);
Search(::milvus::grpc::TopKQueryResultList& topk_query_result_list,
const grpc::SearchParam &search_param);
Status
DescribeTable(grpc::TableSchema& grpc_schema,
......
......@@ -25,6 +25,7 @@
#include <grpcpp/security/credentials.h>
#include <grpcpp/grpcpp.h>
namespace zilliz {
namespace milvus {
namespace server {
......@@ -36,11 +37,11 @@ constexpr long MESSAGE_SIZE = -1;
class NoReusePortOption : public ::grpc::ServerBuilderOption {
public:
void UpdateArguments(::grpc::ChannelArguments* args) override {
void UpdateArguments(::grpc::ChannelArguments *args) override {
args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0);
}
void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>*
void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>> *
plugins) override {}
};
......@@ -78,6 +79,7 @@ GrpcMilvusServer::StartService() {
server = builder.BuildAndStart();
server->Wait();
}
void
......
......@@ -72,11 +72,11 @@ GrpcRequestHandler::Insert(::grpc::ServerContext *context,
::grpc::Status
GrpcRequestHandler::Search(::grpc::ServerContext *context,
const ::milvus::grpc::SearchParam *request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) {
const ::milvus::grpc::SearchParam *request,
::milvus::grpc::TopKQueryResultList *response) {
std::vector<std::string> file_id_array;
BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, writer);
BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
if (grpc_status.error_code() != SERVER_SUCCESS) {
......@@ -89,15 +89,15 @@ GrpcRequestHandler::Search(::grpc::ServerContext *context,
::grpc::Status
GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context,
const ::milvus::grpc::SearchInFilesParam *request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) {
const ::milvus::grpc::SearchInFilesParam *request,
::milvus::grpc::TopKQueryResultList *response) {
std::vector<std::string> file_id_array;
for(int i = 0; i < request->file_id_array_size(); i++) {
for (int i = 0; i < request->file_id_array_size(); i++) {
file_id_array.push_back(request->file_id_array(i));
}
::milvus::grpc::SearchInFilesParam *request_mutable = const_cast<::milvus::grpc::SearchInFilesParam *>(request);
BaseTaskPtr task_ptr = SearchTask::Create(request_mutable->mutable_search_param(), file_id_array, writer);
BaseTaskPtr task_ptr = SearchTask::Create(request_mutable->mutable_search_param(), file_id_array, response);
::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
if (grpc_status.error_code() != SERVER_SUCCESS) {
......
......@@ -136,8 +136,8 @@ public:
*/
::grpc::Status
Search(::grpc::ServerContext *context,
const ::milvus::grpc::SearchParam *request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) override;
const ::milvus::grpc::SearchParam *request,
::milvus::grpc::TopKQueryResultList *response) override;
/**
* @brief Internal use query interface
......@@ -161,8 +161,8 @@ public:
*/
::grpc::Status
SearchInFiles(::grpc::ServerContext *context,
const ::milvus::grpc::SearchInFilesParam *request,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) override;
const ::milvus::grpc::SearchInFilesParam *request,
::milvus::grpc::TopKQueryResultList *response) override;
/**
* @brief Get table schema
......
......@@ -547,25 +547,25 @@ InsertTask::OnExecute() {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
SearchTask::SearchTask(const ::milvus::grpc::SearchParam *search_vector_infos,
const std::vector<std::string> &file_id_array,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer)
: GrpcBaseTask(DQL_TASK_GROUP),
search_param_(search_vector_infos),
file_id_array_(file_id_array),
writer_(writer) {
const std::vector<std::string> &file_id_array,
::milvus::grpc::TopKQueryResultList *response)
: GrpcBaseTask(DQL_TASK_GROUP),
search_param_(search_vector_infos),
file_id_array_(file_id_array),
topk_result_list(response) {
}
BaseTaskPtr
SearchTask::Create(const ::milvus::grpc::SearchParam *search_vector_infos,
const std::vector<std::string> &file_id_array,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer) {
::milvus::grpc::TopKQueryResultList *response) {
if(search_vector_infos == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!";
return nullptr;
}
return std::shared_ptr<GrpcBaseTask>(new SearchTask(search_vector_infos, file_id_array,
writer));
response));
}
ServerError
......@@ -683,18 +683,13 @@ SearchTask::OnExecute() {
rc.ElapseFromBegin("do search");
//step 7: construct result array
for (uint64_t i = 0; i < record_count; i++) {
auto &result = results[i];
const auto &record = search_param_->query_record_array(i);
::milvus::grpc::TopKQueryResult grpc_topk_result;
for (auto &result : results) {
::milvus::grpc::TopKQueryResult *topk_query_result = topk_result_list->add_topk_query_result();
for (auto &pair : result) {
::milvus::grpc::QueryResult *grpc_result = grpc_topk_result.add_query_result_arrays();
::milvus::grpc::QueryResult *grpc_result = topk_query_result->add_query_result_arrays();
grpc_result->set_id(pair.first);
grpc_result->set_distance(pair.second);
}
if (!writer_->Write(grpc_topk_result)) {
return SetError(SERVER_WRITE_ERROR, "Write topk result failed!");
}
}
#ifdef MILVUS_ENABLE_PROFILING
......
......@@ -150,12 +150,12 @@ public:
static BaseTaskPtr
Create(const ::milvus::grpc::SearchParam *search_param,
const std::vector<std::string> &file_id_array,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer);
::milvus::grpc::TopKQueryResultList *response);
protected:
SearchTask(const ::milvus::grpc::SearchParam *search_param,
const std::vector<std::string> &file_id_array,
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer);
const std::vector<std::string> &file_id_array,
::milvus::grpc::TopKQueryResultList *response);
ServerError
OnExecute() override;
......@@ -163,7 +163,7 @@ protected:
private:
const ::milvus::grpc::SearchParam *search_param_;
std::vector<std::string> file_id_array_;
::grpc::ServerWriter<::milvus::grpc::TopKQueryResult> *writer_;
::milvus::grpc::TopKQueryResultList *topk_result_list;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册