From 2b132a9cefe151967357a932e385aca2f0590eb3 Mon Sep 17 00:00:00 2001 From: starlord Date: Thu, 26 Sep 2019 20:25:57 +0800 Subject: [PATCH] format server code Former-commit-id: 5502293863810d216a6c8c46a28eb76bfa30bd12 --- cpp/src/server/Config.cpp | 134 ++++----- cpp/src/server/Config.h | 273 ++++++++--------- cpp/src/server/DBWrapper.cpp | 44 +-- cpp/src/server/DBWrapper.h | 14 +- cpp/src/server/Server.cpp | 20 +- cpp/src/server/Server.h | 7 +- .../server/grpc_impl/GrpcRequestHandler.cpp | 57 ++-- cpp/src/server/grpc_impl/GrpcRequestHandler.h | 43 ++- .../server/grpc_impl/GrpcRequestScheduler.cpp | 143 ++++----- .../server/grpc_impl/GrpcRequestScheduler.h | 42 +-- cpp/src/server/grpc_impl/GrpcRequestTask.cpp | 274 +++++++++--------- cpp/src/server/grpc_impl/GrpcRequestTask.h | 130 ++++----- cpp/src/server/grpc_impl/GrpcServer.cpp | 19 +- cpp/src/server/grpc_impl/GrpcServer.h | 10 +- 14 files changed, 599 insertions(+), 611 deletions(-) diff --git a/cpp/src/server/Config.cpp b/cpp/src/server/Config.cpp index 8ee2f7c6..0dbf4e1d 100644 --- a/cpp/src/server/Config.cpp +++ b/cpp/src/server/Config.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "Config.h" +#include "server/Config.h" #include #include @@ -23,12 +23,13 @@ #include #include #include +#include +#include #include "config/ConfigMgr.h" #include "utils/CommonUtil.h" #include "utils/ValidationUtil.h" - namespace zilliz { namespace milvus { namespace server { @@ -71,11 +72,11 @@ Config::LoadConfigFile(const std::string &filename) { } void -Config::PrintConfigSection(const std::string& config_node_name) { +Config::PrintConfigSection(const std::string &config_node_name) { std::cout << std::endl; std::cout << config_node_name << ":" << std::endl; if (config_map_.find(config_node_name) != config_map_.end()) { - for (auto item: config_map_[config_node_name]) { + for (auto item : config_map_[config_node_name]) { std::cout << item.first << ": " << item.second << std::endl; } } @@ -182,7 +183,7 @@ Config::CheckDBConfigBufferSize(const std::string &value) { return Status(SERVER_INVALID_ARGUMENT, "Invalid DB config buffer_size: " + value); } else { int64_t buffer_size = std::stoi(value) * GB; - unsigned long total_mem = 0, free_mem = 0; + uint64_t total_mem = 0, free_mem = 0; CommonUtil::GetSystemMemInfo(total_mem, free_mem); if (buffer_size >= total_mem) { return Status(SERVER_INVALID_ARGUMENT, "DB config buffer_size exceed system memory: " + value); @@ -205,7 +206,7 @@ Config::CheckDBConfigBuildIndexGPU(const std::string &value) { } Status -Config::CheckMetricConfigAutoBootup(const std::string& value) { +Config::CheckMetricConfigAutoBootup(const std::string &value) { if (!ValidationUtil::ValidateStringIsBool(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config auto_bootup: " + value); } @@ -213,7 +214,7 @@ Config::CheckMetricConfigAutoBootup(const std::string& value) { } Status -Config::CheckMetricConfigCollector(const std::string& value) { +Config::CheckMetricConfigCollector(const std::string &value) { if (value != "prometheus") { return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config collector: " + value); } @@ -221,7 +222,7 @@ Config::CheckMetricConfigCollector(const std::string& value) { } Status -Config::CheckMetricConfigPrometheusPort(const std::string& value) { +Config::CheckMetricConfigPrometheusPort(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config prometheus_port: " + value); } @@ -229,12 +230,12 @@ Config::CheckMetricConfigPrometheusPort(const std::string& value) { } Status -Config::CheckCacheConfigCpuMemCapacity(const std::string& value) { +Config::CheckCacheConfigCpuMemCapacity(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cpu_mem_capacity: " + value); } else { uint64_t cpu_cache_capacity = std::stoi(value) * GB; - unsigned long total_mem = 0, free_mem = 0; + uint64_t total_mem = 0, free_mem = 0; CommonUtil::GetSystemMemInfo(total_mem, free_mem); if (cpu_cache_capacity >= total_mem) { return Status(SERVER_INVALID_ARGUMENT, "Cache config cpu_mem_capacity exceed system memory: " + value); @@ -254,7 +255,7 @@ Config::CheckCacheConfigCpuMemCapacity(const std::string& value) { } Status -Config::CheckCacheConfigCpuMemThreshold(const std::string& value) { +Config::CheckCacheConfigCpuMemThreshold(const std::string &value) { if (!ValidationUtil::ValidateStringIsFloat(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cpu_mem_threshold: " + value); } else { @@ -267,7 +268,7 @@ Config::CheckCacheConfigCpuMemThreshold(const std::string& value) { } Status -Config::CheckCacheConfigGpuMemCapacity(const std::string& value) { +Config::CheckCacheConfigGpuMemCapacity(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { std::cerr << "ERROR: gpu_cache_capacity " << value << " is not a number" << std::endl; } else { @@ -290,7 +291,7 @@ Config::CheckCacheConfigGpuMemCapacity(const std::string& value) { } Status -Config::CheckCacheConfigGpuMemThreshold(const std::string& value) { +Config::CheckCacheConfigGpuMemThreshold(const std::string &value) { if (!ValidationUtil::ValidateStringIsFloat(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config gpu_mem_threshold: " + value); } else { @@ -303,7 +304,7 @@ Config::CheckCacheConfigGpuMemThreshold(const std::string& value) { } Status -Config::CheckCacheConfigCacheInsertData(const std::string& value) { +Config::CheckCacheConfigCacheInsertData(const std::string &value) { if (!ValidationUtil::ValidateStringIsBool(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cache_insert_data: " + value); } @@ -311,7 +312,7 @@ Config::CheckCacheConfigCacheInsertData(const std::string& value) { } Status -Config::CheckEngineConfigBlasThreshold(const std::string& value) { +Config::CheckEngineConfigBlasThreshold(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid engine config blas threshold: " + value); } @@ -319,7 +320,7 @@ Config::CheckEngineConfigBlasThreshold(const std::string& value) { } Status -Config::CheckEngineConfigOmpThreadNum(const std::string& value) { +Config::CheckEngineConfigOmpThreadNum(const std::string &value) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid engine config omp_thread_num: " + value); } else { @@ -333,7 +334,7 @@ Config::CheckEngineConfigOmpThreadNum(const std::string& value) { } Status -Config::CheckResourceConfigMode(const std::string& value) { +Config::CheckResourceConfigMode(const std::string &value) { if (value != "simple") { return Status(SERVER_INVALID_ARGUMENT, "Invalid resource config mode: " + value); } @@ -341,7 +342,7 @@ Config::CheckResourceConfigMode(const std::string& value) { } Status -Config::CheckResourceConfigPool(const std::vector& value) { +Config::CheckResourceConfigPool(const std::vector &value) { if (value.empty()) { return Status(SERVER_INVALID_ARGUMENT, "Invalid resource config pool"); } @@ -632,52 +633,51 @@ Config::GetResourceConfigStrMode() { return value; } - //////////////////////////////////////////////////////////////////////////////// Status -Config::GetServerConfigAddress(std::string& value) { +Config::GetServerConfigAddress(std::string &value) { value = GetServerConfigStrAddress(); return CheckServerConfigAddress(value); } Status -Config::GetServerConfigPort(std::string& value) { +Config::GetServerConfigPort(std::string &value) { value = GetServerConfigStrPort(); return CheckServerConfigPort(value); } Status -Config::GetServerConfigMode(std::string& value) { +Config::GetServerConfigMode(std::string &value) { value = GetServerConfigStrMode(); return CheckServerConfigMode(value); } Status -Config::GetServerConfigTimeZone(std::string& value) { +Config::GetServerConfigTimeZone(std::string &value) { value = GetServerConfigStrTimeZone(); return CheckServerConfigTimeZone(value); } Status -Config::GetDBConfigPath(std::string& value) { +Config::GetDBConfigPath(std::string &value) { value = GetDBConfigStrPath(); return CheckDBConfigPath(value); } Status -Config::GetDBConfigSlavePath(std::string& value) { +Config::GetDBConfigSlavePath(std::string &value) { value = GetDBConfigStrSlavePath(); return Status::OK(); } Status -Config::GetDBConfigBackendUrl(std::string& value) { +Config::GetDBConfigBackendUrl(std::string &value) { value = GetDBConfigStrBackendUrl(); return CheckDBConfigBackendUrl(value); } Status -Config::GetDBConfigArchiveDiskThreshold(int32_t& value) { +Config::GetDBConfigArchiveDiskThreshold(int32_t &value) { std::string str = GetDBConfigStrArchiveDiskThreshold(); Status s = CheckDBConfigArchiveDiskThreshold(str); if (!s.ok()) return s; @@ -686,7 +686,7 @@ Config::GetDBConfigArchiveDiskThreshold(int32_t& value) { } Status -Config::GetDBConfigArchiveDaysThreshold(int32_t& value) { +Config::GetDBConfigArchiveDaysThreshold(int32_t &value) { std::string str = GetDBConfigStrArchiveDaysThreshold(); Status s = CheckDBConfigArchiveDaysThreshold(str); if (!s.ok()) return s; @@ -695,7 +695,7 @@ Config::GetDBConfigArchiveDaysThreshold(int32_t& value) { } Status -Config::GetDBConfigBufferSize(int32_t& value) { +Config::GetDBConfigBufferSize(int32_t &value) { std::string str = GetDBConfigStrBufferSize(); Status s = CheckDBConfigBufferSize(str); if (!s.ok()) return s; @@ -704,7 +704,7 @@ Config::GetDBConfigBufferSize(int32_t& value) { } Status -Config::GetDBConfigBuildIndexGPU(int32_t& value) { +Config::GetDBConfigBuildIndexGPU(int32_t &value) { std::string str = GetDBConfigStrBuildIndexGPU(); Status s = CheckDBConfigBuildIndexGPU(str); if (!s.ok()) return s; @@ -713,7 +713,7 @@ Config::GetDBConfigBuildIndexGPU(int32_t& value) { } Status -Config::GetMetricConfigAutoBootup(bool& value) { +Config::GetMetricConfigAutoBootup(bool &value) { std::string str = GetMetricConfigStrAutoBootup(); Status s = CheckMetricConfigPrometheusPort(str); if (!s.ok()) return s; @@ -723,19 +723,19 @@ Config::GetMetricConfigAutoBootup(bool& value) { } Status -Config::GetMetricConfigCollector(std::string& value) { +Config::GetMetricConfigCollector(std::string &value) { value = GetMetricConfigStrCollector(); return Status::OK(); } Status -Config::GetMetricConfigPrometheusPort(std::string& value) { +Config::GetMetricConfigPrometheusPort(std::string &value) { value = GetMetricConfigStrPrometheusPort(); return CheckMetricConfigPrometheusPort(value); } Status -Config::GetCacheConfigCpuMemCapacity(int32_t& value) { +Config::GetCacheConfigCpuMemCapacity(int32_t &value) { std::string str = GetCacheConfigStrCpuMemCapacity(); Status s = CheckCacheConfigCpuMemCapacity(str); if (!s.ok()) return s; @@ -744,7 +744,7 @@ Config::GetCacheConfigCpuMemCapacity(int32_t& value) { } Status -Config::GetCacheConfigCpuMemThreshold(float& value) { +Config::GetCacheConfigCpuMemThreshold(float &value) { std::string str = GetCacheConfigStrCpuMemThreshold(); Status s = CheckCacheConfigCpuMemThreshold(str); if (!s.ok()) return s; @@ -753,7 +753,7 @@ Config::GetCacheConfigCpuMemThreshold(float& value) { } Status -Config::GetCacheConfigGpuMemCapacity(int32_t& value) { +Config::GetCacheConfigGpuMemCapacity(int32_t &value) { std::string str = GetCacheConfigStrGpuMemCapacity(); Status s = CheckCacheConfigGpuMemCapacity(str); if (!s.ok()) return s; @@ -762,7 +762,7 @@ Config::GetCacheConfigGpuMemCapacity(int32_t& value) { } Status -Config::GetCacheConfigGpuMemThreshold(float& value) { +Config::GetCacheConfigGpuMemThreshold(float &value) { std::string str = GetCacheConfigStrGpuMemThreshold(); Status s = CheckCacheConfigGpuMemThreshold(str); if (!s.ok()) return s; @@ -771,7 +771,7 @@ Config::GetCacheConfigGpuMemThreshold(float& value) { } Status -Config::GetCacheConfigCacheInsertData(bool& value) { +Config::GetCacheConfigCacheInsertData(bool &value) { std::string str = GetCacheConfigStrCacheInsertData(); Status s = CheckCacheConfigCacheInsertData(str); if (!s.ok()) return s; @@ -781,7 +781,7 @@ Config::GetCacheConfigCacheInsertData(bool& value) { } Status -Config::GetEngineConfigBlasThreshold(int32_t& value) { +Config::GetEngineConfigBlasThreshold(int32_t &value) { std::string str = GetEngineConfigStrBlasThreshold(); Status s = CheckEngineConfigBlasThreshold(str); if (!s.ok()) return s; @@ -790,7 +790,7 @@ Config::GetEngineConfigBlasThreshold(int32_t& value) { } Status -Config::GetEngineConfigOmpThreadNum(int32_t& value) { +Config::GetEngineConfigOmpThreadNum(int32_t &value) { std::string str = GetEngineConfigStrOmpThreadNum(); Status s = CheckEngineConfigOmpThreadNum(str); if (!s.ok()) return s; @@ -799,13 +799,13 @@ Config::GetEngineConfigOmpThreadNum(int32_t& value) { } Status -Config::GetResourceConfigMode(std::string& value) { +Config::GetResourceConfigMode(std::string &value) { value = GetResourceConfigStrMode(); return CheckResourceConfigMode(value); } Status -Config::GetResourceConfigPool(std::vector& value) { +Config::GetResourceConfigPool(std::vector &value) { ConfigNode resource_config = GetConfigNode(CONFIG_RESOURCE); value = resource_config.GetSequence(CONFIG_RESOURCE_POOL); return CheckResourceConfigPool(value); @@ -814,7 +814,7 @@ Config::GetResourceConfigPool(std::vector& value) { /////////////////////////////////////////////////////////////////////////////// /* server config */ Status -Config::SetServerConfigAddress(const std::string& value) { +Config::SetServerConfigAddress(const std::string &value) { Status s = CheckServerConfigAddress(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, value); @@ -822,7 +822,7 @@ Config::SetServerConfigAddress(const std::string& value) { } Status -Config::SetServerConfigPort(const std::string& value) { +Config::SetServerConfigPort(const std::string &value) { Status s = CheckServerConfigPort(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_PORT, value); @@ -830,7 +830,7 @@ Config::SetServerConfigPort(const std::string& value) { } Status -Config::SetServerConfigMode(const std::string& value) { +Config::SetServerConfigMode(const std::string &value) { Status s = CheckServerConfigMode(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_MODE, value); @@ -838,7 +838,7 @@ Config::SetServerConfigMode(const std::string& value) { } Status -Config::SetServerConfigTimeZone(const std::string& value) { +Config::SetServerConfigTimeZone(const std::string &value) { Status s = CheckServerConfigTimeZone(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, value); @@ -847,7 +847,7 @@ Config::SetServerConfigTimeZone(const std::string& value) { /* db config */ Status -Config::SetDBConfigPath(const std::string& value) { +Config::SetDBConfigPath(const std::string &value) { Status s = CheckDBConfigPath(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_PATH, value); @@ -855,7 +855,7 @@ Config::SetDBConfigPath(const std::string& value) { } Status -Config::SetDBConfigSlavePath(const std::string& value) { +Config::SetDBConfigSlavePath(const std::string &value) { Status s = CheckDBConfigSlavePath(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_SLAVE_PATH, value); @@ -863,7 +863,7 @@ Config::SetDBConfigSlavePath(const std::string& value) { } Status -Config::SetDBConfigBackendUrl(const std::string& value) { +Config::SetDBConfigBackendUrl(const std::string &value) { Status s = CheckDBConfigBackendUrl(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BACKEND_URL, value); @@ -871,7 +871,7 @@ Config::SetDBConfigBackendUrl(const std::string& value) { } Status -Config::SetDBConfigArchiveDiskThreshold(const std::string& value) { +Config::SetDBConfigArchiveDiskThreshold(const std::string &value) { Status s = CheckDBConfigArchiveDiskThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, value); @@ -879,7 +879,7 @@ Config::SetDBConfigArchiveDiskThreshold(const std::string& value) { } Status -Config::SetDBConfigArchiveDaysThreshold(const std::string& value) { +Config::SetDBConfigArchiveDaysThreshold(const std::string &value) { Status s = CheckDBConfigArchiveDaysThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value); @@ -887,7 +887,7 @@ Config::SetDBConfigArchiveDaysThreshold(const std::string& value) { } Status -Config::SetDBConfigBufferSize(const std::string& value) { +Config::SetDBConfigBufferSize(const std::string &value) { Status s = CheckDBConfigBufferSize(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUFFER_SIZE, value); @@ -895,7 +895,7 @@ Config::SetDBConfigBufferSize(const std::string& value) { } Status -Config::SetDBConfigBuildIndexGPU(const std::string& value) { +Config::SetDBConfigBuildIndexGPU(const std::string &value) { Status s = CheckDBConfigBuildIndexGPU(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, value); @@ -904,7 +904,7 @@ Config::SetDBConfigBuildIndexGPU(const std::string& value) { /* metric config */ Status -Config::SetMetricConfigAutoBootup(const std::string& value) { +Config::SetMetricConfigAutoBootup(const std::string &value) { Status s = CheckMetricConfigAutoBootup(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_AUTO_BOOTUP, value); @@ -912,7 +912,7 @@ Config::SetMetricConfigAutoBootup(const std::string& value) { } Status -Config::SetMetricConfigCollector(const std::string& value) { +Config::SetMetricConfigCollector(const std::string &value) { Status s = CheckMetricConfigCollector(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_COLLECTOR, value); @@ -920,7 +920,7 @@ Config::SetMetricConfigCollector(const std::string& value) { } Status -Config::SetMetricConfigPrometheusPort(const std::string& value) { +Config::SetMetricConfigPrometheusPort(const std::string &value) { Status s = CheckMetricConfigPrometheusPort(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_PROMETHEUS_PORT, value); @@ -929,7 +929,7 @@ Config::SetMetricConfigPrometheusPort(const std::string& value) { /* cache config */ Status -Config::SetCacheConfigCpuMemCapacity(const std::string& value) { +Config::SetCacheConfigCpuMemCapacity(const std::string &value) { Status s = CheckCacheConfigCpuMemCapacity(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CPU_MEM_CAPACITY, value); @@ -937,7 +937,7 @@ Config::SetCacheConfigCpuMemCapacity(const std::string& value) { } Status -Config::SetCacheConfigCpuMemThreshold(const std::string& value) { +Config::SetCacheConfigCpuMemThreshold(const std::string &value) { Status s = CheckCacheConfigCpuMemThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CPU_MEM_THRESHOLD, value); @@ -945,7 +945,7 @@ Config::SetCacheConfigCpuMemThreshold(const std::string& value) { } Status -Config::SetCacheConfigGpuMemCapacity(const std::string& value) { +Config::SetCacheConfigGpuMemCapacity(const std::string &value) { Status s = CheckCacheConfigGpuMemCapacity(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_GPU_MEM_CAPACITY, value); @@ -953,7 +953,7 @@ Config::SetCacheConfigGpuMemCapacity(const std::string& value) { } Status -Config::SetCacheConfigGpuMemThreshold(const std::string& value) { +Config::SetCacheConfigGpuMemThreshold(const std::string &value) { Status s = CheckCacheConfigGpuMemThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_GPU_MEM_THRESHOLD, value); @@ -961,7 +961,7 @@ Config::SetCacheConfigGpuMemThreshold(const std::string& value) { } Status -Config::SetCacheConfigCacheInsertData(const std::string& value) { +Config::SetCacheConfigCacheInsertData(const std::string &value) { Status s = CheckCacheConfigCacheInsertData(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CACHE_INSERT_DATA, value); @@ -970,7 +970,7 @@ Config::SetCacheConfigCacheInsertData(const std::string& value) { /* engine config */ Status -Config::SetEngineConfigBlasThreshold(const std::string& value) { +Config::SetEngineConfigBlasThreshold(const std::string &value) { Status s = CheckEngineConfigBlasThreshold(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_ENGINE_BLAS_THRESHOLD, value); @@ -978,7 +978,7 @@ Config::SetEngineConfigBlasThreshold(const std::string& value) { } Status -Config::SetEngineConfigOmpThreadNum(const std::string& value) { +Config::SetEngineConfigOmpThreadNum(const std::string &value) { Status s = CheckEngineConfigOmpThreadNum(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_ENGINE_OMP_THREAD_NUM, value); @@ -987,13 +987,13 @@ Config::SetEngineConfigOmpThreadNum(const std::string& value) { /* resource config */ Status -Config::SetResourceConfigMode(const std::string& value) { +Config::SetResourceConfigMode(const std::string &value) { Status s = CheckResourceConfigMode(value); if (!s.ok()) return s; SetConfigValueInMem(CONFIG_DB, CONFIG_RESOURCE_MODE, value); return Status::OK(); } -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/Config.h b/cpp/src/server/Config.h index 956c0810..f2d1b969 100644 --- a/cpp/src/server/Config.h +++ b/cpp/src/server/Config.h @@ -17,136 +17,137 @@ #pragma once +#include +#include #include #include -#include "yaml-cpp/yaml.h" +#include + #include "utils/Status.h" #include "config/ConfigNode.h" - namespace zilliz { namespace milvus { namespace server { /* server config */ -static const char* CONFIG_SERVER = "server_config"; -static const char* CONFIG_SERVER_ADDRESS = "address"; -static const char* CONFIG_SERVER_ADDRESS_DEFAULT = "127.0.0.1"; -static const char* CONFIG_SERVER_PORT = "port"; -static const char* CONFIG_SERVER_PORT_DEFAULT = "19530"; -static const char* CONFIG_SERVER_MODE = "mode"; -static const char* CONFIG_SERVER_MODE_DEFAULT = "single"; -static const char* CONFIG_SERVER_TIME_ZONE = "time_zone"; -static const char* CONFIG_SERVER_TIME_ZONE_DEFAULT = "UTC+8"; +static const char *CONFIG_SERVER = "server_config"; +static const char *CONFIG_SERVER_ADDRESS = "address"; +static const char *CONFIG_SERVER_ADDRESS_DEFAULT = "127.0.0.1"; +static const char *CONFIG_SERVER_PORT = "port"; +static const char *CONFIG_SERVER_PORT_DEFAULT = "19530"; +static const char *CONFIG_SERVER_MODE = "mode"; +static const char *CONFIG_SERVER_MODE_DEFAULT = "single"; +static const char *CONFIG_SERVER_TIME_ZONE = "time_zone"; +static const char *CONFIG_SERVER_TIME_ZONE_DEFAULT = "UTC+8"; /* db config */ -static const char* CONFIG_DB = "db_config"; -static const char* CONFIG_DB_PATH = "path"; -static const char* CONFIG_DB_PATH_DEFAULT = "/tmp/milvus"; -static const char* CONFIG_DB_SLAVE_PATH = "slave_path"; -static const char* CONFIG_DB_SLAVE_PATH_DEFAULT = ""; -static const char* CONFIG_DB_BACKEND_URL = "backend_url"; -static const char* CONFIG_DB_BACKEND_URL_DEFAULT = "sqlite://:@:/"; -static const char* CONFIG_DB_ARCHIVE_DISK_THRESHOLD = "archive_disk_threshold"; -static const char* CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT = "0"; -static const char* CONFIG_DB_ARCHIVE_DAYS_THRESHOLD = "archive_days_threshold"; -static const char* CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT = "0"; -static const char* CONFIG_DB_BUFFER_SIZE = "buffer_size"; -static const char* CONFIG_DB_BUFFER_SIZE_DEFAULT = "4"; -static const char* CONFIG_DB_BUILD_INDEX_GPU = "build_index_gpu"; -static const char* CONFIG_DB_BUILD_INDEX_GPU_DEFAULT = "0"; +static const char *CONFIG_DB = "db_config"; +static const char *CONFIG_DB_PATH = "path"; +static const char *CONFIG_DB_PATH_DEFAULT = "/tmp/milvus"; +static const char *CONFIG_DB_SLAVE_PATH = "slave_path"; +static const char *CONFIG_DB_SLAVE_PATH_DEFAULT = ""; +static const char *CONFIG_DB_BACKEND_URL = "backend_url"; +static const char *CONFIG_DB_BACKEND_URL_DEFAULT = "sqlite://:@:/"; +static const char *CONFIG_DB_ARCHIVE_DISK_THRESHOLD = "archive_disk_threshold"; +static const char *CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT = "0"; +static const char *CONFIG_DB_ARCHIVE_DAYS_THRESHOLD = "archive_days_threshold"; +static const char *CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT = "0"; +static const char *CONFIG_DB_BUFFER_SIZE = "buffer_size"; +static const char *CONFIG_DB_BUFFER_SIZE_DEFAULT = "4"; +static const char *CONFIG_DB_BUILD_INDEX_GPU = "build_index_gpu"; +static const char *CONFIG_DB_BUILD_INDEX_GPU_DEFAULT = "0"; /* cache config */ -static const char* CONFIG_CACHE = "cache_config"; -static const char* CONFIG_CACHE_CPU_MEM_CAPACITY = "cpu_mem_capacity"; -static const char* CONFIG_CACHE_CPU_MEM_CAPACITY_DEFAULT = "16"; -static const char* CONFIG_CACHE_GPU_MEM_CAPACITY = "gpu_mem_capacity"; -static const char* CONFIG_CACHE_GPU_MEM_CAPACITY_DEFAULT = "0"; -static const char* CONFIG_CACHE_CPU_MEM_THRESHOLD = "cpu_mem_threshold"; -static const char* CONFIG_CACHE_CPU_MEM_THRESHOLD_DEFAULT = "0.85"; -static const char* CONFIG_CACHE_GPU_MEM_THRESHOLD = "gpu_mem_threshold"; -static const char* CONFIG_CACHE_GPU_MEM_THRESHOLD_DEFAULT = "0.85"; -static const char* CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data"; -static const char* CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT = "false"; +static const char *CONFIG_CACHE = "cache_config"; +static const char *CONFIG_CACHE_CPU_MEM_CAPACITY = "cpu_mem_capacity"; +static const char *CONFIG_CACHE_CPU_MEM_CAPACITY_DEFAULT = "16"; +static const char *CONFIG_CACHE_GPU_MEM_CAPACITY = "gpu_mem_capacity"; +static const char *CONFIG_CACHE_GPU_MEM_CAPACITY_DEFAULT = "0"; +static const char *CONFIG_CACHE_CPU_MEM_THRESHOLD = "cpu_mem_threshold"; +static const char *CONFIG_CACHE_CPU_MEM_THRESHOLD_DEFAULT = "0.85"; +static const char *CONFIG_CACHE_GPU_MEM_THRESHOLD = "gpu_mem_threshold"; +static const char *CONFIG_CACHE_GPU_MEM_THRESHOLD_DEFAULT = "0.85"; +static const char *CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data"; +static const char *CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT = "false"; /* metric config */ -static const char* CONFIG_METRIC = "metric_config"; -static const char* CONFIG_METRIC_AUTO_BOOTUP = "auto_bootup"; -static const char* CONFIG_METRIC_AUTO_BOOTUP_DEFAULT = "false"; -static const char* CONFIG_METRIC_COLLECTOR = "collector"; -static const char* CONFIG_METRIC_COLLECTOR_DEFAULT = "prometheus"; -static const char* CONFIG_METRIC_PROMETHEUS = "prometheus_config"; -static const char* CONFIG_METRIC_PROMETHEUS_PORT = "port"; -static const char* CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT = "8080"; +static const char *CONFIG_METRIC = "metric_config"; +static const char *CONFIG_METRIC_AUTO_BOOTUP = "auto_bootup"; +static const char *CONFIG_METRIC_AUTO_BOOTUP_DEFAULT = "false"; +static const char *CONFIG_METRIC_COLLECTOR = "collector"; +static const char *CONFIG_METRIC_COLLECTOR_DEFAULT = "prometheus"; +static const char *CONFIG_METRIC_PROMETHEUS = "prometheus_config"; +static const char *CONFIG_METRIC_PROMETHEUS_PORT = "port"; +static const char *CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT = "8080"; /* engine config */ -static const char* CONFIG_ENGINE = "engine_config"; -static const char* CONFIG_ENGINE_BLAS_THRESHOLD = "blas_threshold"; -static const char* CONFIG_ENGINE_BLAS_THRESHOLD_DEFAULT = "20"; -static const char* CONFIG_ENGINE_OMP_THREAD_NUM = "omp_thread_num"; -static const char* CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0"; +static const char *CONFIG_ENGINE = "engine_config"; +static const char *CONFIG_ENGINE_BLAS_THRESHOLD = "blas_threshold"; +static const char *CONFIG_ENGINE_BLAS_THRESHOLD_DEFAULT = "20"; +static const char *CONFIG_ENGINE_OMP_THREAD_NUM = "omp_thread_num"; +static const char *CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0"; /* resource config */ -static const char* CONFIG_RESOURCE = "resource_config"; -static const char* CONFIG_RESOURCE_MODE = "mode"; -static const char* CONFIG_RESOURCE_MODE_DEFAULT = "simple"; -static const char* CONFIG_RESOURCE_POOL = "pool"; - +static const char *CONFIG_RESOURCE = "resource_config"; +static const char *CONFIG_RESOURCE_MODE = "mode"; +static const char *CONFIG_RESOURCE_MODE_DEFAULT = "simple"; +static const char *CONFIG_RESOURCE_POOL = "pool"; class Config { public: - static Config& GetInstance(); - Status LoadConfigFile(const std::string& filename); + static Config &GetInstance(); + Status LoadConfigFile(const std::string &filename); void PrintAll(); private: - ConfigNode& GetConfigNode(const std::string& name); + ConfigNode &GetConfigNode(const std::string &name); - Status GetConfigValueInMem(const std::string& parent_key, - const std::string& child_key, - std::string& value); + Status GetConfigValueInMem(const std::string &parent_key, + const std::string &child_key, + std::string &value); - void SetConfigValueInMem(const std::string& parent_key, - const std::string& child_key, - const std::string& value); + void SetConfigValueInMem(const std::string &parent_key, + const std::string &child_key, + const std::string &value); - void PrintConfigSection(const std::string& config_node_name); + void PrintConfigSection(const std::string &config_node_name); /////////////////////////////////////////////////////////////////////////// /* server config */ - Status CheckServerConfigAddress(const std::string& value); - Status CheckServerConfigPort(const std::string& value); - Status CheckServerConfigMode(const std::string& value); - Status CheckServerConfigTimeZone(const std::string& value); + Status CheckServerConfigAddress(const std::string &value); + Status CheckServerConfigPort(const std::string &value); + Status CheckServerConfigMode(const std::string &value); + Status CheckServerConfigTimeZone(const std::string &value); /* db config */ - Status CheckDBConfigPath(const std::string& value); - Status CheckDBConfigSlavePath(const std::string& value); - Status CheckDBConfigBackendUrl(const std::string& value); - Status CheckDBConfigArchiveDiskThreshold(const std::string& value); - Status CheckDBConfigArchiveDaysThreshold(const std::string& value); - Status CheckDBConfigBufferSize(const std::string& value); - Status CheckDBConfigBuildIndexGPU(const std::string& value); + Status CheckDBConfigPath(const std::string &value); + Status CheckDBConfigSlavePath(const std::string &value); + Status CheckDBConfigBackendUrl(const std::string &value); + Status CheckDBConfigArchiveDiskThreshold(const std::string &value); + Status CheckDBConfigArchiveDaysThreshold(const std::string &value); + Status CheckDBConfigBufferSize(const std::string &value); + Status CheckDBConfigBuildIndexGPU(const std::string &value); /* metric config */ - Status CheckMetricConfigAutoBootup(const std::string& value); - Status CheckMetricConfigCollector(const std::string& value); - Status CheckMetricConfigPrometheusPort(const std::string& value); + Status CheckMetricConfigAutoBootup(const std::string &value); + Status CheckMetricConfigCollector(const std::string &value); + Status CheckMetricConfigPrometheusPort(const std::string &value); /* cache config */ - Status CheckCacheConfigCpuMemCapacity(const std::string& value); - Status CheckCacheConfigCpuMemThreshold(const std::string& value); - Status CheckCacheConfigGpuMemCapacity(const std::string& value); - Status CheckCacheConfigGpuMemThreshold(const std::string& value); - Status CheckCacheConfigCacheInsertData(const std::string& value); + Status CheckCacheConfigCpuMemCapacity(const std::string &value); + Status CheckCacheConfigCpuMemThreshold(const std::string &value); + Status CheckCacheConfigGpuMemCapacity(const std::string &value); + Status CheckCacheConfigGpuMemThreshold(const std::string &value); + Status CheckCacheConfigCacheInsertData(const std::string &value); /* engine config */ - Status CheckEngineConfigBlasThreshold(const std::string& value); - Status CheckEngineConfigOmpThreadNum(const std::string& value); + Status CheckEngineConfigBlasThreshold(const std::string &value); + Status CheckEngineConfigOmpThreadNum(const std::string &value); /* resource config */ - Status CheckResourceConfigMode(const std::string& value); - Status CheckResourceConfigPool(const std::vector& value); + Status CheckResourceConfigMode(const std::string &value); + Status CheckResourceConfigPool(const std::vector &value); /////////////////////////////////////////////////////////////////////////// /* server config */ @@ -185,81 +186,81 @@ class Config { public: /* server config */ - Status GetServerConfigAddress(std::string& value); - Status GetServerConfigPort(std::string& value); - Status GetServerConfigMode(std::string& value); - Status GetServerConfigTimeZone(std::string& value); + Status GetServerConfigAddress(std::string &value); + Status GetServerConfigPort(std::string &value); + Status GetServerConfigMode(std::string &value); + Status GetServerConfigTimeZone(std::string &value); /* db config */ - Status GetDBConfigPath(std::string& value); - Status GetDBConfigSlavePath(std::string& value); - Status GetDBConfigBackendUrl(std::string& value); - Status GetDBConfigArchiveDiskThreshold(int32_t& value); - Status GetDBConfigArchiveDaysThreshold(int32_t& value); - Status GetDBConfigBufferSize(int32_t& value); - Status GetDBConfigBuildIndexGPU(int32_t& value); + Status GetDBConfigPath(std::string &value); + Status GetDBConfigSlavePath(std::string &value); + Status GetDBConfigBackendUrl(std::string &value); + Status GetDBConfigArchiveDiskThreshold(int32_t &value); + Status GetDBConfigArchiveDaysThreshold(int32_t &value); + Status GetDBConfigBufferSize(int32_t &value); + Status GetDBConfigBuildIndexGPU(int32_t &value); /* metric config */ - Status GetMetricConfigAutoBootup(bool& value); - Status GetMetricConfigCollector(std::string& value); - Status GetMetricConfigPrometheusPort(std::string& value); + Status GetMetricConfigAutoBootup(bool &value); + Status GetMetricConfigCollector(std::string &value); + Status GetMetricConfigPrometheusPort(std::string &value); /* cache config */ - Status GetCacheConfigCpuMemCapacity(int32_t& value); - Status GetCacheConfigCpuMemThreshold(float& value); - Status GetCacheConfigGpuMemCapacity(int32_t& value); - Status GetCacheConfigGpuMemThreshold(float& value); - Status GetCacheConfigCacheInsertData(bool& value); + Status GetCacheConfigCpuMemCapacity(int32_t &value); + Status GetCacheConfigCpuMemThreshold(float &value); + Status GetCacheConfigGpuMemCapacity(int32_t &value); + Status GetCacheConfigGpuMemThreshold(float &value); + Status GetCacheConfigCacheInsertData(bool &value); /* engine config */ - Status GetEngineConfigBlasThreshold(int32_t& value); - Status GetEngineConfigOmpThreadNum(int32_t& value); + Status GetEngineConfigBlasThreshold(int32_t &value); + Status GetEngineConfigOmpThreadNum(int32_t &value); /* resource config */ - Status GetResourceConfigMode(std::string& value); - Status GetResourceConfigPool(std::vector& value); + Status GetResourceConfigMode(std::string &value); + Status GetResourceConfigPool(std::vector &value); public: /* server config */ - Status SetServerConfigAddress(const std::string& value); - Status SetServerConfigPort(const std::string& value); - Status SetServerConfigMode(const std::string& value); - Status SetServerConfigTimeZone(const std::string& value); + Status SetServerConfigAddress(const std::string &value); + Status SetServerConfigPort(const std::string &value); + Status SetServerConfigMode(const std::string &value); + Status SetServerConfigTimeZone(const std::string &value); /* db config */ - Status SetDBConfigPath(const std::string& value); - Status SetDBConfigSlavePath(const std::string& value); - Status SetDBConfigBackendUrl(const std::string& value); - Status SetDBConfigArchiveDiskThreshold(const std::string& value); - Status SetDBConfigArchiveDaysThreshold(const std::string& value); - Status SetDBConfigBufferSize(const std::string& value); - Status SetDBConfigBuildIndexGPU(const std::string& value); + Status SetDBConfigPath(const std::string &value); + Status SetDBConfigSlavePath(const std::string &value); + Status SetDBConfigBackendUrl(const std::string &value); + Status SetDBConfigArchiveDiskThreshold(const std::string &value); + Status SetDBConfigArchiveDaysThreshold(const std::string &value); + Status SetDBConfigBufferSize(const std::string &value); + Status SetDBConfigBuildIndexGPU(const std::string &value); /* metric config */ - Status SetMetricConfigAutoBootup(const std::string& value); - Status SetMetricConfigCollector(const std::string& value); - Status SetMetricConfigPrometheusPort(const std::string& value); + Status SetMetricConfigAutoBootup(const std::string &value); + Status SetMetricConfigCollector(const std::string &value); + Status SetMetricConfigPrometheusPort(const std::string &value); /* cache config */ - Status SetCacheConfigCpuMemCapacity(const std::string& value); - Status SetCacheConfigCpuMemThreshold(const std::string& value); - Status SetCacheConfigGpuMemCapacity(const std::string& value); - Status SetCacheConfigGpuMemThreshold(const std::string& value); - Status SetCacheConfigCacheInsertData(const std::string& value); + Status SetCacheConfigCpuMemCapacity(const std::string &value); + Status SetCacheConfigCpuMemThreshold(const std::string &value); + Status SetCacheConfigGpuMemCapacity(const std::string &value); + Status SetCacheConfigGpuMemThreshold(const std::string &value); + Status SetCacheConfigCacheInsertData(const std::string &value); /* engine config */ - Status SetEngineConfigBlasThreshold(const std::string& value); - Status SetEngineConfigOmpThreadNum(const std::string& value); + Status SetEngineConfigBlasThreshold(const std::string &value); + Status SetEngineConfigOmpThreadNum(const std::string &value); /* resource config */ - Status SetResourceConfigMode(const std::string& value); + Status SetResourceConfigMode(const std::string &value); private: std::unordered_map> config_map_; std::mutex mutex_; }; -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/DBWrapper.cpp b/cpp/src/server/DBWrapper.cpp index 16b13a12..b142f31d 100644 --- a/cpp/src/server/DBWrapper.cpp +++ b/cpp/src/server/DBWrapper.cpp @@ -16,13 +16,14 @@ // under the License. -#include "DBWrapper.h" +#include "server/DBWrapper.h" #include "Config.h" #include "db/DBFactory.h" #include "utils/CommonUtil.h" #include "utils/Log.h" #include "utils/StringHelpFunctions.h" +#include #include #include @@ -31,11 +32,11 @@ namespace milvus { namespace server { DBWrapper::DBWrapper() { - } -Status DBWrapper::StartService() { - Config& config = Config::GetInstance(); +Status +DBWrapper::StartService() { + Config &config = Config::GetInstance(); Status s; //db config engine::DBOptions opt; @@ -65,15 +66,13 @@ Status DBWrapper::StartService() { if (mode == "single") { opt.mode_ = engine::DBOptions::MODE::SINGLE; - } - else if (mode == "cluster") { + } else if (mode == "cluster") { opt.mode_ = engine::DBOptions::MODE::CLUSTER; - } - else if (mode == "read_only") { + } else if (mode == "read_only") { opt.mode_ = engine::DBOptions::MODE::READ_ONLY; - } - else { - std::cerr << "ERROR: mode specified in server_config is not one of ['single', 'cluster', 'read_only']" << std::endl; + } else { + std::cerr << "ERROR: mode specified in server_config is not one of ['single', 'cluster', 'read_only']" + << std::endl; kill(0, SIGUSR1); } @@ -86,8 +85,8 @@ Status DBWrapper::StartService() { SERVER_LOG_DEBUG << "Specify openmp thread number: " << omp_thread; } else { uint32_t sys_thread_cnt = 8; - if(CommonUtil::GetSystemAvailableThreads(sys_thread_cnt)) { - omp_thread = (int32_t)ceil(sys_thread_cnt*0.5); + if (CommonUtil::GetSystemAvailableThreads(sys_thread_cnt)) { + omp_thread = (int32_t) ceil(sys_thread_cnt * 0.5); omp_set_num_threads(omp_thread); } } @@ -116,14 +115,14 @@ Status DBWrapper::StartService() { //create db root folder Status status = CommonUtil::CreateDirectory(opt.meta_.path_); - if(!status.ok()) { + if (!status.ok()) { std::cerr << "ERROR! Failed to create database root path: " << opt.meta_.path_ << std::endl; kill(0, SIGUSR1); } - for(auto& path : opt.meta_.slave_paths_) { + for (auto &path : opt.meta_.slave_paths_) { status = CommonUtil::CreateDirectory(path); - if(!status.ok()) { + if (!status.ok()) { std::cerr << "ERROR! Failed to create database slave path: " << path << std::endl; kill(0, SIGUSR1); } @@ -132,7 +131,7 @@ Status DBWrapper::StartService() { //create db instance try { db_ = engine::DBFactory::Build(opt); - } catch(std::exception& ex) { + } catch (std::exception &ex) { std::cerr << "ERROR! Failed to open database: " << ex.what() << std::endl; kill(0, SIGUSR1); } @@ -142,14 +141,15 @@ Status DBWrapper::StartService() { return Status::OK(); } -Status DBWrapper::StopService() { - if(db_) { +Status +DBWrapper::StopService() { + if (db_) { db_->Stop(); } return Status::OK(); } -} -} -} \ No newline at end of file +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/DBWrapper.h b/cpp/src/server/DBWrapper.h index 99b8f905..4648f4b5 100644 --- a/cpp/src/server/DBWrapper.h +++ b/cpp/src/server/DBWrapper.h @@ -27,12 +27,12 @@ namespace milvus { namespace server { class DBWrapper { -private: + private: DBWrapper(); ~DBWrapper() = default; -public: - static DBWrapper& GetInstance() { + public: + static DBWrapper &GetInstance() { static DBWrapper wrapper; return wrapper; } @@ -48,10 +48,10 @@ public: return db_; } -private: + private: engine::DBPtr db_; }; -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/Server.cpp b/cpp/src/server/Server.cpp index 57081845..33651463 100644 --- a/cpp/src/server/Server.cpp +++ b/cpp/src/server/Server.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "server/Server.h" + #include #include #include @@ -24,7 +26,6 @@ #include #include -#include "Server.h" #include "server/grpc_impl/GrpcServer.h" #include "server/Config.h" #include "utils/Log.h" @@ -36,7 +37,6 @@ #include "wrapper/KnowhereResource.h" #include "DBWrapper.h" - namespace zilliz { namespace milvus { namespace server { @@ -116,7 +116,7 @@ Server::Daemonize() { } // Close all open fd - for (long fd = sysconf(_SC_OPEN_MAX); fd > 0; fd--) { + for (int64_t fd = sysconf(_SC_OPEN_MAX); fd > 0; fd--) { close(fd); } @@ -172,9 +172,9 @@ Server::Start() { time_zone = "CUT"; } else { int time_bias = std::stoi(time_zone.substr(3, std::string::npos)); - if (time_bias == 0) + if (time_bias == 0) { time_zone = "CUT"; - else if (time_bias > 0) { + } else if (time_bias > 0) { time_zone = "CUT" + std::to_string(-time_bias); } else { time_zone = "CUT+" + std::to_string(-time_bias); @@ -194,7 +194,6 @@ Server::Start() { StartService(); std::cout << "Milvus server start successfully." << std::endl; - } catch (std::exception &ex) { std::cerr << "Milvus server encounter exception: " << ex.what(); } @@ -232,10 +231,9 @@ Server::Stop() { std::cerr << "Milvus server is closed!" << std::endl; } - ErrorCode Server::LoadConfig() { - Config& config = Config::GetInstance(); + Config &config = Config::GetInstance(); Status s = config.LoadConfigFile(config_filename_); if (!s.ok()) { std::cerr << "Failed to load config file: " << config_filename_ << std::endl; @@ -260,6 +258,6 @@ Server::StopService() { engine::KnowhereResource::Finalize(); } -} -} -} +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/Server.h b/cpp/src/server/Server.h index 8507552b..9bd0ce13 100644 --- a/cpp/src/server/Server.h +++ b/cpp/src/server/Server.h @@ -22,7 +22,6 @@ #include #include - namespace zilliz { namespace milvus { namespace server { @@ -58,6 +57,6 @@ class Server { std::string log_config_file_; }; // Server -} // server -} // sql -} // zilliz +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp index c85d1b2f..a64422aa 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.cpp @@ -16,10 +16,12 @@ // under the License. -#include "GrpcRequestHandler.h" -#include "GrpcRequestTask.h" +#include "server/grpc_impl/GrpcRequestHandler.h" +#include "server/grpc_impl/GrpcRequestTask.h" #include "utils/TimeRecorder.h" +#include + namespace zilliz { namespace milvus { namespace server { @@ -29,7 +31,6 @@ namespace grpc { GrpcRequestHandler::CreateTable(::grpc::ServerContext *context, const ::milvus::grpc::TableSchema *request, ::milvus::grpc::Status *response) { - BaseTaskPtr task_ptr = CreateTableTask::Create(request); GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; @@ -39,7 +40,6 @@ GrpcRequestHandler::CreateTable(::grpc::ServerContext *context, GrpcRequestHandler::HasTable(::grpc::ServerContext *context, const ::milvus::grpc::TableName *request, ::milvus::grpc::BoolReply *response) { - bool has_table = false; BaseTaskPtr task_ptr = HasTableTask::Create(request->table_name(), has_table); ::milvus::grpc::Status grpc_status; @@ -61,9 +61,8 @@ GrpcRequestHandler::DropTable(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context, - const ::milvus::grpc::IndexParam *request, - ::milvus::grpc::Status *response) { - + const ::milvus::grpc::IndexParam *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = CreateIndexTask::Create(request); GrpcRequestScheduler::ExecTask(task_ptr, response); return ::grpc::Status::OK; @@ -71,9 +70,8 @@ GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::Insert(::grpc::ServerContext *context, - const ::milvus::grpc::InsertParam *request, - ::milvus::grpc::VectorIds *response) { - + const ::milvus::grpc::InsertParam *request, + ::milvus::grpc::VectorIds *response) { BaseTaskPtr task_ptr = InsertTask::Create(request, response); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -86,7 +84,6 @@ GrpcRequestHandler::Insert(::grpc::ServerContext *context, GrpcRequestHandler::Search(::grpc::ServerContext *context, const ::milvus::grpc::SearchParam *request, ::milvus::grpc::TopKQueryResultList *response) { - std::vector file_id_array; BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, response); ::milvus::grpc::Status grpc_status; @@ -100,7 +97,6 @@ GrpcRequestHandler::Search(::grpc::ServerContext *context, GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context, const ::milvus::grpc::SearchInFilesParam *request, ::milvus::grpc::TopKQueryResultList *response) { - std::vector file_id_array; for (int i = 0; i < request->file_id_array_size(); i++) { file_id_array.push_back(request->file_id_array(i)); @@ -118,7 +114,6 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context, GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context, const ::milvus::grpc::TableName *request, ::milvus::grpc::TableSchema *response) { - BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), response); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -129,9 +124,8 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::CountTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::TableRowCount *response) { - + const ::milvus::grpc::TableName *request, + ::milvus::grpc::TableRowCount *response) { int64_t row_count = 0; BaseTaskPtr task_ptr = CountTableTask::Create(request->table_name(), row_count); ::milvus::grpc::Status grpc_status; @@ -146,7 +140,6 @@ GrpcRequestHandler::CountTable(::grpc::ServerContext *context, GrpcRequestHandler::ShowTables(::grpc::ServerContext *context, const ::milvus::grpc::Command *request, ::milvus::grpc::TableNameList *response) { - BaseTaskPtr task_ptr = ShowTablesTask::Create(response); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -157,9 +150,8 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::Cmd(::grpc::ServerContext *context, - const ::milvus::grpc::Command *request, - ::milvus::grpc::StringReply *response) { - + const ::milvus::grpc::Command *request, + ::milvus::grpc::StringReply *response) { std::string result; BaseTaskPtr task_ptr = CmdTask::Create(request->cmd(), result); ::milvus::grpc::Status grpc_status; @@ -172,8 +164,8 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context, - const ::milvus::grpc::DeleteByRangeParam *request, - ::milvus::grpc::Status *response) { + const ::milvus::grpc::DeleteByRangeParam *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = DeleteByRangeTask::Create(request); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -184,8 +176,8 @@ GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::Status *response) { + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = PreloadTableTask::Create(request->table_name()); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -196,8 +188,8 @@ GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::IndexParam *response) { + const ::milvus::grpc::TableName *request, + ::milvus::grpc::IndexParam *response) { BaseTaskPtr task_ptr = DescribeIndexTask::Create(request->table_name(), response); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -208,8 +200,8 @@ GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context, ::grpc::Status GrpcRequestHandler::DropIndex(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::Status *response) { + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) { BaseTaskPtr task_ptr = DropIndexTask::Create(request->table_name()); ::milvus::grpc::Status grpc_status; GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); @@ -218,8 +210,7 @@ GrpcRequestHandler::DropIndex(::grpc::ServerContext *context, return ::grpc::Status::OK; } - -} -} -} -} \ No newline at end of file +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestHandler.h b/cpp/src/server/grpc_impl/GrpcRequestHandler.h index a8a70eb8..549e1d9d 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestHandler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestHandler.h @@ -20,15 +20,15 @@ #include #include -#include "milvus.grpc.pb.h" -#include "status.pb.h" +#include "grpc/gen-milvus/milvus.grpc.pb.h" +#include "grpc/gen-status/status.pb.h" namespace zilliz { namespace milvus { namespace server { namespace grpc { class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service { -public: + public: /** * @brief Create table method * @@ -103,8 +103,7 @@ public: */ ::grpc::Status CreateIndex(::grpc::ServerContext *context, - const ::milvus::grpc::IndexParam *request, ::milvus::grpc::Status *response) override; - + const ::milvus::grpc::IndexParam *request, ::milvus::grpc::Status *response) override; /** * @brief Insert vector array to table @@ -123,8 +122,8 @@ public: */ ::grpc::Status Insert(::grpc::ServerContext *context, - const ::milvus::grpc::InsertParam *request, - ::milvus::grpc::VectorIds *response) override; + const ::milvus::grpc::InsertParam *request, + ::milvus::grpc::VectorIds *response) override; /** * @brief Query vector @@ -213,8 +212,8 @@ public: */ ::grpc::Status CountTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::TableRowCount *response) override; + const ::milvus::grpc::TableName *request, + ::milvus::grpc::TableRowCount *response) override; /** * @brief List all tables in database @@ -253,8 +252,8 @@ public: */ ::grpc::Status Cmd(::grpc::ServerContext *context, - const ::milvus::grpc::Command *request, - ::milvus::grpc::StringReply *response) override; + const ::milvus::grpc::Command *request, + ::milvus::grpc::StringReply *response) override; /** * @brief delete table by range @@ -291,8 +290,8 @@ public: */ ::grpc::Status PreloadTable(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::Status *response) override; + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) override; /** * @brief Describe index @@ -310,8 +309,8 @@ public: */ ::grpc::Status DescribeIndex(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::IndexParam *response) override; + const ::milvus::grpc::TableName *request, + ::milvus::grpc::IndexParam *response) override; /** * @brief Drop index @@ -329,12 +328,12 @@ public: */ ::grpc::Status DropIndex(::grpc::ServerContext *context, - const ::milvus::grpc::TableName *request, - ::milvus::grpc::Status *response) override; - + const ::milvus::grpc::TableName *request, + ::milvus::grpc::Status *response) override; }; -} -} -} -} + +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp index fb8daa35..4c58195f 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.cpp @@ -15,101 +15,107 @@ // specific language governing permissions and limitations // under the License. -#include "GrpcRequestScheduler.h" +#include "server/grpc_impl/GrpcRequestScheduler.h" #include "utils/Log.h" -#include "src/grpc/gen-status/status.pb.h" +#include "grpc/gen-status/status.pb.h" + +#include namespace zilliz { namespace milvus { namespace server { namespace grpc { -using namespace ::milvus; - namespace { - ::milvus::grpc::ErrorCode ErrorMap(ErrorCode code) { - static const std::map code_map = { - {SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_NULL_POINTER, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND}, - {SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_BLOCKING_QUEUE_EMPTY, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, - {SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER}, - {SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE}, - {SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER}, - {SERVER_CANNOT_DELETE_FILE, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE}, - {SERVER_TABLE_NOT_EXIST, ::milvus::grpc::ErrorCode::TABLE_NOT_EXISTS}, - {SERVER_INVALID_TABLE_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_TABLE_NAME}, - {SERVER_INVALID_TABLE_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, - {SERVER_INVALID_TIME_RANGE, ::milvus::grpc::ErrorCode::ILLEGAL_RANGE}, - {SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, - - {SERVER_INVALID_INDEX_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE}, - {SERVER_INVALID_ROWRECORD, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, - {SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, - {SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK}, - {SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST}, - {SERVER_INVALID_INDEX_METRIC_TYPE,::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE}, - {SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, - {SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID}, - {SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT}, - {SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED}, - {DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED}, - {SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR}, - {SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY}, - }; - - if(code_map.find(code) != code_map.end()) { - return code_map.at(code); - } else { - return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR; - } +::milvus::grpc::ErrorCode +ErrorMap(ErrorCode code) { + static const std::map code_map = { + {SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_NULL_POINTER, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND}, + {SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_BLOCKING_QUEUE_EMPTY, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, + {SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER}, + {SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE}, + {SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER}, + {SERVER_CANNOT_DELETE_FILE, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE}, + {SERVER_TABLE_NOT_EXIST, ::milvus::grpc::ErrorCode::TABLE_NOT_EXISTS}, + {SERVER_INVALID_TABLE_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_TABLE_NAME}, + {SERVER_INVALID_TABLE_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, + {SERVER_INVALID_TIME_RANGE, ::milvus::grpc::ErrorCode::ILLEGAL_RANGE}, + {SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, + + {SERVER_INVALID_INDEX_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE}, + {SERVER_INVALID_ROWRECORD, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, + {SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, + {SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK}, + {SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST}, + {SERVER_INVALID_INDEX_METRIC_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE}, + {SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, + {SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID}, + {SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT}, + {SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED}, + {DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED}, + {SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR}, + {SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY}, + }; + + if (code_map.find(code) != code_map.end()) { + return code_map.at(code); + } else { + return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR; } } +} // namespace //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// GrpcBaseTask::GrpcBaseTask(const std::string &task_group, bool async) - : task_group_(task_group), - async_(async), - done_(false) { - + : task_group_(task_group), + async_(async), + done_(false) { } GrpcBaseTask::~GrpcBaseTask() { WaitToFinish(); } -Status GrpcBaseTask::Execute() { +Status +GrpcBaseTask::Execute() { status_ = OnExecute(); Done(); return status_; } -void GrpcBaseTask::Done() { +void +GrpcBaseTask::Done() { done_ = true; finish_cond_.notify_all(); } -Status GrpcBaseTask::SetStatus(ErrorCode error_code, const std::string &error_msg) { +Status +GrpcBaseTask::SetStatus(ErrorCode error_code, const std::string &error_msg) { status_ = Status(error_code, error_msg); SERVER_LOG_ERROR << error_msg; return status_; } -Status GrpcBaseTask::WaitToFinish() { +Status +GrpcBaseTask::WaitToFinish() { std::unique_lock lock(finish_mtx_); - finish_cond_.wait(lock, [this] { return done_; }); + finish_cond_.wait(lock, [this] { + return done_; + }); return status_; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// GrpcRequestScheduler::GrpcRequestScheduler() - : stopped_(false) { + : stopped_(false) { Start(); } @@ -117,7 +123,8 @@ GrpcRequestScheduler::~GrpcRequestScheduler() { Stop(); } -void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { +void +GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { if (task_ptr == nullptr) { return; } @@ -127,7 +134,7 @@ void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Statu if (!task_ptr->IsAsync()) { task_ptr->WaitToFinish(); - const Status& status = task_ptr->status(); + const Status &status = task_ptr->status(); if (!status.ok()) { grpc_status->set_reason(status.message()); grpc_status->set_error_code(ErrorMap(status.code())); @@ -135,7 +142,8 @@ void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Statu } } -void GrpcRequestScheduler::Start() { +void +GrpcRequestScheduler::Start() { if (!stopped_) { return; } @@ -143,7 +151,8 @@ void GrpcRequestScheduler::Start() { stopped_ = false; } -void GrpcRequestScheduler::Stop() { +void +GrpcRequestScheduler::Stop() { if (stopped_) { return; } @@ -168,7 +177,8 @@ void GrpcRequestScheduler::Stop() { SERVER_LOG_INFO << "Scheduler stopped"; } -Status GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { +Status +GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { if (task_ptr == nullptr) { return Status::OK(); } @@ -186,8 +196,8 @@ Status GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { return task_ptr->WaitToFinish();//sync execution } - -void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { +void +GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { if (task_queue == nullptr) { return; } @@ -210,7 +220,8 @@ void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { } } -Status GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { +Status +GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { std::lock_guard lock(queue_mtx_); std::string group_name = task_ptr->TaskGroup(); @@ -230,7 +241,7 @@ Status GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { return Status::OK(); } -} -} -} -} +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h index e1217540..df5357a4 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestScheduler.h +++ b/cpp/src/server/grpc_impl/GrpcRequestScheduler.h @@ -19,12 +19,14 @@ #include "utils/Status.h" #include "utils/BlockingQueue.h" -#include "status.grpc.pb.h" -#include "status.pb.h" +#include "grpc/gen-status/status.grpc.pb.h" +#include "grpc/gen-status/status.pb.h" #include #include #include +#include +#include namespace zilliz { namespace milvus { @@ -32,30 +34,36 @@ namespace server { namespace grpc { class GrpcBaseTask { -protected: - GrpcBaseTask(const std::string &task_group, bool async = false); + protected: + explicit GrpcBaseTask(const std::string &task_group, bool async = false); virtual ~GrpcBaseTask(); -public: + public: Status Execute(); void Done(); Status WaitToFinish(); - std::string TaskGroup() const { return task_group_; } + std::string TaskGroup() const { + return task_group_; + } - const Status& status() const { return status_; } + const Status &status() const { + return status_; + } - bool IsAsync() const { return async_; } + bool IsAsync() const { + return async_; + } -protected: + protected: virtual Status OnExecute() = 0; Status SetStatus(ErrorCode error_code, const std::string &msg); -protected: + protected: mutable std::mutex finish_mtx_; std::condition_variable finish_cond_; @@ -71,7 +79,7 @@ using TaskQueuePtr = std::shared_ptr; using ThreadPtr = std::shared_ptr; class GrpcRequestScheduler { -public: + public: static GrpcRequestScheduler &GetInstance() { static GrpcRequestScheduler scheduler; return scheduler; @@ -85,7 +93,7 @@ public: static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); -protected: + protected: GrpcRequestScheduler(); virtual ~GrpcRequestScheduler(); @@ -94,7 +102,7 @@ protected: Status PutTaskToQueue(const BaseTaskPtr &task_ptr); -private: + private: mutable std::mutex queue_mtx_; std::map task_groups_; @@ -104,7 +112,7 @@ private: bool stopped_; }; -} -} -} -} +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp index ca6b77e9..5702b80b 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.cpp +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.cpp @@ -15,22 +15,24 @@ // specific language governing permissions and limitations // under the License. +#include "server/grpc_impl/GrpcRequestTask.h" + #include +#include +#include +#include +//#include -#include "GrpcRequestTask.h" +#include "server/Server.h" +#include "server/DBWrapper.h" #include "utils/CommonUtil.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" #include "utils/ValidationUtil.h" -#include "../DBWrapper.h" -#include "version.h" #include "GrpcServer.h" #include "db/Utils.h" #include "scheduler/SchedInst.h" -//#include - -#include "server/Server.h" - +#include "../../../version.h" namespace zilliz { namespace milvus { @@ -45,86 +47,87 @@ using DB_META = zilliz::milvus::engine::meta::Meta; using DB_DATE = zilliz::milvus::engine::meta::DateT; namespace { - engine::EngineType EngineType(int type) { - static std::map map_type = { - {0, engine::EngineType::INVALID}, - {1, engine::EngineType::FAISS_IDMAP}, - {2, engine::EngineType::FAISS_IVFFLAT}, - {3, engine::EngineType::FAISS_IVFSQ8}, - }; - - if (map_type.find(type) == map_type.end()) { - return engine::EngineType::INVALID; - } - - return map_type[type]; +engine::EngineType +EngineType(int type) { + static std::map map_type = { + {0, engine::EngineType::INVALID}, + {1, engine::EngineType::FAISS_IDMAP}, + {2, engine::EngineType::FAISS_IVFFLAT}, + {3, engine::EngineType::FAISS_IVFSQ8}, + }; + + if (map_type.find(type) == map_type.end()) { + return engine::EngineType::INVALID; } - int IndexType(engine::EngineType type) { - static std::map map_type = { - {engine::EngineType::INVALID, 0}, - {engine::EngineType::FAISS_IDMAP, 1}, - {engine::EngineType::FAISS_IVFFLAT, 2}, - {engine::EngineType::FAISS_IVFSQ8, 3}, - }; + return map_type[type]; +} - if (map_type.find(type) == map_type.end()) { - return 0; - } +int +IndexType(engine::EngineType type) { + static std::map map_type = { + {engine::EngineType::INVALID, 0}, + {engine::EngineType::FAISS_IDMAP, 1}, + {engine::EngineType::FAISS_IVFFLAT, 2}, + {engine::EngineType::FAISS_IVFSQ8, 3}, + }; - return map_type[type]; + if (map_type.find(type) == map_type.end()) { + return 0; } - constexpr long DAY_SECONDS = 24 * 60 * 60; - - Status - ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array, - std::vector &dates) { - dates.clear(); - for (auto &range : range_array) { - time_t tt_start, tt_end; - tm tm_start, tm_end; - if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) { - return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value()); - } + return map_type[type]; +} - if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) { - return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value()); - } +constexpr int64_t DAY_SECONDS = 24 * 60 * 60; - long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / - DAY_SECONDS; - if (days == 0) { - return Status(SERVER_INVALID_TIME_RANGE, - "Invalid time range: " + range.start_value() + " to " + range.end_value()); - } +Status +ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array, + std::vector &dates) { + dates.clear(); + for (auto &range : range_array) { + time_t tt_start, tt_end; + tm tm_start, tm_end; + if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) { + return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value()); + } - //range: [start_day, end_day) - for (long i = 0; i < days; i++) { - time_t tt_day = tt_start + DAY_SECONDS * i; - tm tm_day; - CommonUtil::ConvertTime(tt_day, tm_day); + if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) { + return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value()); + } - long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + - tm_day.tm_mday;//according to db logic - dates.push_back(date); - } + int64_t days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / + DAY_SECONDS; + if (days == 0) { + return Status(SERVER_INVALID_TIME_RANGE, + "Invalid time range: " + range.start_value() + " to " + range.end_value()); } - return Status::OK(); + //range: [start_day, end_day) + for (int64_t i = 0; i < days; i++) { + time_t tt_day = tt_start + DAY_SECONDS * i; + tm tm_day; + CommonUtil::ConvertTime(tt_day, tm_day); + + int64_t date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + + tm_day.tm_mday;//according to db logic + dates.push_back(date); + } } + + return Status::OK(); } +} // namespace //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema *schema) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - schema_(schema) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + schema_(schema) { } BaseTaskPtr CreateTableTask::Create(const ::milvus::grpc::TableSchema *schema) { - if(schema == nullptr) { + if (schema == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } @@ -168,12 +171,11 @@ CreateTableTask::OnExecute() { status = DBWrapper::DB()->CreateTable(table_info); if (!status.ok()) { //table could exist - if(status.code() == DB_ALREADY_EXIST) { + if (status.code() == DB_ALREADY_EXIST) { return Status(SERVER_INVALID_TABLE_NAME, status.message()); } return status; } - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -185,9 +187,9 @@ CreateTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema *schema) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name), - schema_(schema) { + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name), + schema_(schema) { } BaseTaskPtr @@ -218,7 +220,6 @@ DescribeTableTask::OnExecute() { schema_->set_dimension(table_info.dimension_); schema_->set_index_file_size(table_info.index_file_size_); schema_->set_metric_type(table_info.metric_type_); - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -230,13 +231,13 @@ DescribeTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CreateIndexTask::CreateIndexTask(const ::milvus::grpc::IndexParam *index_param) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - index_param_(index_param) { + : GrpcBaseTask(DDL_DML_TASK_GROUP), + index_param_(index_param) { } BaseTaskPtr CreateIndexTask::Create(const ::milvus::grpc::IndexParam *index_param) { - if(index_param == nullptr) { + if (index_param == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } @@ -295,10 +296,9 @@ CreateIndexTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// HasTableTask::HasTableTask(const std::string &table_name, bool &has_table) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name), - has_table_(has_table) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name), + has_table_(has_table) { } BaseTaskPtr @@ -333,9 +333,8 @@ HasTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DropTableTask::DropTableTask(const std::string &table_name) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name) { } BaseTaskPtr @@ -385,9 +384,8 @@ DropTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ShowTablesTask::ShowTablesTask(::milvus::grpc::TableNameList *table_name_list) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_list_(table_name_list) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_list_(table_name_list) { } BaseTaskPtr @@ -419,8 +417,8 @@ InsertTask::InsertTask(const ::milvus::grpc::InsertParam *insert_param, BaseTaskPtr InsertTask::Create(const ::milvus::grpc::InsertParam *insert_param, - ::milvus::grpc::VectorIds *record_ids) { - if(insert_param == nullptr) { + ::milvus::grpc::VectorIds *record_ids) { + if (insert_param == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } @@ -444,7 +442,7 @@ InsertTask::OnExecute() { if (!record_ids_->vector_id_array().empty()) { if (record_ids_->vector_id_array().size() != insert_param_->row_record_array_size()) { return Status(SERVER_ILLEGAL_VECTOR_ID, - "Size of vector ids is not equal to row record array size"); + "Size of vector ids is not equal to row record array size"); } } @@ -455,7 +453,7 @@ InsertTask::OnExecute() { if (!status.ok()) { if (status.code() == DB_NOT_FOUND) { return Status(SERVER_TABLE_NOT_EXIST, - "Table " + insert_param_->table_name() + " not exists"); + "Table " + insert_param_->table_name() + " not exists"); } else { return status; } @@ -465,19 +463,22 @@ InsertTask::OnExecute() { //all user provide id, or all internal id bool user_provide_ids = !insert_param_->row_id_array().empty(); //user already provided id before, all insert action require user id - if((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) { - return Status(SERVER_ILLEGAL_VECTOR_ID, "Table vector ids are user defined, please provide id for this batch"); + if ((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) { + return Status(SERVER_ILLEGAL_VECTOR_ID, + "Table vector ids are user defined, please provide id for this batch"); } //user didn't provided id before, no need to provide user id - if((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) { - return Status(SERVER_ILLEGAL_VECTOR_ID, "Table vector ids are auto generated, no need to provide id for this batch"); + if ((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) { + return Status(SERVER_ILLEGAL_VECTOR_ID, + "Table vector ids are auto generated, no need to provide id for this batch"); } rc.RecordSection("check validation"); #ifdef MILVUS_ENABLE_PROFILING - std::string fname = "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + ".profiling"; + std::string fname = "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + + ".profiling"; ProfilerStart(fname.c_str()); #endif @@ -493,8 +494,8 @@ InsertTask::OnExecute() { if (vec_dim != table_info.dimension_) { ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION; std::string error_msg = "Invalid row record dimension: " + std::to_string(vec_dim) - + " vs. table dimension:" + - std::to_string(table_info.dimension_); + + " vs. table dimension:" + + std::to_string(table_info.dimension_); return Status(error_code, error_msg); } memcpy(&vec_f[i * table_info.dimension_], @@ -507,10 +508,10 @@ InsertTask::OnExecute() { //step 5: insert vectors auto vec_count = (uint64_t) insert_param_->row_record_array_size(); std::vector vec_ids(insert_param_->row_id_array_size(), 0); - if(!insert_param_->row_id_array().empty()) { - const int64_t* src_data = insert_param_->row_id_array().data(); - int64_t* target_data = vec_ids.data(); - memcpy(target_data, src_data, (size_t)(sizeof(int64_t)*insert_param_->row_id_array_size())); + if (!insert_param_->row_id_array().empty()) { + const int64_t *src_data = insert_param_->row_id_array().data(); + int64_t *target_data = vec_ids.data(); + memcpy(target_data, src_data, (size_t) (sizeof(int64_t) * insert_param_->row_id_array_size())); } status = DBWrapper::DB()->InsertVectors(insert_param_->table_name(), vec_count, vec_f.data(), vec_ids); @@ -525,13 +526,13 @@ InsertTask::OnExecute() { auto ids_size = record_ids_->vector_id_array_size(); if (ids_size != vec_count) { std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return " - + std::to_string(ids_size) + " id"; + + std::to_string(ids_size) + " id"; return Status(SERVER_ILLEGAL_VECTOR_ID, msg); } //step 6: update table flag user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID - : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID; + : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID; status = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_); #ifdef MILVUS_ENABLE_PROFILING @@ -540,7 +541,6 @@ InsertTask::OnExecute() { rc.RecordSection("add vectors to engine"); rc.ElapseFromBegin("total cost"); - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -556,19 +556,17 @@ SearchTask::SearchTask(const ::milvus::grpc::SearchParam *search_vector_infos, search_param_(search_vector_infos), file_id_array_(file_id_array), topk_result_list(response) { - } BaseTaskPtr SearchTask::Create(const ::milvus::grpc::SearchParam *search_vector_infos, const std::vector &file_id_array, ::milvus::grpc::TopKQueryResultList *response) { - if(search_vector_infos == nullptr) { + if (search_vector_infos == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } - return std::shared_ptr(new SearchTask(search_vector_infos, file_id_array, - response)); + return std::shared_ptr(new SearchTask(search_vector_infos, file_id_array, response)); } Status @@ -640,7 +638,7 @@ SearchTask::OnExecute() { if (query_vec_dim != table_info.dimension_) { ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION; std::string error_msg = "Invalid row record dimension: " + std::to_string(query_vec_dim) - + " vs. table dimension:" + std::to_string(table_info.dimension_); + + " vs. table dimension:" + std::to_string(table_info.dimension_); return Status(error_code, error_msg); } @@ -655,16 +653,17 @@ SearchTask::OnExecute() { auto record_count = (uint64_t) search_param_->query_record_array().size(); #ifdef MILVUS_ENABLE_PROFILING - std::string fname = "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + ".profiling"; + std::string fname = "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + + ".profiling"; ProfilerStart(fname.c_str()); #endif if (file_id_array_.empty()) { status = DBWrapper::DB()->Query(table_name_, (size_t) top_k, record_count, nprobe, - vec_f.data(), dates, results); + vec_f.data(), dates, results); } else { status = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k, - record_count, nprobe, vec_f.data(), dates, results); + record_count, nprobe, vec_f.data(), dates, results); } #ifdef MILVUS_ENABLE_PROFILING @@ -682,7 +681,7 @@ SearchTask::OnExecute() { if (results.size() != record_count) { std::string msg = "Search " + std::to_string(record_count) + " vectors but only return " - + std::to_string(results.size()) + " results"; + + std::to_string(results.size()) + " results"; return Status(SERVER_ILLEGAL_SEARCH_RESULT, msg); } @@ -699,8 +698,6 @@ SearchTask::OnExecute() { //step 8: print time cost percent rc.RecordSection("construct result and send"); rc.ElapseFromBegin("totally cost"); - - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -710,10 +707,9 @@ SearchTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CountTableTask::CountTableTask(const std::string &table_name, int64_t &row_count) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name), - row_count_(row_count) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name), + row_count_(row_count) { } BaseTaskPtr @@ -742,7 +738,6 @@ CountTableTask::OnExecute() { row_count_ = (int64_t) row_count; rc.ElapseFromBegin("total cost"); - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } @@ -752,10 +747,9 @@ CountTableTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// CmdTask::CmdTask(const std::string &cmd, std::string &result) - : GrpcBaseTask(PING_TASK_GROUP), - cmd_(cmd), - result_(result) { - + : GrpcBaseTask(PING_TASK_GROUP), + cmd_(cmd), + result_(result) { } BaseTaskPtr @@ -769,8 +763,7 @@ CmdTask::OnExecute() { result_ = MILVUS_VERSION; } else if (cmd_ == "tasktable") { result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables(); - } - else { + } else { result_ = "OK"; } @@ -779,16 +772,17 @@ CmdTask::OnExecute() { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - delete_by_range_param_(delete_by_range_param){ + : GrpcBaseTask(DDL_DML_TASK_GROUP), + delete_by_range_param_(delete_by_range_param) { } BaseTaskPtr DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param) { - if(delete_by_range_param == nullptr) { + if (delete_by_range_param == nullptr) { SERVER_LOG_ERROR << "grpc input is null!"; return nullptr; } + return std::shared_ptr(new DeleteByRangeTask(delete_by_range_param)); } @@ -838,23 +832,21 @@ DeleteByRangeTask::OnExecute() { if (!status.ok()) { return status; } - } catch (std::exception &ex) { return Status(SERVER_UNEXPECTED_ERROR, ex.what()); } - + return Status::OK(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// PreloadTableTask::PreloadTableTask(const std::string &table_name) - : GrpcBaseTask(DDL_DML_TASK_GROUP), - table_name_(table_name) { - + : GrpcBaseTask(DDL_DML_TASK_GROUP), + table_name_(table_name) { } BaseTaskPtr -PreloadTableTask::Create(const std::string &table_name){ +PreloadTableTask::Create(const std::string &table_name) { return std::shared_ptr(new PreloadTableTask(table_name)); } @@ -889,12 +881,11 @@ DescribeIndexTask::DescribeIndexTask(const std::string &table_name, : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name), index_param_(index_param) { - } BaseTaskPtr DescribeIndexTask::Create(const std::string &table_name, - ::milvus::grpc::IndexParam *index_param){ + ::milvus::grpc::IndexParam *index_param) { return std::shared_ptr(new DescribeIndexTask(table_name, index_param)); } @@ -932,11 +923,10 @@ DescribeIndexTask::OnExecute() { DropIndexTask::DropIndexTask(const std::string &table_name) : GrpcBaseTask(DDL_DML_TASK_GROUP), table_name_(table_name) { - } BaseTaskPtr -DropIndexTask::Create(const std::string &table_name){ +DropIndexTask::Create(const std::string &table_name) { return std::shared_ptr(new DropIndexTask(table_name)); } @@ -975,7 +965,7 @@ DropIndexTask::OnExecute() { return Status::OK(); } -} -} -} -} +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcRequestTask.h b/cpp/src/server/grpc_impl/GrpcRequestTask.h index a0c4540c..4c8c038d 100644 --- a/cpp/src/server/grpc_impl/GrpcRequestTask.h +++ b/cpp/src/server/grpc_impl/GrpcRequestTask.h @@ -16,15 +16,18 @@ // under the License. #pragma once -#include "GrpcRequestScheduler.h" + +#include "server/grpc_impl/GrpcRequestScheduler.h" #include "utils/Status.h" #include "db/Types.h" -#include "milvus.grpc.pb.h" -#include "status.pb.h" +#include "grpc/gen-milvus/milvus.grpc.pb.h" +#include "grpc/gen-status/status.pb.h" #include #include +#include +#include namespace zilliz { namespace milvus { @@ -33,138 +36,130 @@ namespace grpc { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class CreateTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::TableSchema *schema); -protected: - explicit - CreateTableTask(const ::milvus::grpc::TableSchema *request); + protected: + explicit CreateTableTask(const ::milvus::grpc::TableSchema *request); Status OnExecute() override; -private: + private: const ::milvus::grpc::TableSchema *schema_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class HasTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name, bool &has_table); -protected: + protected: HasTableTask(const std::string &request, bool &has_table); Status OnExecute() override; - -private: + private: std::string table_name_; bool &has_table_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DescribeTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name, ::milvus::grpc::TableSchema *schema); -protected: + protected: DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema *schema); Status OnExecute() override; - -private: + private: std::string table_name_; ::milvus::grpc::TableSchema *schema_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DropTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name); -protected: - explicit - DropTableTask(const std::string &table_name); + protected: + explicit DropTableTask(const std::string &table_name); Status OnExecute() override; - -private: + private: std::string table_name_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class CreateIndexTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::IndexParam *index_Param); -protected: - explicit - CreateIndexTask(const ::milvus::grpc::IndexParam *index_Param); + protected: + explicit CreateIndexTask(const ::milvus::grpc::IndexParam *index_Param); Status OnExecute() override; - -private: + private: const ::milvus::grpc::IndexParam *index_param_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class ShowTablesTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(::milvus::grpc::TableNameList *table_name_list); -protected: - explicit - ShowTablesTask(::milvus::grpc::TableNameList *table_name_list); + protected: + explicit ShowTablesTask(::milvus::grpc::TableNameList *table_name_list); Status OnExecute() override; -private: + private: ::milvus::grpc::TableNameList *table_name_list_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class InsertTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::InsertParam *insert_Param, ::milvus::grpc::VectorIds *record_ids_); -protected: + protected: InsertTask(const ::milvus::grpc::InsertParam *insert_Param, - ::milvus::grpc::VectorIds *record_ids_); + ::milvus::grpc::VectorIds *record_ids_); Status OnExecute() override; -private: + private: const ::milvus::grpc::InsertParam *insert_param_; ::milvus::grpc::VectorIds *record_ids_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class SearchTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::SearchParam *search_param, const std::vector &file_id_array, ::milvus::grpc::TopKQueryResultList *response); -protected: + protected: SearchTask(const ::milvus::grpc::SearchParam *search_param, const std::vector &file_id_array, ::milvus::grpc::TopKQueryResultList *response); @@ -172,7 +167,7 @@ protected: Status OnExecute() override; -private: + private: const ::milvus::grpc::SearchParam *search_param_; std::vector file_id_array_; ::milvus::grpc::TopKQueryResultList *topk_result_list; @@ -180,107 +175,106 @@ private: //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class CountTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name, int64_t &row_count); -protected: + protected: CountTableTask(const std::string &table_name, int64_t &row_count); Status OnExecute() override; -private: + private: std::string table_name_; int64_t &row_count_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class CmdTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &cmd, std::string &result); -protected: + protected: CmdTask(const std::string &cmd, std::string &result); Status OnExecute() override; -private: + private: std::string cmd_; std::string &result_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DeleteByRangeTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param); -protected: - DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param); + protected: + explicit DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param); Status OnExecute() override; -private: + private: const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class PreloadTableTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name); -protected: - PreloadTableTask(const std::string &table_name); + protected: + explicit PreloadTableTask(const std::string &table_name); Status OnExecute() override; -private: + private: std::string table_name_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DescribeIndexTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name, - ::milvus::grpc::IndexParam *index_param); + ::milvus::grpc::IndexParam *index_param); -protected: + protected: DescribeIndexTask(const std::string &table_name, - ::milvus::grpc::IndexParam *index_param); + ::milvus::grpc::IndexParam *index_param); Status OnExecute() override; -private: + private: std::string table_name_; ::milvus::grpc::IndexParam *index_param_; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class DropIndexTask : public GrpcBaseTask { -public: + public: static BaseTaskPtr Create(const std::string &table_name); -protected: - DropIndexTask(const std::string &table_name); + protected: + explicit DropIndexTask(const std::string &table_name); Status OnExecute() override; -private: + private: std::string table_name_; - }; -} -} -} -} \ No newline at end of file +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcServer.cpp b/cpp/src/server/grpc_impl/GrpcServer.cpp index 7e209215..065271dd 100644 --- a/cpp/src/server/grpc_impl/GrpcServer.cpp +++ b/cpp/src/server/grpc_impl/GrpcServer.cpp @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -#include "milvus.grpc.pb.h" -#include "GrpcServer.h" +#include "grpc/gen-milvus/milvus.grpc.pb.h" +#include "server/grpc_impl/GrpcServer.h" #include "server/Config.h" #include "server/DBWrapper.h" #include "utils/Log.h" @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -35,14 +36,12 @@ #include #include - namespace zilliz { namespace milvus { namespace server { namespace grpc { - -constexpr long MESSAGE_SIZE = -1; +constexpr int64_t MESSAGE_SIZE = -1; //this class is to check port occupation during server start class NoReusePortOption : public ::grpc::ServerBuilderOption { @@ -52,11 +51,9 @@ class NoReusePortOption : public ::grpc::ServerBuilderOption { } void UpdatePlugins(std::vector> *plugins) override { - } }; - void GrpcServer::Start() { thread_ptr_ = std::make_shared(&GrpcServer::StartService, this); @@ -117,7 +114,7 @@ GrpcServer::StopService() { return Status::OK(); } -} -} -} -} \ No newline at end of file +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/server/grpc_impl/GrpcServer.h b/cpp/src/server/grpc_impl/GrpcServer.h index a861692f..9101f144 100644 --- a/cpp/src/server/grpc_impl/GrpcServer.h +++ b/cpp/src/server/grpc_impl/GrpcServer.h @@ -19,12 +19,12 @@ #include "utils/Status.h" +#include #include #include #include #include - namespace zilliz { namespace milvus { namespace server { @@ -52,7 +52,7 @@ class GrpcServer { std::shared_ptr thread_ptr_; }; -} -} -} -} +} // namespace grpc +} // namespace server +} // namespace milvus +} // namespace zilliz -- GitLab