提交 86c72ee1 编写于 作者: Y Yu Kun

Merge remote-tracking branch 'upstream/branch-0.4.0' into branch-0.4.0


Former-commit-id: a85094a6af8b6af363450c61a9a9f60ca4941a8a
......@@ -5,6 +5,8 @@ Please mark all change in change log and use the ticket from JIRA.
# Milvus 0.4.0 (2019-07-28)
## Bug
- MS-119 - The problem of combining the log files
- MS-121 - The problem that user can't change the time zone
- MS-411 - Fix metric unittest linking error
- MS-412 - Fix gpu cache logical error
- MS-416 - ExecutionEngineImpl::GpuCache has not return value cause crash
......@@ -31,7 +33,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-453 - GPU search error when nprobe set more than 1024
- MS-474 - Create index hang if use branch-0.3.1 server config
- MS-510 - unittest out of memory and crashed
- MS-119 - The problem of combining the log files
- MS-507 - Dataset 10m-512, index type sq8,performance in-normal when set CPU_CACHE to 16 or 64
## Improvement
- MS-327 - Clean code for milvus
......@@ -114,6 +116,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-531 - Disable next version code
- MS-533 - Update resource_test to cover dump function
- MS-523 - Config file validation
- MS-539 - Remove old task code
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -3,11 +3,11 @@ server_config:
port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534
gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1
mode: single # milvus deployment type: single, cluster, read_only
time_zone: UTC+8 # Use the UTC-x or UTC+x to specify a time zone. eg. UTC+8 for China Standard Time
db_config:
db_path: @MILVUS_DB_PATH@ # milvus data storage path
db_slave_path: # secondry data storage path, split by semicolon
parallel_reduce: false # use multi-threads to reduce topk result
# URI format: dialect://username:password@host:port/database
# All parts except dialect are optional, but you MUST include the delimiters
......@@ -39,7 +39,6 @@ cache_config:
engine_config:
use_blas_threshold: 20
omp_thread_num: 0 # how many compute threads be used by engine, 0 means use all cpu core to compute
resource_config:
# resource list, length: 0~N
......
#pragma once
#include <memory>
#include "preprocessor.h"
namespace zilliz {
namespace knowhere {
class NormalizePreprocessor : public Preprocessor {
public:
DatasetPtr
Preprocess(const DatasetPtr &input) override;
private:
void
Normalize(float *arr, int64_t dimension);
};
using NormalizePreprocessorPtr = std::shared_ptr<NormalizePreprocessor>;
} // namespace knowhere
} // namespace zilliz
//#pragma once
//
//#include <memory>
//#include "preprocessor.h"
//
//
//namespace zilliz {
//namespace knowhere {
//
//class NormalizePreprocessor : public Preprocessor {
// public:
// DatasetPtr
// Preprocess(const DatasetPtr &input) override;
//
// private:
//
// void
// Normalize(float *arr, int64_t dimension);
//};
//
//
//using NormalizePreprocessorPtr = std::shared_ptr<NormalizePreprocessor>;
//
//
//} // namespace knowhere
//} // namespace zilliz
......@@ -27,8 +27,8 @@ class CPUKDTRNG : public VectorIndex {
Load(const BinarySet &index_array) override;
public:
PreprocessorPtr
BuildPreprocessor(const DatasetPtr &dataset, const Config &config) override;
//PreprocessorPtr
//BuildPreprocessor(const DatasetPtr &dataset, const Config &config) override;
int64_t Count() override;
int64_t Dimension() override;
......
......@@ -32,6 +32,12 @@ class IDMAP : public VectorIndex, public BasicIndex {
virtual int64_t *GetRawIds();
protected:
virtual void search_impl(int64_t n,
const float *data,
int64_t k,
float *distances,
int64_t *labels,
const Config &cfg);
std::mutex mutex_;
};
......@@ -49,6 +55,12 @@ class GPUIDMAP : public IDMAP, public GPUIndex {
VectorIndexPtr CopyGpuToGpu(const int64_t &device_id, const Config &config) override;
protected:
void search_impl(int64_t n,
const float *data,
int64_t k,
float *distances,
int64_t *labels,
const Config &cfg) override;
BinarySet SerializeImpl() override;
void LoadImpl(const BinarySet &index_binary) override;
};
......
......@@ -14,11 +14,23 @@ namespace knowhere {
KnowhereException::KnowhereException(const std::string &msg):msg(msg) {}
KnowhereException::KnowhereException(const std::string &m, const char *funcName, const char *file, int line) {
#ifdef DEBUG
int size = snprintf(nullptr, 0, "Error in %s at %s:%d: %s",
funcName, file, line, m.c_str());
msg.resize(size + 1);
snprintf(&msg[0], msg.size(), "Error in %s at %s:%d: %s",
funcName, file, line, m.c_str());
#else
std::string file_path(file);
auto const pos = file_path.find_last_of('/');
auto filename = file_path.substr(pos+1).c_str();
int size = snprintf(nullptr, 0, "Error in %s at %s:%d: %s",
funcName, filename, line, m.c_str());
msg.resize(size + 1);
snprintf(&msg[0], msg.size(), "Error in %s at %s:%d: %s",
funcName, filename, line, m.c_str());
#endif
}
const char *KnowhereException::what() const noexcept {
......
#include "knowhere/index/vector_index/definitions.h"
#include "knowhere/common/config.h"
#include "knowhere/index/preprocessor/normalize.h"
namespace zilliz {
namespace knowhere {
DatasetPtr
NormalizePreprocessor::Preprocess(const DatasetPtr &dataset) {
// TODO: wrap dataset->tensor
auto tensor = dataset->tensor()[0];
auto p_data = (float *)tensor->raw_mutable_data();
auto dimension = tensor->shape()[1];
auto rows = tensor->shape()[0];
#pragma omp parallel for
for (auto i = 0; i < rows; ++i) {
Normalize(&(p_data[i * dimension]), dimension);
}
}
void
NormalizePreprocessor::Normalize(float *arr, int64_t dimension) {
double vector_length = 0;
for (auto j = 0; j < dimension; j++) {
double val = arr[j];
vector_length += val * val;
}
vector_length = std::sqrt(vector_length);
if (vector_length < 1e-6) {
auto val = (float) (1.0 / std::sqrt((double) dimension));
for (int j = 0; j < dimension; j++) arr[j] = val;
} else {
for (int j = 0; j < dimension; j++) arr[j] = (float) (arr[j] / vector_length);
}
}
} // namespace knowhere
} // namespace zilliz
//
//#include "knowhere/index/vector_index/definitions.h"
//#include "knowhere/common/config.h"
//#include "knowhere/index/preprocessor/normalize.h"
//
//
//namespace zilliz {
//namespace knowhere {
//
//DatasetPtr
//NormalizePreprocessor::Preprocess(const DatasetPtr &dataset) {
// // TODO: wrap dataset->tensor
// auto tensor = dataset->tensor()[0];
// auto p_data = (float *)tensor->raw_mutable_data();
// auto dimension = tensor->shape()[1];
// auto rows = tensor->shape()[0];
//
//#pragma omp parallel for
// for (auto i = 0; i < rows; ++i) {
// Normalize(&(p_data[i * dimension]), dimension);
// }
//}
//
//void
//NormalizePreprocessor::Normalize(float *arr, int64_t dimension) {
// double vector_length = 0;
// for (auto j = 0; j < dimension; j++) {
// double val = arr[j];
// vector_length += val * val;
// }
// vector_length = std::sqrt(vector_length);
// if (vector_length < 1e-6) {
// auto val = (float) (1.0 / std::sqrt((double) dimension));
// for (int j = 0; j < dimension; j++) arr[j] = val;
// } else {
// for (int j = 0; j < dimension; j++) arr[j] = (float) (arr[j] / vector_length);
// }
//}
//
//} // namespace knowhere
//} // namespace zilliz
......@@ -9,7 +9,7 @@
#include "knowhere/index/vector_index/cpu_kdt_rng.h"
#include "knowhere/index/vector_index/definitions.h"
#include "knowhere/index/preprocessor/normalize.h"
//#include "knowhere/index/preprocessor/normalize.h"
#include "knowhere/index/vector_index/kdt_parameters.h"
#include "knowhere/adapter/sptag.h"
#include "knowhere/common/exception.h"
......@@ -60,20 +60,20 @@ CPUKDTRNG::Load(const BinarySet &binary_set) {
index_ptr_->LoadIndexFromMemory(index_blobs);
}
PreprocessorPtr
CPUKDTRNG::BuildPreprocessor(const DatasetPtr &dataset, const Config &config) {
return std::make_shared<NormalizePreprocessor>();
}
//PreprocessorPtr
//CPUKDTRNG::BuildPreprocessor(const DatasetPtr &dataset, const Config &config) {
// return std::make_shared<NormalizePreprocessor>();
//}
IndexModelPtr
CPUKDTRNG::Train(const DatasetPtr &origin, const Config &train_config) {
SetParameters(train_config);
DatasetPtr dataset = origin->Clone();
if (index_ptr_->GetDistCalcMethod() == SPTAG::DistCalcMethod::Cosine
&& preprocessor_) {
preprocessor_->Preprocess(dataset);
}
//if (index_ptr_->GetDistCalcMethod() == SPTAG::DistCalcMethod::Cosine
// && preprocessor_) {
// preprocessor_->Preprocess(dataset);
//}
auto vectorset = ConvertToVectorSet(dataset);
auto metaset = ConvertToMetadataSet(dataset);
......@@ -88,10 +88,10 @@ CPUKDTRNG::Add(const DatasetPtr &origin, const Config &add_config) {
SetParameters(add_config);
DatasetPtr dataset = origin->Clone();
if (index_ptr_->GetDistCalcMethod() == SPTAG::DistCalcMethod::Cosine
&& preprocessor_) {
preprocessor_->Preprocess(dataset);
}
//if (index_ptr_->GetDistCalcMethod() == SPTAG::DistCalcMethod::Cosine
// && preprocessor_) {
// preprocessor_->Preprocess(dataset);
//}
auto vectorset = ConvertToVectorSet(dataset);
auto metaset = ConvertToMetadataSet(dataset);
......
......@@ -280,15 +280,15 @@ void FaissGpuResourceMgr::InitResource() {
is_init = true;
std::cout << "InitResource" << std::endl;
//std::cout << "InitResource" << std::endl;
for(auto& device : devices_params_) {
auto& device_id = device.first;
std::cout << "Device Id: " << device_id << std::endl;
//std::cout << "Device Id: " << device_id << std::endl;
auto& device_param = device.second;
auto& bq = idle_map[device_id];
for (int64_t i = 0; i < device_param.resource_num; ++i) {
std::cout << "Resource Id: " << i << std::endl;
//std::cout << "Resource Id: " << i << std::endl;
auto raw_resource = std::make_shared<faiss::gpu::StandardGpuResources>();
// TODO(linxj): enable set pinned memory
......@@ -298,7 +298,7 @@ void FaissGpuResourceMgr::InitResource() {
bq.Put(res_wrapper);
}
}
std::cout << "End initResource" << std::endl;
//std::cout << "End initResource" << std::endl;
}
ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
......@@ -315,16 +315,6 @@ ResPtr FaissGpuResourceMgr::GetRes(const int64_t &device_id,
return nullptr;
}
//bool FaissGpuResourceMgr::GetRes(const int64_t &device_id,
// ResPtr &res,
// const int64_t &alloc_size) {
// InitResource();
//
// std::lock_guard<std::mutex> lk(res->mutex);
// AllocateTempMem(res, device_id, alloc_size);
// return true;
//}
void FaissGpuResourceMgr::MoveToIdle(const int64_t &device_id, const ResPtr &res) {
auto finder = idle_map.find(device_id);
if (finder != idle_map.end()) {
......
......@@ -50,7 +50,7 @@ DatasetPtr IDMAP::Search(const DatasetPtr &dataset, const Config &config) {
auto res_ids = (int64_t *) malloc(sizeof(int64_t) * elems);
auto res_dis = (float *) malloc(sizeof(float) * elems);
index_->search(rows, (float *) p_data, k, res_dis, res_ids);
search_impl(rows, (float *) p_data, k, res_dis, res_ids, Config());
auto id_buf = MakeMutableBufferSmart((uint8_t *) res_ids, sizeof(int64_t) * elems);
auto dist_buf = MakeMutableBufferSmart((uint8_t *) res_dis, sizeof(float) * elems);
......@@ -72,6 +72,11 @@ DatasetPtr IDMAP::Search(const DatasetPtr &dataset, const Config &config) {
return std::make_shared<Dataset>(array, nullptr);
}
void IDMAP::search_impl(int64_t n, const float *data, int64_t k, float *distances, int64_t *labels, const Config &cfg) {
index_->search(n, (float *) data, k, distances, labels);
}
void IDMAP::Add(const DatasetPtr &dataset, const Config &config) {
if (!index_) {
KNOWHERE_THROW_MSG("index not initialize");
......@@ -207,6 +212,7 @@ void GPUIDMAP::LoadImpl(const BinarySet &index_binary) {
if (auto res = FaissGpuResourceMgr::GetInstance().GetRes(gpu_id_) ){
ResScope rs(gpu_id_, res);
res_ = res;
auto device_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index);
index_.reset(device_index);
} else {
......@@ -230,5 +236,15 @@ int64_t *GPUIDMAP::GetRawIds() {
KNOWHERE_THROW_MSG("Not support");
}
void GPUIDMAP::search_impl(int64_t n,
const float *data,
int64_t k,
float *distances,
int64_t *labels,
const Config &cfg) {
ResScope rs(res_);
index_->search(n, (float *) data, k, distances, labels);
}
}
}
......@@ -12,6 +12,7 @@
#include "knowhere/index/vector_index/idmap.h"
#include "knowhere/adapter/structure.h"
#include "knowhere/index/vector_index/cloner.h"
#include "knowhere/common/exception.h"
#include "utils.h"
......@@ -65,19 +66,20 @@ void PrintResult(const DatasetPtr &result,
}
TEST_F(IDMAPTest, idmap_basic) {
assert(!xb.empty());
ASSERT_TRUE(!xb.empty());
Config Default_cfg;
index_->Train(Config::object{{"dim", dim}, {"metric_type", "L2"}});
index_->Add(base_dataset, Default_cfg);
EXPECT_EQ(index_->Count(), nb);
EXPECT_EQ(index_->Dimension(), dim);
assert(index_->GetRawVectors() != nullptr);
assert(index_->GetRawIds() != nullptr);
ASSERT_TRUE(index_->GetRawVectors() != nullptr);
ASSERT_TRUE(index_->GetRawIds() != nullptr);
auto result = index_->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(result, nq, k);
PrintResult(result, nq, k);
index_->Seal();
auto binaryset = index_->Serialize();
auto new_index = std::make_shared<IDMAP>();
new_index->Load(binaryset);
......@@ -126,15 +128,15 @@ TEST_F(IDMAPTest, idmap_serialize) {
}
TEST_F(IDMAPTest, copy_test) {
assert(!xb.empty());
ASSERT_TRUE(!xb.empty());
Config Default_cfg;
index_->Train(Config::object{{"dim", dim}, {"metric_type", "L2"}});
index_->Add(base_dataset, Default_cfg);
EXPECT_EQ(index_->Count(), nb);
EXPECT_EQ(index_->Dimension(), dim);
assert(index_->GetRawVectors() != nullptr);
assert(index_->GetRawIds() != nullptr);
ASSERT_TRUE(index_->GetRawVectors() != nullptr);
ASSERT_TRUE(index_->GetRawIds() != nullptr);
auto result = index_->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(result, nq, k);
//PrintResult(result, nq, k);
......@@ -151,8 +153,16 @@ TEST_F(IDMAPTest, copy_test) {
auto clone_index = CopyCpuToGpu(index_, device_id, Config());
auto clone_result = clone_index->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(clone_result, nq, k);
//assert(std::static_pointer_cast<GPUIDMAP>(clone_index)->GetRawVectors() != nullptr);
//assert(std::static_pointer_cast<GPUIDMAP>(clone_index)->GetRawIds() != nullptr);
ASSERT_THROW({ std::static_pointer_cast<GPUIDMAP>(clone_index)->GetRawVectors(); },
zilliz::knowhere::KnowhereException);
ASSERT_THROW({ std::static_pointer_cast<GPUIDMAP>(clone_index)->GetRawIds(); },
zilliz::knowhere::KnowhereException);
auto binary = clone_index->Serialize();
clone_index->Load(binary);
auto new_result = clone_index->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(new_result, nq, k);
auto clone_gpu_idx = clone_index->Clone();
auto clone_gpu_res = clone_gpu_idx->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(clone_gpu_res, nq, k);
......@@ -161,14 +171,13 @@ TEST_F(IDMAPTest, copy_test) {
auto host_index = CopyGpuToCpu(clone_index, Config());
auto host_result = host_index->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(host_result, nq, k);
assert(std::static_pointer_cast<IDMAP>(host_index)->GetRawVectors() != nullptr);
assert(std::static_pointer_cast<IDMAP>(host_index)->GetRawIds() != nullptr);
ASSERT_TRUE(std::static_pointer_cast<IDMAP>(host_index)->GetRawVectors() != nullptr);
ASSERT_TRUE(std::static_pointer_cast<IDMAP>(host_index)->GetRawIds() != nullptr);
// gpu to gpu
auto device_index = CopyCpuToGpu(index_, device_id, Config());
auto device_result = device_index->Search(query_dataset, Config::object{{"k", k}});
auto new_device_index = std::static_pointer_cast<GPUIDMAP>(device_index)->CopyGpuToGpu(device_id, Config());
auto device_result = new_device_index->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(device_result, nq, k);
//assert(std::static_pointer_cast<GPUIDMAP>(device_index)->GetRawVectors() != nullptr);
//assert(std::static_pointer_cast<GPUIDMAP>(device_index)->GetRawIds() != nullptr);
}
}
......@@ -394,8 +394,11 @@ TEST_F(GPURESTEST, gpu_ivf_resource_test) {
{
index_type = "GPUIVF";
index_ = IndexFactory(index_type);
index_ = std::make_shared<GPUIVF>(-1);
ASSERT_EQ(std::dynamic_pointer_cast<GPUIVF>(index_)->GetGpuDevice(), -1);
std::dynamic_pointer_cast<GPUIVF>(index_)->SetGpuDevice(device_id);
ASSERT_EQ(std::dynamic_pointer_cast<GPUIVF>(index_)->GetGpuDevice(), device_id);
auto preprocessor = index_->BuildPreprocessor(base_dataset, preprocess_cfg);
index_->set_preprocessor(preprocessor);
train_cfg = Config::object{{"nlist", 1638}, {"gpu_id", device_id}, {"metric_type", "L2"}};
......@@ -412,8 +415,9 @@ TEST_F(GPURESTEST, gpu_ivf_resource_test) {
if (i > search_count - 6 || i < 5)
tc.RecordSection("search once");
}
tc.RecordSection("search all");
tc.ElapseFromBegin("search all");
}
FaissGpuResourceMgr::GetInstance().Dump();
{
// IVF-Search
......@@ -430,7 +434,7 @@ TEST_F(GPURESTEST, gpu_ivf_resource_test) {
if (i > search_count - 6 || i < 5)
tc.RecordSection("search once");
}
tc.RecordSection("search all");
tc.ElapseFromBegin("search all");
}
}
......@@ -461,7 +465,7 @@ TEST_F(GPURESTEST, gpuivfsq) {
if (i > search_count - 6 || i < 5)
tc.RecordSection("search once");
}
tc.RecordSection("search all");
tc.ElapseFromBegin("search all");
}
{
......@@ -493,7 +497,7 @@ TEST_F(GPURESTEST, gpuivfsq) {
if (i > search_count - 6 || i < 5)
tc.RecordSection("search once");
}
tc.RecordSection("search all");
tc.ElapseFromBegin("search all");
delete cpu_index;
delete search_idx;
}
......
......@@ -8,6 +8,7 @@
#include <iostream>
#include <sstream>
#include "knowhere/common/exception.h"
#include "knowhere/index/vector_index/cpu_kdt_rng.h"
#include "knowhere/index/vector_index/definitions.h"
......@@ -125,6 +126,10 @@ TEST_P(KDTTest, kdt_serialize) {
auto result = new_index->Search(query_dataset, search_cfg);
AssertAnns(result, nq, k);
PrintResult(result, nq, k);
ASSERT_EQ(new_index->Count(), nb);
ASSERT_EQ(new_index->Dimension(), dim);
ASSERT_THROW({new_index->Clone();}, zilliz::knowhere::KnowhereException);
ASSERT_NO_THROW({new_index->Seal();});
{
int fileno = 0;
......
......@@ -7,6 +7,7 @@
#include <gtest/gtest.h>
#include <memory>
#include "knowhere/common/exception.h"
#include "knowhere/index/vector_index/gpu_ivf.h"
#include "knowhere/index/vector_index/nsg_index.h"
#include "knowhere/index/vector_index/nsg/nsg_io.h"
......@@ -71,6 +72,14 @@ TEST_P(NSGInterfaceTest, basic_test) {
auto new_result = new_index->Search(query_dataset, Config::object{{"k", k}});
AssertAnns(result, nq, k);
ASSERT_EQ(index_->Count(), nb);
ASSERT_EQ(index_->Dimension(), dim);
ASSERT_THROW({index_->Clone();}, zilliz::knowhere::KnowhereException);
ASSERT_NO_THROW({
index_->Add(base_dataset, Config());
index_->Seal();
});
{
//std::cout << "k = 1" << std::endl;
//new_index->Search(GenQuery(1), Config::object{{"k", 1}});
......
......@@ -303,13 +303,15 @@ Status ExecutionEngineImpl::Search(long n,
}
Status ExecutionEngineImpl::Cache() {
zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, index_);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
return Status::OK();
}
Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, index_);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
return Status::OK();
}
......
......@@ -881,6 +881,7 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
res = filesToIndexQuery.store();
} //Scoped Connection
Status ret;
std::map<std::string, TableSchema> groups;
TableFileSchema table_file;
for (auto &resRow : res) {
......@@ -925,16 +926,17 @@ Status MySQLMetaImpl::FilesToIndex(TableFilesSchema &files) {
auto status = utils::GetTableFilePath(options_, table_file);
if(!status.ok()) {
return status;
ret = status;
}
files.push_back(table_file);
}
return ret;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO INDEX", e.what());
}
return Status::OK();
}
Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
......@@ -998,6 +1000,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
return status;
}
Status ret;
TableFileSchema table_file;
for (auto &resRow : res) {
......@@ -1031,7 +1034,7 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
auto status = utils::GetTableFilePath(options_, table_file);
if(!status.ok()) {
return status;
ret = status;
}
auto dateItr = files.find(table_file.date_);
......@@ -1041,11 +1044,11 @@ Status MySQLMetaImpl::FilesToSearch(const std::string &table_id,
files[table_file.date_].push_back(table_file);
}
return ret;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO SEARCH", e.what());
}
return Status::OK();
}
Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
......@@ -1083,6 +1086,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
res = filesToMergeQuery.store();
} //Scoped Connection
Status ret;
for (auto &resRow : res) {
TableFileSchema table_file;
table_file.file_size_ = resRow["file_size"];
......@@ -1120,7 +1124,7 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
auto status = utils::GetTableFilePath(options_, table_file);
if(!status.ok()) {
return status;
ret = status;
}
auto dateItr = files.find(table_file.date_);
......@@ -1131,11 +1135,11 @@ Status MySQLMetaImpl::FilesToMerge(const std::string &table_id,
files[table_file.date_].push_back(table_file);
}
return ret;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN FINDING TABLE FILES TO MERGE", e.what());
}
return Status::OK();
}
Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
......@@ -1165,7 +1169,8 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
getTableFileQuery << "SELECT id, engine_type, file_id, file_type, file_size, row_count, date, created_on " <<
"FROM TableFiles " <<
"WHERE table_id = " << quote << table_id << " AND " <<
"(" << idStr << ");";
"(" << idStr << ") AND " <<
"file_type <> " << std::to_string(TableFileSchema::TO_DELETE) << ";";
ENGINE_LOG_DEBUG << "MySQLMetaImpl::GetTableFiles: " << getTableFileQuery.str();
......@@ -1174,11 +1179,9 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
TableSchema table_schema;
table_schema.table_id_ = table_id;
auto status = DescribeTable(table_schema);
if (!status.ok()) {
return status;
}
DescribeTable(table_schema);
Status ret;
for (auto &resRow : res) {
TableFileSchema file_schema;
......@@ -1211,18 +1214,16 @@ Status MySQLMetaImpl::GetTableFiles(const std::string &table_id,
file_schema.dimension_ = table_schema.dimension_;
auto status = utils::GetTableFilePath(options_, file_schema);
if(!status.ok()) {
return status;
}
utils::GetTableFilePath(options_, file_schema);
table_files.emplace_back(file_schema);
}
return ret;
} catch (std::exception &e) {
return HandleException("GENERAL ERROR WHEN RETRIEVING TABLE FILES", e.what());
}
return Status::OK();
}
// PXU TODO: Support Swap
......
......@@ -53,7 +53,7 @@ class MySQLMetaImpl : public Meta {
Status UpdateTableIndex(const std::string &table_id, const TableIndex& index) override;
Status UpdateTableFlag(const std::string &table_id, int64_t flag);
Status UpdateTableFlag(const std::string &table_id, int64_t flag) override;
Status DescribeTableIndex(const std::string &table_id, TableIndex& index) override;
......
......@@ -603,6 +603,7 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
std::map<std::string, TableSchema> groups;
TableFileSchema table_file;
Status ret;
for (auto &file : selected) {
table_file.id_ = std::get<0>(file);
table_file.table_id_ = std::get<1>(file);
......@@ -616,7 +617,7 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
auto status = utils::GetTableFilePath(options_, table_file);
if(!status.ok()) {
return status;
ret = status;
}
auto groupItr = groups.find(table_file.table_id_);
if (groupItr == groups.end()) {
......@@ -635,11 +636,11 @@ Status SqliteMetaImpl::FilesToIndex(TableFilesSchema &files) {
files.push_back(table_file);
}
return ret;
} catch (std::exception &e) {
return HandleException("Encounter exception when iterate raw files", e.what());
}
return Status::OK();
}
Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
......@@ -695,6 +696,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
result = ConnectorPtr->select(select_columns, filter);
}
Status ret;
TableFileSchema table_file;
for (auto &file : result) {
table_file.id_ = std::get<0>(file);
......@@ -712,7 +714,7 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
auto status = utils::GetTableFilePath(options_, table_file);
if(!status.ok()) {
return status;
ret = status;
}
auto dateItr = files.find(table_file.date_);
......@@ -724,13 +726,12 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
if(files.empty()) {
ENGINE_LOG_ERROR << "No file to search for table: " << table_id;
}
return ret;
} catch (std::exception &e) {
return HandleException("Encounter exception when iterate index files", e.what());
}
return Status::OK();
}
Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
......@@ -761,6 +762,7 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
c(&TableFileSchema::table_id_) == table_id),
order_by(&TableFileSchema::file_size_).desc());
Status result;
for (auto &file : selected) {
TableFileSchema table_file;
table_file.file_size_ = std::get<4>(file);
......@@ -782,7 +784,7 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
auto status = utils::GetTableFilePath(options_, table_file);
if(!status.ok()) {
return status;
result = status;
}
auto dateItr = files.find(table_file.date_);
......@@ -791,11 +793,12 @@ Status SqliteMetaImpl::FilesToMerge(const std::string &table_id,
}
files[table_file.date_].push_back(table_file);
}
return result;
} catch (std::exception &e) {
return HandleException("Encounter exception when iterate merge files", e.what());
}
return Status::OK();
}
Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
......@@ -812,7 +815,8 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
&TableFileSchema::engine_type_,
&TableFileSchema::created_on_),
where(c(&TableFileSchema::table_id_) == table_id and
in(&TableFileSchema::id_, ids)
in(&TableFileSchema::id_, ids) and
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
TableSchema table_schema;
......@@ -822,6 +826,7 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
return status;
}
Status result;
for (auto &file : files) {
TableFileSchema file_schema;
file_schema.table_id_ = table_id;
......@@ -838,18 +843,15 @@ Status SqliteMetaImpl::GetTableFiles(const std::string& table_id,
file_schema.nlist_ = table_schema.nlist_;
file_schema.metric_type_ = table_schema.metric_type_;
auto status = utils::GetTableFilePath(options_, file_schema);
if(!status.ok()) {
return status;
}
utils::GetTableFilePath(options_, file_schema);
table_files.emplace_back(file_schema);
}
return result;
} catch (std::exception &e) {
return HandleException("Encounter exception when lookup table files", e.what());
}
return Status::OK();
}
// PXU TODO: Support Swap
......
......@@ -89,7 +89,6 @@ TaskScheduler::TaskDispatchWorker() {
return true;
}
#ifdef NEW_SCHEDULER
// TODO: Put task into Disk-TaskTable
auto task = TaskConvert(task_ptr);
auto disk_list = ResMgrInst::GetInstance()->GetDiskResources();
......@@ -98,16 +97,7 @@ TaskScheduler::TaskDispatchWorker() {
disk->task_table().Put(task);
}
}
#else
//execute task
ScheduleTaskPtr next_task = task_ptr->Execute();
if(next_task != nullptr) {
task_queue_.Put(next_task);
}
#endif
}
return true;
}
bool
......@@ -126,8 +116,6 @@ TaskScheduler::TaskWorker() {
task_queue_.Put(next_task);
}
}
return true;
}
}
......
......@@ -17,11 +17,6 @@ DeleteTask::DeleteTask(const DeleteContextPtr& context)
}
std::shared_ptr<IScheduleTask> DeleteTask::Execute() {
if(context_ != nullptr && context_->meta() != nullptr) {
context_->meta()->DeleteTableFiles(context_->table_id());
}
return nullptr;
}
......
......@@ -15,82 +15,13 @@ namespace zilliz {
namespace milvus {
namespace engine {
namespace {
void CollectFileMetrics(int file_type, size_t file_size) {
switch(file_type) {
case meta::TableFileSchema::RAW:
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
break;
}
default: {
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
break;
}
}
}
}
IndexLoadTask::IndexLoadTask()
: IScheduleTask(ScheduleTaskType::kIndexLoad) {
}
std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
server::TimeRecorder rc("");
//step 1: load index
ExecutionEnginePtr index_ptr = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType)file_->engine_type_,
(MetricType)file_->metric_type_,
file_->nlist_);
try {
auto stat = index_ptr->Load();
if(!stat.ok()) {
//typical error: file not available
ENGINE_LOG_ERROR << "Failed to load index file: file not available";
for(auto& context : search_contexts_) {
context->IndexSearchDone(file_->id_);//mark as done avoid dead lock, even failed
}
return nullptr;
}
} catch (std::exception& ex) {
//typical error: out of disk space or permition denied
std::string msg = "Failed to load index file: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
for(auto& context : search_contexts_) {
context->IndexSearchDone(file_->id_);//mark as done avoid dead lock, even failed
}
return nullptr;
}
size_t file_size = index_ptr->PhysicalSize();
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + std::to_string(file_->file_type_)
+ " size:" + std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info);
for(auto& context : search_contexts_) {
context->AccumLoadCost(span);
}
CollectFileMetrics(file_->file_type_, file_size);
//step 2: return search task for later execution
SearchTaskPtr task_ptr = std::make_shared<SearchTask>();
task_ptr->index_id_ = file_->id_;
task_ptr->file_type_ = file_->file_type_;
task_ptr->index_engine_ = index_ptr;
task_ptr->search_contexts_.swap(search_contexts_);
return std::static_pointer_cast<IScheduleTask>(task_ptr);
return nullptr;
}
}
......
......@@ -14,259 +14,12 @@ namespace zilliz {
namespace milvus {
namespace engine {
namespace {
static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 1000000;
static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;
bool NeedParallelReduce(uint64_t nq, uint64_t topk) {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& db_config = config.GetConfig(server::CONFIG_DB);
bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
if(!need_parallel) {
return false;
}
return nq*topk >= PARALLEL_REDUCE_THRESHOLD;
}
void ParallelReduce(std::function<void(size_t, size_t)>& reduce_function, size_t max_index) {
size_t reduce_batch = PARALLEL_REDUCE_BATCH;
auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
if(thread_count > 0) {
reduce_batch = max_index/thread_count + 1;
}
ENGINE_LOG_DEBUG << "use " << thread_count <<
" thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
std::vector<std::shared_ptr<std::thread> > thread_array;
size_t from_index = 0;
while(from_index < max_index) {
size_t to_index = from_index + reduce_batch;
if(to_index > max_index) {
to_index = max_index;
}
auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
thread_array.push_back(reduce_thread);
from_index = to_index;
}
for(auto& thread_ptr : thread_array) {
thread_ptr->join();
}
}
}
SearchTask::SearchTask()
: IScheduleTask(ScheduleTaskType::kSearch) {
}
std::shared_ptr<IScheduleTask> SearchTask::Execute() {
if(index_engine_ == nullptr) {
return nullptr;
}
ENGINE_LOG_DEBUG << "Searching in file id:" << index_id_<< " with "
<< search_contexts_.size() << " tasks";
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
server::CollectSearchTaskMetrics metrics(file_type_);
bool metric_l2 = (index_engine_->IndexMetricType() == MetricType::L2);
std::vector<long> output_ids;
std::vector<float> output_distence;
for(auto& context : search_contexts_) {
//step 1: allocate memory
auto inner_k = context->topk();
auto nprobe = context->nprobe();
output_ids.resize(inner_k*context->nq());
output_distence.resize(inner_k*context->nq());
try {
//step 2: search
index_engine_->Search(context->nq(), context->vectors(), inner_k, nprobe, output_distence.data(),
output_ids.data());
double span = rc.RecordSection("do search for context:" + context->Identity());
context->AccumSearchCost(span);
//step 3: cluster result
SearchContext::ResultSet result_set;
auto spec_k = index_engine_->Count() < context->topk() ? index_engine_->Count() : context->topk();
SearchTask::ClusterResult(output_ids, output_distence, context->nq(), spec_k, result_set);
span = rc.RecordSection("cluster result for context:" + context->Identity());
context->AccumReduceCost(span);
//step 4: pick up topk result
SearchTask::TopkResult(result_set, inner_k, metric_l2, context->GetResult());
span = rc.RecordSection("reduce topk for context:" + context->Identity());
context->AccumReduceCost(span);
} catch (std::exception& ex) {
ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
continue;
}
//step 5: notify to send result to client
context->IndexSearchDone(index_id_);
}
rc.ElapseFromBegin("totally cost");
return nullptr;
}
Status SearchTask::ClusterResult(const std::vector<long> &output_ids,
const std::vector<float> &output_distence,
uint64_t nq,
uint64_t topk,
SearchContext::ResultSet &result_set) {
if(output_ids.size() < nq*topk || output_distence.size() < nq*topk) {
std::string msg = "Invalid id array size: " + std::to_string(output_ids.size()) +
" distance array size: " + std::to_string(output_distence.size());
ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg);
}
result_set.clear();
result_set.resize(nq);
std::function<void(size_t, size_t)> reduce_worker = [&](size_t from_index, size_t to_index) {
for (auto i = from_index; i < to_index; i++) {
SearchContext::Id2DistanceMap id_distance;
id_distance.reserve(topk);
for (auto k = 0; k < topk; k++) {
uint64_t index = i * topk + k;
if(output_ids[index] < 0) {
continue;
}
id_distance.push_back(std::make_pair(output_ids[index], output_distence[index]));
}
result_set[i] = id_distance;
}
};
if(NeedParallelReduce(nq, topk)) {
ParallelReduce(reduce_worker, nq);
} else {
reduce_worker(0, nq);
}
return Status::OK();
}
Status SearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src,
SearchContext::Id2DistanceMap &distance_target,
uint64_t topk,
bool ascending) {
//Note: the score_src and score_target are already arranged by score in ascending order
if(distance_src.empty()) {
ENGINE_LOG_WARNING << "Empty distance source array";
return Status::OK();
}
if(distance_target.empty()) {
distance_target.swap(distance_src);
return Status::OK();
}
size_t src_count = distance_src.size();
size_t target_count = distance_target.size();
SearchContext::Id2DistanceMap distance_merged;
distance_merged.reserve(topk);
size_t src_index = 0, target_index = 0;
while(true) {
//all score_src items are merged, if score_merged.size() still less than topk
//move items from score_target to score_merged until score_merged.size() equal topk
if(src_index >= src_count) {
for(size_t i = target_index; i < target_count && distance_merged.size() < topk; ++i) {
distance_merged.push_back(distance_target[i]);
}
break;
}
//all score_target items are merged, if score_merged.size() still less than topk
//move items from score_src to score_merged until score_merged.size() equal topk
if(target_index >= target_count) {
for(size_t i = src_index; i < src_count && distance_merged.size() < topk; ++i) {
distance_merged.push_back(distance_src[i]);
}
break;
}
//compare score,
// if ascending = true, put smallest score to score_merged one by one
// else, put largest score to score_merged one by one
auto& src_pair = distance_src[src_index];
auto& target_pair = distance_target[target_index];
if(ascending){
if(src_pair.second > target_pair.second) {
distance_merged.push_back(target_pair);
target_index++;
} else {
distance_merged.push_back(src_pair);
src_index++;
}
} else {
if(src_pair.second < target_pair.second) {
distance_merged.push_back(target_pair);
target_index++;
} else {
distance_merged.push_back(src_pair);
src_index++;
}
}
//score_merged.size() already equal topk
if(distance_merged.size() >= topk) {
break;
}
}
distance_target.swap(distance_merged);
return Status::OK();
}
Status SearchTask::TopkResult(SearchContext::ResultSet &result_src,
uint64_t topk,
bool ascending,
SearchContext::ResultSet &result_target) {
if (result_target.empty()) {
result_target.swap(result_src);
return Status::OK();
}
if (result_src.size() != result_target.size()) {
std::string msg = "Invalid result set size";
ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg);
}
std::function<void(size_t, size_t)> ReduceWorker = [&](size_t from_index, size_t to_index) {
for (size_t i = from_index; i < to_index; i++) {
SearchContext::Id2DistanceMap &score_src = result_src[i];
SearchContext::Id2DistanceMap &score_target = result_target[i];
SearchTask::MergeResult(score_src, score_target, topk, ascending);
}
};
if(NeedParallelReduce(result_src.size(), topk)) {
ParallelReduce(ReduceWorker, result_src.size());
} else {
ReduceWorker(0, result_src.size());
}
return Status::OK();
return nullptr;
}
}
......
......@@ -19,22 +19,6 @@ public:
virtual std::shared_ptr<IScheduleTask> Execute() override;
static Status ClusterResult(const std::vector<long> &output_ids,
const std::vector<float> &output_distence,
uint64_t nq,
uint64_t topk,
SearchContext::ResultSet &result_set);
static Status MergeResult(SearchContext::Id2DistanceMap &distance_src,
SearchContext::Id2DistanceMap &distance_target,
uint64_t topk,
bool ascending);
static Status TopkResult(SearchContext::ResultSet &result_src,
uint64_t topk,
bool ascending,
SearchContext::ResultSet &result_target);
public:
size_t index_id_ = 0;
int file_type_ = 0; //for metrics
......
......@@ -16,7 +16,6 @@
#include "utils/SignalUtil.h"
#include "utils/CommonUtil.h"
#include "utils/LogUtil.h"
INITIALIZE_EASYLOGGINGPP
......@@ -98,10 +97,8 @@ main(int argc, char *argv[]) {
}
}
zilliz::milvus::server::InitLog(log_config_file);
server::Server* server_ptr = server::Server::Instance();
server_ptr->Init(start_daemonized, pid_filename, config_filename);
server_ptr->Init(start_daemonized, pid_filename, config_filename, log_config_file);
return server_ptr->Start();
}
......
......@@ -7,6 +7,7 @@
#include "Server.h"
#include "server/grpc_impl/GrpcMilvusServer.h"
#include "utils/Log.h"
#include "utils/LogUtil.h"
#include "utils/SignalUtil.h"
#include "utils/TimeRecorder.h"
#include "metrics/Metrics.h"
......@@ -24,11 +25,12 @@
#include "metrics/Metrics.h"
#include "DBWrapper.h"
namespace zilliz {
namespace milvus {
namespace server {
Server*
Server *
Server::Instance() {
static Server server;
return &server;
......@@ -42,10 +44,14 @@ Server::~Server() {
}
void
Server::Init(int64_t daemonized, const std::string& pid_filename, const std::string& config_filename) {
Server::Init(int64_t daemonized,
const std::string &pid_filename,
const std::string &config_filename,
const std::string &log_config_file) {
daemonized_ = daemonized;
pid_filename_ = pid_filename;
config_filename_ = config_filename;
log_config_file_ = log_config_file;
}
void
......@@ -54,7 +60,7 @@ Server::Daemonize() {
return;
}
SERVER_LOG_INFO << "Milvus server run in daemonize mode";
std::cout << "Milvus server run in daemonize mode";
// std::string log_path(GetLogDirFullPath());
// log_path += "zdb_server.(INFO/WARNNING/ERROR/CRITICAL)";
......@@ -101,7 +107,7 @@ Server::Daemonize() {
// Change the working directory to root
int ret = chdir("/");
if(ret != 0){
if (ret != 0) {
return;
}
......@@ -110,7 +116,7 @@ Server::Daemonize() {
close(fd);
}
SERVER_LOG_INFO << "Redirect stdin/stdout/stderr to /dev/null";
std::cout << "Redirect stdin/stdout/stderr to /dev/null";
// Redirect stdin/stdout/stderr to /dev/null
stdin = fopen("/dev/null", "r");
......@@ -120,17 +126,17 @@ Server::Daemonize() {
if (!pid_filename_.empty()) {
pid_fd = open(pid_filename_.c_str(), O_RDWR | O_CREAT, 0640);
if (pid_fd < 0) {
SERVER_LOG_INFO << "Can't open filename: " + pid_filename_ + ", Error: " + strerror(errno);
std::cout << "Can't open filename: " + pid_filename_ + ", Error: " + strerror(errno);
exit(EXIT_FAILURE);
}
if (lockf(pid_fd, F_TLOCK, 0) < 0) {
SERVER_LOG_INFO << "Can't lock filename: " + pid_filename_ + ", Error: " + strerror(errno);
std::cout << "Can't lock filename: " + pid_filename_ + ", Error: " + strerror(errno);
exit(EXIT_FAILURE);
}
std::string pid_file_context = std::to_string(getpid());
ssize_t res = write(pid_fd, pid_file_context.c_str(), pid_file_context.size());
if(res != 0){
if (res != 0) {
return;
}
}
......@@ -146,7 +152,7 @@ Server::Start() {
do {
try {
// Read config file
if(LoadConfig() != SERVER_SUCCESS) {
if (LoadConfig() != SERVER_SUCCESS) {
return 1;
}
......@@ -154,6 +160,27 @@ Server::Start() {
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
std::string time_zone = server_config.GetValue(CONFIG_TIME_ZONE, "UTC+8");
if (time_zone.length() == 3) {
time_zone = "CUT";
} else {
int time_bias = std::stoi(time_zone.substr(3, std::string::npos));
if (time_bias == 0)
time_zone = "CUT";
else if (time_bias > 0) {
time_zone = "CUT" + std::to_string(-time_bias);
} else {
time_zone = "CUT+" + std::to_string(-time_bias);
}
}
if (setenv("TZ", time_zone.c_str(), 1) != 0) {
return -1;
}
tzset();
InitLog(log_config_file_);
// Handle Signal
signal(SIGINT, SignalUtil::HandleSignal);
signal(SIGHUP, SignalUtil::HandleSignal);
......@@ -164,12 +191,12 @@ Server::Start() {
std::cout << "Milvus server start successfully." << std::endl;
StartService();
} catch(std::exception& ex){
SERVER_LOG_ERROR << "Milvus server encounter exception: " << std::string(ex.what())
<< "Is another server instance running?";
} catch (std::exception &ex) {
std::cerr << "Milvus server encounter exception: " << std::string(ex.what())
<< "Is another server instance running?";
break;
}
} while(false);
} while (false);
Stop();
return 0;
......@@ -182,12 +209,12 @@ Server::Stop() {
// Unlock and close lockfile
if (pid_fd != -1) {
int ret = lockf(pid_fd, F_ULOCK, 0);
if(ret != 0){
if (ret != 0) {
std::cout << "Can't lock file: " << strerror(errno) << std::endl;
exit(0);
}
ret = close(pid_fd);
if(ret != 0){
if (ret != 0) {
std::cout << "Can't close file: " << strerror(errno) << std::endl;
exit(0);
}
......@@ -196,7 +223,7 @@ Server::Stop() {
// Try to delete lockfile
if (!pid_filename_.empty()) {
int ret = unlink(pid_filename_.c_str());
if(ret != 0){
if (ret != 0) {
std::cout << "Can't unlink file: " << strerror(errno) << std::endl;
exit(0);
}
......@@ -214,7 +241,7 @@ ErrorCode
Server::LoadConfig() {
ServerConfig::GetInstance().LoadConfigFile(config_filename_);
ErrorCode err = ServerConfig::GetInstance().ValidateConfig();
if(err != SERVER_SUCCESS){
if (err != SERVER_SUCCESS) {
exit(0);
}
......
......@@ -18,7 +18,7 @@ class Server {
public:
static Server* Instance();
void Init(int64_t daemonized, const std::string& pid_filename, const std::string& config_filename);
void Init(int64_t daemonized, const std::string& pid_filename, const std::string& config_filename, const std::string &log_config_file);
int Start();
void Stop();
......@@ -40,6 +40,7 @@ class Server {
int pid_fd = -1;
std::string pid_filename_;
std::string config_filename_;
std::string log_config_file_;
}; // Server
} // server
......
......@@ -97,32 +97,30 @@ ServerConfig::CheckServerConfig() {
std::string ip_address = server_config.GetValue(CONFIG_SERVER_ADDRESS, "127.0.0.1");
if (ValidationUtil::ValidateIpAddress(ip_address) != SERVER_SUCCESS) {
std::cerr << "Error: invalid server IP address: " << ip_address << std::endl;
std::cerr << "ERROR: invalid server IP address: " << ip_address << std::endl;
okay = false;
}
std::string port_str = server_config.GetValue(CONFIG_SERVER_PORT, "19530");
if (ValidationUtil::ValidateStringIsNumber(port_str) != SERVER_SUCCESS) {
std::cerr << "Error: port " << port_str << " is not a number" << std::endl;
std::cerr << "ERROR: port " << port_str << " is not a number" << std::endl;
okay = false;
}
else {
} else {
int32_t port = std::stol(port_str);
if (port < 1025 | port > 65534) {
std::cerr << "Error: port " << port_str << " out of range [1025, 65534]" << std::endl;
std::cerr << "ERROR: port " << port_str << " out of range [1025, 65534]" << std::endl;
okay = false;
}
}
std::string gpu_index_str = server_config.GetValue(CONFIG_GPU_INDEX, "0");
if (ValidationUtil::ValidateStringIsNumber(gpu_index_str) != SERVER_SUCCESS) {
std::cerr << "Error: gpu_index " << gpu_index_str << " is not a number" << std::endl;
std::cerr << "ERROR: gpu_index " << gpu_index_str << " is not a number" << std::endl;
okay = false;
}
else {
} else {
int32_t gpu_index = std::stol(gpu_index_str);
if (ValidationUtil::ValidateGpuIndex(gpu_index) != SERVER_SUCCESS) {
std::cerr << "Error: invalid gpu_index " << gpu_index_str << std::endl;
std::cerr << "ERROR: invalid gpu_index " << gpu_index_str << std::endl;
okay = false;
}
}
......@@ -133,6 +131,25 @@ ServerConfig::CheckServerConfig() {
okay = false;
}
std::string time_zone = server_config.GetValue(CONFIG_TIME_ZONE, "UTC+8");
int flag = 0;
if(time_zone.length() < 3)
flag = 1;
else if(time_zone.substr(0, 3) != "UTC")
flag = 1;
else if(time_zone.length() > 3){
try {
stoi(time_zone.substr(3, std::string::npos));
}
catch (std::invalid_argument &) {
flag = 1;
}
}
if(flag == 1){
std::cerr << "ERROR: time_zone " << time_zone << " is not in a right format" << std::endl;
okay = false;
}
return (okay ? SERVER_SUCCESS : SERVER_INVALID_ARGUMENT);
}
......@@ -163,33 +180,27 @@ ServerConfig::CheckDBConfig() {
okay = false;
}
std::string parallel_reduce_str = db_config.GetValue(CONFIG_DB_PARALLEL_REDUCE, "false");
if (ValidationUtil::ValidateStringIsBool(parallel_reduce_str) != SERVER_SUCCESS) {
std::cerr << "Error: invalid parallel_reduce config: " << parallel_reduce_str << std::endl;
okay = false;
}
std::string db_backend_url = db_config.GetValue(CONFIG_DB_URL);
if (ValidationUtil::ValidateDbURI(db_backend_url) != SERVER_SUCCESS) {
std::cerr << "Error: invalid db_backend_url " << db_backend_url << std::endl;
std::cerr << "ERROR: invalid db_backend_url: " << db_backend_url << std::endl;
okay = false;
}
std::string archive_disk_threshold_str = db_config.GetValue(CONFIG_DB_INSERT_BUFFER_SIZE, "0");
if (ValidationUtil::ValidateStringIsNumber(archive_disk_threshold_str) != SERVER_SUCCESS) {
std::cerr << "Error: archive_disk_threshold " << archive_disk_threshold_str << " is not a number" << std::endl;
std::cerr << "ERROR: archive_disk_threshold " << archive_disk_threshold_str << " is not a number" << std::endl;
okay = false;
}
std::string archive_days_threshold_str = db_config.GetValue(CONFIG_DB_INSERT_BUFFER_SIZE, "0");
if (ValidationUtil::ValidateStringIsNumber(archive_days_threshold_str) != SERVER_SUCCESS) {
std::cerr << "Error: archive_days_threshold " << archive_days_threshold_str << " is not a number" << std::endl;
std::cerr << "ERROR: archive_days_threshold " << archive_days_threshold_str << " is not a number" << std::endl;
okay = false;
}
std::string insert_buffer_size_str = db_config.GetValue(CONFIG_DB_INSERT_BUFFER_SIZE, "4");
if (ValidationUtil::ValidateStringIsNumber(insert_buffer_size_str) != SERVER_SUCCESS) {
std::cerr << "Error: insert_buffer_size " << insert_buffer_size_str << " is not a number" << std::endl;
std::cerr << "ERROR: insert_buffer_size " << insert_buffer_size_str << " is not a number" << std::endl;
okay = false;
}
else {
......@@ -198,7 +209,7 @@ ServerConfig::CheckDBConfig() {
unsigned long total_mem = 0, free_mem = 0;
CommonUtil::GetSystemMemInfo(total_mem, free_mem);
if (insert_buffer_size >= total_mem) {
std::cerr << "Error: insert_buffer_size exceed system memory" << std::endl;
std::cerr << "ERROR: insert_buffer_size exceed system memory" << std::endl;
okay = false;
}
}
......@@ -222,13 +233,13 @@ ServerConfig::CheckMetricConfig() {
std::string is_startup_str = metric_config.GetValue(CONFIG_METRIC_IS_STARTUP, "off");
if (ValidationUtil::ValidateStringIsBool(is_startup_str) != SERVER_SUCCESS) {
std::cerr << "Error: invalid is_startup config: " << is_startup_str << std::endl;
std::cerr << "ERROR: invalid is_startup config: " << is_startup_str << std::endl;
okay = false;
}
std::string port_str = metric_config.GetChild(CONFIG_PROMETHEUS).GetValue(CONFIG_METRIC_PROMETHEUS_PORT, "8080");
if (ValidationUtil::ValidateStringIsNumber(port_str) != SERVER_SUCCESS) {
std::cerr << "Error: port specified in prometheus_config " << port_str << " is not a number" << std::endl;
std::cerr << "ERROR: port specified in prometheus_config " << port_str << " is not a number" << std::endl;
okay = false;
}
......@@ -253,7 +264,7 @@ ServerConfig::CheckCacheConfig() {
std::string cpu_cache_capacity_str = cache_config.GetValue(CONFIG_CPU_CACHE_CAPACITY, "16");
if (ValidationUtil::ValidateStringIsNumber(cpu_cache_capacity_str) != SERVER_SUCCESS) {
std::cerr << "Error: cpu_cache_capacity " << cpu_cache_capacity_str << " is not a number" << std::endl;
std::cerr << "ERROR: cpu_cache_capacity " << cpu_cache_capacity_str << " is not a number" << std::endl;
okay = false;
}
else {
......@@ -262,7 +273,7 @@ ServerConfig::CheckCacheConfig() {
unsigned long total_mem = 0, free_mem = 0;
CommonUtil::GetSystemMemInfo(total_mem, free_mem);
if (cpu_cache_capacity >= total_mem) {
std::cerr << "Error: cpu_cache_capacity exceed system memory" << std::endl;
std::cerr << "ERROR: cpu_cache_capacity exceed system memory" << std::endl;
okay = false;
}
else if (cpu_cache_capacity > (double) total_mem * 0.9) {
......@@ -272,7 +283,7 @@ ServerConfig::CheckCacheConfig() {
uint64_t insert_buffer_size = (uint64_t) GetConfig(CONFIG_DB).GetInt32Value(CONFIG_DB_INSERT_BUFFER_SIZE, 4);
insert_buffer_size *= GB;
if (insert_buffer_size + cpu_cache_capacity >= total_mem) {
std::cerr << "Error: sum of cpu_cache_capacity and insert_buffer_size exceed system memory" << std::endl;
std::cerr << "ERROR: sum of cpu_cache_capacity and insert_buffer_size exceed system memory" << std::endl;
okay = false;
}
}
......@@ -280,23 +291,23 @@ ServerConfig::CheckCacheConfig() {
std::string cpu_cache_free_percent_str = cache_config.GetValue(CACHE_FREE_PERCENT, "0.85");
double cpu_cache_free_percent;
if (ValidationUtil::ValidateStringIsDouble(cpu_cache_free_percent_str, cpu_cache_free_percent) != SERVER_SUCCESS) {
std::cerr << "Error: cpu_cache_free_percent " << cpu_cache_free_percent_str << " is not a double" << std::endl;
std::cerr << "ERROR: cpu_cache_free_percent " << cpu_cache_free_percent_str << " is not a double" << std::endl;
okay = false;
}
else if (cpu_cache_free_percent < std::numeric_limits<double>::epsilon() || cpu_cache_free_percent > 1.0) {
std::cerr << "Error: invalid cpu_cache_free_percent " << cpu_cache_free_percent_str << std::endl;
std::cerr << "ERROR: invalid cpu_cache_free_percent " << cpu_cache_free_percent_str << std::endl;
okay = false;
}
std::string insert_cache_immediately_str = cache_config.GetValue(CONFIG_INSERT_CACHE_IMMEDIATELY, "false");
if (ValidationUtil::ValidateStringIsBool(insert_cache_immediately_str) != SERVER_SUCCESS) {
std::cerr << "Error: invalid insert_cache_immediately config: " << insert_cache_immediately_str << std::endl;
std::cerr << "ERROR: invalid insert_cache_immediately config: " << insert_cache_immediately_str << std::endl;
okay = false;
}
std::string gpu_cache_capacity_str = cache_config.GetValue(CONFIG_GPU_CACHE_CAPACITY, "5");
if (ValidationUtil::ValidateStringIsNumber(gpu_cache_capacity_str) != SERVER_SUCCESS) {
std::cerr << "Error: gpu_cache_capacity " << gpu_cache_capacity_str << " is not a number" << std::endl;
std::cerr << "ERROR: gpu_cache_capacity " << gpu_cache_capacity_str << " is not a number" << std::endl;
okay = false;
}
else {
......@@ -305,11 +316,11 @@ ServerConfig::CheckCacheConfig() {
int gpu_index = GetConfig(CONFIG_SERVER).GetInt32Value(CONFIG_GPU_INDEX, 0);
size_t gpu_memory;
if (ValidationUtil::GetGpuMemory(gpu_index, gpu_memory) != SERVER_SUCCESS) {
std::cerr << "Error: could not get gpu memory for device " << gpu_index << std::endl;
std::cerr << "ERROR: could not get gpu memory for device " << gpu_index << std::endl;
okay = false;
}
else if (gpu_cache_capacity >= gpu_memory) {
std::cerr << "Error: gpu_cache_capacity " << gpu_cache_capacity
std::cerr << "ERROR: gpu_cache_capacity " << gpu_cache_capacity
<< " exceed total gpu memory " << gpu_memory << std::endl;
okay = false;
}
......@@ -321,11 +332,11 @@ ServerConfig::CheckCacheConfig() {
std::string gpu_cache_free_percent_str = cache_config.GetValue(GPU_CACHE_FREE_PERCENT, "0.85");
double gpu_cache_free_percent;
if (ValidationUtil::ValidateStringIsDouble(gpu_cache_free_percent_str, gpu_cache_free_percent) != SERVER_SUCCESS) {
std::cerr << "Error: gpu_cache_free_percent " << gpu_cache_free_percent_str << " is not a double" << std::endl;
std::cerr << "ERROR: gpu_cache_free_percent " << gpu_cache_free_percent_str << " is not a double" << std::endl;
okay = false;
}
else if (gpu_cache_free_percent < std::numeric_limits<double>::epsilon() || gpu_cache_free_percent > 1.0) {
std::cerr << "Error: invalid gpu_cache_free_percent " << gpu_cache_free_percent << std::endl;
std::cerr << "ERROR: invalid gpu_cache_free_percent " << gpu_cache_free_percent << std::endl;
okay = false;
}
......@@ -333,11 +344,11 @@ ServerConfig::CheckCacheConfig() {
for (std::string &gpu_id : conf_gpu_ids) {
if (ValidationUtil::ValidateStringIsNumber(gpu_id) != SERVER_SUCCESS) {
std::cerr << "Error: gpu_id " << gpu_id << " is not a number" << std::endl;
std::cerr << "ERROR: gpu_id " << gpu_id << " is not a number" << std::endl;
okay = false;
}
else if (ValidationUtil::ValidateGpuIndex(std::stol(gpu_id)) != SERVER_SUCCESS) {
std::cerr << "Error: gpu_id " << gpu_id << " is invalid" << std::endl;
std::cerr << "ERROR: gpu_id " << gpu_id << " is invalid" << std::endl;
okay = false;
}
}
......@@ -357,20 +368,19 @@ ServerConfig::CheckEngineConfig() {
std::string use_blas_threshold_str = engine_config.GetValue(CONFIG_DCBT, "20");
if (ValidationUtil::ValidateStringIsNumber(use_blas_threshold_str) != SERVER_SUCCESS) {
std::cerr << "Error: use_blas_threshold " << use_blas_threshold_str << " is not a number" << std::endl;
std::cerr << "ERROR: use_blas_threshold " << use_blas_threshold_str << " is not a number" << std::endl;
okay = false;
}
std::string omp_thread_num_str = engine_config.GetValue(CONFIG_OMP_THREAD_NUM, "0");
if (ValidationUtil::ValidateStringIsNumber(omp_thread_num_str) != SERVER_SUCCESS) {
std::cerr << "Error: omp_thread_num " << omp_thread_num_str << " is not a number" << std::endl;
std::cerr << "ERROR: omp_thread_num " << omp_thread_num_str << " is not a number" << std::endl;
okay = false;
}
else {
} else {
int32_t omp_thread = std::stol(omp_thread_num_str);
uint32_t sys_thread_cnt = 8;
if (omp_thread > CommonUtil::GetSystemAvailableThreads(sys_thread_cnt)) {
std::cerr << "Error: omp_thread_num " << omp_thread_num_str << " > system available thread "
std::cerr << "ERROR: omp_thread_num " << omp_thread_num_str << " > system available thread "
<< sys_thread_cnt << std::endl;
okay = false;
}
......@@ -428,7 +438,7 @@ ServerConfig::CheckResourceConfig() {
bool okay = true;
server::ConfigNode resource_config = GetConfig(CONFIG_RESOURCE);
if (resource_config.GetChildren().empty()) {
std::cerr << "Error: no context under resource" << std::endl;
std::cerr << "ERROR: no context under resource" << std::endl;
okay = false;
}
......@@ -452,23 +462,21 @@ ServerConfig::CheckResourceConfig() {
std::string device_id_str = resource_conf.GetValue(CONFIG_RESOURCE_DEVICE_ID, "0");
int32_t device_id = -1;
if (ValidationUtil::ValidateStringIsNumber(device_id_str) != SERVER_SUCCESS) {
std::cerr << "Error: device_id " << device_id_str << " is not a number" << std::endl;
std::cerr << "ERROR: device_id " << device_id_str << " is not a number" << std::endl;
okay = false;
}
else {
} else {
device_id = std::stol(device_id_str);
}
std::string enable_executor_str = resource_conf.GetValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, "off");
if (ValidationUtil::ValidateStringIsBool(enable_executor_str) != SERVER_SUCCESS) {
std::cerr << "Error: invalid enable_executor config: " << enable_executor_str << std::endl;
std::cerr << "ERROR: invalid enable_executor config: " << enable_executor_str << std::endl;
okay = false;
}
if (type == "DISK") {
hasDisk = true;
}
else if (type == "CPU") {
} else if (type == "CPU") {
hasCPU = true;
if (resource_conf.GetBoolValue(CONFIG_RESOURCE_ENABLE_EXECUTOR, false)) {
hasExecutor = true;
......@@ -484,32 +492,32 @@ ServerConfig::CheckResourceConfig() {
}
std::string gpu_resource_num_str = resource_conf.GetValue(CONFIG_RESOURCE_NUM, "2");
if (ValidationUtil::ValidateStringIsNumber(gpu_resource_num_str) != SERVER_SUCCESS) {
std::cerr << "Error: gpu_resource_num " << gpu_resource_num_str << " is not a number" << std::endl;
std::cerr << "ERROR: gpu_resource_num " << gpu_resource_num_str << " is not a number" << std::endl;
okay = false;
}
bool mem_valid = true;
std::string pinned_memory_str = resource_conf.GetValue(CONFIG_RESOURCE_PIN_MEMORY, "300");
if (ValidationUtil::ValidateStringIsNumber(pinned_memory_str) != SERVER_SUCCESS) {
std::cerr << "Error: pinned_memory " << pinned_memory_str << " is not a number" << std::endl;
std::cerr << "ERROR: pinned_memory " << pinned_memory_str << " is not a number" << std::endl;
okay = false;
mem_valid = false;
}
std::string temp_memory_str = resource_conf.GetValue(CONFIG_RESOURCE_TEMP_MEMORY, "300");
if (ValidationUtil::ValidateStringIsNumber(temp_memory_str) != SERVER_SUCCESS) {
std::cerr << "Error: temp_memory " << temp_memory_str << " is not a number" << std::endl;
std::cerr << "ERROR: temp_memory " << temp_memory_str << " is not a number" << std::endl;
okay = false;
mem_valid = false;
}
if (mem_valid) {
size_t gpu_memory;
if (ValidationUtil::GetGpuMemory(device_id, gpu_memory) != SERVER_SUCCESS) {
std::cerr << "Error: could not get gpu memory for device " << device_id << std::endl;
std::cerr << "ERROR: could not get gpu memory for device " << device_id << std::endl;
okay = false;
}
else {
size_t prealoc_mem = std::stol(pinned_memory_str) + std::stol(temp_memory_str);
if (prealoc_mem >= gpu_memory) {
std::cerr << "Error: sum of pinned_memory and temp_memory " << prealoc_mem
std::cerr << "ERROR: sum of pinned_memory and temp_memory " << prealoc_mem
<< " exceeds total gpu memory " << gpu_memory << " for device " << device_id << std::endl;
okay = false;
}
......@@ -537,7 +545,7 @@ ServerConfig::CheckResourceConfig() {
std::string speed_str = connection_conf.GetValue(CONFIG_SPEED_CONNECTIONS);
if (ValidationUtil::ValidateStringIsNumber(speed_str) != SERVER_SUCCESS) {
std::cerr << "Error: speed " << speed_str << " is not a number" << std::endl;
std::cerr << "ERROR: speed " << speed_str << " is not a number" << std::endl;
okay = false;
}
......@@ -545,18 +553,17 @@ ServerConfig::CheckResourceConfig() {
std::string delimiter = "===";
auto delimiter_pos = endpoint_str.find(delimiter);
if (delimiter_pos == std::string::npos) {
std::cerr << "Error: invalid endpoint format: " << endpoint_str << std::endl;
std::cerr << "ERROR: invalid endpoint format: " << endpoint_str << std::endl;
okay = false;
}
else {
} else {
std::string left_resource = endpoint_str.substr(0, delimiter_pos);
if (resource_list.find(left_resource) == resource_list.end()) {
std::cerr << "Error: left resource " << left_resource << " does not exist" << std::endl;
std::cerr << "ERROR: left resource " << left_resource << " does not exist" << std::endl;
okay = false;
}
std::string right_resource = endpoint_str.substr(delimiter_pos + delimiter.length(), endpoint_str.length());
if (resource_list.find(right_resource) == resource_list.end()) {
std::cerr << "Error: right resource " << right_resource << " does not exist" << std::endl;
std::cerr << "ERROR: right resource " << right_resource << " does not exist" << std::endl;
okay = false;
}
}
......
......@@ -19,6 +19,7 @@ static const char* CONFIG_SERVER_ADDRESS = "address";
static const char* CONFIG_SERVER_PORT = "port";
static const char* CONFIG_CLUSTER_MODE = "mode";
static const char* CONFIG_GPU_INDEX = "gpu_index";
static const char* CONFIG_TIME_ZONE = "time_zone";
static const char* CONFIG_DB = "db_config";
static const char* CONFIG_DB_URL = "db_backend_url";
......
......@@ -16,7 +16,7 @@ namespace grpc {
using namespace ::milvus;
namespace {
const std::map<ErrorCode, ::milvus::grpc::ErrorCode> &ErrorMap() {
::milvus::grpc::ErrorCode ErrorMap(ErrorCode code) {
static const std::map<ErrorCode, ::milvus::grpc::ErrorCode> code_map = {
{SERVER_UNEXPECTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
{SERVER_UNSUPPORTED_ERROR, ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR},
......@@ -40,8 +40,9 @@ namespace {
{SERVER_INVALID_ROWRECORD_ARRAY, ::milvus::grpc::ErrorCode::ILLEGAL_ROWRECORD},
{SERVER_INVALID_TOPK, ::milvus::grpc::ErrorCode::ILLEGAL_TOPK},
{SERVER_INVALID_NPROBE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_INVALID_INDEX_METRIC_TYPE,::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_INVALID_INDEX_NLIST, ::milvus::grpc::ErrorCode::ILLEGAL_NLIST},
{SERVER_INVALID_INDEX_METRIC_TYPE,::milvus::grpc::ErrorCode::ILLEGAL_METRIC_TYPE},
{SERVER_INVALID_INDEX_FILE_SIZE, ::milvus::grpc::ErrorCode::ILLEGAL_ARGUMENT},
{SERVER_ILLEGAL_VECTOR_ID, ::milvus::grpc::ErrorCode::ILLEGAL_VECTOR_ID},
{SERVER_ILLEGAL_SEARCH_RESULT, ::milvus::grpc::ErrorCode::ILLEGAL_SEARCH_RESULT},
{SERVER_CACHE_ERROR, ::milvus::grpc::ErrorCode::CACHE_FAILED},
......@@ -49,7 +50,11 @@ namespace {
{SERVER_BUILD_INDEX_ERROR, ::milvus::grpc::ErrorCode::BUILD_INDEX_ERROR},
};
return code_map;
if(code_map.find(code) != code_map.end()) {
return code_map.at(code);
} else {
return ::milvus::grpc::ErrorCode::UNEXPECTED_ERROR;
}
}
}
......@@ -115,7 +120,7 @@ void GrpcRequestScheduler::ExecTask(BaseTaskPtr &task_ptr, ::milvus::grpc::Statu
ErrorCode err = task_ptr->ErrorID();
if (err != SERVER_SUCCESS) {
grpc_status->set_reason(task_ptr->ErrorMsg());
grpc_status->set_error_code(ErrorMap().at(err));
grpc_status->set_error_code(ErrorMap(err));
}
}
}
......
......@@ -463,12 +463,12 @@ InsertTask::OnExecute() {
bool user_provide_ids = !insert_param_->row_id_array().empty();
//user already provided id before, all insert action require user id
if((table_info.flag_ & engine::meta::FLAG_MASK_HAS_USERID) && !user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are user defined, please provide id for this batch");
return SetError(SERVER_ILLEGAL_VECTOR_ID, "Table vector ids are user defined, please provide id for this batch");
}
//user didn't provided id before, no need to provide user id
if((table_info.flag_ & engine::meta::FLAG_MASK_NO_USERID) && user_provide_ids) {
return SetError(SERVER_INVALID_ARGUMENT, "Table vector ids are auto generated, no need to provide id for this batch");
return SetError(SERVER_ILLEGAL_VECTOR_ID, "Table vector ids are auto generated, no need to provide id for this batch");
}
rc.RecordSection("check validation");
......@@ -484,12 +484,12 @@ InsertTask::OnExecute() {
// TODO: change to one dimension array in protobuf or use multiple-thread to copy the data
for (size_t i = 0; i < insert_param_->row_record_array_size(); i++) {
if (insert_param_->row_record_array(i).vector_data().empty()) {
return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record float array is empty");
return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array data is empty");
}
uint64_t vec_dim = insert_param_->row_record_array(i).vector_data().size();
if (vec_dim != table_info.dimension_) {
ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
std::string error_msg = "Invalid rowrecord dimension: " + std::to_string(vec_dim)
std::string error_msg = "Invalid row record dimension: " + std::to_string(vec_dim)
+ " vs. table dimension:" +
std::to_string(table_info.dimension_);
return SetError(error_code, error_msg);
......@@ -631,12 +631,12 @@ SearchTask::OnExecute() {
std::vector<float> vec_f(record_array_size * table_info.dimension_, 0);
for (size_t i = 0; i < record_array_size; i++) {
if (search_param_->query_record_array(i).vector_data().empty()) {
return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Query record float array is empty");
return SetError(SERVER_INVALID_ROWRECORD_ARRAY, "Row record array data is empty");
}
uint64_t query_vec_dim = search_param_->query_record_array(i).vector_data().size();
if (query_vec_dim != table_info.dimension_) {
ErrorCode error_code = SERVER_INVALID_VECTOR_DIMENSION;
std::string error_msg = "Invalid rowrecord dimension: " + std::to_string(query_vec_dim)
std::string error_msg = "Invalid row record dimension: " + std::to_string(query_vec_dim)
+ " vs. table dimension:" + std::to_string(table_info.dimension_);
return SetError(error_code, error_msg);
}
......
......@@ -5,16 +5,16 @@
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <boost/filesystem.hpp>
#include <vector>
#include "db/engine/EngineFactory.h"
#include "db/engine/ExecutionEngineImpl.h"
#include "server/ServerConfig.h"
#include <vector>
#include "utils.h"
using namespace zilliz::milvus;
TEST(EngineTest, FACTORY_TEST) {
TEST_F(EngineTest, FACTORY_TEST) {
{
auto engine_ptr = engine::EngineFactory::Build(
512,
......@@ -76,7 +76,7 @@ TEST(EngineTest, FACTORY_TEST) {
}
}
TEST(EngineTest, ENGINE_IMPL_TEST) {
TEST_F(EngineTest, ENGINE_IMPL_TEST) {
uint16_t dimension = 64;
std::string file_path = "/tmp/milvus_index_1";
auto engine_ptr = engine::EngineFactory::Build(
......@@ -105,19 +105,19 @@ TEST(EngineTest, ENGINE_IMPL_TEST) {
ASSERT_EQ(engine_ptr->Dimension(), dimension);
ASSERT_EQ(engine_ptr->Count(), ids.size());
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
config.AddSequenceItem(server::CONFIG_GPU_IDS, "0");
status = engine_ptr->CopyToGpu(0);
//ASSERT_TRUE(status.ok());
auto new_engine = engine_ptr->Clone();
ASSERT_EQ(new_engine->Dimension(), dimension);
ASSERT_EQ(new_engine->Count(), ids.size());
status = new_engine->CopyToCpu();
//ASSERT_TRUE(status.ok());
auto engine_build = new_engine->BuildIndex("/tmp/milvus_index_2", engine::EngineType::FAISS_IVFSQ8);
//ASSERT_TRUE(status.ok());
// server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
// config.AddSequenceItem(server::CONFIG_GPU_IDS, "0");
//
// status = engine_ptr->CopyToGpu(0);
// //ASSERT_TRUE(status.ok());
//
// auto new_engine = engine_ptr->Clone();
// ASSERT_EQ(new_engine->Dimension(), dimension);
// ASSERT_EQ(new_engine->Count(), ids.size());
// status = new_engine->CopyToCpu();
// //ASSERT_TRUE(status.ok());
//
// auto engine_build = new_engine->BuildIndex("/tmp/milvus_index_2", engine::EngineType::FAISS_IVFSQ8);
// //ASSERT_TRUE(status.ok());
}
......@@ -55,9 +55,6 @@ void BuildVectors(int64_t n, std::vector<float> &vectors) {
}
TEST_F(MemManagerTest, VECTOR_SOURCE_TEST) {
std::shared_ptr<engine::meta::SqliteMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
engine::meta::TableSchema table_schema = BuildTableSchema();
auto status = impl_->CreateTable(table_schema);
ASSERT_TRUE(status.ok());
......@@ -96,16 +93,10 @@ TEST_F(MemManagerTest, VECTOR_SOURCE_TEST) {
vector_ids = source.GetVectorIds();
ASSERT_EQ(vector_ids.size(), 100);
status = impl_->DropAll();
ASSERT_TRUE(status.ok());
}
TEST_F(MemManagerTest, MEM_TABLE_FILE_TEST) {
std::shared_ptr<engine::meta::SqliteMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
auto options = engine::OptionsFactory::Build();
auto options = GetOptions();
engine::meta::TableSchema table_schema = BuildTableSchema();
auto status = impl_->CreateTable(table_schema);
......@@ -143,15 +134,10 @@ TEST_F(MemManagerTest, MEM_TABLE_FILE_TEST) {
ASSERT_EQ(vector_ids.size(), n_max - n_100);
ASSERT_TRUE(mem_table_file.IsFull());
status = impl_->DropAll();
ASSERT_TRUE(status.ok());
}
TEST_F(MemManagerTest, MEM_TABLE_TEST) {
std::shared_ptr<engine::meta::SqliteMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
auto options = engine::OptionsFactory::Build();
auto options = GetOptions();
engine::meta::TableSchema table_schema = BuildTableSchema();
auto status = impl_->CreateTable(table_schema);
......@@ -211,9 +197,6 @@ TEST_F(MemManagerTest, MEM_TABLE_TEST) {
status = mem_table.Serialize();
ASSERT_TRUE(status.ok());
status = impl_->DropAll();
ASSERT_TRUE(status.ok());
}
TEST_F(MemManagerTest2, SERIAL_INSERT_SEARCH_TEST) {
......
......@@ -101,8 +101,7 @@ TEST_F(MetaTest, TABLE_FILE_TEST) {
meta::TableFilesSchema files;
status = impl_->GetTableFiles(table_file.table_id_, ids, files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), 1UL);
ASSERT_TRUE(files[0].file_type_ == meta::TableFileSchema::TO_DELETE);
ASSERT_EQ(files.size(), 0UL);
}
TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
......@@ -150,8 +149,6 @@ TEST_F(MetaTest, ARCHIVE_TEST_DAYS) {
for(auto& file : files_get) {
if (days[i] < days_num) {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::NEW);
} else {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::TO_DELETE);
}
i++;
}
......@@ -195,9 +192,7 @@ TEST_F(MetaTest, ARCHIVE_TEST_DISK) {
ASSERT_TRUE(status.ok());
for(auto& file : files_get) {
if (i < 5) {
ASSERT_TRUE(file.file_type_ == meta::TableFileSchema::TO_DELETE);
} else {
if (i >= 5) {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::NEW);
}
++i;
......@@ -277,38 +272,31 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
meta::TableFilesSchema files;
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatePartionedTableFilesSchema dated_files;
status = impl_->FilesToMerge(table.table_id_, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), raw_files_cnt);
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatesT dates = {table_file.date_};
std::vector<size_t> ids;
status = impl_->FilesToSearch(table_id, ids, dates, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt);
status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt);
status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt);
ids.push_back(size_t(9999999999));
status = impl_->FilesToSearch(table_id, ids, dates, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),0);
std::vector<int> file_types;
......@@ -336,6 +324,12 @@ TEST_F(MetaTest, TABLE_FILES_TEST) {
status = impl_->DeleteTableFiles(table_id);
ASSERT_TRUE(status.ok());
status = impl_->CreateTableFile(table_file);
table_file.file_type_ = meta::TableFileSchema::NEW;
status = impl_->UpdateTableFile(table_file);
status = impl_->CleanUp();
ASSERT_TRUE(status.ok());
status = impl_->DeleteTable(table_id);
ASSERT_TRUE(status.ok());
......
......@@ -128,6 +128,14 @@ TEST(DBMiscTest, UTILS_TEST) {
ASSERT_TRUE(boost::filesystem::exists(path));
}
options.slave_paths.push_back("/");
status = engine::utils::CreateTablePath(options, TABLE_NAME);
ASSERT_FALSE(status.ok());
options.path = "/";
status = engine::utils::CreateTablePath(options, TABLE_NAME);
ASSERT_FALSE(status.ok());
engine::meta::TableFileSchema file;
file.id_ = 50;
file.table_id_ = TABLE_NAME;
......@@ -142,6 +150,4 @@ TEST(DBMiscTest, UTILS_TEST) {
status = engine::utils::DeleteTableFilePath(options, file);
ASSERT_TRUE(status.ok());
}
\ No newline at end of file
......@@ -108,9 +108,7 @@ TEST_F(MySqlMetaTest, TABLE_FILE_TEST) {
std::vector<size_t> ids = {table_file.id_};
meta::TableFilesSchema files;
status = impl_->GetTableFiles(table_file.table_id_, ids, files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), 1UL);
ASSERT_TRUE(files[0].file_type_ == meta::TableFileSchema::TO_DELETE);
ASSERT_EQ(files.size(), 0UL);
}
TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) {
......@@ -159,8 +157,6 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DAYS) {
for(auto& file : files_get) {
if (days[i] < days_num) {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::NEW);
} else {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::TO_DELETE);
}
i++;
}
......@@ -219,9 +215,7 @@ TEST_F(MySqlMetaTest, ARCHIVE_TEST_DISK) {
ASSERT_TRUE(status.ok());
for(auto& file : files_get) {
if (i < 5) {
ASSERT_TRUE(file.file_type_ == meta::TableFileSchema::TO_DELETE);
} else {
if (i >= 5) {
ASSERT_EQ(file.file_type_, meta::TableFileSchema::NEW);
}
++i;
......@@ -302,38 +296,31 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
meta::TableFilesSchema files;
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatePartionedTableFilesSchema dated_files;
status = impl_->FilesToMerge(table.table_id_, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(), raw_files_cnt);
status = impl_->FilesToIndex(files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(files.size(), to_index_files_cnt);
meta::DatesT dates = {table_file.date_};
std::vector<size_t> ids;
status = impl_->FilesToSearch(table_id, ids, dates, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt);
status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt);
status = impl_->FilesToSearch(table_id, ids, meta::DatesT(), dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),
to_index_files_cnt+raw_files_cnt+index_files_cnt);
ids.push_back(size_t(9999999999));
status = impl_->FilesToSearch(table_id, ids, dates, dated_files);
ASSERT_TRUE(status.ok());
ASSERT_EQ(dated_files[table_file.date_].size(),0);
std::vector<int> file_types;
......@@ -364,7 +351,7 @@ TEST_F(MySqlMetaTest, TABLE_FILES_TEST) {
status = impl_->DeleteTable(table_id);
ASSERT_TRUE(status.ok());
status = impl_->CleanUpFilesWithTTL(1UL);
status = impl_->CleanUpFilesWithTTL(0UL);
ASSERT_TRUE(status.ok());
}
......
......@@ -10,6 +10,8 @@
#include <gtest/gtest.h>
#include <cmath>
#include <vector>
#include <src/scheduler/task/SearchTask.h>
using namespace zilliz::milvus;
......@@ -114,23 +116,23 @@ TEST(DBSearchTest, TOPK_TEST) {
std::vector<long> target_ids;
std::vector<float> target_distence;
engine::SearchContext::ResultSet src_result;
auto status = engine::SearchTask::ClusterResult(target_ids, target_distence, NQ, TOP_K, src_result);
auto status = engine::XSearchTask::ClusterResult(target_ids, target_distence, NQ, TOP_K, src_result);
ASSERT_FALSE(status.ok());
ASSERT_TRUE(src_result.empty());
BuildResult(NQ, TOP_K, ascending, target_ids, target_distence);
status = engine::SearchTask::ClusterResult(target_ids, target_distence, NQ, TOP_K, src_result);
status = engine::XSearchTask::ClusterResult(target_ids, target_distence, NQ, TOP_K, src_result);
ASSERT_TRUE(status.ok());
ASSERT_EQ(src_result.size(), NQ);
engine::SearchContext::ResultSet target_result;
status = engine::SearchTask::TopkResult(target_result, TOP_K, ascending, target_result);
status = engine::XSearchTask::TopkResult(target_result, TOP_K, ascending, target_result);
ASSERT_TRUE(status.ok());
status = engine::SearchTask::TopkResult(target_result, TOP_K, ascending, src_result);
status = engine::XSearchTask::TopkResult(target_result, TOP_K, ascending, src_result);
ASSERT_FALSE(status.ok());
status = engine::SearchTask::TopkResult(src_result, TOP_K, ascending, target_result);
status = engine::XSearchTask::TopkResult(src_result, TOP_K, ascending, target_result);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(src_result.empty());
ASSERT_EQ(target_result.size(), NQ);
......@@ -140,10 +142,10 @@ TEST(DBSearchTest, TOPK_TEST) {
uint64_t wrong_topk = TOP_K - 10;
BuildResult(NQ, wrong_topk, ascending, src_ids, src_distence);
status = engine::SearchTask::ClusterResult(src_ids, src_distence, NQ, wrong_topk, src_result);
status = engine::XSearchTask::ClusterResult(src_ids, src_distence, NQ, wrong_topk, src_result);
ASSERT_TRUE(status.ok());
status = engine::SearchTask::TopkResult(src_result, TOP_K, ascending, target_result);
status = engine::XSearchTask::TopkResult(src_result, TOP_K, ascending, target_result);
ASSERT_TRUE(status.ok());
for(uint64_t i = 0; i < NQ; i++) {
ASSERT_EQ(target_result[i].size(), TOP_K);
......@@ -152,7 +154,7 @@ TEST(DBSearchTest, TOPK_TEST) {
wrong_topk = TOP_K + 10;
BuildResult(NQ, wrong_topk, ascending, src_ids, src_distence);
status = engine::SearchTask::TopkResult(src_result, TOP_K, ascending, target_result);
status = engine::XSearchTask::TopkResult(src_result, TOP_K, ascending, target_result);
ASSERT_TRUE(status.ok());
for(uint64_t i = 0; i < NQ; i++) {
ASSERT_EQ(target_result[i].size(), TOP_K);
......@@ -170,15 +172,15 @@ TEST(DBSearchTest, MERGE_TEST) {
uint64_t src_count = 5, target_count = 8;
BuildResult(1, src_count, ascending, src_ids, src_distence);
BuildResult(1, target_count, ascending, target_ids, target_distence);
auto status = engine::SearchTask::ClusterResult(src_ids, src_distence, 1, src_count, src_result);
auto status = engine::XSearchTask::ClusterResult(src_ids, src_distence, 1, src_count, src_result);
ASSERT_TRUE(status.ok());
status = engine::SearchTask::ClusterResult(target_ids, target_distence, 1, target_count, target_result);
status = engine::XSearchTask::ClusterResult(target_ids, target_distence, 1, target_count, target_result);
ASSERT_TRUE(status.ok());
{
engine::SearchContext::Id2DistanceMap src = src_result[0];
engine::SearchContext::Id2DistanceMap target = target_result[0];
status = engine::SearchTask::MergeResult(src, target, 10, ascending);
status = engine::XSearchTask::MergeResult(src, target, 10, ascending);
ASSERT_TRUE(status.ok());
ASSERT_EQ(target.size(), 10);
CheckResult(src_result[0], target_result[0], target, ascending);
......@@ -187,7 +189,7 @@ TEST(DBSearchTest, MERGE_TEST) {
{
engine::SearchContext::Id2DistanceMap src = src_result[0];
engine::SearchContext::Id2DistanceMap target;
status = engine::SearchTask::MergeResult(src, target, 10, ascending);
status = engine::XSearchTask::MergeResult(src, target, 10, ascending);
ASSERT_TRUE(status.ok());
ASSERT_EQ(target.size(), src_count);
ASSERT_TRUE(src.empty());
......@@ -197,7 +199,7 @@ TEST(DBSearchTest, MERGE_TEST) {
{
engine::SearchContext::Id2DistanceMap src = src_result[0];
engine::SearchContext::Id2DistanceMap target = target_result[0];
status = engine::SearchTask::MergeResult(src, target, 30, ascending);
status = engine::XSearchTask::MergeResult(src, target, 30, ascending);
ASSERT_TRUE(status.ok());
ASSERT_EQ(target.size(), src_count + target_count);
CheckResult(src_result[0], target_result[0], target, ascending);
......@@ -206,7 +208,7 @@ TEST(DBSearchTest, MERGE_TEST) {
{
engine::SearchContext::Id2DistanceMap target = src_result[0];
engine::SearchContext::Id2DistanceMap src = target_result[0];
status = engine::SearchTask::MergeResult(src, target, 30, ascending);
status = engine::XSearchTask::MergeResult(src, target, 30, ascending);
ASSERT_TRUE(status.ok());
ASSERT_EQ(target.size(), src_count + target_count);
CheckResult(src_result[0], target_result[0], target, ascending);
......@@ -229,7 +231,7 @@ TEST(DBSearchTest, PARALLEL_CLUSTER_TEST) {
BuildResult(nq, topk, ascending, target_ids, target_distence);
rc.RecordSection("build id/dietance map");
auto status = engine::SearchTask::ClusterResult(target_ids, target_distence, nq, topk, src_result);
auto status = engine::XSearchTask::ClusterResult(target_ids, target_distence, nq, topk, src_result);
ASSERT_TRUE(status.ok());
ASSERT_EQ(src_result.size(), nq);
......@@ -269,14 +271,14 @@ TEST(DBSearchTest, PARALLEL_TOPK_TEST) {
server::TimeRecorder rc("DoCluster");
BuildResult(nq, topk, ascending, target_ids, target_distence);
auto status = engine::SearchTask::ClusterResult(target_ids, target_distence, nq, topk, src_result);
auto status = engine::XSearchTask::ClusterResult(target_ids, target_distence, nq, topk, src_result);
rc.RecordSection("cluster result");
BuildResult(nq, insufficient_topk, ascending, insufficient_ids, insufficient_distence);
status = engine::SearchTask::ClusterResult(target_ids, target_distence, nq, insufficient_topk, insufficient_result);
status = engine::XSearchTask::ClusterResult(target_ids, target_distence, nq, insufficient_topk, insufficient_result);
rc.RecordSection("cluster result");
engine::SearchTask::TopkResult(insufficient_result, topk, ascending, src_result);
engine::XSearchTask::TopkResult(insufficient_result, topk, ascending, src_result);
ASSERT_TRUE(status.ok());
rc.RecordSection("topk");
......
......@@ -47,6 +47,12 @@ void BaseTest::InitLog() {
void BaseTest::SetUp() {
InitLog();
zilliz::knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(0, 1024*1024*200, 1024*1024*300, 2);
}
void BaseTest::TearDown() {
zilliz::knowhere::FaissGpuResourceMgr::GetInstance().Free();
}
engine::Options BaseTest::GetOptions() {
......@@ -60,8 +66,6 @@ engine::Options BaseTest::GetOptions() {
void DBTest::SetUp() {
BaseTest::SetUp();
zilliz::knowhere::FaissGpuResourceMgr::GetInstance().InitDevice(0, 1024*1024*200, 1024*1024*300, 2);
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
config.AddSequenceItem(server::CONFIG_GPU_IDS, "0");
......@@ -87,7 +91,7 @@ void DBTest::TearDown() {
db_->DropAll();
delete db_;
zilliz::knowhere::FaissGpuResourceMgr::GetInstance().Free();
BaseTest::TearDown();
engine::ResMgrInst::GetInstance()->Stop();
engine::SchedInst::GetInstance()->Stop();
......@@ -116,6 +120,8 @@ void MetaTest::SetUp() {
void MetaTest::TearDown() {
impl_->DropAll();
BaseTest::TearDown();
auto options = GetOptions();
boost::filesystem::remove_all(options.meta.path);
}
......@@ -144,6 +150,8 @@ void MySqlMetaTest::SetUp() {
void MySqlMetaTest::TearDown() {
impl_->DropAll();
BaseTest::TearDown();
auto options = GetOptions();
boost::filesystem::remove_all(options.meta.path);
}
......
......@@ -37,6 +37,7 @@ protected:
void InitLog();
virtual void SetUp() override;
virtual void TearDown() override;
virtual zilliz::milvus::engine::Options GetOptions();
};
......@@ -55,6 +56,10 @@ class DBTest2 : public DBTest {
virtual zilliz::milvus::engine::Options GetOptions() override;
};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class EngineTest : public DBTest {
};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class MetaTest : public BaseTest {
protected:
......@@ -82,7 +87,7 @@ class MySqlMetaTest : public BaseTest {
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class MemManagerTest : public BaseTest {
class MemManagerTest : public MetaTest {
};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......
server_config:
address: 0.0.0.0
address: 0.0.0.0 # milvus server ip address (IPv4)
port: 19530 # the port milvus listen to, default: 19530, range: 1025 ~ 65534
gpu_index: 0 # the gpu milvus use, default: 0, range: 0 ~ gpu number - 1
mode: single # milvus deployment type: single, cluster, read_only
db_config:
db_path: /tmp/milvus # milvus data storage path
#URI format: dialect://username:password@host:port/database
#All parts except dialect are optional, but you MUST include the delimiters
#Currently supports mysql or sqlite
db_backend_url: mysql://root:1234@:/test # meta database uri
index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB
archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB
archive_days_threshold: 30 # files older than x days will be archived, unit: day
db_slave_path: # secondry data storage path, split by semicolon
metric_config:
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
# URI format: dialect://username:password@host:port/database
# All parts except dialect are optional, but you MUST include the delimiters
# Currently dialect supports mysql or sqlite
db_backend_url: sqlite://:@:/
archive_disk_threshold: 0 # triger archive action if storage size exceed this value, 0 means no limit, unit: GB
archive_days_threshold: 0 # files older than x days will be archived, 0 means no limit, unit: day
insert_buffer_size: 4 # maximum insert buffer size allowed, default: 4, unit: GB, should be at least 1 GB.
# the sum of insert_buffer_size and cpu_cache_capacity should be less than total memory, unit: GB
license_config: # license configure
license_path: "/tmp/milvus/system.license" # license file path
metric_config:
is_startup: off # if monitoring start: on, off
collector: prometheus # metrics collector: prometheus
prometheus_config: # following are prometheus configure
port: 8080 # the port prometheus use to fetch metrics
push_gateway_ip_address: 127.0.0.1 # push method configure: push gateway ip address
push_gateway_port: 9091 # push method configure: push gateway port
cache_config: # cache configure
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cache_config:
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
gpu_cache_capacity: 5 # how many memory are used as cache in gpu, unit: GB, RANGE: 0 ~ less than total memory
gpu_cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
gpu_ids: # gpu id
- 0
- 1
engine_config:
nprobe: 3000
\ No newline at end of file
use_blas_threshold: 20
resource_config:
# resource list, length: 0~N
# please set a DISK resource and a CPU resource least, or system will not return query result.
#
# example:
# resource_name: # resource name, just using in connections below
# type: DISK # resource type, optional: DISK/CPU/GPU
# device_id: 0
# enable_executor: false # if is enable executor, optional: true, false
resources:
ssda:
type: DISK
device_id: 0
enable_executor: false
cpu:
type: CPU
device_id: 0
enable_executor: false
gpu0:
type: GPU
device_id: 0
enable_executor: true
gpu_resource_num: 2
pinned_memory: 300
temp_memory: 300
# connection list, length: 0~N
# example:
# connection_name:
# speed: 100 # unit: MS/s
# endpoint: ===
connections:
io:
speed: 500
endpoint: ssda===cpu
pcie0:
speed: 11000
endpoint: cpu===gpu0
......@@ -101,7 +101,8 @@ TEST(ConfigTest, SERVER_CONFIG_TEST) {
err = server::ServerConfig::GetInstance().ValidateConfig();
ASSERT_EQ(err, SERVER_SUCCESS);
server::ConfigNode node1 = config.GetConfig("server_config");
const server::ServerConfig& config_const = config;
server::ConfigNode node1 = config_const.GetConfig("server_config");
server::ConfigNode& node2 = config.GetConfig("cache_config");
node1.Combine(node2);
......
......@@ -158,7 +158,17 @@ TEST(UtilTest, LOG_TEST) {
ASSERT_EQ(fname, "log_config.conf");
}
TEST(UtilTest, VALIDATE_TABLENAME_TEST) {
TEST(UtilTest, TIMERECORDER_TEST) {
for(int64_t log_level = 0; log_level <= 6; log_level++) {
if(log_level == 5) {
continue; //skip fatal
}
server::TimeRecorder rc("time", log_level);
rc.RecordSection("end");
}
}
TEST(ValidationUtilTest, VALIDATE_TABLENAME_TEST) {
std::string table_name = "Normal123_";
ErrorCode res = server::ValidationUtil::ValidateTableName(table_name);
ASSERT_EQ(res, SERVER_SUCCESS);
......@@ -192,7 +202,7 @@ TEST(UtilTest, VALIDATE_TABLENAME_TEST) {
ASSERT_EQ(res, SERVER_INVALID_TABLE_NAME);
}
TEST(UtilTest, VALIDATE_DIMENSIONTEST) {
TEST(ValidationUtilTest, VALIDATE_DIMENSION_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateTableDimension(-1), SERVER_INVALID_VECTOR_DIMENSION);
ASSERT_EQ(server::ValidationUtil::ValidateTableDimension(0), SERVER_INVALID_VECTOR_DIMENSION);
ASSERT_EQ(server::ValidationUtil::ValidateTableDimension(16385), SERVER_INVALID_VECTOR_DIMENSION);
......@@ -200,7 +210,7 @@ TEST(UtilTest, VALIDATE_DIMENSIONTEST) {
ASSERT_EQ(server::ValidationUtil::ValidateTableDimension(1), SERVER_SUCCESS);
}
TEST(UtilTest, VALIDATE_INDEX_TEST) {
TEST(ValidationUtilTest, VALIDATE_INDEX_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexType((int)engine::EngineType::INVALID), SERVER_INVALID_INDEX_TYPE);
for(int i = 1; i <= (int)engine::EngineType::MAX_VALUE; i++) {
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexType(i), SERVER_SUCCESS);
......@@ -218,14 +228,14 @@ TEST(UtilTest, VALIDATE_INDEX_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateTableIndexMetricType(2), SERVER_SUCCESS);
}
TEST(ValidationUtilTest, ValidateTopkTest) {
TEST(ValidationUtilTest, VALIDATE_TOPK_TEST) {
engine::meta::TableSchema schema;
ASSERT_EQ(server::ValidationUtil::ValidateSearchTopk(10, schema), SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateSearchTopk(65536, schema), SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateSearchTopk(0, schema), SERVER_SUCCESS);
}
TEST(ValidationUtilTest, ValidateNprobeTest) {
TEST(ValidationUtilTest, VALIDATE_NPROBE_TEST) {
engine::meta::TableSchema schema;
schema.nlist_ = 100;
ASSERT_EQ(server::ValidationUtil::ValidateSearchNprobe(10, schema), SERVER_SUCCESS);
......@@ -233,7 +243,7 @@ TEST(ValidationUtilTest, ValidateNprobeTest) {
ASSERT_NE(server::ValidationUtil::ValidateSearchNprobe(101, schema), SERVER_SUCCESS);
}
TEST(ValidationUtilTest, ValidateGpuTest) {
TEST(ValidationUtilTest, VALIDATE_GPU_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateGpuIndex(0), SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateGpuIndex(100), SERVER_SUCCESS);
......@@ -242,12 +252,33 @@ TEST(ValidationUtilTest, ValidateGpuTest) {
ASSERT_NE(server::ValidationUtil::GetGpuMemory(100, memory), SERVER_SUCCESS);
}
TEST(UtilTest, TIMERECORDER_TEST) {
for(int64_t log_level = 0; log_level <= 6; log_level++) {
if(log_level == 5) {
continue; //skip fatal
}
server::TimeRecorder rc("time", log_level);
rc.RecordSection("end");
}
TEST(ValidationUtilTest, VALIDATE_IPADDRESS_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateIpAddress("127.0.0.1"), SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateIpAddress("not ip"), SERVER_SUCCESS);
}
TEST(ValidationUtilTest, VALIDATE_NUMBER_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateStringIsNumber("1234"), SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateStringIsNumber("not number"), SERVER_SUCCESS);
}
TEST(ValidationUtilTest, VALIDATE_BOOL_TEST) {
std::string str = "true";
ASSERT_EQ(server::ValidationUtil::ValidateStringIsBool(str), SERVER_SUCCESS);
str = "not bool";
ASSERT_NE(server::ValidationUtil::ValidateStringIsBool(str), SERVER_SUCCESS);
}
TEST(ValidationUtilTest, VALIDATE_DOUBLE_TEST) {
double ret = 0.0;
ASSERT_EQ(server::ValidationUtil::ValidateStringIsDouble("2.5", ret), SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateStringIsDouble("not double", ret), SERVER_SUCCESS);
}
TEST(ValidationUtilTest, VALIDATE_DBURI_TEST) {
ASSERT_EQ(server::ValidationUtil::ValidateDbURI("sqlite://:@:/"), SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateDbURI("xxx://:@:/"), SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateDbURI("not uri"), SERVER_SUCCESS);
ASSERT_EQ(server::ValidationUtil::ValidateDbURI("mysql://root:123456@127.0.0.1:3303/milvus"), SERVER_SUCCESS);
ASSERT_NE(server::ValidationUtil::ValidateDbURI("mysql://root:123456@127.0.0.1:port/milvus"), SERVER_SUCCESS);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册