From a68fa43ddd238af805a767b2d233acd30ff548e3 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Fri, 9 Aug 2019 15:46:32 +0800 Subject: [PATCH] grpc and thrift server run concurrently Former-commit-id: 4abfccaf6ee598bbb7cc47d48f943b8ca9726f12 --- cpp/cmake/DefineOptions.cmake | 2 +- cpp/cmake/ThirdPartyPackages.cmake | 4 +- cpp/scripts/start_server.sh | 2 +- cpp/src/CMakeLists.txt | 51 ++++++---- cpp/src/server/Server.cpp | 11 ++- ...{MilvusServer.cpp => GrpcMilvusServer.cpp} | 12 +-- .../{MilvusServer.h => GrpcMilvusServer.h} | 2 +- ...uestHandler.cpp => GrpcRequestHandler.cpp} | 92 +++++++++---------- ...{RequestHandler.h => GrpcRequestHandler.h} | 2 +- ...Scheduler.cpp => GrpcRequestScheduler.cpp} | 28 +++--- ...uestScheduler.h => GrpcRequestScheduler.h} | 18 ++-- .../{RequestTask.cpp => GrpcRequestTask.cpp} | 44 ++++----- .../{RequestTask.h => GrpcRequestTask.h} | 22 ++--- cpp/src/server/thrift_impl/RequestHandler.cpp | 30 +++--- 14 files changed, 166 insertions(+), 154 deletions(-) rename cpp/src/server/grpc_impl/{MilvusServer.cpp => GrpcMilvusServer.cpp} (92%) rename cpp/src/server/grpc_impl/{MilvusServer.h => GrpcMilvusServer.h} (95%) rename cpp/src/server/grpc_impl/{RequestHandler.cpp => GrpcRequestHandler.cpp} (54%) rename cpp/src/server/grpc_impl/{RequestHandler.h => GrpcRequestHandler.h} (98%) rename cpp/src/server/grpc_impl/{RequestScheduler.cpp => GrpcRequestScheduler.cpp} (90%) rename cpp/src/server/grpc_impl/{RequestScheduler.h => GrpcRequestScheduler.h} (83%) rename cpp/src/server/grpc_impl/{RequestTask.cpp => GrpcRequestTask.cpp} (95%) rename cpp/src/server/grpc_impl/{RequestTask.h => GrpcRequestTask.h} (91%) diff --git a/cpp/cmake/DefineOptions.cmake b/cpp/cmake/DefineOptions.cmake index d0624bd0..af89dccb 100644 --- a/cpp/cmake/DefineOptions.cmake +++ b/cpp/cmake/DefineOptions.cmake @@ -90,7 +90,7 @@ define_option(MILVUS_WITH_SQLITE_ORM "Build with SQLite ORM library" ON) define_option(MILVUS_WITH_MYSQLPP "Build with MySQL++" ON) -define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" OFF) +define_option(MILVUS_WITH_THRIFT "Build with Apache Thrift library" ON) define_option(MILVUS_WITH_YAMLCPP "Build with yaml-cpp library" ON) diff --git a/cpp/cmake/ThirdPartyPackages.cmake b/cpp/cmake/ThirdPartyPackages.cmake index 6846275e..ade4b4a8 100644 --- a/cpp/cmake/ThirdPartyPackages.cmake +++ b/cpp/cmake/ThirdPartyPackages.cmake @@ -2640,7 +2640,7 @@ macro(build_grpc) add_dependencies(grpc_protoc grpc_ep) endmacro() -if(NOT MILVUS_WITH_THRIFT STREQUAL "ON") +#if(NOT MILVUS_WITH_THRIFT STREQUAL "ON") resolve_dependency(GRPC) get_target_property(GRPC_INCLUDE_DIR grpc INTERFACE_INCLUDE_DIRECTORIES) @@ -2651,4 +2651,4 @@ if(NOT MILVUS_WITH_THRIFT STREQUAL "ON") include_directories(SYSTEM ${GRPC_THIRD_PARTY_DIR}/protobuf/src) link_directories(SYSTEM ${GRPC_PROTOBUF_LIB_DIR}) -endif() +#endif() diff --git a/cpp/scripts/start_server.sh b/cpp/scripts/start_server.sh index 312cef86..72e120f2 100755 --- a/cpp/scripts/start_server.sh +++ b/cpp/scripts/start_server.sh @@ -1,4 +1,4 @@ #!/bin/bash -../bin/milvus_server -c ../conf/server_config.yaml -l ../conf/log_config.conf +../bin/milvus_grpc_server -c ../conf/server_config.yaml -l ../conf/log_config.conf diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 50e09e8c..b84dde00 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -4,7 +4,6 @@ # Proprietary and confidential. #------------------------------------------------------------------------------- - aux_source_directory(cache cache_files) aux_source_directory(config config_files) aux_source_directory(server server_files) @@ -78,17 +77,17 @@ include_directories(/usr/include/mysql) include_directories(grpc/gen-status) include_directories(grpc/gen-milvus) -if (MILVUS_WITH_THRIFT STREQUAL "ON") - set(client_lib +#if (MILVUS_WITH_THRIFT STREQUAL "ON") + set(client_thrift_lib thrift) -else() - set(client_lib +#else() + set(client_grpc_lib grpcpp_channelz grpc++ grpc grpc_protobuf grpc_protoc) -endif() +#endif() set(third_party_libs knowhere @@ -100,7 +99,8 @@ set(third_party_libs lapack easyloggingpp sqlite - ${client_lib} + ${client_thrift_lib} + ${client_grpc_lib} yaml-cpp prometheus-cpp-push prometheus-cpp-pull @@ -197,7 +197,7 @@ set(knowhere_libs tbb ) -if (MILVUS_WITH_THRIFT STREQUAL "ON") +#if (MILVUS_WITH_THRIFT STREQUAL "ON") add_executable(milvus_thrift_server ${config_files} ${server_files} @@ -206,7 +206,7 @@ if (MILVUS_WITH_THRIFT STREQUAL "ON") ${thrift_service_files} ${metrics_files} ) -else() +#else() add_executable(milvus_grpc_server ${config_files} ${server_files} @@ -215,7 +215,15 @@ else() ${grpc_service_files} ${metrics_files} ) -endif() +#endif() + add_executable(milvus_server + ${config_files} + ${server_files} + ${thriftserver_files} + ${grpcserver_files} + ${utils_files} + ${thrift_service_files} + ${metrics_files}) if (ENABLE_LICENSE STREQUAL "ON") add_executable(get_sys_info ${get_sys_info_files}) @@ -224,25 +232,28 @@ if (ENABLE_LICENSE STREQUAL "ON") target_link_libraries(get_sys_info ${license_libs} license_check ${third_party_libs}) target_link_libraries(license_generator ${license_libs} ${third_party_libs}) - if(MILVUS_WITH_THRIFT STREQUAL "ON") +# if(MILVUS_WITH_THRIFT STREQUAL "ON") target_link_libraries(milvus_thrift_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) - else() +# else() target_link_libraries(milvus_grpc_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) - endif() +# endif() + target_link_libraries(milvus_server ${server_libs} license_check ${knowhere_libs} ${third_party_libs}) + else () - if(MILVUS_WITH_THRIFT STREQUAL "ON") +# if(MILVUS_WITH_THRIFT STREQUAL "ON") target_link_libraries(milvus_thrift_server ${server_libs} ${knowhere_libs} ${third_party_libs}) - else() +# else() target_link_libraries(milvus_grpc_server ${server_libs} ${knowhere_libs} ${third_party_libs}) - endif() - +# endif() + target_link_libraries(milvus_server ${server_libs} ${knowhere_libs} ${third_party_libs}) endif() -if (MILVUS_WITH_THRIFT STREQUAL "ON") +#if (MILVUS_WITH_THRIFT STREQUAL "ON") install(TARGETS milvus_thrift_server DESTINATION bin) -else() +#else() install(TARGETS milvus_grpc_server DESTINATION bin) -endif() +#endif() + install(TARGETS milvus_server DESTINATION bin) install(FILES ${KNOWHERE_BUILD_DIR}/lib/${CMAKE_SHARED_LIBRARY_PREFIX}tbb${CMAKE_SHARED_LIBRARY_SUFFIX} diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 4bfe1f76..13218faa 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -5,11 +5,11 @@ //////////////////////////////////////////////////////////////////////////////// #include "Server.h" //#include "ServerConfig.h" -#ifdef MILVUS_ENABLE_THRIFT +//#ifdef MILVUS_ENABLE_THRIFT #include "server/thrift_impl/MilvusServer.h" -#else -#include "server/grpc_impl/MilvusServer.h" -#endif +//#else +#include "server/grpc_impl/GrpcMilvusServer.h" +//#endif #include "utils/Log.h" #include "utils/SignalUtil.h" @@ -225,11 +225,12 @@ Server::LoadConfig() { void Server::StartService() { MilvusServer::StartService(); + GrpcMilvusServer::StartService(); } void Server::StopService() { - MilvusServer::StopService(); + GrpcMilvusServer::StopService(); } } diff --git a/cpp/src/server/grpc_impl/MilvusServer.cpp b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp similarity index 92% rename from cpp/src/server/grpc_impl/MilvusServer.cpp rename to cpp/src/server/grpc_impl/GrpcMilvusServer.cpp index bd8cfd7f..98631a5d 100644 --- a/cpp/src/server/grpc_impl/MilvusServer.cpp +++ b/cpp/src/server/grpc_impl/GrpcMilvusServer.cpp @@ -4,12 +4,12 @@ * Proprietary and confidential. ******************************************************************************/ #include "milvus.grpc.pb.h" -#include "MilvusServer.h" +#include "GrpcMilvusServer.h" #include "../ServerConfig.h" #include "../DBWrapper.h" #include "utils/Log.h" #include "faiss/utils.h" -#include "RequestHandler.h" +#include "GrpcRequestHandler.h" #include #include @@ -34,7 +34,7 @@ static std::unique_ptr server; constexpr long MESSAGE_SIZE = -1; void -MilvusServer::StartService() { +GrpcMilvusServer::StartService() { if (server != nullptr){ std::cout << "stopservice!\n"; StopService(); @@ -44,7 +44,7 @@ MilvusServer::StartService() { ConfigNode server_config = config.GetConfig(CONFIG_SERVER); ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE); std::string address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1"); - int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 19530); + int32_t port = server_config.GetInt32Value(CONFIG_SERVER_PORT, 19531); faiss::distance_compute_blas_threshold = engine_config.GetInt32Value(CONFIG_DCBT, 20); @@ -60,7 +60,7 @@ MilvusServer::StartService() { builder.SetDefaultCompressionAlgorithm(GRPC_COMPRESS_STREAM_GZIP); builder.SetDefaultCompressionLevel(GRPC_COMPRESS_LEVEL_HIGH); - RequestHandler service; + GrpcRequestHandler service; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); @@ -71,7 +71,7 @@ MilvusServer::StartService() { } void -MilvusServer::StopService() { +GrpcMilvusServer::StopService() { if (server != nullptr) { server->Shutdown(); } diff --git a/cpp/src/server/grpc_impl/MilvusServer.h b/cpp/src/server/grpc_impl/GrpcMilvusServer.h similarity index 95% rename from cpp/src/server/grpc_impl/MilvusServer.h rename to cpp/src/server/grpc_impl/GrpcMilvusServer.h index 82dcd64c..4a3eec68 100644 --- a/cpp/src/server/grpc_impl/MilvusServer.h +++ b/cpp/src/server/grpc_impl/GrpcMilvusServer.h @@ -11,7 +11,7 @@ namespace zilliz { namespace milvus { namespace server { -class MilvusServer { +class GrpcMilvusServer { public: static void StartService(); diff --git a/cpp/src/server/grpc_impl/RequestHandler.cpp b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp similarity index 54% rename from cpp/src/server/grpc_impl/RequestHandler.cpp rename to cpp/src/server/grpc_impl/GrpcRequestHandler.cpp index 7a582f91..3d9fd013 100644 --- a/cpp/src/server/grpc_impl/RequestHandler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -4,8 +4,8 @@ * Proprietary and confidential. ******************************************************************************/ -#include "RequestHandler.h" -#include "RequestTask.h" +#include "GrpcRequestHandler.h" +#include "GrpcRequestTask.h" #include "utils/TimeRecorder.h" namespace zilliz { @@ -13,24 +13,24 @@ namespace milvus { namespace server { ::grpc::Status -RequestHandler::CreateTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableSchema *request, - ::milvus::grpc::Status *response) { +GrpcRequestHandler::CreateTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableSchema *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = CreateTableTask::Create(*request); - RequestScheduler::ExecTask(task_ptr, response); + GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::HasTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::BoolReply *response) { +GrpcRequestHandler::HasTable(::grpc::ServerContext *context, + const ::milvus::grpc::TableName *request, + ::milvus::grpc::BoolReply *response) { bool has_table = false; BaseTaskPtr task_ptr = HasTableTask::Create(request->table_name(), has_table); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->set_bool_reply(has_table); response->mutable_status()->set_reason(grpc_status.reason()); response->mutable_status()->set_error_code(grpc_status.error_code()); @@ -38,47 +38,47 @@ RequestHandler::HasTable(::grpc::ServerContext *context, } ::grpc::Status -RequestHandler::DropTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::Status* response) { +GrpcRequestHandler::DropTable(::grpc::ServerContext* context, + const ::milvus::grpc::TableName* request, + ::milvus::grpc::Status* response) { BaseTaskPtr task_ptr = DropTableTask::Create(request->table_name()); - RequestScheduler::ExecTask(task_ptr, response); + GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::BuildIndex(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::Status* response) { +GrpcRequestHandler::BuildIndex(::grpc::ServerContext* context, + const ::milvus::grpc::TableName* request, + ::milvus::grpc::Status* response) { BaseTaskPtr task_ptr = BuildIndexTask::Create(request->table_name()); - RequestScheduler::ExecTask(task_ptr, response); + GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::InsertVector(::grpc::ServerContext* context, - const ::milvus::grpc::InsertInfos* request, - ::milvus::grpc::VectorIds* response) { +GrpcRequestHandler::InsertVector(::grpc::ServerContext* context, + const ::milvus::grpc::InsertInfos* request, + ::milvus::grpc::VectorIds* response) { BaseTaskPtr task_ptr = InsertVectorTask::Create(*request, *response); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->mutable_status()->set_reason(grpc_status.reason()); response->mutable_status()->set_error_code(grpc_status.error_code()); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::SearchVector(::grpc::ServerContext* context, - const ::milvus::grpc::SearchVectorInfos* request, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { +GrpcRequestHandler::SearchVector(::grpc::ServerContext* context, + const ::milvus::grpc::SearchVectorInfos* request, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { std::vector file_id_array; BaseTaskPtr task_ptr = SearchVectorTask::Create(*request, file_id_array, *writer); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); if (grpc_status.error_code() != SERVER_SUCCESS) { ::grpc::Status status(::grpc::INVALID_ARGUMENT, grpc_status.reason()); return status; @@ -88,14 +88,14 @@ RequestHandler::SearchVector(::grpc::ServerContext* context, } ::grpc::Status -RequestHandler::SearchVectorInFiles(::grpc::ServerContext* context, - const ::milvus::grpc::SearchVectorInFilesInfos* request, - ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { +GrpcRequestHandler::SearchVectorInFiles(::grpc::ServerContext* context, + const ::milvus::grpc::SearchVectorInFilesInfos* request, + ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>* writer) { std::vector file_id_array; BaseTaskPtr task_ptr = SearchVectorTask::Create(request->search_vector_infos(), file_id_array, *writer); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); if (grpc_status.error_code() != SERVER_SUCCESS) { ::grpc::Status status(::grpc::INVALID_ARGUMENT, grpc_status.reason()); return status; @@ -105,27 +105,27 @@ RequestHandler::SearchVectorInFiles(::grpc::ServerContext* context, } ::grpc::Status -RequestHandler::DescribeTable(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::TableSchema* response) { +GrpcRequestHandler::DescribeTable(::grpc::ServerContext* context, + const ::milvus::grpc::TableName* request, + ::milvus::grpc::TableSchema* response) { BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), *response); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->mutable_table_name()->mutable_status()->set_error_code(grpc_status.error_code()); response->mutable_table_name()->mutable_status()->set_reason(grpc_status.reason()); return ::grpc::Status::OK; } ::grpc::Status -RequestHandler::GetTableRowCount(::grpc::ServerContext* context, - const ::milvus::grpc::TableName* request, - ::milvus::grpc::TableRowCount* response) { +GrpcRequestHandler::GetTableRowCount(::grpc::ServerContext* context, + const ::milvus::grpc::TableName* request, + ::milvus::grpc::TableRowCount* response) { int64_t row_count = 0; BaseTaskPtr task_ptr = GetTableRowCountTask::Create(request->table_name(), row_count); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->set_table_row_count(row_count); response->mutable_status()->set_reason(grpc_status.reason()); response->mutable_status()->set_error_code(grpc_status.error_code()); @@ -133,13 +133,13 @@ RequestHandler::GetTableRowCount(::grpc::ServerContext* context, } ::grpc::Status -RequestHandler::ShowTables(::grpc::ServerContext* context, - const ::milvus::grpc::Command* request, - ::grpc::ServerWriter<::milvus::grpc::TableName>* writer) { +GrpcRequestHandler::ShowTables(::grpc::ServerContext* context, + const ::milvus::grpc::Command* request, + ::grpc::ServerWriter<::milvus::grpc::TableName>* writer) { BaseTaskPtr task_ptr = ShowTablesTask::Create(*writer); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); if (grpc_status.error_code() != SERVER_SUCCESS) { ::grpc::Status status(::grpc::UNKNOWN, grpc_status.reason()); return status; @@ -149,14 +149,14 @@ RequestHandler::ShowTables(::grpc::ServerContext* context, } ::grpc::Status -RequestHandler::Ping(::grpc::ServerContext* context, - const ::milvus::grpc::Command* request, - ::milvus::grpc::ServerStatus* response) { +GrpcRequestHandler::Ping(::grpc::ServerContext* context, + const ::milvus::grpc::Command* request, + ::milvus::grpc::ServerStatus* response) { std::string result; BaseTaskPtr task_ptr = PingTask::Create(request->cmd(), result); ::milvus::grpc::Status grpc_status; - RequestScheduler::ExecTask(task_ptr, &grpc_status); + GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); response->set_info(result); response->mutable_status()->set_reason(grpc_status.reason()); response->mutable_status()->set_error_code(grpc_status.error_code()); diff --git a/cpp/src/server/grpc_impl/RequestHandler.h b/cpp/src/server/grpc_impl/GrpcRequestHandler.h similarity index 98% rename from cpp/src/server/grpc_impl/RequestHandler.h rename to cpp/src/server/grpc_impl/GrpcRequestHandler.h index f098d19b..34c8f749 100644 --- a/cpp/src/server/grpc_impl/RequestHandler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.h @@ -14,7 +14,7 @@ namespace zilliz { namespace milvus { namespace server { -class RequestHandler final : public ::milvus::grpc::MilvusService::Service { +class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service { public: /** * @brief Create table method diff --git a/cpp/src/server/grpc_impl/RequestScheduler.cpp b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp similarity index 90% rename from cpp/src/server/grpc_impl/RequestScheduler.cpp rename to cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp index 5358906c..43b3d6d7 100644 --- a/cpp/src/server/grpc_impl/RequestScheduler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp @@ -3,7 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#include "RequestScheduler.h" +#include "GrpcRequestScheduler.h" #include "utils/Log.h" #include "src/grpc/gen-status/status.pb.h" @@ -50,7 +50,7 @@ namespace { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -BaseTask::BaseTask(const std::string& task_group, bool async) +GrpcBaseTask::GrpcBaseTask(const std::string& task_group, bool async) : task_group_(task_group), async_(async), done_(false), @@ -58,12 +58,12 @@ BaseTask::BaseTask(const std::string& task_group, bool async) } -BaseTask::~BaseTask() { +GrpcBaseTask::~GrpcBaseTask() { WaitToFinish(); } ServerError -BaseTask::Execute() { +GrpcBaseTask::Execute() { error_code_ = OnExecute(); done_ = true; finish_cond_.notify_all(); @@ -71,7 +71,7 @@ BaseTask::Execute() { } ServerError -BaseTask::SetError(ServerError error_code, const std::string& error_msg) { +GrpcBaseTask::SetError(ServerError error_code, const std::string& error_msg) { error_code_ = error_code; error_msg_ = error_msg; @@ -80,7 +80,7 @@ BaseTask::SetError(ServerError error_code, const std::string& error_msg) { } ServerError -BaseTask::WaitToFinish() { +GrpcBaseTask::WaitToFinish() { std::unique_lock lock(finish_mtx_); finish_cond_.wait(lock, [this] { return done_; }); @@ -88,22 +88,22 @@ BaseTask::WaitToFinish() { } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -RequestScheduler::RequestScheduler() +GrpcRequestScheduler::GrpcRequestScheduler() : stopped_(false) { Start(); } -RequestScheduler::~RequestScheduler() { +GrpcRequestScheduler::~GrpcRequestScheduler() { Stop(); } void -RequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *grpc_status) { +GrpcRequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *grpc_status) { if(task_ptr == nullptr) { return; } - RequestScheduler& scheduler = RequestScheduler::GetInstance(); + GrpcRequestScheduler& scheduler = GrpcRequestScheduler::GetInstance(); scheduler.ExecuteTask(task_ptr); if(!task_ptr->IsAsync()) { @@ -117,7 +117,7 @@ RequestScheduler::ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status *grpc_s } void -RequestScheduler::Start() { +GrpcRequestScheduler::Start() { if(!stopped_) { return; } @@ -126,7 +126,7 @@ RequestScheduler::Start() { } void -RequestScheduler::Stop() { +GrpcRequestScheduler::Stop() { if(stopped_) { return; } @@ -152,7 +152,7 @@ RequestScheduler::Stop() { } ServerError -RequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) { +GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr& task_ptr) { if(task_ptr == nullptr) { return SERVER_NULL_POINTER; } @@ -196,7 +196,7 @@ namespace { } ServerError -RequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) { +GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr& task_ptr) { std::lock_guard lock(queue_mtx_); std::string group_name = task_ptr->TaskGroup(); diff --git a/cpp/src/server/grpc_impl/RequestScheduler.h b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h similarity index 83% rename from cpp/src/server/grpc_impl/RequestScheduler.h rename to cpp/src/server/grpc_impl/GrpcRequestScheduler.h index 4b8d0da0..ca878980 100644 --- a/cpp/src/server/grpc_impl/RequestScheduler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h @@ -17,10 +17,10 @@ namespace zilliz { namespace milvus { namespace server { -class BaseTask { +class GrpcBaseTask { protected: - BaseTask(const std::string& task_group, bool async = false); - virtual ~BaseTask(); + GrpcBaseTask(const std::string& task_group, bool async = false); + virtual ~GrpcBaseTask(); public: ServerError @@ -59,15 +59,15 @@ protected: std::string error_msg_; }; -using BaseTaskPtr = std::shared_ptr; +using BaseTaskPtr = std::shared_ptr; using TaskQueue = BlockingQueue; using TaskQueuePtr = std::shared_ptr; using ThreadPtr = std::shared_ptr; -class RequestScheduler { +class GrpcRequestScheduler { public: - static RequestScheduler& GetInstance() { - static RequestScheduler scheduler; + static GrpcRequestScheduler& GetInstance() { + static GrpcRequestScheduler scheduler; return scheduler; } @@ -81,8 +81,8 @@ public: ExecTask(BaseTaskPtr& task_ptr, ::milvus::grpc::Status* grpc_status); protected: - RequestScheduler(); - virtual ~RequestScheduler(); + GrpcRequestScheduler(); + virtual ~GrpcRequestScheduler(); ServerError PutTaskToQueue(const BaseTaskPtr& task_ptr); diff --git a/cpp/src/server/grpc_impl/RequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp similarity index 95% rename from cpp/src/server/grpc_impl/RequestTask.cpp rename to cpp/src/server/grpc_impl/GrpcRequestTask.cpp index c27daaa9..975bf4a8 100644 --- a/cpp/src/server/grpc_impl/RequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -3,7 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ -#include "RequestTask.h" +#include "GrpcRequestTask.h" #include "../ServerConfig.h" #include "utils/CommonUtil.h" #include "utils/Log.h" @@ -11,7 +11,7 @@ #include "utils/ValidationUtil.h" #include "../DBWrapper.h" #include "version.h" -#include "MilvusServer.h" +#include "GrpcMilvusServer.h" #include "src/server/Server.h" @@ -100,7 +100,7 @@ namespace { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema& schema) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), schema_(schema) { } @@ -109,7 +109,7 @@ BaseTaskPtr CreateTableTask::Create(const ::milvus::grpc::TableSchema& schema) { // BaseTaskPtr create_table_task_ptr = std::make_shared(schema); // return create_table_task_ptr; - return std::shared_ptr(new CreateTableTask(schema)); + return std::shared_ptr(new CreateTableTask(schema)); } ServerError @@ -158,14 +158,14 @@ CreateTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema& schema) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), schema_(schema) { } BaseTaskPtr DescribeTableTask::Create(const std::string& table_name, ::milvus::grpc::TableSchema& schema) { - return std::shared_ptr(new DescribeTableTask(table_name, schema)); + return std::shared_ptr(new DescribeTableTask(table_name, schema)); } ServerError @@ -204,13 +204,13 @@ DescribeTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// BuildIndexTask::BuildIndexTask(const std::string& table_name) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) { } BaseTaskPtr BuildIndexTask::Create(const std::string& table_name) { - return std::shared_ptr(new BuildIndexTask(table_name)); + return std::shared_ptr(new BuildIndexTask(table_name)); } ServerError @@ -250,7 +250,7 @@ BuildIndexTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// HasTableTask::HasTableTask(const std::string& table_name, bool& has_table) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), has_table_(has_table) { @@ -258,7 +258,7 @@ HasTableTask::HasTableTask(const std::string& table_name, bool& has_table) BaseTaskPtr HasTableTask::Create(const std::string& table_name, bool& has_table) { - return std::shared_ptr(new HasTableTask(table_name, has_table)); + return std::shared_ptr(new HasTableTask(table_name, has_table)); } ServerError @@ -288,14 +288,14 @@ HasTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DropTableTask::DropTableTask(const std::string& table_name) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) { } BaseTaskPtr DropTableTask::Create(const std::string& table_name) { - return std::shared_ptr(new DropTableTask(table_name)); + return std::shared_ptr(new DropTableTask(table_name)); } ServerError @@ -340,14 +340,14 @@ DropTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ShowTablesTask::ShowTablesTask(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), writer_(writer) { } BaseTaskPtr ShowTablesTask::Create(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer) { - return std::shared_ptr(new ShowTablesTask(writer)); + return std::shared_ptr(new ShowTablesTask(writer)); } ServerError @@ -371,7 +371,7 @@ ShowTablesTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// InsertVectorTask::InsertVectorTask(const ::milvus::grpc::InsertInfos& insert_infos, ::milvus::grpc::VectorIds& record_ids) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), insert_infos_(insert_infos), record_ids_(record_ids) { record_ids_.Clear(); @@ -380,7 +380,7 @@ InsertVectorTask::InsertVectorTask(const ::milvus::grpc::InsertInfos& insert_inf BaseTaskPtr InsertVectorTask::Create(const ::milvus::grpc::InsertInfos& insert_infos, ::milvus::grpc::VectorIds& record_ids) { - return std::shared_ptr(new InsertVectorTask(insert_infos, record_ids)); + return std::shared_ptr(new InsertVectorTask(insert_infos, record_ids)); } ServerError @@ -477,7 +477,7 @@ InsertVectorTask::OnExecute() { SearchVectorTask::SearchVectorTask(const ::milvus::grpc::SearchVectorInfos& search_vector_infos, const std::vector& file_id_array, ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer) - : BaseTask(DQL_TASK_GROUP), + : GrpcBaseTask(DQL_TASK_GROUP), search_vector_infos_(search_vector_infos), file_id_array_(file_id_array), writer_(writer) { @@ -488,7 +488,7 @@ BaseTaskPtr SearchVectorTask::Create(const ::milvus::grpc::SearchVectorInfos& search_vector_infos, const std::vector& file_id_array, ::grpc::ServerWriter<::milvus::grpc::TopKQueryResult>& writer) { - return std::shared_ptr(new SearchVectorTask(search_vector_infos, file_id_array, + return std::shared_ptr(new SearchVectorTask(search_vector_infos, file_id_array, writer)); } @@ -630,7 +630,7 @@ SearchVectorTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count) - : BaseTask(DDL_DML_TASK_GROUP), + : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), row_count_(row_count) { @@ -638,7 +638,7 @@ GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_ BaseTaskPtr GetTableRowCountTask::Create(const std::string& table_name, int64_t& row_count) { - return std::shared_ptr(new GetTableRowCountTask(table_name, row_count)); + return std::shared_ptr(new GetTableRowCountTask(table_name, row_count)); } ServerError @@ -673,7 +673,7 @@ GetTableRowCountTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// PingTask::PingTask(const std::string& cmd, std::string& result) - : BaseTask(PING_TASK_GROUP), + : GrpcBaseTask(PING_TASK_GROUP), cmd_(cmd), result_(result) { @@ -681,7 +681,7 @@ PingTask::PingTask(const std::string& cmd, std::string& result) BaseTaskPtr PingTask::Create(const std::string& cmd, std::string& result) { - return std::shared_ptr(new PingTask(cmd, result)); + return std::shared_ptr(new PingTask(cmd, result)); } ServerError diff --git a/cpp/src/server/grpc_impl/RequestTask.h b/cpp/src/server/grpc_impl/GrpcRequestTask.h similarity index 91% rename from cpp/src/server/grpc_impl/RequestTask.h rename to cpp/src/server/grpc_impl/GrpcRequestTask.h index 1bca8e0a..cb75bf7e 100644 --- a/cpp/src/server/grpc_impl/RequestTask.h +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.h @@ -4,7 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ #pragma once -#include "RequestScheduler.h" +#include "GrpcRequestScheduler.h" #include "utils/Error.h" #include "db/Types.h" @@ -19,7 +19,7 @@ namespace milvus { namespace server { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class CreateTableTask : public BaseTask { +class CreateTableTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const ::milvus::grpc::TableSchema& schema); @@ -36,7 +36,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class HasTableTask : public BaseTask { +class HasTableTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name, bool& has_table); @@ -54,7 +54,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class DescribeTableTask : public BaseTask { +class DescribeTableTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name, ::milvus::grpc::TableSchema& schema); @@ -72,7 +72,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class DropTableTask : public BaseTask { +class DropTableTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name); @@ -90,7 +90,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class BuildIndexTask : public BaseTask { +class BuildIndexTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name); @@ -108,7 +108,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class ShowTablesTask : public BaseTask { +class ShowTablesTask : public GrpcBaseTask { public: static BaseTaskPtr Create(::grpc::ServerWriter< ::milvus::grpc::TableName>& writer); @@ -125,7 +125,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class InsertVectorTask : public BaseTask { +class InsertVectorTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const ::milvus::grpc::InsertInfos& insert_infos, @@ -144,7 +144,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class SearchVectorTask : public BaseTask { +class SearchVectorTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const ::milvus::grpc::SearchVectorInfos& searchVectorInfos, @@ -166,7 +166,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class GetTableRowCountTask : public BaseTask { +class GetTableRowCountTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& table_name, int64_t& row_count); @@ -183,7 +183,7 @@ private: }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -class PingTask : public BaseTask { +class PingTask : public GrpcBaseTask { public: static BaseTaskPtr Create(const std::string& cmd, std::string& result); diff --git a/cpp/src/server/thrift_impl/RequestHandler.cpp b/cpp/src/server/thrift_impl/RequestHandler.cpp index 50740431..e7c8e8b1 100644 --- a/cpp/src/server/thrift_impl/RequestHandler.cpp +++ b/cpp/src/server/thrift_impl/RequestHandler.cpp @@ -47,18 +47,18 @@ RequestHandler::BuildIndex(const std::string &table_name) { void RequestHandler::AddVector(std::vector &_return, - const std::string &table_name, - const std::vector &record_array) { + const std::string &table_name, + const std::vector &record_array) { BaseTaskPtr task_ptr = AddVectorTask::Create(table_name, record_array, _return); RequestScheduler::ExecTask(task_ptr); } void RequestHandler::SearchVector(std::vector &_return, - const std::string &table_name, - const std::vector &query_record_array, - const std::vector &query_range_array, - const int64_t topk) { + const std::string &table_name, + const std::vector &query_record_array, + const std::vector &query_range_array, + const int64_t topk) { // SERVER_LOG_DEBUG << "Entering RequestHandler::SearchVector"; BaseTaskPtr task_ptr = SearchVectorTask1::Create(table_name, std::vector(), query_record_array, query_range_array, topk, _return); @@ -67,10 +67,10 @@ RequestHandler::SearchVector(std::vector &_return, void RequestHandler::SearchVector2(std::vector & _return, - const std::string& table_name, - const std::vector & query_record_array, - const std::vector & query_range_array, - const int64_t topk) { + const std::string& table_name, + const std::vector & query_record_array, + const std::vector & query_range_array, + const int64_t topk) { BaseTaskPtr task_ptr = SearchVectorTask2::Create(table_name, std::vector(), query_record_array, query_range_array, topk, _return); RequestScheduler::ExecTask(task_ptr); @@ -78,11 +78,11 @@ RequestHandler::SearchVector2(std::vector & _return, void RequestHandler::SearchVectorInFiles(std::vector<::milvus::thrift::TopKQueryResult> &_return, - const std::string& table_name, - const std::vector &file_id_array, - const std::vector<::milvus::thrift::RowRecord> &query_record_array, - const std::vector<::milvus::thrift::Range> &query_range_array, - const int64_t topk) { + const std::string& table_name, + const std::vector &file_id_array, + const std::vector<::milvus::thrift::RowRecord> &query_record_array, + const std::vector<::milvus::thrift::Range> &query_range_array, + const int64_t topk) { // SERVER_LOG_DEBUG << "Entering RequestHandler::SearchVectorInFiles. file_id_array size = " << std::to_string(file_id_array.size()); BaseTaskPtr task_ptr = SearchVectorTask1::Create(table_name, file_id_array, query_record_array, query_range_array, topk, _return); -- GitLab