提交 dd29a72d 编写于 作者: G groot

format server code


Former-commit-id: 2fb1177818acab220d905b9051e87bf8b636240d
上级 a42ade88
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "Config.h" #include "server/Config.h"
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
...@@ -23,12 +23,13 @@ ...@@ -23,12 +23,13 @@
#include <stdlib.h> #include <stdlib.h>
#include <iostream> #include <iostream>
#include <algorithm> #include <algorithm>
#include <vector>
#include <string>
#include "config/ConfigMgr.h" #include "config/ConfigMgr.h"
#include "utils/CommonUtil.h" #include "utils/CommonUtil.h"
#include "utils/ValidationUtil.h" #include "utils/ValidationUtil.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
...@@ -71,11 +72,11 @@ Config::LoadConfigFile(const std::string &filename) { ...@@ -71,11 +72,11 @@ Config::LoadConfigFile(const std::string &filename) {
} }
void void
Config::PrintConfigSection(const std::string& config_node_name) { Config::PrintConfigSection(const std::string &config_node_name) {
std::cout << std::endl; std::cout << std::endl;
std::cout << config_node_name << ":" << std::endl; std::cout << config_node_name << ":" << std::endl;
if (config_map_.find(config_node_name) != config_map_.end()) { if (config_map_.find(config_node_name) != config_map_.end()) {
for (auto item: config_map_[config_node_name]) { for (auto item : config_map_[config_node_name]) {
std::cout << item.first << ": " << item.second << std::endl; std::cout << item.first << ": " << item.second << std::endl;
} }
} }
...@@ -182,7 +183,7 @@ Config::CheckDBConfigBufferSize(const std::string &value) { ...@@ -182,7 +183,7 @@ Config::CheckDBConfigBufferSize(const std::string &value) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid DB config buffer_size: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid DB config buffer_size: " + value);
} else { } else {
int64_t buffer_size = std::stoi(value) * GB; int64_t buffer_size = std::stoi(value) * GB;
unsigned long total_mem = 0, free_mem = 0; uint64_t total_mem = 0, free_mem = 0;
CommonUtil::GetSystemMemInfo(total_mem, free_mem); CommonUtil::GetSystemMemInfo(total_mem, free_mem);
if (buffer_size >= total_mem) { if (buffer_size >= total_mem) {
return Status(SERVER_INVALID_ARGUMENT, "DB config buffer_size exceed system memory: " + value); return Status(SERVER_INVALID_ARGUMENT, "DB config buffer_size exceed system memory: " + value);
...@@ -205,7 +206,7 @@ Config::CheckDBConfigBuildIndexGPU(const std::string &value) { ...@@ -205,7 +206,7 @@ Config::CheckDBConfigBuildIndexGPU(const std::string &value) {
} }
Status Status
Config::CheckMetricConfigAutoBootup(const std::string& value) { Config::CheckMetricConfigAutoBootup(const std::string &value) {
if (!ValidationUtil::ValidateStringIsBool(value).ok()) { if (!ValidationUtil::ValidateStringIsBool(value).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config auto_bootup: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config auto_bootup: " + value);
} }
...@@ -213,7 +214,7 @@ Config::CheckMetricConfigAutoBootup(const std::string& value) { ...@@ -213,7 +214,7 @@ Config::CheckMetricConfigAutoBootup(const std::string& value) {
} }
Status Status
Config::CheckMetricConfigCollector(const std::string& value) { Config::CheckMetricConfigCollector(const std::string &value) {
if (value != "prometheus") { if (value != "prometheus") {
return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config collector: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config collector: " + value);
} }
...@@ -221,7 +222,7 @@ Config::CheckMetricConfigCollector(const std::string& value) { ...@@ -221,7 +222,7 @@ Config::CheckMetricConfigCollector(const std::string& value) {
} }
Status Status
Config::CheckMetricConfigPrometheusPort(const std::string& value) { Config::CheckMetricConfigPrometheusPort(const std::string &value) {
if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config prometheus_port: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid metric config prometheus_port: " + value);
} }
...@@ -229,12 +230,12 @@ Config::CheckMetricConfigPrometheusPort(const std::string& value) { ...@@ -229,12 +230,12 @@ Config::CheckMetricConfigPrometheusPort(const std::string& value) {
} }
Status Status
Config::CheckCacheConfigCpuMemCapacity(const std::string& value) { Config::CheckCacheConfigCpuMemCapacity(const std::string &value) {
if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cpu_mem_capacity: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cpu_mem_capacity: " + value);
} else { } else {
uint64_t cpu_cache_capacity = std::stoi(value) * GB; uint64_t cpu_cache_capacity = std::stoi(value) * GB;
unsigned long total_mem = 0, free_mem = 0; uint64_t total_mem = 0, free_mem = 0;
CommonUtil::GetSystemMemInfo(total_mem, free_mem); CommonUtil::GetSystemMemInfo(total_mem, free_mem);
if (cpu_cache_capacity >= total_mem) { if (cpu_cache_capacity >= total_mem) {
return Status(SERVER_INVALID_ARGUMENT, "Cache config cpu_mem_capacity exceed system memory: " + value); return Status(SERVER_INVALID_ARGUMENT, "Cache config cpu_mem_capacity exceed system memory: " + value);
...@@ -254,7 +255,7 @@ Config::CheckCacheConfigCpuMemCapacity(const std::string& value) { ...@@ -254,7 +255,7 @@ Config::CheckCacheConfigCpuMemCapacity(const std::string& value) {
} }
Status Status
Config::CheckCacheConfigCpuMemThreshold(const std::string& value) { Config::CheckCacheConfigCpuMemThreshold(const std::string &value) {
if (!ValidationUtil::ValidateStringIsFloat(value).ok()) { if (!ValidationUtil::ValidateStringIsFloat(value).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cpu_mem_threshold: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cpu_mem_threshold: " + value);
} else { } else {
...@@ -267,7 +268,7 @@ Config::CheckCacheConfigCpuMemThreshold(const std::string& value) { ...@@ -267,7 +268,7 @@ Config::CheckCacheConfigCpuMemThreshold(const std::string& value) {
} }
Status Status
Config::CheckCacheConfigGpuMemCapacity(const std::string& value) { Config::CheckCacheConfigGpuMemCapacity(const std::string &value) {
if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) {
std::cerr << "ERROR: gpu_cache_capacity " << value << " is not a number" << std::endl; std::cerr << "ERROR: gpu_cache_capacity " << value << " is not a number" << std::endl;
} else { } else {
...@@ -290,7 +291,7 @@ Config::CheckCacheConfigGpuMemCapacity(const std::string& value) { ...@@ -290,7 +291,7 @@ Config::CheckCacheConfigGpuMemCapacity(const std::string& value) {
} }
Status Status
Config::CheckCacheConfigGpuMemThreshold(const std::string& value) { Config::CheckCacheConfigGpuMemThreshold(const std::string &value) {
if (!ValidationUtil::ValidateStringIsFloat(value).ok()) { if (!ValidationUtil::ValidateStringIsFloat(value).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config gpu_mem_threshold: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config gpu_mem_threshold: " + value);
} else { } else {
...@@ -303,7 +304,7 @@ Config::CheckCacheConfigGpuMemThreshold(const std::string& value) { ...@@ -303,7 +304,7 @@ Config::CheckCacheConfigGpuMemThreshold(const std::string& value) {
} }
Status Status
Config::CheckCacheConfigCacheInsertData(const std::string& value) { Config::CheckCacheConfigCacheInsertData(const std::string &value) {
if (!ValidationUtil::ValidateStringIsBool(value).ok()) { if (!ValidationUtil::ValidateStringIsBool(value).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cache_insert_data: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid cache config cache_insert_data: " + value);
} }
...@@ -311,7 +312,7 @@ Config::CheckCacheConfigCacheInsertData(const std::string& value) { ...@@ -311,7 +312,7 @@ Config::CheckCacheConfigCacheInsertData(const std::string& value) {
} }
Status Status
Config::CheckEngineConfigBlasThreshold(const std::string& value) { Config::CheckEngineConfigBlasThreshold(const std::string &value) {
if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid engine config blas threshold: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid engine config blas threshold: " + value);
} }
...@@ -319,7 +320,7 @@ Config::CheckEngineConfigBlasThreshold(const std::string& value) { ...@@ -319,7 +320,7 @@ Config::CheckEngineConfigBlasThreshold(const std::string& value) {
} }
Status Status
Config::CheckEngineConfigOmpThreadNum(const std::string& value) { Config::CheckEngineConfigOmpThreadNum(const std::string &value) {
if (!ValidationUtil::ValidateStringIsNumber(value).ok()) { if (!ValidationUtil::ValidateStringIsNumber(value).ok()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid engine config omp_thread_num: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid engine config omp_thread_num: " + value);
} else { } else {
...@@ -333,7 +334,7 @@ Config::CheckEngineConfigOmpThreadNum(const std::string& value) { ...@@ -333,7 +334,7 @@ Config::CheckEngineConfigOmpThreadNum(const std::string& value) {
} }
Status Status
Config::CheckResourceConfigMode(const std::string& value) { Config::CheckResourceConfigMode(const std::string &value) {
if (value != "simple") { if (value != "simple") {
return Status(SERVER_INVALID_ARGUMENT, "Invalid resource config mode: " + value); return Status(SERVER_INVALID_ARGUMENT, "Invalid resource config mode: " + value);
} }
...@@ -341,7 +342,7 @@ Config::CheckResourceConfigMode(const std::string& value) { ...@@ -341,7 +342,7 @@ Config::CheckResourceConfigMode(const std::string& value) {
} }
Status Status
Config::CheckResourceConfigPool(const std::vector<std::string>& value) { Config::CheckResourceConfigPool(const std::vector<std::string> &value) {
if (value.empty()) { if (value.empty()) {
return Status(SERVER_INVALID_ARGUMENT, "Invalid resource config pool"); return Status(SERVER_INVALID_ARGUMENT, "Invalid resource config pool");
} }
...@@ -632,52 +633,51 @@ Config::GetResourceConfigStrMode() { ...@@ -632,52 +633,51 @@ Config::GetResourceConfigStrMode() {
return value; return value;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
Status Status
Config::GetServerConfigAddress(std::string& value) { Config::GetServerConfigAddress(std::string &value) {
value = GetServerConfigStrAddress(); value = GetServerConfigStrAddress();
return CheckServerConfigAddress(value); return CheckServerConfigAddress(value);
} }
Status Status
Config::GetServerConfigPort(std::string& value) { Config::GetServerConfigPort(std::string &value) {
value = GetServerConfigStrPort(); value = GetServerConfigStrPort();
return CheckServerConfigPort(value); return CheckServerConfigPort(value);
} }
Status Status
Config::GetServerConfigMode(std::string& value) { Config::GetServerConfigMode(std::string &value) {
value = GetServerConfigStrMode(); value = GetServerConfigStrMode();
return CheckServerConfigMode(value); return CheckServerConfigMode(value);
} }
Status Status
Config::GetServerConfigTimeZone(std::string& value) { Config::GetServerConfigTimeZone(std::string &value) {
value = GetServerConfigStrTimeZone(); value = GetServerConfigStrTimeZone();
return CheckServerConfigTimeZone(value); return CheckServerConfigTimeZone(value);
} }
Status Status
Config::GetDBConfigPath(std::string& value) { Config::GetDBConfigPath(std::string &value) {
value = GetDBConfigStrPath(); value = GetDBConfigStrPath();
return CheckDBConfigPath(value); return CheckDBConfigPath(value);
} }
Status Status
Config::GetDBConfigSlavePath(std::string& value) { Config::GetDBConfigSlavePath(std::string &value) {
value = GetDBConfigStrSlavePath(); value = GetDBConfigStrSlavePath();
return Status::OK(); return Status::OK();
} }
Status Status
Config::GetDBConfigBackendUrl(std::string& value) { Config::GetDBConfigBackendUrl(std::string &value) {
value = GetDBConfigStrBackendUrl(); value = GetDBConfigStrBackendUrl();
return CheckDBConfigBackendUrl(value); return CheckDBConfigBackendUrl(value);
} }
Status Status
Config::GetDBConfigArchiveDiskThreshold(int32_t& value) { Config::GetDBConfigArchiveDiskThreshold(int32_t &value) {
std::string str = GetDBConfigStrArchiveDiskThreshold(); std::string str = GetDBConfigStrArchiveDiskThreshold();
Status s = CheckDBConfigArchiveDiskThreshold(str); Status s = CheckDBConfigArchiveDiskThreshold(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -686,7 +686,7 @@ Config::GetDBConfigArchiveDiskThreshold(int32_t& value) { ...@@ -686,7 +686,7 @@ Config::GetDBConfigArchiveDiskThreshold(int32_t& value) {
} }
Status Status
Config::GetDBConfigArchiveDaysThreshold(int32_t& value) { Config::GetDBConfigArchiveDaysThreshold(int32_t &value) {
std::string str = GetDBConfigStrArchiveDaysThreshold(); std::string str = GetDBConfigStrArchiveDaysThreshold();
Status s = CheckDBConfigArchiveDaysThreshold(str); Status s = CheckDBConfigArchiveDaysThreshold(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -695,7 +695,7 @@ Config::GetDBConfigArchiveDaysThreshold(int32_t& value) { ...@@ -695,7 +695,7 @@ Config::GetDBConfigArchiveDaysThreshold(int32_t& value) {
} }
Status Status
Config::GetDBConfigBufferSize(int32_t& value) { Config::GetDBConfigBufferSize(int32_t &value) {
std::string str = GetDBConfigStrBufferSize(); std::string str = GetDBConfigStrBufferSize();
Status s = CheckDBConfigBufferSize(str); Status s = CheckDBConfigBufferSize(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -704,7 +704,7 @@ Config::GetDBConfigBufferSize(int32_t& value) { ...@@ -704,7 +704,7 @@ Config::GetDBConfigBufferSize(int32_t& value) {
} }
Status Status
Config::GetDBConfigBuildIndexGPU(int32_t& value) { Config::GetDBConfigBuildIndexGPU(int32_t &value) {
std::string str = GetDBConfigStrBuildIndexGPU(); std::string str = GetDBConfigStrBuildIndexGPU();
Status s = CheckDBConfigBuildIndexGPU(str); Status s = CheckDBConfigBuildIndexGPU(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -713,7 +713,7 @@ Config::GetDBConfigBuildIndexGPU(int32_t& value) { ...@@ -713,7 +713,7 @@ Config::GetDBConfigBuildIndexGPU(int32_t& value) {
} }
Status Status
Config::GetMetricConfigAutoBootup(bool& value) { Config::GetMetricConfigAutoBootup(bool &value) {
std::string str = GetMetricConfigStrAutoBootup(); std::string str = GetMetricConfigStrAutoBootup();
Status s = CheckMetricConfigPrometheusPort(str); Status s = CheckMetricConfigPrometheusPort(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -723,19 +723,19 @@ Config::GetMetricConfigAutoBootup(bool& value) { ...@@ -723,19 +723,19 @@ Config::GetMetricConfigAutoBootup(bool& value) {
} }
Status Status
Config::GetMetricConfigCollector(std::string& value) { Config::GetMetricConfigCollector(std::string &value) {
value = GetMetricConfigStrCollector(); value = GetMetricConfigStrCollector();
return Status::OK(); return Status::OK();
} }
Status Status
Config::GetMetricConfigPrometheusPort(std::string& value) { Config::GetMetricConfigPrometheusPort(std::string &value) {
value = GetMetricConfigStrPrometheusPort(); value = GetMetricConfigStrPrometheusPort();
return CheckMetricConfigPrometheusPort(value); return CheckMetricConfigPrometheusPort(value);
} }
Status Status
Config::GetCacheConfigCpuMemCapacity(int32_t& value) { Config::GetCacheConfigCpuMemCapacity(int32_t &value) {
std::string str = GetCacheConfigStrCpuMemCapacity(); std::string str = GetCacheConfigStrCpuMemCapacity();
Status s = CheckCacheConfigCpuMemCapacity(str); Status s = CheckCacheConfigCpuMemCapacity(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -744,7 +744,7 @@ Config::GetCacheConfigCpuMemCapacity(int32_t& value) { ...@@ -744,7 +744,7 @@ Config::GetCacheConfigCpuMemCapacity(int32_t& value) {
} }
Status Status
Config::GetCacheConfigCpuMemThreshold(float& value) { Config::GetCacheConfigCpuMemThreshold(float &value) {
std::string str = GetCacheConfigStrCpuMemThreshold(); std::string str = GetCacheConfigStrCpuMemThreshold();
Status s = CheckCacheConfigCpuMemThreshold(str); Status s = CheckCacheConfigCpuMemThreshold(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -753,7 +753,7 @@ Config::GetCacheConfigCpuMemThreshold(float& value) { ...@@ -753,7 +753,7 @@ Config::GetCacheConfigCpuMemThreshold(float& value) {
} }
Status Status
Config::GetCacheConfigGpuMemCapacity(int32_t& value) { Config::GetCacheConfigGpuMemCapacity(int32_t &value) {
std::string str = GetCacheConfigStrGpuMemCapacity(); std::string str = GetCacheConfigStrGpuMemCapacity();
Status s = CheckCacheConfigGpuMemCapacity(str); Status s = CheckCacheConfigGpuMemCapacity(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -762,7 +762,7 @@ Config::GetCacheConfigGpuMemCapacity(int32_t& value) { ...@@ -762,7 +762,7 @@ Config::GetCacheConfigGpuMemCapacity(int32_t& value) {
} }
Status Status
Config::GetCacheConfigGpuMemThreshold(float& value) { Config::GetCacheConfigGpuMemThreshold(float &value) {
std::string str = GetCacheConfigStrGpuMemThreshold(); std::string str = GetCacheConfigStrGpuMemThreshold();
Status s = CheckCacheConfigGpuMemThreshold(str); Status s = CheckCacheConfigGpuMemThreshold(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -771,7 +771,7 @@ Config::GetCacheConfigGpuMemThreshold(float& value) { ...@@ -771,7 +771,7 @@ Config::GetCacheConfigGpuMemThreshold(float& value) {
} }
Status Status
Config::GetCacheConfigCacheInsertData(bool& value) { Config::GetCacheConfigCacheInsertData(bool &value) {
std::string str = GetCacheConfigStrCacheInsertData(); std::string str = GetCacheConfigStrCacheInsertData();
Status s = CheckCacheConfigCacheInsertData(str); Status s = CheckCacheConfigCacheInsertData(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -781,7 +781,7 @@ Config::GetCacheConfigCacheInsertData(bool& value) { ...@@ -781,7 +781,7 @@ Config::GetCacheConfigCacheInsertData(bool& value) {
} }
Status Status
Config::GetEngineConfigBlasThreshold(int32_t& value) { Config::GetEngineConfigBlasThreshold(int32_t &value) {
std::string str = GetEngineConfigStrBlasThreshold(); std::string str = GetEngineConfigStrBlasThreshold();
Status s = CheckEngineConfigBlasThreshold(str); Status s = CheckEngineConfigBlasThreshold(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -790,7 +790,7 @@ Config::GetEngineConfigBlasThreshold(int32_t& value) { ...@@ -790,7 +790,7 @@ Config::GetEngineConfigBlasThreshold(int32_t& value) {
} }
Status Status
Config::GetEngineConfigOmpThreadNum(int32_t& value) { Config::GetEngineConfigOmpThreadNum(int32_t &value) {
std::string str = GetEngineConfigStrOmpThreadNum(); std::string str = GetEngineConfigStrOmpThreadNum();
Status s = CheckEngineConfigOmpThreadNum(str); Status s = CheckEngineConfigOmpThreadNum(str);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -799,13 +799,13 @@ Config::GetEngineConfigOmpThreadNum(int32_t& value) { ...@@ -799,13 +799,13 @@ Config::GetEngineConfigOmpThreadNum(int32_t& value) {
} }
Status Status
Config::GetResourceConfigMode(std::string& value) { Config::GetResourceConfigMode(std::string &value) {
value = GetResourceConfigStrMode(); value = GetResourceConfigStrMode();
return CheckResourceConfigMode(value); return CheckResourceConfigMode(value);
} }
Status Status
Config::GetResourceConfigPool(std::vector<std::string>& value) { Config::GetResourceConfigPool(std::vector<std::string> &value) {
ConfigNode resource_config = GetConfigNode(CONFIG_RESOURCE); ConfigNode resource_config = GetConfigNode(CONFIG_RESOURCE);
value = resource_config.GetSequence(CONFIG_RESOURCE_POOL); value = resource_config.GetSequence(CONFIG_RESOURCE_POOL);
return CheckResourceConfigPool(value); return CheckResourceConfigPool(value);
...@@ -814,7 +814,7 @@ Config::GetResourceConfigPool(std::vector<std::string>& value) { ...@@ -814,7 +814,7 @@ Config::GetResourceConfigPool(std::vector<std::string>& value) {
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
/* server config */ /* server config */
Status Status
Config::SetServerConfigAddress(const std::string& value) { Config::SetServerConfigAddress(const std::string &value) {
Status s = CheckServerConfigAddress(value); Status s = CheckServerConfigAddress(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, value); SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_ADDRESS, value);
...@@ -822,7 +822,7 @@ Config::SetServerConfigAddress(const std::string& value) { ...@@ -822,7 +822,7 @@ Config::SetServerConfigAddress(const std::string& value) {
} }
Status Status
Config::SetServerConfigPort(const std::string& value) { Config::SetServerConfigPort(const std::string &value) {
Status s = CheckServerConfigPort(value); Status s = CheckServerConfigPort(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_PORT, value); SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_PORT, value);
...@@ -830,7 +830,7 @@ Config::SetServerConfigPort(const std::string& value) { ...@@ -830,7 +830,7 @@ Config::SetServerConfigPort(const std::string& value) {
} }
Status Status
Config::SetServerConfigMode(const std::string& value) { Config::SetServerConfigMode(const std::string &value) {
Status s = CheckServerConfigMode(value); Status s = CheckServerConfigMode(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_MODE, value); SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_MODE, value);
...@@ -838,7 +838,7 @@ Config::SetServerConfigMode(const std::string& value) { ...@@ -838,7 +838,7 @@ Config::SetServerConfigMode(const std::string& value) {
} }
Status Status
Config::SetServerConfigTimeZone(const std::string& value) { Config::SetServerConfigTimeZone(const std::string &value) {
Status s = CheckServerConfigTimeZone(value); Status s = CheckServerConfigTimeZone(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, value); SetConfigValueInMem(CONFIG_SERVER, CONFIG_SERVER_TIME_ZONE, value);
...@@ -847,7 +847,7 @@ Config::SetServerConfigTimeZone(const std::string& value) { ...@@ -847,7 +847,7 @@ Config::SetServerConfigTimeZone(const std::string& value) {
/* db config */ /* db config */
Status Status
Config::SetDBConfigPath(const std::string& value) { Config::SetDBConfigPath(const std::string &value) {
Status s = CheckDBConfigPath(value); Status s = CheckDBConfigPath(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_PATH, value); SetConfigValueInMem(CONFIG_DB, CONFIG_DB_PATH, value);
...@@ -855,7 +855,7 @@ Config::SetDBConfigPath(const std::string& value) { ...@@ -855,7 +855,7 @@ Config::SetDBConfigPath(const std::string& value) {
} }
Status Status
Config::SetDBConfigSlavePath(const std::string& value) { Config::SetDBConfigSlavePath(const std::string &value) {
Status s = CheckDBConfigSlavePath(value); Status s = CheckDBConfigSlavePath(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_SLAVE_PATH, value); SetConfigValueInMem(CONFIG_DB, CONFIG_DB_SLAVE_PATH, value);
...@@ -863,7 +863,7 @@ Config::SetDBConfigSlavePath(const std::string& value) { ...@@ -863,7 +863,7 @@ Config::SetDBConfigSlavePath(const std::string& value) {
} }
Status Status
Config::SetDBConfigBackendUrl(const std::string& value) { Config::SetDBConfigBackendUrl(const std::string &value) {
Status s = CheckDBConfigBackendUrl(value); Status s = CheckDBConfigBackendUrl(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BACKEND_URL, value); SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BACKEND_URL, value);
...@@ -871,7 +871,7 @@ Config::SetDBConfigBackendUrl(const std::string& value) { ...@@ -871,7 +871,7 @@ Config::SetDBConfigBackendUrl(const std::string& value) {
} }
Status Status
Config::SetDBConfigArchiveDiskThreshold(const std::string& value) { Config::SetDBConfigArchiveDiskThreshold(const std::string &value) {
Status s = CheckDBConfigArchiveDiskThreshold(value); Status s = CheckDBConfigArchiveDiskThreshold(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, value); SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DISK_THRESHOLD, value);
...@@ -879,7 +879,7 @@ Config::SetDBConfigArchiveDiskThreshold(const std::string& value) { ...@@ -879,7 +879,7 @@ Config::SetDBConfigArchiveDiskThreshold(const std::string& value) {
} }
Status Status
Config::SetDBConfigArchiveDaysThreshold(const std::string& value) { Config::SetDBConfigArchiveDaysThreshold(const std::string &value) {
Status s = CheckDBConfigArchiveDaysThreshold(value); Status s = CheckDBConfigArchiveDaysThreshold(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value); SetConfigValueInMem(CONFIG_DB, CONFIG_DB_ARCHIVE_DAYS_THRESHOLD, value);
...@@ -887,7 +887,7 @@ Config::SetDBConfigArchiveDaysThreshold(const std::string& value) { ...@@ -887,7 +887,7 @@ Config::SetDBConfigArchiveDaysThreshold(const std::string& value) {
} }
Status Status
Config::SetDBConfigBufferSize(const std::string& value) { Config::SetDBConfigBufferSize(const std::string &value) {
Status s = CheckDBConfigBufferSize(value); Status s = CheckDBConfigBufferSize(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUFFER_SIZE, value); SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUFFER_SIZE, value);
...@@ -895,7 +895,7 @@ Config::SetDBConfigBufferSize(const std::string& value) { ...@@ -895,7 +895,7 @@ Config::SetDBConfigBufferSize(const std::string& value) {
} }
Status Status
Config::SetDBConfigBuildIndexGPU(const std::string& value) { Config::SetDBConfigBuildIndexGPU(const std::string &value) {
Status s = CheckDBConfigBuildIndexGPU(value); Status s = CheckDBConfigBuildIndexGPU(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, value); SetConfigValueInMem(CONFIG_DB, CONFIG_DB_BUILD_INDEX_GPU, value);
...@@ -904,7 +904,7 @@ Config::SetDBConfigBuildIndexGPU(const std::string& value) { ...@@ -904,7 +904,7 @@ Config::SetDBConfigBuildIndexGPU(const std::string& value) {
/* metric config */ /* metric config */
Status Status
Config::SetMetricConfigAutoBootup(const std::string& value) { Config::SetMetricConfigAutoBootup(const std::string &value) {
Status s = CheckMetricConfigAutoBootup(value); Status s = CheckMetricConfigAutoBootup(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_AUTO_BOOTUP, value); SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_AUTO_BOOTUP, value);
...@@ -912,7 +912,7 @@ Config::SetMetricConfigAutoBootup(const std::string& value) { ...@@ -912,7 +912,7 @@ Config::SetMetricConfigAutoBootup(const std::string& value) {
} }
Status Status
Config::SetMetricConfigCollector(const std::string& value) { Config::SetMetricConfigCollector(const std::string &value) {
Status s = CheckMetricConfigCollector(value); Status s = CheckMetricConfigCollector(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_COLLECTOR, value); SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_COLLECTOR, value);
...@@ -920,7 +920,7 @@ Config::SetMetricConfigCollector(const std::string& value) { ...@@ -920,7 +920,7 @@ Config::SetMetricConfigCollector(const std::string& value) {
} }
Status Status
Config::SetMetricConfigPrometheusPort(const std::string& value) { Config::SetMetricConfigPrometheusPort(const std::string &value) {
Status s = CheckMetricConfigPrometheusPort(value); Status s = CheckMetricConfigPrometheusPort(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_PROMETHEUS_PORT, value); SetConfigValueInMem(CONFIG_DB, CONFIG_METRIC_PROMETHEUS_PORT, value);
...@@ -929,7 +929,7 @@ Config::SetMetricConfigPrometheusPort(const std::string& value) { ...@@ -929,7 +929,7 @@ Config::SetMetricConfigPrometheusPort(const std::string& value) {
/* cache config */ /* cache config */
Status Status
Config::SetCacheConfigCpuMemCapacity(const std::string& value) { Config::SetCacheConfigCpuMemCapacity(const std::string &value) {
Status s = CheckCacheConfigCpuMemCapacity(value); Status s = CheckCacheConfigCpuMemCapacity(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CPU_MEM_CAPACITY, value); SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CPU_MEM_CAPACITY, value);
...@@ -937,7 +937,7 @@ Config::SetCacheConfigCpuMemCapacity(const std::string& value) { ...@@ -937,7 +937,7 @@ Config::SetCacheConfigCpuMemCapacity(const std::string& value) {
} }
Status Status
Config::SetCacheConfigCpuMemThreshold(const std::string& value) { Config::SetCacheConfigCpuMemThreshold(const std::string &value) {
Status s = CheckCacheConfigCpuMemThreshold(value); Status s = CheckCacheConfigCpuMemThreshold(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CPU_MEM_THRESHOLD, value); SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CPU_MEM_THRESHOLD, value);
...@@ -945,7 +945,7 @@ Config::SetCacheConfigCpuMemThreshold(const std::string& value) { ...@@ -945,7 +945,7 @@ Config::SetCacheConfigCpuMemThreshold(const std::string& value) {
} }
Status Status
Config::SetCacheConfigGpuMemCapacity(const std::string& value) { Config::SetCacheConfigGpuMemCapacity(const std::string &value) {
Status s = CheckCacheConfigGpuMemCapacity(value); Status s = CheckCacheConfigGpuMemCapacity(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_GPU_MEM_CAPACITY, value); SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_GPU_MEM_CAPACITY, value);
...@@ -953,7 +953,7 @@ Config::SetCacheConfigGpuMemCapacity(const std::string& value) { ...@@ -953,7 +953,7 @@ Config::SetCacheConfigGpuMemCapacity(const std::string& value) {
} }
Status Status
Config::SetCacheConfigGpuMemThreshold(const std::string& value) { Config::SetCacheConfigGpuMemThreshold(const std::string &value) {
Status s = CheckCacheConfigGpuMemThreshold(value); Status s = CheckCacheConfigGpuMemThreshold(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_GPU_MEM_THRESHOLD, value); SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_GPU_MEM_THRESHOLD, value);
...@@ -961,7 +961,7 @@ Config::SetCacheConfigGpuMemThreshold(const std::string& value) { ...@@ -961,7 +961,7 @@ Config::SetCacheConfigGpuMemThreshold(const std::string& value) {
} }
Status Status
Config::SetCacheConfigCacheInsertData(const std::string& value) { Config::SetCacheConfigCacheInsertData(const std::string &value) {
Status s = CheckCacheConfigCacheInsertData(value); Status s = CheckCacheConfigCacheInsertData(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CACHE_INSERT_DATA, value); SetConfigValueInMem(CONFIG_DB, CONFIG_CACHE_CACHE_INSERT_DATA, value);
...@@ -970,7 +970,7 @@ Config::SetCacheConfigCacheInsertData(const std::string& value) { ...@@ -970,7 +970,7 @@ Config::SetCacheConfigCacheInsertData(const std::string& value) {
/* engine config */ /* engine config */
Status Status
Config::SetEngineConfigBlasThreshold(const std::string& value) { Config::SetEngineConfigBlasThreshold(const std::string &value) {
Status s = CheckEngineConfigBlasThreshold(value); Status s = CheckEngineConfigBlasThreshold(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_ENGINE_BLAS_THRESHOLD, value); SetConfigValueInMem(CONFIG_DB, CONFIG_ENGINE_BLAS_THRESHOLD, value);
...@@ -978,7 +978,7 @@ Config::SetEngineConfigBlasThreshold(const std::string& value) { ...@@ -978,7 +978,7 @@ Config::SetEngineConfigBlasThreshold(const std::string& value) {
} }
Status Status
Config::SetEngineConfigOmpThreadNum(const std::string& value) { Config::SetEngineConfigOmpThreadNum(const std::string &value) {
Status s = CheckEngineConfigOmpThreadNum(value); Status s = CheckEngineConfigOmpThreadNum(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_ENGINE_OMP_THREAD_NUM, value); SetConfigValueInMem(CONFIG_DB, CONFIG_ENGINE_OMP_THREAD_NUM, value);
...@@ -987,13 +987,13 @@ Config::SetEngineConfigOmpThreadNum(const std::string& value) { ...@@ -987,13 +987,13 @@ Config::SetEngineConfigOmpThreadNum(const std::string& value) {
/* resource config */ /* resource config */
Status Status
Config::SetResourceConfigMode(const std::string& value) { Config::SetResourceConfigMode(const std::string &value) {
Status s = CheckResourceConfigMode(value); Status s = CheckResourceConfigMode(value);
if (!s.ok()) return s; if (!s.ok()) return s;
SetConfigValueInMem(CONFIG_DB, CONFIG_RESOURCE_MODE, value); SetConfigValueInMem(CONFIG_DB, CONFIG_RESOURCE_MODE, value);
return Status::OK(); return Status::OK();
} }
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
...@@ -17,136 +17,137 @@ ...@@ -17,136 +17,137 @@
#pragma once #pragma once
#include <vector>
#include <string>
#include <mutex> #include <mutex>
#include <unordered_map> #include <unordered_map>
#include "yaml-cpp/yaml.h" #include <yaml-cpp/yaml.h>
#include "utils/Status.h" #include "utils/Status.h"
#include "config/ConfigNode.h" #include "config/ConfigNode.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
/* server config */ /* server config */
static const char* CONFIG_SERVER = "server_config"; static const char *CONFIG_SERVER = "server_config";
static const char* CONFIG_SERVER_ADDRESS = "address"; static const char *CONFIG_SERVER_ADDRESS = "address";
static const char* CONFIG_SERVER_ADDRESS_DEFAULT = "127.0.0.1"; static const char *CONFIG_SERVER_ADDRESS_DEFAULT = "127.0.0.1";
static const char* CONFIG_SERVER_PORT = "port"; static const char *CONFIG_SERVER_PORT = "port";
static const char* CONFIG_SERVER_PORT_DEFAULT = "19530"; static const char *CONFIG_SERVER_PORT_DEFAULT = "19530";
static const char* CONFIG_SERVER_MODE = "mode"; static const char *CONFIG_SERVER_MODE = "mode";
static const char* CONFIG_SERVER_MODE_DEFAULT = "single"; static const char *CONFIG_SERVER_MODE_DEFAULT = "single";
static const char* CONFIG_SERVER_TIME_ZONE = "time_zone"; static const char *CONFIG_SERVER_TIME_ZONE = "time_zone";
static const char* CONFIG_SERVER_TIME_ZONE_DEFAULT = "UTC+8"; static const char *CONFIG_SERVER_TIME_ZONE_DEFAULT = "UTC+8";
/* db config */ /* db config */
static const char* CONFIG_DB = "db_config"; static const char *CONFIG_DB = "db_config";
static const char* CONFIG_DB_PATH = "path"; static const char *CONFIG_DB_PATH = "path";
static const char* CONFIG_DB_PATH_DEFAULT = "/tmp/milvus"; static const char *CONFIG_DB_PATH_DEFAULT = "/tmp/milvus";
static const char* CONFIG_DB_SLAVE_PATH = "slave_path"; static const char *CONFIG_DB_SLAVE_PATH = "slave_path";
static const char* CONFIG_DB_SLAVE_PATH_DEFAULT = ""; static const char *CONFIG_DB_SLAVE_PATH_DEFAULT = "";
static const char* CONFIG_DB_BACKEND_URL = "backend_url"; static const char *CONFIG_DB_BACKEND_URL = "backend_url";
static const char* CONFIG_DB_BACKEND_URL_DEFAULT = "sqlite://:@:/"; static const char *CONFIG_DB_BACKEND_URL_DEFAULT = "sqlite://:@:/";
static const char* CONFIG_DB_ARCHIVE_DISK_THRESHOLD = "archive_disk_threshold"; static const char *CONFIG_DB_ARCHIVE_DISK_THRESHOLD = "archive_disk_threshold";
static const char* CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT = "0"; static const char *CONFIG_DB_ARCHIVE_DISK_THRESHOLD_DEFAULT = "0";
static const char* CONFIG_DB_ARCHIVE_DAYS_THRESHOLD = "archive_days_threshold"; static const char *CONFIG_DB_ARCHIVE_DAYS_THRESHOLD = "archive_days_threshold";
static const char* CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT = "0"; static const char *CONFIG_DB_ARCHIVE_DAYS_THRESHOLD_DEFAULT = "0";
static const char* CONFIG_DB_BUFFER_SIZE = "buffer_size"; static const char *CONFIG_DB_BUFFER_SIZE = "buffer_size";
static const char* CONFIG_DB_BUFFER_SIZE_DEFAULT = "4"; static const char *CONFIG_DB_BUFFER_SIZE_DEFAULT = "4";
static const char* CONFIG_DB_BUILD_INDEX_GPU = "build_index_gpu"; static const char *CONFIG_DB_BUILD_INDEX_GPU = "build_index_gpu";
static const char* CONFIG_DB_BUILD_INDEX_GPU_DEFAULT = "0"; static const char *CONFIG_DB_BUILD_INDEX_GPU_DEFAULT = "0";
/* cache config */ /* cache config */
static const char* CONFIG_CACHE = "cache_config"; static const char *CONFIG_CACHE = "cache_config";
static const char* CONFIG_CACHE_CPU_MEM_CAPACITY = "cpu_mem_capacity"; static const char *CONFIG_CACHE_CPU_MEM_CAPACITY = "cpu_mem_capacity";
static const char* CONFIG_CACHE_CPU_MEM_CAPACITY_DEFAULT = "16"; static const char *CONFIG_CACHE_CPU_MEM_CAPACITY_DEFAULT = "16";
static const char* CONFIG_CACHE_GPU_MEM_CAPACITY = "gpu_mem_capacity"; static const char *CONFIG_CACHE_GPU_MEM_CAPACITY = "gpu_mem_capacity";
static const char* CONFIG_CACHE_GPU_MEM_CAPACITY_DEFAULT = "0"; static const char *CONFIG_CACHE_GPU_MEM_CAPACITY_DEFAULT = "0";
static const char* CONFIG_CACHE_CPU_MEM_THRESHOLD = "cpu_mem_threshold"; static const char *CONFIG_CACHE_CPU_MEM_THRESHOLD = "cpu_mem_threshold";
static const char* CONFIG_CACHE_CPU_MEM_THRESHOLD_DEFAULT = "0.85"; static const char *CONFIG_CACHE_CPU_MEM_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_CACHE_GPU_MEM_THRESHOLD = "gpu_mem_threshold"; static const char *CONFIG_CACHE_GPU_MEM_THRESHOLD = "gpu_mem_threshold";
static const char* CONFIG_CACHE_GPU_MEM_THRESHOLD_DEFAULT = "0.85"; static const char *CONFIG_CACHE_GPU_MEM_THRESHOLD_DEFAULT = "0.85";
static const char* CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data"; static const char *CONFIG_CACHE_CACHE_INSERT_DATA = "cache_insert_data";
static const char* CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT = "false"; static const char *CONFIG_CACHE_CACHE_INSERT_DATA_DEFAULT = "false";
/* metric config */ /* metric config */
static const char* CONFIG_METRIC = "metric_config"; static const char *CONFIG_METRIC = "metric_config";
static const char* CONFIG_METRIC_AUTO_BOOTUP = "auto_bootup"; static const char *CONFIG_METRIC_AUTO_BOOTUP = "auto_bootup";
static const char* CONFIG_METRIC_AUTO_BOOTUP_DEFAULT = "false"; static const char *CONFIG_METRIC_AUTO_BOOTUP_DEFAULT = "false";
static const char* CONFIG_METRIC_COLLECTOR = "collector"; static const char *CONFIG_METRIC_COLLECTOR = "collector";
static const char* CONFIG_METRIC_COLLECTOR_DEFAULT = "prometheus"; static const char *CONFIG_METRIC_COLLECTOR_DEFAULT = "prometheus";
static const char* CONFIG_METRIC_PROMETHEUS = "prometheus_config"; static const char *CONFIG_METRIC_PROMETHEUS = "prometheus_config";
static const char* CONFIG_METRIC_PROMETHEUS_PORT = "port"; static const char *CONFIG_METRIC_PROMETHEUS_PORT = "port";
static const char* CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT = "8080"; static const char *CONFIG_METRIC_PROMETHEUS_PORT_DEFAULT = "8080";
/* engine config */ /* engine config */
static const char* CONFIG_ENGINE = "engine_config"; static const char *CONFIG_ENGINE = "engine_config";
static const char* CONFIG_ENGINE_BLAS_THRESHOLD = "blas_threshold"; static const char *CONFIG_ENGINE_BLAS_THRESHOLD = "blas_threshold";
static const char* CONFIG_ENGINE_BLAS_THRESHOLD_DEFAULT = "20"; static const char *CONFIG_ENGINE_BLAS_THRESHOLD_DEFAULT = "20";
static const char* CONFIG_ENGINE_OMP_THREAD_NUM = "omp_thread_num"; static const char *CONFIG_ENGINE_OMP_THREAD_NUM = "omp_thread_num";
static const char* CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0"; static const char *CONFIG_ENGINE_OMP_THREAD_NUM_DEFAULT = "0";
/* resource config */ /* resource config */
static const char* CONFIG_RESOURCE = "resource_config"; static const char *CONFIG_RESOURCE = "resource_config";
static const char* CONFIG_RESOURCE_MODE = "mode"; static const char *CONFIG_RESOURCE_MODE = "mode";
static const char* CONFIG_RESOURCE_MODE_DEFAULT = "simple"; static const char *CONFIG_RESOURCE_MODE_DEFAULT = "simple";
static const char* CONFIG_RESOURCE_POOL = "pool"; static const char *CONFIG_RESOURCE_POOL = "pool";
class Config { class Config {
public: public:
static Config& GetInstance(); static Config &GetInstance();
Status LoadConfigFile(const std::string& filename); Status LoadConfigFile(const std::string &filename);
void PrintAll(); void PrintAll();
private: private:
ConfigNode& GetConfigNode(const std::string& name); ConfigNode &GetConfigNode(const std::string &name);
Status GetConfigValueInMem(const std::string& parent_key, Status GetConfigValueInMem(const std::string &parent_key,
const std::string& child_key, const std::string &child_key,
std::string& value); std::string &value);
void SetConfigValueInMem(const std::string& parent_key, void SetConfigValueInMem(const std::string &parent_key,
const std::string& child_key, const std::string &child_key,
const std::string& value); const std::string &value);
void PrintConfigSection(const std::string& config_node_name); void PrintConfigSection(const std::string &config_node_name);
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
/* server config */ /* server config */
Status CheckServerConfigAddress(const std::string& value); Status CheckServerConfigAddress(const std::string &value);
Status CheckServerConfigPort(const std::string& value); Status CheckServerConfigPort(const std::string &value);
Status CheckServerConfigMode(const std::string& value); Status CheckServerConfigMode(const std::string &value);
Status CheckServerConfigTimeZone(const std::string& value); Status CheckServerConfigTimeZone(const std::string &value);
/* db config */ /* db config */
Status CheckDBConfigPath(const std::string& value); Status CheckDBConfigPath(const std::string &value);
Status CheckDBConfigSlavePath(const std::string& value); Status CheckDBConfigSlavePath(const std::string &value);
Status CheckDBConfigBackendUrl(const std::string& value); Status CheckDBConfigBackendUrl(const std::string &value);
Status CheckDBConfigArchiveDiskThreshold(const std::string& value); Status CheckDBConfigArchiveDiskThreshold(const std::string &value);
Status CheckDBConfigArchiveDaysThreshold(const std::string& value); Status CheckDBConfigArchiveDaysThreshold(const std::string &value);
Status CheckDBConfigBufferSize(const std::string& value); Status CheckDBConfigBufferSize(const std::string &value);
Status CheckDBConfigBuildIndexGPU(const std::string& value); Status CheckDBConfigBuildIndexGPU(const std::string &value);
/* metric config */ /* metric config */
Status CheckMetricConfigAutoBootup(const std::string& value); Status CheckMetricConfigAutoBootup(const std::string &value);
Status CheckMetricConfigCollector(const std::string& value); Status CheckMetricConfigCollector(const std::string &value);
Status CheckMetricConfigPrometheusPort(const std::string& value); Status CheckMetricConfigPrometheusPort(const std::string &value);
/* cache config */ /* cache config */
Status CheckCacheConfigCpuMemCapacity(const std::string& value); Status CheckCacheConfigCpuMemCapacity(const std::string &value);
Status CheckCacheConfigCpuMemThreshold(const std::string& value); Status CheckCacheConfigCpuMemThreshold(const std::string &value);
Status CheckCacheConfigGpuMemCapacity(const std::string& value); Status CheckCacheConfigGpuMemCapacity(const std::string &value);
Status CheckCacheConfigGpuMemThreshold(const std::string& value); Status CheckCacheConfigGpuMemThreshold(const std::string &value);
Status CheckCacheConfigCacheInsertData(const std::string& value); Status CheckCacheConfigCacheInsertData(const std::string &value);
/* engine config */ /* engine config */
Status CheckEngineConfigBlasThreshold(const std::string& value); Status CheckEngineConfigBlasThreshold(const std::string &value);
Status CheckEngineConfigOmpThreadNum(const std::string& value); Status CheckEngineConfigOmpThreadNum(const std::string &value);
/* resource config */ /* resource config */
Status CheckResourceConfigMode(const std::string& value); Status CheckResourceConfigMode(const std::string &value);
Status CheckResourceConfigPool(const std::vector<std::string>& value); Status CheckResourceConfigPool(const std::vector<std::string> &value);
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
/* server config */ /* server config */
...@@ -185,81 +186,81 @@ class Config { ...@@ -185,81 +186,81 @@ class Config {
public: public:
/* server config */ /* server config */
Status GetServerConfigAddress(std::string& value); Status GetServerConfigAddress(std::string &value);
Status GetServerConfigPort(std::string& value); Status GetServerConfigPort(std::string &value);
Status GetServerConfigMode(std::string& value); Status GetServerConfigMode(std::string &value);
Status GetServerConfigTimeZone(std::string& value); Status GetServerConfigTimeZone(std::string &value);
/* db config */ /* db config */
Status GetDBConfigPath(std::string& value); Status GetDBConfigPath(std::string &value);
Status GetDBConfigSlavePath(std::string& value); Status GetDBConfigSlavePath(std::string &value);
Status GetDBConfigBackendUrl(std::string& value); Status GetDBConfigBackendUrl(std::string &value);
Status GetDBConfigArchiveDiskThreshold(int32_t& value); Status GetDBConfigArchiveDiskThreshold(int32_t &value);
Status GetDBConfigArchiveDaysThreshold(int32_t& value); Status GetDBConfigArchiveDaysThreshold(int32_t &value);
Status GetDBConfigBufferSize(int32_t& value); Status GetDBConfigBufferSize(int32_t &value);
Status GetDBConfigBuildIndexGPU(int32_t& value); Status GetDBConfigBuildIndexGPU(int32_t &value);
/* metric config */ /* metric config */
Status GetMetricConfigAutoBootup(bool& value); Status GetMetricConfigAutoBootup(bool &value);
Status GetMetricConfigCollector(std::string& value); Status GetMetricConfigCollector(std::string &value);
Status GetMetricConfigPrometheusPort(std::string& value); Status GetMetricConfigPrometheusPort(std::string &value);
/* cache config */ /* cache config */
Status GetCacheConfigCpuMemCapacity(int32_t& value); Status GetCacheConfigCpuMemCapacity(int32_t &value);
Status GetCacheConfigCpuMemThreshold(float& value); Status GetCacheConfigCpuMemThreshold(float &value);
Status GetCacheConfigGpuMemCapacity(int32_t& value); Status GetCacheConfigGpuMemCapacity(int32_t &value);
Status GetCacheConfigGpuMemThreshold(float& value); Status GetCacheConfigGpuMemThreshold(float &value);
Status GetCacheConfigCacheInsertData(bool& value); Status GetCacheConfigCacheInsertData(bool &value);
/* engine config */ /* engine config */
Status GetEngineConfigBlasThreshold(int32_t& value); Status GetEngineConfigBlasThreshold(int32_t &value);
Status GetEngineConfigOmpThreadNum(int32_t& value); Status GetEngineConfigOmpThreadNum(int32_t &value);
/* resource config */ /* resource config */
Status GetResourceConfigMode(std::string& value); Status GetResourceConfigMode(std::string &value);
Status GetResourceConfigPool(std::vector<std::string>& value); Status GetResourceConfigPool(std::vector<std::string> &value);
public: public:
/* server config */ /* server config */
Status SetServerConfigAddress(const std::string& value); Status SetServerConfigAddress(const std::string &value);
Status SetServerConfigPort(const std::string& value); Status SetServerConfigPort(const std::string &value);
Status SetServerConfigMode(const std::string& value); Status SetServerConfigMode(const std::string &value);
Status SetServerConfigTimeZone(const std::string& value); Status SetServerConfigTimeZone(const std::string &value);
/* db config */ /* db config */
Status SetDBConfigPath(const std::string& value); Status SetDBConfigPath(const std::string &value);
Status SetDBConfigSlavePath(const std::string& value); Status SetDBConfigSlavePath(const std::string &value);
Status SetDBConfigBackendUrl(const std::string& value); Status SetDBConfigBackendUrl(const std::string &value);
Status SetDBConfigArchiveDiskThreshold(const std::string& value); Status SetDBConfigArchiveDiskThreshold(const std::string &value);
Status SetDBConfigArchiveDaysThreshold(const std::string& value); Status SetDBConfigArchiveDaysThreshold(const std::string &value);
Status SetDBConfigBufferSize(const std::string& value); Status SetDBConfigBufferSize(const std::string &value);
Status SetDBConfigBuildIndexGPU(const std::string& value); Status SetDBConfigBuildIndexGPU(const std::string &value);
/* metric config */ /* metric config */
Status SetMetricConfigAutoBootup(const std::string& value); Status SetMetricConfigAutoBootup(const std::string &value);
Status SetMetricConfigCollector(const std::string& value); Status SetMetricConfigCollector(const std::string &value);
Status SetMetricConfigPrometheusPort(const std::string& value); Status SetMetricConfigPrometheusPort(const std::string &value);
/* cache config */ /* cache config */
Status SetCacheConfigCpuMemCapacity(const std::string& value); Status SetCacheConfigCpuMemCapacity(const std::string &value);
Status SetCacheConfigCpuMemThreshold(const std::string& value); Status SetCacheConfigCpuMemThreshold(const std::string &value);
Status SetCacheConfigGpuMemCapacity(const std::string& value); Status SetCacheConfigGpuMemCapacity(const std::string &value);
Status SetCacheConfigGpuMemThreshold(const std::string& value); Status SetCacheConfigGpuMemThreshold(const std::string &value);
Status SetCacheConfigCacheInsertData(const std::string& value); Status SetCacheConfigCacheInsertData(const std::string &value);
/* engine config */ /* engine config */
Status SetEngineConfigBlasThreshold(const std::string& value); Status SetEngineConfigBlasThreshold(const std::string &value);
Status SetEngineConfigOmpThreadNum(const std::string& value); Status SetEngineConfigOmpThreadNum(const std::string &value);
/* resource config */ /* resource config */
Status SetResourceConfigMode(const std::string& value); Status SetResourceConfigMode(const std::string &value);
private: private:
std::unordered_map<std::string, std::unordered_map<std::string, std::string>> config_map_; std::unordered_map<std::string, std::unordered_map<std::string, std::string>> config_map_;
std::mutex mutex_; std::mutex mutex_;
}; };
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
...@@ -16,13 +16,14 @@ ...@@ -16,13 +16,14 @@
// under the License. // under the License.
#include "DBWrapper.h" #include "server/DBWrapper.h"
#include "Config.h" #include "Config.h"
#include "db/DBFactory.h" #include "db/DBFactory.h"
#include "utils/CommonUtil.h" #include "utils/CommonUtil.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/StringHelpFunctions.h" #include "utils/StringHelpFunctions.h"
#include <string>
#include <omp.h> #include <omp.h>
#include <faiss/utils.h> #include <faiss/utils.h>
...@@ -31,11 +32,11 @@ namespace milvus { ...@@ -31,11 +32,11 @@ namespace milvus {
namespace server { namespace server {
DBWrapper::DBWrapper() { DBWrapper::DBWrapper() {
} }
Status DBWrapper::StartService() { Status
Config& config = Config::GetInstance(); DBWrapper::StartService() {
Config &config = Config::GetInstance();
Status s; Status s;
//db config //db config
engine::DBOptions opt; engine::DBOptions opt;
...@@ -65,15 +66,13 @@ Status DBWrapper::StartService() { ...@@ -65,15 +66,13 @@ Status DBWrapper::StartService() {
if (mode == "single") { if (mode == "single") {
opt.mode_ = engine::DBOptions::MODE::SINGLE; opt.mode_ = engine::DBOptions::MODE::SINGLE;
} } else if (mode == "cluster") {
else if (mode == "cluster") {
opt.mode_ = engine::DBOptions::MODE::CLUSTER; opt.mode_ = engine::DBOptions::MODE::CLUSTER;
} } else if (mode == "read_only") {
else if (mode == "read_only") {
opt.mode_ = engine::DBOptions::MODE::READ_ONLY; opt.mode_ = engine::DBOptions::MODE::READ_ONLY;
} } else {
else { std::cerr << "ERROR: mode specified in server_config is not one of ['single', 'cluster', 'read_only']"
std::cerr << "ERROR: mode specified in server_config is not one of ['single', 'cluster', 'read_only']" << std::endl; << std::endl;
kill(0, SIGUSR1); kill(0, SIGUSR1);
} }
...@@ -86,8 +85,8 @@ Status DBWrapper::StartService() { ...@@ -86,8 +85,8 @@ Status DBWrapper::StartService() {
SERVER_LOG_DEBUG << "Specify openmp thread number: " << omp_thread; SERVER_LOG_DEBUG << "Specify openmp thread number: " << omp_thread;
} else { } else {
uint32_t sys_thread_cnt = 8; uint32_t sys_thread_cnt = 8;
if(CommonUtil::GetSystemAvailableThreads(sys_thread_cnt)) { if (CommonUtil::GetSystemAvailableThreads(sys_thread_cnt)) {
omp_thread = (int32_t)ceil(sys_thread_cnt*0.5); omp_thread = (int32_t) ceil(sys_thread_cnt * 0.5);
omp_set_num_threads(omp_thread); omp_set_num_threads(omp_thread);
} }
} }
...@@ -116,14 +115,14 @@ Status DBWrapper::StartService() { ...@@ -116,14 +115,14 @@ Status DBWrapper::StartService() {
//create db root folder //create db root folder
Status status = CommonUtil::CreateDirectory(opt.meta_.path_); Status status = CommonUtil::CreateDirectory(opt.meta_.path_);
if(!status.ok()) { if (!status.ok()) {
std::cerr << "ERROR! Failed to create database root path: " << opt.meta_.path_ << std::endl; std::cerr << "ERROR! Failed to create database root path: " << opt.meta_.path_ << std::endl;
kill(0, SIGUSR1); kill(0, SIGUSR1);
} }
for(auto& path : opt.meta_.slave_paths_) { for (auto &path : opt.meta_.slave_paths_) {
status = CommonUtil::CreateDirectory(path); status = CommonUtil::CreateDirectory(path);
if(!status.ok()) { if (!status.ok()) {
std::cerr << "ERROR! Failed to create database slave path: " << path << std::endl; std::cerr << "ERROR! Failed to create database slave path: " << path << std::endl;
kill(0, SIGUSR1); kill(0, SIGUSR1);
} }
...@@ -132,7 +131,7 @@ Status DBWrapper::StartService() { ...@@ -132,7 +131,7 @@ Status DBWrapper::StartService() {
//create db instance //create db instance
try { try {
db_ = engine::DBFactory::Build(opt); db_ = engine::DBFactory::Build(opt);
} catch(std::exception& ex) { } catch (std::exception &ex) {
std::cerr << "ERROR! Failed to open database: " << ex.what() << std::endl; std::cerr << "ERROR! Failed to open database: " << ex.what() << std::endl;
kill(0, SIGUSR1); kill(0, SIGUSR1);
} }
...@@ -142,14 +141,15 @@ Status DBWrapper::StartService() { ...@@ -142,14 +141,15 @@ Status DBWrapper::StartService() {
return Status::OK(); return Status::OK();
} }
Status DBWrapper::StopService() { Status
if(db_) { DBWrapper::StopService() {
if (db_) {
db_->Stop(); db_->Stop();
} }
return Status::OK(); return Status::OK();
} }
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
\ No newline at end of file
...@@ -27,12 +27,12 @@ namespace milvus { ...@@ -27,12 +27,12 @@ namespace milvus {
namespace server { namespace server {
class DBWrapper { class DBWrapper {
private: private:
DBWrapper(); DBWrapper();
~DBWrapper() = default; ~DBWrapper() = default;
public: public:
static DBWrapper& GetInstance() { static DBWrapper &GetInstance() {
static DBWrapper wrapper; static DBWrapper wrapper;
return wrapper; return wrapper;
} }
...@@ -48,10 +48,10 @@ public: ...@@ -48,10 +48,10 @@ public:
return db_; return db_;
} }
private: private:
engine::DBPtr db_; engine::DBPtr db_;
}; };
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "server/Server.h"
#include <thread> #include <thread>
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
...@@ -24,7 +26,6 @@ ...@@ -24,7 +26,6 @@
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include "Server.h"
#include "server/grpc_impl/GrpcServer.h" #include "server/grpc_impl/GrpcServer.h"
#include "server/Config.h" #include "server/Config.h"
#include "utils/Log.h" #include "utils/Log.h"
...@@ -36,7 +37,6 @@ ...@@ -36,7 +37,6 @@
#include "wrapper/KnowhereResource.h" #include "wrapper/KnowhereResource.h"
#include "DBWrapper.h" #include "DBWrapper.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
...@@ -116,7 +116,7 @@ Server::Daemonize() { ...@@ -116,7 +116,7 @@ Server::Daemonize() {
} }
// Close all open fd // Close all open fd
for (long fd = sysconf(_SC_OPEN_MAX); fd > 0; fd--) { for (int64_t fd = sysconf(_SC_OPEN_MAX); fd > 0; fd--) {
close(fd); close(fd);
} }
...@@ -172,9 +172,9 @@ Server::Start() { ...@@ -172,9 +172,9 @@ Server::Start() {
time_zone = "CUT"; time_zone = "CUT";
} else { } else {
int time_bias = std::stoi(time_zone.substr(3, std::string::npos)); int time_bias = std::stoi(time_zone.substr(3, std::string::npos));
if (time_bias == 0) if (time_bias == 0) {
time_zone = "CUT"; time_zone = "CUT";
else if (time_bias > 0) { } else if (time_bias > 0) {
time_zone = "CUT" + std::to_string(-time_bias); time_zone = "CUT" + std::to_string(-time_bias);
} else { } else {
time_zone = "CUT+" + std::to_string(-time_bias); time_zone = "CUT+" + std::to_string(-time_bias);
...@@ -194,7 +194,6 @@ Server::Start() { ...@@ -194,7 +194,6 @@ Server::Start() {
StartService(); StartService();
std::cout << "Milvus server start successfully." << std::endl; std::cout << "Milvus server start successfully." << std::endl;
} catch (std::exception &ex) { } catch (std::exception &ex) {
std::cerr << "Milvus server encounter exception: " << ex.what(); std::cerr << "Milvus server encounter exception: " << ex.what();
} }
...@@ -232,10 +231,9 @@ Server::Stop() { ...@@ -232,10 +231,9 @@ Server::Stop() {
std::cerr << "Milvus server is closed!" << std::endl; std::cerr << "Milvus server is closed!" << std::endl;
} }
ErrorCode ErrorCode
Server::LoadConfig() { Server::LoadConfig() {
Config& config = Config::GetInstance(); Config &config = Config::GetInstance();
Status s = config.LoadConfigFile(config_filename_); Status s = config.LoadConfigFile(config_filename_);
if (!s.ok()) { if (!s.ok()) {
std::cerr << "Failed to load config file: " << config_filename_ << std::endl; std::cerr << "Failed to load config file: " << config_filename_ << std::endl;
...@@ -260,6 +258,6 @@ Server::StopService() { ...@@ -260,6 +258,6 @@ Server::StopService() {
engine::KnowhereResource::Finalize(); engine::KnowhereResource::Finalize();
} }
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
#include <cstdint> #include <cstdint>
#include <string> #include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
...@@ -58,6 +57,6 @@ class Server { ...@@ -58,6 +57,6 @@ class Server {
std::string log_config_file_; std::string log_config_file_;
}; // Server }; // Server
} // server } // namespace server
} // sql } // namespace milvus
} // zilliz } // namespace zilliz
...@@ -16,10 +16,12 @@ ...@@ -16,10 +16,12 @@
// under the License. // under the License.
#include "GrpcRequestHandler.h" #include "server/grpc_impl/GrpcRequestHandler.h"
#include "GrpcRequestTask.h" #include "server/grpc_impl/GrpcRequestTask.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
#include <vector>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
...@@ -29,7 +31,6 @@ namespace grpc { ...@@ -29,7 +31,6 @@ namespace grpc {
GrpcRequestHandler::CreateTable(::grpc::ServerContext *context, GrpcRequestHandler::CreateTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableSchema *request, const ::milvus::grpc::TableSchema *request,
::milvus::grpc::Status *response) { ::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = CreateTableTask::Create(request); BaseTaskPtr task_ptr = CreateTableTask::Create(request);
GrpcRequestScheduler::ExecTask(task_ptr, response); GrpcRequestScheduler::ExecTask(task_ptr, response);
return ::grpc::Status::OK; return ::grpc::Status::OK;
...@@ -39,7 +40,6 @@ GrpcRequestHandler::CreateTable(::grpc::ServerContext *context, ...@@ -39,7 +40,6 @@ GrpcRequestHandler::CreateTable(::grpc::ServerContext *context,
GrpcRequestHandler::HasTable(::grpc::ServerContext *context, GrpcRequestHandler::HasTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::BoolReply *response) { ::milvus::grpc::BoolReply *response) {
bool has_table = false; bool has_table = false;
BaseTaskPtr task_ptr = HasTableTask::Create(request->table_name(), has_table); BaseTaskPtr task_ptr = HasTableTask::Create(request->table_name(), has_table);
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
...@@ -61,9 +61,8 @@ GrpcRequestHandler::DropTable(::grpc::ServerContext *context, ...@@ -61,9 +61,8 @@ GrpcRequestHandler::DropTable(::grpc::ServerContext *context,
::grpc::Status ::grpc::Status
GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context, GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context,
const ::milvus::grpc::IndexParam *request, const ::milvus::grpc::IndexParam *request,
::milvus::grpc::Status *response) { ::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = CreateIndexTask::Create(request); BaseTaskPtr task_ptr = CreateIndexTask::Create(request);
GrpcRequestScheduler::ExecTask(task_ptr, response); GrpcRequestScheduler::ExecTask(task_ptr, response);
return ::grpc::Status::OK; return ::grpc::Status::OK;
...@@ -71,9 +70,8 @@ GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context, ...@@ -71,9 +70,8 @@ GrpcRequestHandler::CreateIndex(::grpc::ServerContext *context,
::grpc::Status ::grpc::Status
GrpcRequestHandler::Insert(::grpc::ServerContext *context, GrpcRequestHandler::Insert(::grpc::ServerContext *context,
const ::milvus::grpc::InsertParam *request, const ::milvus::grpc::InsertParam *request,
::milvus::grpc::VectorIds *response) { ::milvus::grpc::VectorIds *response) {
BaseTaskPtr task_ptr = InsertTask::Create(request, response); BaseTaskPtr task_ptr = InsertTask::Create(request, response);
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
...@@ -86,7 +84,6 @@ GrpcRequestHandler::Insert(::grpc::ServerContext *context, ...@@ -86,7 +84,6 @@ GrpcRequestHandler::Insert(::grpc::ServerContext *context,
GrpcRequestHandler::Search(::grpc::ServerContext *context, GrpcRequestHandler::Search(::grpc::ServerContext *context,
const ::milvus::grpc::SearchParam *request, const ::milvus::grpc::SearchParam *request,
::milvus::grpc::TopKQueryResultList *response) { ::milvus::grpc::TopKQueryResultList *response) {
std::vector<std::string> file_id_array; std::vector<std::string> file_id_array;
BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, response); BaseTaskPtr task_ptr = SearchTask::Create(request, file_id_array, response);
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
...@@ -100,7 +97,6 @@ GrpcRequestHandler::Search(::grpc::ServerContext *context, ...@@ -100,7 +97,6 @@ GrpcRequestHandler::Search(::grpc::ServerContext *context,
GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context, GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context,
const ::milvus::grpc::SearchInFilesParam *request, const ::milvus::grpc::SearchInFilesParam *request,
::milvus::grpc::TopKQueryResultList *response) { ::milvus::grpc::TopKQueryResultList *response) {
std::vector<std::string> file_id_array; std::vector<std::string> file_id_array;
for (int i = 0; i < request->file_id_array_size(); i++) { for (int i = 0; i < request->file_id_array_size(); i++) {
file_id_array.push_back(request->file_id_array(i)); file_id_array.push_back(request->file_id_array(i));
...@@ -118,7 +114,6 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context, ...@@ -118,7 +114,6 @@ GrpcRequestHandler::SearchInFiles(::grpc::ServerContext *context,
GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context, GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::TableSchema *response) { ::milvus::grpc::TableSchema *response) {
BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), response); BaseTaskPtr task_ptr = DescribeTableTask::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
...@@ -129,9 +124,8 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context, ...@@ -129,9 +124,8 @@ GrpcRequestHandler::DescribeTable(::grpc::ServerContext *context,
::grpc::Status ::grpc::Status
GrpcRequestHandler::CountTable(::grpc::ServerContext *context, GrpcRequestHandler::CountTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::TableRowCount *response) { ::milvus::grpc::TableRowCount *response) {
int64_t row_count = 0; int64_t row_count = 0;
BaseTaskPtr task_ptr = CountTableTask::Create(request->table_name(), row_count); BaseTaskPtr task_ptr = CountTableTask::Create(request->table_name(), row_count);
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
...@@ -146,7 +140,6 @@ GrpcRequestHandler::CountTable(::grpc::ServerContext *context, ...@@ -146,7 +140,6 @@ GrpcRequestHandler::CountTable(::grpc::ServerContext *context,
GrpcRequestHandler::ShowTables(::grpc::ServerContext *context, GrpcRequestHandler::ShowTables(::grpc::ServerContext *context,
const ::milvus::grpc::Command *request, const ::milvus::grpc::Command *request,
::milvus::grpc::TableNameList *response) { ::milvus::grpc::TableNameList *response) {
BaseTaskPtr task_ptr = ShowTablesTask::Create(response); BaseTaskPtr task_ptr = ShowTablesTask::Create(response);
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
...@@ -157,9 +150,8 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext *context, ...@@ -157,9 +150,8 @@ GrpcRequestHandler::ShowTables(::grpc::ServerContext *context,
::grpc::Status ::grpc::Status
GrpcRequestHandler::Cmd(::grpc::ServerContext *context, GrpcRequestHandler::Cmd(::grpc::ServerContext *context,
const ::milvus::grpc::Command *request, const ::milvus::grpc::Command *request,
::milvus::grpc::StringReply *response) { ::milvus::grpc::StringReply *response) {
std::string result; std::string result;
BaseTaskPtr task_ptr = CmdTask::Create(request->cmd(), result); BaseTaskPtr task_ptr = CmdTask::Create(request->cmd(), result);
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
...@@ -172,8 +164,8 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext *context, ...@@ -172,8 +164,8 @@ GrpcRequestHandler::Cmd(::grpc::ServerContext *context,
::grpc::Status ::grpc::Status
GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context, GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context,
const ::milvus::grpc::DeleteByRangeParam *request, const ::milvus::grpc::DeleteByRangeParam *request,
::milvus::grpc::Status *response) { ::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = DeleteByRangeTask::Create(request); BaseTaskPtr task_ptr = DeleteByRangeTask::Create(request);
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
...@@ -184,8 +176,8 @@ GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context, ...@@ -184,8 +176,8 @@ GrpcRequestHandler::DeleteByRange(::grpc::ServerContext *context,
::grpc::Status ::grpc::Status
GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context, GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) { ::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = PreloadTableTask::Create(request->table_name()); BaseTaskPtr task_ptr = PreloadTableTask::Create(request->table_name());
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
...@@ -196,8 +188,8 @@ GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context, ...@@ -196,8 +188,8 @@ GrpcRequestHandler::PreloadTable(::grpc::ServerContext *context,
::grpc::Status ::grpc::Status
GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context, GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::IndexParam *response) { ::milvus::grpc::IndexParam *response) {
BaseTaskPtr task_ptr = DescribeIndexTask::Create(request->table_name(), response); BaseTaskPtr task_ptr = DescribeIndexTask::Create(request->table_name(), response);
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
...@@ -208,8 +200,8 @@ GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context, ...@@ -208,8 +200,8 @@ GrpcRequestHandler::DescribeIndex(::grpc::ServerContext *context,
::grpc::Status ::grpc::Status
GrpcRequestHandler::DropIndex(::grpc::ServerContext *context, GrpcRequestHandler::DropIndex(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) { ::milvus::grpc::Status *response) {
BaseTaskPtr task_ptr = DropIndexTask::Create(request->table_name()); BaseTaskPtr task_ptr = DropIndexTask::Create(request->table_name());
::milvus::grpc::Status grpc_status; ::milvus::grpc::Status grpc_status;
GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status); GrpcRequestScheduler::ExecTask(task_ptr, &grpc_status);
...@@ -218,8 +210,7 @@ GrpcRequestHandler::DropIndex(::grpc::ServerContext *context, ...@@ -218,8 +210,7 @@ GrpcRequestHandler::DropIndex(::grpc::ServerContext *context,
return ::grpc::Status::OK; return ::grpc::Status::OK;
} }
} // namespace grpc
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
}
\ No newline at end of file
...@@ -20,15 +20,15 @@ ...@@ -20,15 +20,15 @@
#include <cstdint> #include <cstdint>
#include <string> #include <string>
#include "milvus.grpc.pb.h" #include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "status.pb.h" #include "grpc/gen-status/status.pb.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
namespace grpc { namespace grpc {
class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service { class GrpcRequestHandler final : public ::milvus::grpc::MilvusService::Service {
public: public:
/** /**
* @brief Create table method * @brief Create table method
* *
...@@ -103,8 +103,7 @@ public: ...@@ -103,8 +103,7 @@ public:
*/ */
::grpc::Status ::grpc::Status
CreateIndex(::grpc::ServerContext *context, CreateIndex(::grpc::ServerContext *context,
const ::milvus::grpc::IndexParam *request, ::milvus::grpc::Status *response) override; const ::milvus::grpc::IndexParam *request, ::milvus::grpc::Status *response) override;
/** /**
* @brief Insert vector array to table * @brief Insert vector array to table
...@@ -123,8 +122,8 @@ public: ...@@ -123,8 +122,8 @@ public:
*/ */
::grpc::Status ::grpc::Status
Insert(::grpc::ServerContext *context, Insert(::grpc::ServerContext *context,
const ::milvus::grpc::InsertParam *request, const ::milvus::grpc::InsertParam *request,
::milvus::grpc::VectorIds *response) override; ::milvus::grpc::VectorIds *response) override;
/** /**
* @brief Query vector * @brief Query vector
...@@ -213,8 +212,8 @@ public: ...@@ -213,8 +212,8 @@ public:
*/ */
::grpc::Status ::grpc::Status
CountTable(::grpc::ServerContext *context, CountTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::TableRowCount *response) override; ::milvus::grpc::TableRowCount *response) override;
/** /**
* @brief List all tables in database * @brief List all tables in database
...@@ -253,8 +252,8 @@ public: ...@@ -253,8 +252,8 @@ public:
*/ */
::grpc::Status ::grpc::Status
Cmd(::grpc::ServerContext *context, Cmd(::grpc::ServerContext *context,
const ::milvus::grpc::Command *request, const ::milvus::grpc::Command *request,
::milvus::grpc::StringReply *response) override; ::milvus::grpc::StringReply *response) override;
/** /**
* @brief delete table by range * @brief delete table by range
...@@ -291,8 +290,8 @@ public: ...@@ -291,8 +290,8 @@ public:
*/ */
::grpc::Status ::grpc::Status
PreloadTable(::grpc::ServerContext *context, PreloadTable(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) override; ::milvus::grpc::Status *response) override;
/** /**
* @brief Describe index * @brief Describe index
...@@ -310,8 +309,8 @@ public: ...@@ -310,8 +309,8 @@ public:
*/ */
::grpc::Status ::grpc::Status
DescribeIndex(::grpc::ServerContext *context, DescribeIndex(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::IndexParam *response) override; ::milvus::grpc::IndexParam *response) override;
/** /**
* @brief Drop index * @brief Drop index
...@@ -329,12 +328,12 @@ public: ...@@ -329,12 +328,12 @@ public:
*/ */
::grpc::Status ::grpc::Status
DropIndex(::grpc::ServerContext *context, DropIndex(::grpc::ServerContext *context,
const ::milvus::grpc::TableName *request, const ::milvus::grpc::TableName *request,
::milvus::grpc::Status *response) override; ::milvus::grpc::Status *response) override;
}; };
}
} } // namespace grpc
} } // namespace server
} } // namespace milvus
} // namespace zilliz
...@@ -15,101 +15,107 @@ ...@@ -15,101 +15,107 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "GrpcRequestScheduler.h" #include "server/grpc_impl/GrpcRequestScheduler.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "src/grpc/gen-status/status.pb.h" #include "grpc/gen-status/status.pb.h"
#include <utility>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
namespace grpc { namespace grpc {
using namespace ::milvus;
namespace { namespace {
::milvus::grpc::ErrorCode ErrorMap(ErrorCode code) { ::milvus::grpc::ErrorCode
static const std::map<ErrorCode, ::milvus::grpc::ErrorCode> code_map = { ErrorMap(ErrorCode code) {
{SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, static const std::map<ErrorCode, ::milvus::grpc::ErrorCode> code_map = {
{SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, {SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
{SERVER_NULL_POINTER, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, {SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
{SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, {SERVER_NULL_POINTER, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
{SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND}, {SERVER_INVALID_ARGUMENT, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, {SERVER_FILE_NOT_FOUND, ::milvus::grpc::ErrorCode::FILE_NOT_FOUND},
{SERVER_BLOCKING_QUEUE_EMPTY, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR}, {SERVER_NOT_IMPLEMENT, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
{SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER}, {SERVER_BLOCKING_QUEUE_EMPTY, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
{SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE}, {SERVER_CANNOT_CREATE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FOLDER},
{SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER}, {SERVER_CANNOT_CREATE_FILE, ::milvus::grpc::ErrorCode::CANNOT_CREATE_FILE},
{SERVER_CANNOT_DELETE_FILE, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE}, {SERVER_CANNOT_DELETE_FOLDER, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FOLDER},
{SERVER_TABLE_NOT_EXIST, ::milvus::grpc::ErrorCode::TABLE_NOT_EXISTS}, {SERVER_CANNOT_DELETE_FILE, ::milvus::grpc::ErrorCode::CANNOT_DELETE_FILE},
{SERVER_INVALID_TABLE_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_TABLE_NAME}, {SERVER_TABLE_NOT_EXIST, ::milvus::grpc::ErrorCode::TABLE_NOT_EXISTS},
{SERVER_INVALID_TABLE_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, {SERVER_INVALID_TABLE_NAME, ::milvus::grpc::ErrorCode::ILLEGAL_TABLE_NAME},
{SERVER_INVALID_TIME_RANGE, ::milvus::grpc::ErrorCode::ILLEGAL_RANGE}, {SERVER_INVALID_TABLE_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION},
{SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION}, {SERVER_INVALID_TIME_RANGE, ::milvus::grpc::ErrorCode::ILLEGAL_RANGE},
{SERVER_INVALID_VECTOR_DIMENSION, ::milvus::grpc::ErrorCode::ILLEGAL_DIMENSION},
{SERVER_INVALID_INDEX_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE},
{SERVER_INVALID_ROWRECORD, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, {SERVER_INVALID_INDEX_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_INDEX_TYPE},
{SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD}, {SERVER_INVALID_ROWRECORD, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD},
{SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK}, {SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD},
{SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, {SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK},
{SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST}, {SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_INVALID_INDEX_METRIC_TYPE,::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE}, {SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST},
{SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT}, {SERVER_INVALID_INDEX_METRIC_TYPE, ::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE},
{SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID}, {SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT}, {SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID},
{SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED}, {SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT},
{DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED}, {SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED},
{SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR}, {DB_META_TRANSACTION_FAILED, ::milvus::grpc::ErrorCode::META_FAILED},
{SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY}, {SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR},
}; {SERVER_OUT_OF_MEMORY, ::milvus::grpc::ErrorCode::OUT_OF_MEMORY},
};
if(code_map.find(code) != code_map.end()) {
return code_map.at(code); if (code_map.find(code) != code_map.end()) {
} else { return code_map.at(code);
return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR; } else {
} return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR;
} }
} }
} // namespace
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GrpcBaseTask::GrpcBaseTask(const std::string &task_group, bool async) GrpcBaseTask::GrpcBaseTask(const std::string &task_group, bool async)
: task_group_(task_group), : task_group_(task_group),
async_(async), async_(async),
done_(false) { done_(false) {
} }
GrpcBaseTask::~GrpcBaseTask() { GrpcBaseTask::~GrpcBaseTask() {
WaitToFinish(); WaitToFinish();
} }
Status GrpcBaseTask::Execute() { Status
GrpcBaseTask::Execute() {
status_ = OnExecute(); status_ = OnExecute();
Done(); Done();
return status_; return status_;
} }
void GrpcBaseTask::Done() { void
GrpcBaseTask::Done() {
done_ = true; done_ = true;
finish_cond_.notify_all(); finish_cond_.notify_all();
} }
Status GrpcBaseTask::SetStatus(ErrorCode error_code, const std::string &error_msg) { Status
GrpcBaseTask::SetStatus(ErrorCode error_code, const std::string &error_msg) {
status_ = Status(error_code, error_msg); status_ = Status(error_code, error_msg);
SERVER_LOG_ERROR << error_msg; SERVER_LOG_ERROR << error_msg;
return status_; return status_;
} }
Status GrpcBaseTask::WaitToFinish() { Status
GrpcBaseTask::WaitToFinish() {
std::unique_lock<std::mutex> lock(finish_mtx_); std::unique_lock<std::mutex> lock(finish_mtx_);
finish_cond_.wait(lock, [this] { return done_; }); finish_cond_.wait(lock, [this] {
return done_;
});
return status_; return status_;
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GrpcRequestScheduler::GrpcRequestScheduler() GrpcRequestScheduler::GrpcRequestScheduler()
: stopped_(false) { : stopped_(false) {
Start(); Start();
} }
...@@ -117,7 +123,8 @@ GrpcRequestScheduler::~GrpcRequestScheduler() { ...@@ -117,7 +123,8 @@ GrpcRequestScheduler::~GrpcRequestScheduler() {
Stop(); Stop();
} }
void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) { void
GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status) {
if (task_ptr == nullptr) { if (task_ptr == nullptr) {
return; return;
} }
...@@ -127,7 +134,7 @@ void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Statu ...@@ -127,7 +134,7 @@ void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Statu
if (!task_ptr->IsAsync()) { if (!task_ptr->IsAsync()) {
task_ptr->WaitToFinish(); task_ptr->WaitToFinish();
const Status& status = task_ptr->status(); const Status &status = task_ptr->status();
if (!status.ok()) { if (!status.ok()) {
grpc_status->set_reason(status.message()); grpc_status->set_reason(status.message());
grpc_status->set_error_code(ErrorMap(status.code())); grpc_status->set_error_code(ErrorMap(status.code()));
...@@ -135,7 +142,8 @@ void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Statu ...@@ -135,7 +142,8 @@ void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Statu
} }
} }
void GrpcRequestScheduler::Start() { void
GrpcRequestScheduler::Start() {
if (!stopped_) { if (!stopped_) {
return; return;
} }
...@@ -143,7 +151,8 @@ void GrpcRequestScheduler::Start() { ...@@ -143,7 +151,8 @@ void GrpcRequestScheduler::Start() {
stopped_ = false; stopped_ = false;
} }
void GrpcRequestScheduler::Stop() { void
GrpcRequestScheduler::Stop() {
if (stopped_) { if (stopped_) {
return; return;
} }
...@@ -168,7 +177,8 @@ void GrpcRequestScheduler::Stop() { ...@@ -168,7 +177,8 @@ void GrpcRequestScheduler::Stop() {
SERVER_LOG_INFO << "Scheduler stopped"; SERVER_LOG_INFO << "Scheduler stopped";
} }
Status GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { Status
GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
if (task_ptr == nullptr) { if (task_ptr == nullptr) {
return Status::OK(); return Status::OK();
} }
...@@ -186,8 +196,8 @@ Status GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) { ...@@ -186,8 +196,8 @@ Status GrpcRequestScheduler::ExecuteTask(const BaseTaskPtr &task_ptr) {
return task_ptr->WaitToFinish();//sync execution return task_ptr->WaitToFinish();//sync execution
} }
void
void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
if (task_queue == nullptr) { if (task_queue == nullptr) {
return; return;
} }
...@@ -210,7 +220,8 @@ void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) { ...@@ -210,7 +220,8 @@ void GrpcRequestScheduler::TakeTaskToExecute(TaskQueuePtr task_queue) {
} }
} }
Status GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { Status
GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
std::lock_guard<std::mutex> lock(queue_mtx_); std::lock_guard<std::mutex> lock(queue_mtx_);
std::string group_name = task_ptr->TaskGroup(); std::string group_name = task_ptr->TaskGroup();
...@@ -230,7 +241,7 @@ Status GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) { ...@@ -230,7 +241,7 @@ Status GrpcRequestScheduler::PutTaskToQueue(const BaseTaskPtr &task_ptr) {
return Status::OK(); return Status::OK();
} }
} } // namespace grpc
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
...@@ -19,12 +19,14 @@ ...@@ -19,12 +19,14 @@
#include "utils/Status.h" #include "utils/Status.h"
#include "utils/BlockingQueue.h" #include "utils/BlockingQueue.h"
#include "status.grpc.pb.h" #include "grpc/gen-status/status.grpc.pb.h"
#include "status.pb.h" #include "grpc/gen-status/status.pb.h"
#include <map> #include <map>
#include <vector> #include <vector>
#include <thread> #include <thread>
#include <memory>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -32,30 +34,36 @@ namespace server { ...@@ -32,30 +34,36 @@ namespace server {
namespace grpc { namespace grpc {
class GrpcBaseTask { class GrpcBaseTask {
protected: protected:
GrpcBaseTask(const std::string &task_group, bool async = false); explicit GrpcBaseTask(const std::string &task_group, bool async = false);
virtual ~GrpcBaseTask(); virtual ~GrpcBaseTask();
public: public:
Status Execute(); Status Execute();
void Done(); void Done();
Status WaitToFinish(); Status WaitToFinish();
std::string TaskGroup() const { return task_group_; } std::string TaskGroup() const {
return task_group_;
}
const Status& status() const { return status_; } const Status &status() const {
return status_;
}
bool IsAsync() const { return async_; } bool IsAsync() const {
return async_;
}
protected: protected:
virtual Status OnExecute() = 0; virtual Status OnExecute() = 0;
Status SetStatus(ErrorCode error_code, const std::string &msg); Status SetStatus(ErrorCode error_code, const std::string &msg);
protected: protected:
mutable std::mutex finish_mtx_; mutable std::mutex finish_mtx_;
std::condition_variable finish_cond_; std::condition_variable finish_cond_;
...@@ -71,7 +79,7 @@ using TaskQueuePtr = std::shared_ptr<TaskQueue>; ...@@ -71,7 +79,7 @@ using TaskQueuePtr = std::shared_ptr<TaskQueue>;
using ThreadPtr = std::shared_ptr<std::thread>; using ThreadPtr = std::shared_ptr<std::thread>;
class GrpcRequestScheduler { class GrpcRequestScheduler {
public: public:
static GrpcRequestScheduler &GetInstance() { static GrpcRequestScheduler &GetInstance() {
static GrpcRequestScheduler scheduler; static GrpcRequestScheduler scheduler;
return scheduler; return scheduler;
...@@ -85,7 +93,7 @@ public: ...@@ -85,7 +93,7 @@ public:
static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status); static void ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Status *grpc_status);
protected: protected:
GrpcRequestScheduler(); GrpcRequestScheduler();
virtual ~GrpcRequestScheduler(); virtual ~GrpcRequestScheduler();
...@@ -94,7 +102,7 @@ protected: ...@@ -94,7 +102,7 @@ protected:
Status PutTaskToQueue(const BaseTaskPtr &task_ptr); Status PutTaskToQueue(const BaseTaskPtr &task_ptr);
private: private:
mutable std::mutex queue_mtx_; mutable std::mutex queue_mtx_;
std::map<std::string, TaskQueuePtr> task_groups_; std::map<std::string, TaskQueuePtr> task_groups_;
...@@ -104,7 +112,7 @@ private: ...@@ -104,7 +112,7 @@ private:
bool stopped_; bool stopped_;
}; };
} } // namespace grpc
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
...@@ -15,22 +15,24 @@ ...@@ -15,22 +15,24 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "server/grpc_impl/GrpcRequestTask.h"
#include <string.h> #include <string.h>
#include <map>
#include <vector>
#include <string>
//#include <gperftools/profiler.h>
#include "GrpcRequestTask.h" #include "server/Server.h"
#include "server/DBWrapper.h"
#include "utils/CommonUtil.h" #include "utils/CommonUtil.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h" #include "utils/ValidationUtil.h"
#include "../DBWrapper.h"
#include "version.h"
#include "GrpcServer.h" #include "GrpcServer.h"
#include "db/Utils.h" #include "db/Utils.h"
#include "scheduler/SchedInst.h" #include "scheduler/SchedInst.h"
//#include <gperftools/profiler.h> #include "../../../version.h"
#include "server/Server.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -45,86 +47,87 @@ using DB_META = zilliz::milvus::engine::meta::Meta; ...@@ -45,86 +47,87 @@ using DB_META = zilliz::milvus::engine::meta::Meta;
using DB_DATE = zilliz::milvus::engine::meta::DateT; using DB_DATE = zilliz::milvus::engine::meta::DateT;
namespace { namespace {
engine::EngineType EngineType(int type) { engine::EngineType
static std::map<int, engine::EngineType> map_type = { EngineType(int type) {
{0, engine::EngineType::INVALID}, static std::map<int, engine::EngineType> map_type = {
{1, engine::EngineType::FAISS_IDMAP}, {0, engine::EngineType::INVALID},
{2, engine::EngineType::FAISS_IVFFLAT}, {1, engine::EngineType::FAISS_IDMAP},
{3, engine::EngineType::FAISS_IVFSQ8}, {2, engine::EngineType::FAISS_IVFFLAT},
}; {3, engine::EngineType::FAISS_IVFSQ8},
};
if (map_type.find(type) == map_type.end()) {
return engine::EngineType::INVALID; if (map_type.find(type) == map_type.end()) {
} return engine::EngineType::INVALID;
return map_type[type];
} }
int IndexType(engine::EngineType type) { return map_type[type];
static std::map<engine::EngineType, int> map_type = { }
{engine::EngineType::INVALID, 0},
{engine::EngineType::FAISS_IDMAP, 1},
{engine::EngineType::FAISS_IVFFLAT, 2},
{engine::EngineType::FAISS_IVFSQ8, 3},
};
if (map_type.find(type) == map_type.end()) { int
return 0; IndexType(engine::EngineType type) {
} static std::map<engine::EngineType, int> map_type = {
{engine::EngineType::INVALID, 0},
{engine::EngineType::FAISS_IDMAP, 1},
{engine::EngineType::FAISS_IVFFLAT, 2},
{engine::EngineType::FAISS_IVFSQ8, 3},
};
return map_type[type]; if (map_type.find(type) == map_type.end()) {
return 0;
} }
constexpr long DAY_SECONDS = 24 * 60 * 60; return map_type[type];
}
Status
ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array,
std::vector<DB_DATE> &dates) {
dates.clear();
for (auto &range : range_array) {
time_t tt_start, tt_end;
tm tm_start, tm_end;
if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) {
return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
}
if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) { constexpr int64_t DAY_SECONDS = 24 * 60 * 60;
return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
}
long days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) / Status
DAY_SECONDS; ConvertTimeRangeToDBDates(const std::vector<::milvus::grpc::Range> &range_array,
if (days == 0) { std::vector<DB_DATE> &dates) {
return Status(SERVER_INVALID_TIME_RANGE, dates.clear();
"Invalid time range: " + range.start_value() + " to " + range.end_value()); for (auto &range : range_array) {
} time_t tt_start, tt_end;
tm tm_start, tm_end;
if (!CommonUtil::TimeStrToTime(range.start_value(), tt_start, tm_start)) {
return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
}
//range: [start_day, end_day) if (!CommonUtil::TimeStrToTime(range.end_value(), tt_end, tm_end)) {
for (long i = 0; i < days; i++) { return Status(SERVER_INVALID_TIME_RANGE, "Invalid time range: " + range.start_value());
time_t tt_day = tt_start + DAY_SECONDS * i; }
tm tm_day;
CommonUtil::ConvertTime(tt_day, tm_day);
long date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 + int64_t days = (tt_end > tt_start) ? (tt_end - tt_start) / DAY_SECONDS : (tt_start - tt_end) /
tm_day.tm_mday;//according to db logic DAY_SECONDS;
dates.push_back(date); if (days == 0) {
} return Status(SERVER_INVALID_TIME_RANGE,
"Invalid time range: " + range.start_value() + " to " + range.end_value());
} }
return Status::OK(); //range: [start_day, end_day)
for (int64_t i = 0; i < days; i++) {
time_t tt_day = tt_start + DAY_SECONDS * i;
tm tm_day;
CommonUtil::ConvertTime(tt_day, tm_day);
int64_t date = tm_day.tm_year * 10000 + tm_day.tm_mon * 100 +
tm_day.tm_mday;//according to db logic
dates.push_back(date);
}
} }
return Status::OK();
} }
} // namespace
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema *schema) CreateTableTask::CreateTableTask(const ::milvus::grpc::TableSchema *schema)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
schema_(schema) { schema_(schema) {
} }
BaseTaskPtr BaseTaskPtr
CreateTableTask::Create(const ::milvus::grpc::TableSchema *schema) { CreateTableTask::Create(const ::milvus::grpc::TableSchema *schema) {
if(schema == nullptr) { if (schema == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!"; SERVER_LOG_ERROR << "grpc input is null!";
return nullptr; return nullptr;
} }
...@@ -168,12 +171,11 @@ CreateTableTask::OnExecute() { ...@@ -168,12 +171,11 @@ CreateTableTask::OnExecute() {
status = DBWrapper::DB()->CreateTable(table_info); status = DBWrapper::DB()->CreateTable(table_info);
if (!status.ok()) { if (!status.ok()) {
//table could exist //table could exist
if(status.code() == DB_ALREADY_EXIST) { if (status.code() == DB_ALREADY_EXIST) {
return Status(SERVER_INVALID_TABLE_NAME, status.message()); return Status(SERVER_INVALID_TABLE_NAME, status.message());
} }
return status; return status;
} }
} catch (std::exception &ex) { } catch (std::exception &ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what()); return Status(SERVER_UNEXPECTED_ERROR, ex.what());
} }
...@@ -185,9 +187,9 @@ CreateTableTask::OnExecute() { ...@@ -185,9 +187,9 @@ CreateTableTask::OnExecute() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema *schema) DescribeTableTask::DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema *schema)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name), table_name_(table_name),
schema_(schema) { schema_(schema) {
} }
BaseTaskPtr BaseTaskPtr
...@@ -218,7 +220,6 @@ DescribeTableTask::OnExecute() { ...@@ -218,7 +220,6 @@ DescribeTableTask::OnExecute() {
schema_->set_dimension(table_info.dimension_); schema_->set_dimension(table_info.dimension_);
schema_->set_index_file_size(table_info.index_file_size_); schema_->set_index_file_size(table_info.index_file_size_);
schema_->set_metric_type(table_info.metric_type_); schema_->set_metric_type(table_info.metric_type_);
} catch (std::exception &ex) { } catch (std::exception &ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what()); return Status(SERVER_UNEXPECTED_ERROR, ex.what());
} }
...@@ -230,13 +231,13 @@ DescribeTableTask::OnExecute() { ...@@ -230,13 +231,13 @@ DescribeTableTask::OnExecute() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
CreateIndexTask::CreateIndexTask(const ::milvus::grpc::IndexParam *index_param) CreateIndexTask::CreateIndexTask(const ::milvus::grpc::IndexParam *index_param)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
index_param_(index_param) { index_param_(index_param) {
} }
BaseTaskPtr BaseTaskPtr
CreateIndexTask::Create(const ::milvus::grpc::IndexParam *index_param) { CreateIndexTask::Create(const ::milvus::grpc::IndexParam *index_param) {
if(index_param == nullptr) { if (index_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!"; SERVER_LOG_ERROR << "grpc input is null!";
return nullptr; return nullptr;
} }
...@@ -295,10 +296,9 @@ CreateIndexTask::OnExecute() { ...@@ -295,10 +296,9 @@ CreateIndexTask::OnExecute() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
HasTableTask::HasTableTask(const std::string &table_name, bool &has_table) HasTableTask::HasTableTask(const std::string &table_name, bool &has_table)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name), table_name_(table_name),
has_table_(has_table) { has_table_(has_table) {
} }
BaseTaskPtr BaseTaskPtr
...@@ -333,9 +333,8 @@ HasTableTask::OnExecute() { ...@@ -333,9 +333,8 @@ HasTableTask::OnExecute() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DropTableTask::DropTableTask(const std::string &table_name) DropTableTask::DropTableTask(const std::string &table_name)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name) { table_name_(table_name) {
} }
BaseTaskPtr BaseTaskPtr
...@@ -385,9 +384,8 @@ DropTableTask::OnExecute() { ...@@ -385,9 +384,8 @@ DropTableTask::OnExecute() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ShowTablesTask::ShowTablesTask(::milvus::grpc::TableNameList *table_name_list) ShowTablesTask::ShowTablesTask(::milvus::grpc::TableNameList *table_name_list)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
table_name_list_(table_name_list) { table_name_list_(table_name_list) {
} }
BaseTaskPtr BaseTaskPtr
...@@ -419,8 +417,8 @@ InsertTask::InsertTask(const ::milvus::grpc::InsertParam *insert_param, ...@@ -419,8 +417,8 @@ InsertTask::InsertTask(const ::milvus::grpc::InsertParam *insert_param,
BaseTaskPtr BaseTaskPtr
InsertTask::Create(const ::milvus::grpc::InsertParam *insert_param, InsertTask::Create(const ::milvus::grpc::InsertParam *insert_param,
::milvus::grpc::VectorIds *record_ids) { ::milvus::grpc::VectorIds *record_ids) {
if(insert_param == nullptr) { if (insert_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!"; SERVER_LOG_ERROR << "grpc input is null!";
return nullptr; return nullptr;
} }
...@@ -444,7 +442,7 @@ InsertTask::OnExecute() { ...@@ -444,7 +442,7 @@ InsertTask::OnExecute() {
if (!record_ids_->vector_id_array().empty()) { if (!record_ids_->vector_id_array().empty()) {
if (record_ids_->vector_id_array().size() != insert_param_->row_record_array_size()) { if (record_ids_->vector_id_array().size() != insert_param_->row_record_array_size()) {
return Status(SERVER_ILLEGAL_VECTOR_ID, return Status(SERVER_ILLEGAL_VECTOR_ID,
"Size of vector ids is not equal to row record array size"); "Size of vector ids is not equal to row record array size");
} }
} }
...@@ -455,7 +453,7 @@ InsertTask::OnExecute() { ...@@ -455,7 +453,7 @@ InsertTask::OnExecute() {
if (!status.ok()) { if (!status.ok()) {
if (status.code() == DB_NOT_FOUND) { if (status.code() == DB_NOT_FOUND) {
return Status(SERVER_TABLE_NOT_EXIST, return Status(SERVER_TABLE_NOT_EXIST,
"Table " + insert_param_->table_name() + " not exists"); "Table " + insert_param_->table_name() + " not exists");
} else { } else {
return status; return status;
} }
...@@ -465,19 +463,22 @@ InsertTask::OnExecute() { ...@@ -465,19 +463,22 @@ InsertTask::OnExecute() {
//all user provide id, or all internal id //all user provide id, or all internal id
bool user_provide_ids = !insert_param_->row_id_array().empty(); bool user_provide_ids = !insert_param_->row_id_array().empty();
//user already provided id before, all insert action require user id //user already provided id before, all insert action require user id
if((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) { if ((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) {
return Status(SERVER_ILLEGAL_VECTOR_ID, "Table vector ids are user defined, please provide id for this batch"); return Status(SERVER_ILLEGAL_VECTOR_ID,
"Table vector ids are user defined, please provide id for this batch");
} }
//user didn't provided id before, no need to provide user id //user didn't provided id before, no need to provide user id
if((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) { if ((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) {
return Status(SERVER_ILLEGAL_VECTOR_ID, "Table vector ids are auto generated, no need to provide id for this batch"); return Status(SERVER_ILLEGAL_VECTOR_ID,
"Table vector ids are auto generated, no need to provide id for this batch");
} }
rc.RecordSection("check validation"); rc.RecordSection("check validation");
#ifdef MILVUS_ENABLE_PROFILING #ifdef MILVUS_ENABLE_PROFILING
std::string fname = "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size()) + ".profiling"; std::string fname = "/tmp/insert_" + std::to_string(this->insert_param_->row_record_array_size())
+ ".profiling";
ProfilerStart(fname.c_str()); ProfilerStart(fname.c_str());
#endif #endif
...@@ -493,8 +494,8 @@ InsertTask::OnExecute() { ...@@ -493,8 +494,8 @@ InsertTask::OnExecute() {
if (vec_dim != table_info.dimension_) { if (vec_dim != table_info.dimension_) {
ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION; ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
std::string error_msg = "Invalid row record dimension: " + std::to_string(vec_dim) std::string error_msg = "Invalid row record dimension: " + std::to_string(vec_dim)
+ " vs. table dimension:" + + " vs. table dimension:" +
std::to_string(table_info.dimension_); std::to_string(table_info.dimension_);
return Status(error_code, error_msg); return Status(error_code, error_msg);
} }
memcpy(&vec_f[i * table_info.dimension_], memcpy(&vec_f[i * table_info.dimension_],
...@@ -507,10 +508,10 @@ InsertTask::OnExecute() { ...@@ -507,10 +508,10 @@ InsertTask::OnExecute() {
//step 5: insert vectors //step 5: insert vectors
auto vec_count = (uint64_t) insert_param_->row_record_array_size(); auto vec_count = (uint64_t) insert_param_->row_record_array_size();
std::vector<int64_t> vec_ids(insert_param_->row_id_array_size(), 0); std::vector<int64_t> vec_ids(insert_param_->row_id_array_size(), 0);
if(!insert_param_->row_id_array().empty()) { if (!insert_param_->row_id_array().empty()) {
const int64_t* src_data = insert_param_->row_id_array().data(); const int64_t *src_data = insert_param_->row_id_array().data();
int64_t* target_data = vec_ids.data(); int64_t *target_data = vec_ids.data();
memcpy(target_data, src_data, (size_t)(sizeof(int64_t)*insert_param_->row_id_array_size())); memcpy(target_data, src_data, (size_t) (sizeof(int64_t) * insert_param_->row_id_array_size()));
} }
status = DBWrapper::DB()->InsertVectors(insert_param_->table_name(), vec_count, vec_f.data(), vec_ids); status = DBWrapper::DB()->InsertVectors(insert_param_->table_name(), vec_count, vec_f.data(), vec_ids);
...@@ -525,13 +526,13 @@ InsertTask::OnExecute() { ...@@ -525,13 +526,13 @@ InsertTask::OnExecute() {
auto ids_size = record_ids_->vector_id_array_size(); auto ids_size = record_ids_->vector_id_array_size();
if (ids_size != vec_count) { if (ids_size != vec_count) {
std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return " std::string msg = "Add " + std::to_string(vec_count) + " vectors but only return "
+ std::to_string(ids_size) + " id"; + std::to_string(ids_size) + " id";
return Status(SERVER_ILLEGAL_VECTOR_ID, msg); return Status(SERVER_ILLEGAL_VECTOR_ID, msg);
} }
//step 6: update table flag //step 6: update table flag
user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID user_provide_ids ? table_info.flag_ |= engine::meta::FLAG_MASK_HAS_USERID
: table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID; : table_info.flag_ |= engine::meta::FLAG_MASK_NO_USERID;
status = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_); status = DBWrapper::DB()->UpdateTableFlag(insert_param_->table_name(), table_info.flag_);
#ifdef MILVUS_ENABLE_PROFILING #ifdef MILVUS_ENABLE_PROFILING
...@@ -540,7 +541,6 @@ InsertTask::OnExecute() { ...@@ -540,7 +541,6 @@ InsertTask::OnExecute() {
rc.RecordSection("add vectors to engine"); rc.RecordSection("add vectors to engine");
rc.ElapseFromBegin("total cost"); rc.ElapseFromBegin("total cost");
} catch (std::exception &ex) { } catch (std::exception &ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what()); return Status(SERVER_UNEXPECTED_ERROR, ex.what());
} }
...@@ -556,19 +556,17 @@ SearchTask::SearchTask(const ::milvus::grpc::SearchParam *search_vector_infos, ...@@ -556,19 +556,17 @@ SearchTask::SearchTask(const ::milvus::grpc::SearchParam *search_vector_infos,
search_param_(search_vector_infos), search_param_(search_vector_infos),
file_id_array_(file_id_array), file_id_array_(file_id_array),
topk_result_list(response) { topk_result_list(response) {
} }
BaseTaskPtr BaseTaskPtr
SearchTask::Create(const ::milvus::grpc::SearchParam *search_vector_infos, SearchTask::Create(const ::milvus::grpc::SearchParam *search_vector_infos,
const std::vector<std::string> &file_id_array, const std::vector<std::string> &file_id_array,
::milvus::grpc::TopKQueryResultList *response) { ::milvus::grpc::TopKQueryResultList *response) {
if(search_vector_infos == nullptr) { if (search_vector_infos == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!"; SERVER_LOG_ERROR << "grpc input is null!";
return nullptr; return nullptr;
} }
return std::shared_ptr<GrpcBaseTask>(new SearchTask(search_vector_infos, file_id_array, return std::shared_ptr<GrpcBaseTask>(new SearchTask(search_vector_infos, file_id_array, response));
response));
} }
Status Status
...@@ -640,7 +638,7 @@ SearchTask::OnExecute() { ...@@ -640,7 +638,7 @@ SearchTask::OnExecute() {
if (query_vec_dim != table_info.dimension_) { if (query_vec_dim != table_info.dimension_) {
ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION; ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
std::string error_msg = "Invalid row record dimension: " + std::to_string(query_vec_dim) std::string error_msg = "Invalid row record dimension: " + std::to_string(query_vec_dim)
+ " vs. table dimension:" + std::to_string(table_info.dimension_); + " vs. table dimension:" + std::to_string(table_info.dimension_);
return Status(error_code, error_msg); return Status(error_code, error_msg);
} }
...@@ -655,16 +653,17 @@ SearchTask::OnExecute() { ...@@ -655,16 +653,17 @@ SearchTask::OnExecute() {
auto record_count = (uint64_t) search_param_->query_record_array().size(); auto record_count = (uint64_t) search_param_->query_record_array().size();
#ifdef MILVUS_ENABLE_PROFILING #ifdef MILVUS_ENABLE_PROFILING
std::string fname = "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size()) + ".profiling"; std::string fname = "/tmp/search_nq_" + std::to_string(this->search_param_->query_record_array_size())
+ ".profiling";
ProfilerStart(fname.c_str()); ProfilerStart(fname.c_str());
#endif #endif
if (file_id_array_.empty()) { if (file_id_array_.empty()) {
status = DBWrapper::DB()->Query(table_name_, (size_t) top_k, record_count, nprobe, status = DBWrapper::DB()->Query(table_name_, (size_t) top_k, record_count, nprobe,
vec_f.data(), dates, results); vec_f.data(), dates, results);
} else { } else {
status = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k, status = DBWrapper::DB()->Query(table_name_, file_id_array_, (size_t) top_k,
record_count, nprobe, vec_f.data(), dates, results); record_count, nprobe, vec_f.data(), dates, results);
} }
#ifdef MILVUS_ENABLE_PROFILING #ifdef MILVUS_ENABLE_PROFILING
...@@ -682,7 +681,7 @@ SearchTask::OnExecute() { ...@@ -682,7 +681,7 @@ SearchTask::OnExecute() {
if (results.size() != record_count) { if (results.size() != record_count) {
std::string msg = "Search " + std::to_string(record_count) + " vectors but only return " std::string msg = "Search " + std::to_string(record_count) + " vectors but only return "
+ std::to_string(results.size()) + " results"; + std::to_string(results.size()) + " results";
return Status(SERVER_ILLEGAL_SEARCH_RESULT, msg); return Status(SERVER_ILLEGAL_SEARCH_RESULT, msg);
} }
...@@ -699,8 +698,6 @@ SearchTask::OnExecute() { ...@@ -699,8 +698,6 @@ SearchTask::OnExecute() {
//step 8: print time cost percent //step 8: print time cost percent
rc.RecordSection("construct result and send"); rc.RecordSection("construct result and send");
rc.ElapseFromBegin("totally cost"); rc.ElapseFromBegin("totally cost");
} catch (std::exception &ex) { } catch (std::exception &ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what()); return Status(SERVER_UNEXPECTED_ERROR, ex.what());
} }
...@@ -710,10 +707,9 @@ SearchTask::OnExecute() { ...@@ -710,10 +707,9 @@ SearchTask::OnExecute() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
CountTableTask::CountTableTask(const std::string &table_name, int64_t &row_count) CountTableTask::CountTableTask(const std::string &table_name, int64_t &row_count)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name), table_name_(table_name),
row_count_(row_count) { row_count_(row_count) {
} }
BaseTaskPtr BaseTaskPtr
...@@ -742,7 +738,6 @@ CountTableTask::OnExecute() { ...@@ -742,7 +738,6 @@ CountTableTask::OnExecute() {
row_count_ = (int64_t) row_count; row_count_ = (int64_t) row_count;
rc.ElapseFromBegin("total cost"); rc.ElapseFromBegin("total cost");
} catch (std::exception &ex) { } catch (std::exception &ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what()); return Status(SERVER_UNEXPECTED_ERROR, ex.what());
} }
...@@ -752,10 +747,9 @@ CountTableTask::OnExecute() { ...@@ -752,10 +747,9 @@ CountTableTask::OnExecute() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
CmdTask::CmdTask(const std::string &cmd, std::string &result) CmdTask::CmdTask(const std::string &cmd, std::string &result)
: GrpcBaseTask(PING_TASK_GROUP), : GrpcBaseTask(PING_TASK_GROUP),
cmd_(cmd), cmd_(cmd),
result_(result) { result_(result) {
} }
BaseTaskPtr BaseTaskPtr
...@@ -769,8 +763,7 @@ CmdTask::OnExecute() { ...@@ -769,8 +763,7 @@ CmdTask::OnExecute() {
result_ = MILVUS_VERSION; result_ = MILVUS_VERSION;
} else if (cmd_ == "tasktable") { } else if (cmd_ == "tasktable") {
result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables(); result_ = scheduler::ResMgrInst::GetInstance()->DumpTaskTables();
} } else {
else {
result_ = "OK"; result_ = "OK";
} }
...@@ -779,16 +772,17 @@ CmdTask::OnExecute() { ...@@ -779,16 +772,17 @@ CmdTask::OnExecute() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param) DeleteByRangeTask::DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
delete_by_range_param_(delete_by_range_param){ delete_by_range_param_(delete_by_range_param) {
} }
BaseTaskPtr BaseTaskPtr
DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param) { DeleteByRangeTask::Create(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param) {
if(delete_by_range_param == nullptr) { if (delete_by_range_param == nullptr) {
SERVER_LOG_ERROR << "grpc input is null!"; SERVER_LOG_ERROR << "grpc input is null!";
return nullptr; return nullptr;
} }
return std::shared_ptr<GrpcBaseTask>(new DeleteByRangeTask(delete_by_range_param)); return std::shared_ptr<GrpcBaseTask>(new DeleteByRangeTask(delete_by_range_param));
} }
...@@ -838,23 +832,21 @@ DeleteByRangeTask::OnExecute() { ...@@ -838,23 +832,21 @@ DeleteByRangeTask::OnExecute() {
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
} catch (std::exception &ex) { } catch (std::exception &ex) {
return Status(SERVER_UNEXPECTED_ERROR, ex.what()); return Status(SERVER_UNEXPECTED_ERROR, ex.what());
} }
return Status::OK(); return Status::OK();
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
PreloadTableTask::PreloadTableTask(const std::string &table_name) PreloadTableTask::PreloadTableTask(const std::string &table_name)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name) { table_name_(table_name) {
} }
BaseTaskPtr BaseTaskPtr
PreloadTableTask::Create(const std::string &table_name){ PreloadTableTask::Create(const std::string &table_name) {
return std::shared_ptr<GrpcBaseTask>(new PreloadTableTask(table_name)); return std::shared_ptr<GrpcBaseTask>(new PreloadTableTask(table_name));
} }
...@@ -889,12 +881,11 @@ DescribeIndexTask::DescribeIndexTask(const std::string &table_name, ...@@ -889,12 +881,11 @@ DescribeIndexTask::DescribeIndexTask(const std::string &table_name,
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name), table_name_(table_name),
index_param_(index_param) { index_param_(index_param) {
} }
BaseTaskPtr BaseTaskPtr
DescribeIndexTask::Create(const std::string &table_name, DescribeIndexTask::Create(const std::string &table_name,
::milvus::grpc::IndexParam *index_param){ ::milvus::grpc::IndexParam *index_param) {
return std::shared_ptr<GrpcBaseTask>(new DescribeIndexTask(table_name, index_param)); return std::shared_ptr<GrpcBaseTask>(new DescribeIndexTask(table_name, index_param));
} }
...@@ -932,11 +923,10 @@ DescribeIndexTask::OnExecute() { ...@@ -932,11 +923,10 @@ DescribeIndexTask::OnExecute() {
DropIndexTask::DropIndexTask(const std::string &table_name) DropIndexTask::DropIndexTask(const std::string &table_name)
: GrpcBaseTask(DDL_DML_TASK_GROUP), : GrpcBaseTask(DDL_DML_TASK_GROUP),
table_name_(table_name) { table_name_(table_name) {
} }
BaseTaskPtr BaseTaskPtr
DropIndexTask::Create(const std::string &table_name){ DropIndexTask::Create(const std::string &table_name) {
return std::shared_ptr<GrpcBaseTask>(new DropIndexTask(table_name)); return std::shared_ptr<GrpcBaseTask>(new DropIndexTask(table_name));
} }
...@@ -975,7 +965,7 @@ DropIndexTask::OnExecute() { ...@@ -975,7 +965,7 @@ DropIndexTask::OnExecute() {
return Status::OK(); return Status::OK();
} }
} } // namespace grpc
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
...@@ -16,15 +16,18 @@ ...@@ -16,15 +16,18 @@
// under the License. // under the License.
#pragma once #pragma once
#include "GrpcRequestScheduler.h"
#include "server/grpc_impl/GrpcRequestScheduler.h"
#include "utils/Status.h" #include "utils/Status.h"
#include "db/Types.h" #include "db/Types.h"
#include "milvus.grpc.pb.h" #include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "status.pb.h" #include "grpc/gen-status/status.pb.h"
#include <condition_variable> #include <condition_variable>
#include <memory> #include <memory>
#include <vector>
#include <string>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -33,138 +36,130 @@ namespace grpc { ...@@ -33,138 +36,130 @@ namespace grpc {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CreateTableTask : public GrpcBaseTask { class CreateTableTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const ::milvus::grpc::TableSchema *schema); Create(const ::milvus::grpc::TableSchema *schema);
protected: protected:
explicit explicit CreateTableTask(const ::milvus::grpc::TableSchema *request);
CreateTableTask(const ::milvus::grpc::TableSchema *request);
Status Status
OnExecute() override; OnExecute() override;
private: private:
const ::milvus::grpc::TableSchema *schema_; const ::milvus::grpc::TableSchema *schema_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class HasTableTask : public GrpcBaseTask { class HasTableTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const std::string &table_name, bool &has_table); Create(const std::string &table_name, bool &has_table);
protected: protected:
HasTableTask(const std::string &request, bool &has_table); HasTableTask(const std::string &request, bool &has_table);
Status Status
OnExecute() override; OnExecute() override;
private:
private:
std::string table_name_; std::string table_name_;
bool &has_table_; bool &has_table_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DescribeTableTask : public GrpcBaseTask { class DescribeTableTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const std::string &table_name, ::milvus::grpc::TableSchema *schema); Create(const std::string &table_name, ::milvus::grpc::TableSchema *schema);
protected: protected:
DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema *schema); DescribeTableTask(const std::string &table_name, ::milvus::grpc::TableSchema *schema);
Status Status
OnExecute() override; OnExecute() override;
private:
private:
std::string table_name_; std::string table_name_;
::milvus::grpc::TableSchema *schema_; ::milvus::grpc::TableSchema *schema_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DropTableTask : public GrpcBaseTask { class DropTableTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const std::string &table_name); Create(const std::string &table_name);
protected: protected:
explicit explicit DropTableTask(const std::string &table_name);
DropTableTask(const std::string &table_name);
Status Status
OnExecute() override; OnExecute() override;
private:
private:
std::string table_name_; std::string table_name_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CreateIndexTask : public GrpcBaseTask { class CreateIndexTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const ::milvus::grpc::IndexParam *index_Param); Create(const ::milvus::grpc::IndexParam *index_Param);
protected: protected:
explicit explicit CreateIndexTask(const ::milvus::grpc::IndexParam *index_Param);
CreateIndexTask(const ::milvus::grpc::IndexParam *index_Param);
Status Status
OnExecute() override; OnExecute() override;
private:
private:
const ::milvus::grpc::IndexParam *index_param_; const ::milvus::grpc::IndexParam *index_param_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class ShowTablesTask : public GrpcBaseTask { class ShowTablesTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(::milvus::grpc::TableNameList *table_name_list); Create(::milvus::grpc::TableNameList *table_name_list);
protected: protected:
explicit explicit ShowTablesTask(::milvus::grpc::TableNameList *table_name_list);
ShowTablesTask(::milvus::grpc::TableNameList *table_name_list);
Status Status
OnExecute() override; OnExecute() override;
private: private:
::milvus::grpc::TableNameList *table_name_list_; ::milvus::grpc::TableNameList *table_name_list_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class InsertTask : public GrpcBaseTask { class InsertTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const ::milvus::grpc::InsertParam *insert_Param, Create(const ::milvus::grpc::InsertParam *insert_Param,
::milvus::grpc::VectorIds *record_ids_); ::milvus::grpc::VectorIds *record_ids_);
protected: protected:
InsertTask(const ::milvus::grpc::InsertParam *insert_Param, InsertTask(const ::milvus::grpc::InsertParam *insert_Param,
::milvus::grpc::VectorIds *record_ids_); ::milvus::grpc::VectorIds *record_ids_);
Status Status
OnExecute() override; OnExecute() override;
private: private:
const ::milvus::grpc::InsertParam *insert_param_; const ::milvus::grpc::InsertParam *insert_param_;
::milvus::grpc::VectorIds *record_ids_; ::milvus::grpc::VectorIds *record_ids_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class SearchTask : public GrpcBaseTask { class SearchTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const ::milvus::grpc::SearchParam *search_param, Create(const ::milvus::grpc::SearchParam *search_param,
const std::vector<std::string> &file_id_array, const std::vector<std::string> &file_id_array,
::milvus::grpc::TopKQueryResultList *response); ::milvus::grpc::TopKQueryResultList *response);
protected: protected:
SearchTask(const ::milvus::grpc::SearchParam *search_param, SearchTask(const ::milvus::grpc::SearchParam *search_param,
const std::vector<std::string> &file_id_array, const std::vector<std::string> &file_id_array,
::milvus::grpc::TopKQueryResultList *response); ::milvus::grpc::TopKQueryResultList *response);
...@@ -172,7 +167,7 @@ protected: ...@@ -172,7 +167,7 @@ protected:
Status Status
OnExecute() override; OnExecute() override;
private: private:
const ::milvus::grpc::SearchParam *search_param_; const ::milvus::grpc::SearchParam *search_param_;
std::vector<std::string> file_id_array_; std::vector<std::string> file_id_array_;
::milvus::grpc::TopKQueryResultList *topk_result_list; ::milvus::grpc::TopKQueryResultList *topk_result_list;
...@@ -180,107 +175,106 @@ private: ...@@ -180,107 +175,106 @@ private:
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CountTableTask : public GrpcBaseTask { class CountTableTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const std::string &table_name, int64_t &row_count); Create(const std::string &table_name, int64_t &row_count);
protected: protected:
CountTableTask(const std::string &table_name, int64_t &row_count); CountTableTask(const std::string &table_name, int64_t &row_count);
Status Status
OnExecute() override; OnExecute() override;
private: private:
std::string table_name_; std::string table_name_;
int64_t &row_count_; int64_t &row_count_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class CmdTask : public GrpcBaseTask { class CmdTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const std::string &cmd, std::string &result); Create(const std::string &cmd, std::string &result);
protected: protected:
CmdTask(const std::string &cmd, std::string &result); CmdTask(const std::string &cmd, std::string &result);
Status Status
OnExecute() override; OnExecute() override;
private: private:
std::string cmd_; std::string cmd_;
std::string &result_; std::string &result_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteByRangeTask : public GrpcBaseTask { class DeleteByRangeTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param); Create(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param);
protected: protected:
DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param); explicit DeleteByRangeTask(const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param);
Status Status
OnExecute() override; OnExecute() override;
private: private:
const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param_; const ::milvus::grpc::DeleteByRangeParam *delete_by_range_param_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class PreloadTableTask : public GrpcBaseTask { class PreloadTableTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const std::string &table_name); Create(const std::string &table_name);
protected: protected:
PreloadTableTask(const std::string &table_name); explicit PreloadTableTask(const std::string &table_name);
Status Status
OnExecute() override; OnExecute() override;
private: private:
std::string table_name_; std::string table_name_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DescribeIndexTask : public GrpcBaseTask { class DescribeIndexTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const std::string &table_name, Create(const std::string &table_name,
::milvus::grpc::IndexParam *index_param); ::milvus::grpc::IndexParam *index_param);
protected: protected:
DescribeIndexTask(const std::string &table_name, DescribeIndexTask(const std::string &table_name,
::milvus::grpc::IndexParam *index_param); ::milvus::grpc::IndexParam *index_param);
Status Status
OnExecute() override; OnExecute() override;
private: private:
std::string table_name_; std::string table_name_;
::milvus::grpc::IndexParam *index_param_; ::milvus::grpc::IndexParam *index_param_;
}; };
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DropIndexTask : public GrpcBaseTask { class DropIndexTask : public GrpcBaseTask {
public: public:
static BaseTaskPtr static BaseTaskPtr
Create(const std::string &table_name); Create(const std::string &table_name);
protected: protected:
DropIndexTask(const std::string &table_name); explicit DropIndexTask(const std::string &table_name);
Status Status
OnExecute() override; OnExecute() override;
private: private:
std::string table_name_; std::string table_name_;
}; };
} } // namespace grpc
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
\ No newline at end of file
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "milvus.grpc.pb.h" #include "grpc/gen-milvus/milvus.grpc.pb.h"
#include "GrpcServer.h" #include "server/grpc_impl/GrpcServer.h"
#include "server/Config.h" #include "server/Config.h"
#include "server/DBWrapper.h" #include "server/DBWrapper.h"
#include "utils/Log.h" #include "utils/Log.h"
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include <random> #include <random>
#include <string> #include <string>
#include <thread> #include <thread>
#include <vector>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpcpp/channel.h> #include <grpcpp/channel.h>
...@@ -35,14 +36,12 @@ ...@@ -35,14 +36,12 @@
#include <grpcpp/create_channel.h> #include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h> #include <grpcpp/security/credentials.h>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
namespace grpc { namespace grpc {
constexpr int64_t MESSAGE_SIZE = -1;
constexpr long MESSAGE_SIZE = -1;
//this class is to check port occupation during server start //this class is to check port occupation during server start
class NoReusePortOption : public ::grpc::ServerBuilderOption { class NoReusePortOption : public ::grpc::ServerBuilderOption {
...@@ -52,11 +51,9 @@ class NoReusePortOption : public ::grpc::ServerBuilderOption { ...@@ -52,11 +51,9 @@ class NoReusePortOption : public ::grpc::ServerBuilderOption {
} }
void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>> *plugins) override { void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>> *plugins) override {
} }
}; };
void void
GrpcServer::Start() { GrpcServer::Start() {
thread_ptr_ = std::make_shared<std::thread>(&GrpcServer::StartService, this); thread_ptr_ = std::make_shared<std::thread>(&GrpcServer::StartService, this);
...@@ -117,7 +114,7 @@ GrpcServer::StopService() { ...@@ -117,7 +114,7 @@ GrpcServer::StopService() {
return Status::OK(); return Status::OK();
} }
} } // namespace grpc
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
\ No newline at end of file
...@@ -19,12 +19,12 @@ ...@@ -19,12 +19,12 @@
#include "utils/Status.h" #include "utils/Status.h"
#include <memory>
#include <cstdint> #include <cstdint>
#include <string> #include <string>
#include <thread> #include <thread>
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace server { namespace server {
...@@ -52,7 +52,7 @@ class GrpcServer { ...@@ -52,7 +52,7 @@ class GrpcServer {
std::shared_ptr<std::thread> thread_ptr_; std::shared_ptr<std::thread> thread_ptr_;
}; };
} } // namespace grpc
} } // namespace server
} } // namespace milvus
} } // namespace zilliz
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册