提交 cfcf3256 编写于 作者: S starlord

refine code


Former-commit-id: e1348890cd447aa9e1cc1b63614f223b2ba58605
上级 cc8205af
...@@ -121,9 +121,15 @@ $ sudo apt-get install clang-tidy clang-format ...@@ -121,9 +121,15 @@ $ sudo apt-get install clang-tidy clang-format
$ rm cmake_build/CMakeCache.txt $ rm cmake_build/CMakeCache.txt
``` ```
Check code style
```shell ```shell
$ ./build.sh -l $ ./build.sh -l
``` ```
To format the code
```shell
$ cd cmake_build
$ make clang-format
```
##### Run unit test ##### Run unit test
......
...@@ -5,4 +5,5 @@ ...@@ -5,4 +5,5 @@
*thirdparty* *thirdparty*
*easylogging++* *easylogging++*
*SqliteMetaImpl.cpp *SqliteMetaImpl.cpp
*src/grpc* *src/grpc*
\ No newline at end of file *milvus/include*
\ No newline at end of file
...@@ -26,9 +26,9 @@ ...@@ -26,9 +26,9 @@
#include "meta/SqliteMetaImpl.h" #include "meta/SqliteMetaImpl.h"
#include "metrics/Metrics.h" #include "metrics/Metrics.h"
#include "scheduler/SchedInst.h" #include "scheduler/SchedInst.h"
#include "scheduler/job/BuildIndexJob.h"
#include "scheduler/job/DeleteJob.h" #include "scheduler/job/DeleteJob.h"
#include "scheduler/job/SearchJob.h" #include "scheduler/job/SearchJob.h"
#include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
...@@ -51,7 +51,7 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1; ...@@ -51,7 +51,7 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1;
} // namespace } // namespace
DBImpl::DBImpl(const DBOptions &options) DBImpl::DBImpl(const DBOptions& options)
: options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) {
meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_); meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_);
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
...@@ -111,7 +111,7 @@ DBImpl::DropAll() { ...@@ -111,7 +111,7 @@ DBImpl::DropAll() {
} }
Status Status
DBImpl::CreateTable(meta::TableSchema &table_schema) { DBImpl::CreateTable(meta::TableSchema& table_schema) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -122,7 +122,7 @@ DBImpl::CreateTable(meta::TableSchema &table_schema) { ...@@ -122,7 +122,7 @@ DBImpl::CreateTable(meta::TableSchema &table_schema) {
} }
Status Status
DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) { DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -147,7 +147,7 @@ DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) { ...@@ -147,7 +147,7 @@ DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) {
} }
Status Status
DBImpl::DescribeTable(meta::TableSchema &table_schema) { DBImpl::DescribeTable(meta::TableSchema& table_schema) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -158,7 +158,7 @@ DBImpl::DescribeTable(meta::TableSchema &table_schema) { ...@@ -158,7 +158,7 @@ DBImpl::DescribeTable(meta::TableSchema &table_schema) {
} }
Status Status
DBImpl::HasTable(const std::string &table_id, bool &has_or_not) { DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -167,7 +167,7 @@ DBImpl::HasTable(const std::string &table_id, bool &has_or_not) { ...@@ -167,7 +167,7 @@ DBImpl::HasTable(const std::string &table_id, bool &has_or_not) {
} }
Status Status
DBImpl::AllTables(std::vector<meta::TableSchema> &table_schema_array) { DBImpl::AllTables(std::vector<meta::TableSchema>& table_schema_array) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -176,7 +176,7 @@ DBImpl::AllTables(std::vector<meta::TableSchema> &table_schema_array) { ...@@ -176,7 +176,7 @@ DBImpl::AllTables(std::vector<meta::TableSchema> &table_schema_array) {
} }
Status Status
DBImpl::PreloadTable(const std::string &table_id) { DBImpl::PreloadTable(const std::string& table_id) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -195,11 +195,11 @@ DBImpl::PreloadTable(const std::string &table_id) { ...@@ -195,11 +195,11 @@ DBImpl::PreloadTable(const std::string &table_id) {
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t available_size = cache_total - cache_usage; int64_t available_size = cache_total - cache_usage;
for (auto &day_files : files) { for (auto& day_files : files) {
for (auto &file : day_files.second) { for (auto& file : day_files.second) {
ExecutionEnginePtr engine = ExecutionEnginePtr engine =
EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_, EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
(MetricType) file.metric_type_, file.nlist_); (MetricType)file.metric_type_, file.nlist_);
if (engine == nullptr) { if (engine == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type"; ENGINE_LOG_ERROR << "Invalid engine type";
return Status(DB_ERROR, "Invalid engine type"); return Status(DB_ERROR, "Invalid engine type");
...@@ -212,7 +212,7 @@ DBImpl::PreloadTable(const std::string &table_id) { ...@@ -212,7 +212,7 @@ DBImpl::PreloadTable(const std::string &table_id) {
try { try {
// step 1: load index // step 1: load index
engine->Load(true); engine->Load(true);
} catch (std::exception &ex) { } catch (std::exception& ex) {
std::string msg = "Pre-load table encounter exception: " + std::string(ex.what()); std::string msg = "Pre-load table encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg); return Status(DB_ERROR, msg);
...@@ -224,7 +224,7 @@ DBImpl::PreloadTable(const std::string &table_id) { ...@@ -224,7 +224,7 @@ DBImpl::PreloadTable(const std::string &table_id) {
} }
Status Status
DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -233,7 +233,7 @@ DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { ...@@ -233,7 +233,7 @@ DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) {
} }
Status Status
DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) { DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -261,7 +261,7 @@ DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vect ...@@ -261,7 +261,7 @@ DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vect
} }
Status Status
DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) {
{ {
std::unique_lock<std::mutex> lock(build_index_mutex_); std::unique_lock<std::mutex> lock(build_index_mutex_);
...@@ -316,7 +316,7 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { ...@@ -316,7 +316,7 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
while (!file_ids.empty()) { while (!file_ids.empty()) {
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times; ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
if (index.engine_type_ != (int) EngineType::FAISS_IDMAP) { if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) {
status = meta_ptr_->UpdateTableFilesToIndex(table_id); status = meta_ptr_->UpdateTableFilesToIndex(table_id);
} }
...@@ -329,19 +329,19 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { ...@@ -329,19 +329,19 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) {
} }
Status Status
DBImpl::DescribeIndex(const std::string &table_id, TableIndex &index) { DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) {
return meta_ptr_->DescribeTableIndex(table_id, index); return meta_ptr_->DescribeTableIndex(table_id, index);
} }
Status Status
DBImpl::DropIndex(const std::string &table_id) { DBImpl::DropIndex(const std::string& table_id) {
ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; ENGINE_LOG_DEBUG << "Drop index for table: " << table_id;
return meta_ptr_->DropTableIndex(table_id); return meta_ptr_->DropTableIndex(table_id);
} }
Status Status
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors, DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
QueryResults &results) { QueryResults& results) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -353,8 +353,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr ...@@ -353,8 +353,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr
} }
Status Status
DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors, DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
const meta::DatesT &dates, QueryResults &results) { const meta::DatesT& dates, QueryResults& results) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -370,8 +370,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr ...@@ -370,8 +370,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr
} }
meta::TableFilesSchema file_id_array; meta::TableFilesSchema file_id_array;
for (auto &day_files : files) { for (auto& day_files : files) {
for (auto &file : day_files.second) { for (auto& file : day_files.second) {
file_id_array.push_back(file); file_id_array.push_back(file);
} }
} }
...@@ -383,8 +383,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr ...@@ -383,8 +383,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr
} }
Status Status
DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_ids, uint64_t k, uint64_t nq, DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids, uint64_t k, uint64_t nq,
uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) { uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -393,7 +393,7 @@ DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_ ...@@ -393,7 +393,7 @@ DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_
// get specified files // get specified files
std::vector<size_t> ids; std::vector<size_t> ids;
for (auto &id : file_ids) { for (auto& id : file_ids) {
meta::TableFileSchema table_file; meta::TableFileSchema table_file;
table_file.table_id_ = table_id; table_file.table_id_ = table_id;
std::string::size_type sz; std::string::size_type sz;
...@@ -407,8 +407,8 @@ DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_ ...@@ -407,8 +407,8 @@ DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_
} }
meta::TableFilesSchema file_id_array; meta::TableFilesSchema file_id_array;
for (auto &day_files : files_array) { for (auto& day_files : files_array) {
for (auto &file : day_files.second) { for (auto& file : day_files.second) {
file_id_array.push_back(file); file_id_array.push_back(file);
} }
} }
...@@ -424,7 +424,7 @@ DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_ ...@@ -424,7 +424,7 @@ DBImpl::Query(const std::string &table_id, const std::vector<std::string> &file_
} }
Status Status
DBImpl::Size(uint64_t &result) { DBImpl::Size(uint64_t& result) {
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status(DB_ERROR, "Milsvus server is shutdown!"); return Status(DB_ERROR, "Milsvus server is shutdown!");
} }
...@@ -436,8 +436,8 @@ DBImpl::Size(uint64_t &result) { ...@@ -436,8 +436,8 @@ DBImpl::Size(uint64_t &result) {
// internal methods // internal methods
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Status Status
DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files, uint64_t k, uint64_t nq, DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq,
uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) { uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) {
server::CollectQueryMetrics metrics(nq); server::CollectQueryMetrics metrics(nq);
TimeRecorder rc(""); TimeRecorder rc("");
...@@ -446,7 +446,7 @@ DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &fi ...@@ -446,7 +446,7 @@ DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &fi
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size()
<< " date range count: " << dates.size(); << " date range count: " << dates.size();
scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors); scheduler::SearchJobPtr job = std::make_shared<scheduler::SearchJob>(0, k, nq, nprobe, vectors);
for (auto &file : files) { for (auto& file : files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file); scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddIndexFile(file_ptr); job->AddIndexFile(file_ptr);
} }
...@@ -514,7 +514,7 @@ DBImpl::BackgroundTimerTask() { ...@@ -514,7 +514,7 @@ DBImpl::BackgroundTimerTask() {
void void
DBImpl::WaitMergeFileFinish() { DBImpl::WaitMergeFileFinish() {
std::lock_guard<std::mutex> lck(compact_result_mutex_); std::lock_guard<std::mutex> lck(compact_result_mutex_);
for (auto &iter : compact_thread_results_) { for (auto& iter : compact_thread_results_) {
iter.wait(); iter.wait();
} }
} }
...@@ -522,7 +522,7 @@ DBImpl::WaitMergeFileFinish() { ...@@ -522,7 +522,7 @@ DBImpl::WaitMergeFileFinish() {
void void
DBImpl::WaitBuildIndexFinish() { DBImpl::WaitBuildIndexFinish() {
std::lock_guard<std::mutex> lck(index_result_mutex_); std::lock_guard<std::mutex> lck(index_result_mutex_);
for (auto &iter : index_thread_results_) { for (auto& iter : index_thread_results_) {
iter.wait(); iter.wait();
} }
} }
...@@ -563,7 +563,7 @@ DBImpl::MemSerialize() { ...@@ -563,7 +563,7 @@ DBImpl::MemSerialize() {
std::lock_guard<std::mutex> lck(mem_serialize_mutex_); std::lock_guard<std::mutex> lck(mem_serialize_mutex_);
std::set<std::string> temp_table_ids; std::set<std::string> temp_table_ids;
mem_mgr_->Serialize(temp_table_ids); mem_mgr_->Serialize(temp_table_ids);
for (auto &id : temp_table_ids) { for (auto& id : temp_table_ids) {
compact_table_ids_.insert(id); compact_table_ids_.insert(id);
} }
...@@ -608,7 +608,7 @@ DBImpl::StartCompactionTask() { ...@@ -608,7 +608,7 @@ DBImpl::StartCompactionTask() {
} }
Status Status
DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const meta::TableFilesSchema &files) { DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) {
ENGINE_LOG_DEBUG << "Merge files for table: " << table_id; ENGINE_LOG_DEBUG << "Merge files for table: " << table_id;
// step 1: create table file // step 1: create table file
...@@ -625,13 +625,13 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m ...@@ -625,13 +625,13 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m
// step 2: merge files // step 2: merge files
ExecutionEnginePtr index = ExecutionEnginePtr index =
EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType) table_file.engine_type_, EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_,
(MetricType) table_file.metric_type_, table_file.nlist_); (MetricType)table_file.metric_type_, table_file.nlist_);
meta::TableFilesSchema updated; meta::TableFilesSchema updated;
int64_t index_size = 0; int64_t index_size = 0;
for (auto &file : files) { for (auto& file : files) {
server::CollectMergeFilesMetrics metrics; server::CollectMergeFilesMetrics metrics;
index->Merge(file.location_); index->Merge(file.location_);
...@@ -649,7 +649,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m ...@@ -649,7 +649,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m
// step 3: serialize to disk // step 3: serialize to disk
try { try {
index->Serialize(); index->Serialize();
} catch (std::exception &ex) { } catch (std::exception& ex) {
// typical error: out of disk space or permition denied // typical error: out of disk space or permition denied
std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
...@@ -667,7 +667,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m ...@@ -667,7 +667,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m
// step 4: update table files state // step 4: update table files state
// if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size
// else set file type to RAW, no need to build index // else set file type to RAW, no need to build index
if (table_file.engine_type_ != (int) EngineType::FAISS_IDMAP) { if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) {
table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX
: meta::TableFileSchema::RAW; : meta::TableFileSchema::RAW;
} else { } else {
...@@ -687,7 +687,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m ...@@ -687,7 +687,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m
} }
Status Status
DBImpl::BackgroundMergeFiles(const std::string &table_id) { DBImpl::BackgroundMergeFiles(const std::string& table_id) {
meta::DatePartionedTableFilesSchema raw_files; meta::DatePartionedTableFilesSchema raw_files;
auto status = meta_ptr_->FilesToMerge(table_id, raw_files); auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
if (!status.ok()) { if (!status.ok()) {
...@@ -696,7 +696,7 @@ DBImpl::BackgroundMergeFiles(const std::string &table_id) { ...@@ -696,7 +696,7 @@ DBImpl::BackgroundMergeFiles(const std::string &table_id) {
} }
bool has_merge = false; bool has_merge = false;
for (auto &kv : raw_files) { for (auto& kv : raw_files) {
auto files = kv.second; auto files = kv.second;
if (files.size() < options_.merge_trigger_number_) { if (files.size() < options_.merge_trigger_number_) {
ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action"; ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action";
...@@ -719,7 +719,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) { ...@@ -719,7 +719,7 @@ DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
ENGINE_LOG_TRACE << " Background compaction thread start"; ENGINE_LOG_TRACE << " Background compaction thread start";
Status status; Status status;
for (auto &table_id : table_ids) { for (auto& table_id : table_ids) {
status = BackgroundMergeFiles(table_id); status = BackgroundMergeFiles(table_id);
if (!status.ok()) { if (!status.ok()) {
ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString(); ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
...@@ -771,9 +771,9 @@ DBImpl::StartBuildIndexTask(bool force) { ...@@ -771,9 +771,9 @@ DBImpl::StartBuildIndexTask(bool force) {
} }
Status Status
DBImpl::BuildIndex(const meta::TableFileSchema &file) { DBImpl::BuildIndex(const meta::TableFileSchema& file) {
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_, ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_,
(MetricType) file.metric_type_, file.nlist_); (MetricType)file.metric_type_, file.nlist_);
if (to_index == nullptr) { if (to_index == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type"; ENGINE_LOG_ERROR << "Invalid engine type";
return Status(DB_ERROR, "Invalid engine type"); return Status(DB_ERROR, "Invalid engine type");
...@@ -804,7 +804,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) { ...@@ -804,7 +804,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) {
try { try {
server::CollectBuildIndexMetrics metrics; server::CollectBuildIndexMetrics metrics;
index = to_index->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_); index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
if (index == nullptr) { if (index == nullptr) {
table_file.file_type_ = meta::TableFileSchema::TO_DELETE; table_file.file_type_ = meta::TableFileSchema::TO_DELETE;
status = meta_ptr_->UpdateTableFile(table_file); status = meta_ptr_->UpdateTableFile(table_file);
...@@ -813,7 +813,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) { ...@@ -813,7 +813,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) {
return status; return status;
} }
} catch (std::exception &ex) { } catch (std::exception& ex) {
// typical error: out of gpu memory // typical error: out of gpu memory
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what()); std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
...@@ -839,7 +839,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) { ...@@ -839,7 +839,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) {
// step 5: save index file // step 5: save index file
try { try {
index->Serialize(); index->Serialize();
} catch (std::exception &ex) { } catch (std::exception& ex) {
// typical error: out of disk space or permition denied // typical error: out of disk space or permition denied
std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
...@@ -882,7 +882,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) { ...@@ -882,7 +882,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) {
status = meta_ptr_->UpdateTableFile(table_file); status = meta_ptr_->UpdateTableFile(table_file);
ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete";
} }
} catch (std::exception &ex) { } catch (std::exception& ex) {
std::string msg = "Build index encounter exception: " + std::string(ex.what()); std::string msg = "Build index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
return Status(DB_ERROR, msg); return Status(DB_ERROR, msg);
...@@ -900,11 +900,10 @@ DBImpl::BackgroundBuildIndex() { ...@@ -900,11 +900,10 @@ DBImpl::BackgroundBuildIndex() {
meta_ptr_->FilesToIndex(to_index_files); meta_ptr_->FilesToIndex(to_index_files);
Status status; Status status;
scheduler::BuildIndexJobPtr scheduler::BuildIndexJobPtr job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
job = std::make_shared<scheduler::BuildIndexJob>(0, meta_ptr_, options_);
// step 2: put build index task to scheduler // step 2: put build index task to scheduler
for (auto &file : to_index_files) { for (auto& file : to_index_files) {
scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file); scheduler::TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
job->AddToIndexFiles(file_ptr); job->AddToIndexFiles(file_ptr);
} }
...@@ -915,17 +914,17 @@ DBImpl::BackgroundBuildIndex() { ...@@ -915,17 +914,17 @@ DBImpl::BackgroundBuildIndex() {
ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); ENGINE_LOG_ERROR << "Building index failed: " << status.ToString();
} }
// for (auto &file : to_index_files) { // for (auto &file : to_index_files) {
// status = BuildIndex(file); // status = BuildIndex(file);
// if (!status.ok()) { // if (!status.ok()) {
// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); // ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
// } // }
// //
// if (shutting_down_.load(std::memory_order_acquire)) { // if (shutting_down_.load(std::memory_order_acquire)) {
// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; // ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action";
// break; // break;
// } // }
// } // }
ENGINE_LOG_TRACE << "Background build index thread exit"; ENGINE_LOG_TRACE << "Background build index thread exit";
} }
......
...@@ -15,18 +15,17 @@ ...@@ -15,18 +15,17 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include <src/scheduler/tasklabel/SpecResLabel.h>
#include "scheduler/TaskCreator.h" #include "scheduler/TaskCreator.h"
#include <src/scheduler/tasklabel/SpecResLabel.h>
#include "SchedInst.h"
#include "scheduler/tasklabel/BroadcastLabel.h" #include "scheduler/tasklabel/BroadcastLabel.h"
#include "tasklabel/DefaultLabel.h" #include "tasklabel/DefaultLabel.h"
#include "SchedInst.h"
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
std::vector<TaskPtr> std::vector<TaskPtr>
TaskCreator::Create(const JobPtr &job) { TaskCreator::Create(const JobPtr& job) {
switch (job->type()) { switch (job->type()) {
case JobType::SEARCH: { case JobType::SEARCH: {
return Create(std::static_pointer_cast<SearchJob>(job)); return Create(std::static_pointer_cast<SearchJob>(job));
...@@ -45,7 +44,7 @@ TaskCreator::Create(const JobPtr &job) { ...@@ -45,7 +44,7 @@ TaskCreator::Create(const JobPtr &job) {
} }
std::vector<TaskPtr> std::vector<TaskPtr>
TaskCreator::Create(const SearchJobPtr &job) { TaskCreator::Create(const SearchJobPtr& job) {
std::vector<TaskPtr> tasks; std::vector<TaskPtr> tasks;
for (auto& index_file : job->index_files()) { for (auto& index_file : job->index_files()) {
auto label = std::make_shared<DefaultLabel>(); auto label = std::make_shared<DefaultLabel>();
...@@ -58,7 +57,7 @@ TaskCreator::Create(const SearchJobPtr &job) { ...@@ -58,7 +57,7 @@ TaskCreator::Create(const SearchJobPtr &job) {
} }
std::vector<TaskPtr> std::vector<TaskPtr>
TaskCreator::Create(const DeleteJobPtr &job) { TaskCreator::Create(const DeleteJobPtr& job) {
std::vector<TaskPtr> tasks; std::vector<TaskPtr> tasks;
auto label = std::make_shared<BroadcastLabel>(); auto label = std::make_shared<BroadcastLabel>();
auto task = std::make_shared<XDeleteTask>(job, label); auto task = std::make_shared<XDeleteTask>(job, label);
...@@ -69,12 +68,12 @@ TaskCreator::Create(const DeleteJobPtr &job) { ...@@ -69,12 +68,12 @@ TaskCreator::Create(const DeleteJobPtr &job) {
} }
std::vector<TaskPtr> std::vector<TaskPtr>
TaskCreator::Create(const BuildIndexJobPtr &job) { TaskCreator::Create(const BuildIndexJobPtr& job) {
std::vector<TaskPtr> tasks; std::vector<TaskPtr> tasks;
//TODO(yukun): remove "disk" hardcode here // TODO(yukun): remove "disk" hardcode here
ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk"); ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk");
for (auto &to_index_file : job->to_index_files()) { for (auto& to_index_file : job->to_index_files()) {
auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr)); auto label = std::make_shared<SpecResLabel>(std::weak_ptr<Resource>(res_ptr));
auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, label); auto task = std::make_shared<XBuildIndexTask>(to_index_file.second, label);
task->job_ = job; task->job_ = job;
......
...@@ -30,9 +30,9 @@ ...@@ -30,9 +30,9 @@
#include "job/DeleteJob.h" #include "job/DeleteJob.h"
#include "job/Job.h" #include "job/Job.h"
#include "job/SearchJob.h" #include "job/SearchJob.h"
#include "task/BuildIndexTask.h"
#include "task/DeleteTask.h" #include "task/DeleteTask.h"
#include "task/SearchTask.h" #include "task/SearchTask.h"
#include "task/BuildIndexTask.h"
#include "task/Task.h" #include "task/Task.h"
namespace milvus { namespace milvus {
......
...@@ -22,14 +22,13 @@ ...@@ -22,14 +22,13 @@
#include "src/cache/GpuCacheMgr.h" #include "src/cache/GpuCacheMgr.h"
#include "src/server/Config.h" #include "src/server/Config.h"
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
std::vector<ResourcePtr> std::vector<ResourcePtr>
get_neighbours(const ResourcePtr &self) { get_neighbours(const ResourcePtr& self) {
std::vector<ResourcePtr> neighbours; std::vector<ResourcePtr> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) { for (auto& neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock(); auto node = neighbour_node.neighbour_node.lock();
if (not node) if (not node)
continue; continue;
...@@ -43,9 +42,9 @@ get_neighbours(const ResourcePtr &self) { ...@@ -43,9 +42,9 @@ get_neighbours(const ResourcePtr &self) {
} }
std::vector<std::pair<ResourcePtr, Connection>> std::vector<std::pair<ResourcePtr, Connection>>
get_neighbours_with_connetion(const ResourcePtr &self) { get_neighbours_with_connetion(const ResourcePtr& self) {
std::vector<std::pair<ResourcePtr, Connection>> neighbours; std::vector<std::pair<ResourcePtr, Connection>> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) { for (auto& neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock(); auto node = neighbour_node.neighbour_node.lock();
if (not node) if (not node)
continue; continue;
...@@ -59,12 +58,12 @@ get_neighbours_with_connetion(const ResourcePtr &self) { ...@@ -59,12 +58,12 @@ get_neighbours_with_connetion(const ResourcePtr &self) {
} }
void void
Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) { Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) {
auto neighbours = get_neighbours_with_connetion(self); auto neighbours = get_neighbours_with_connetion(self);
if (not neighbours.empty()) { if (not neighbours.empty()) {
std::vector<uint64_t> speeds; std::vector<uint64_t> speeds;
uint64_t total_speed = 0; uint64_t total_speed = 0;
for (auto &neighbour : neighbours) { for (auto& neighbour : neighbours) {
uint64_t speed = neighbour.second.speed(); uint64_t speed = neighbour.second.speed();
speeds.emplace_back(speed); speeds.emplace_back(speed);
total_speed += speed; total_speed += speed;
...@@ -89,15 +88,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self ...@@ -89,15 +88,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self
} }
void void
Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) { Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) {
auto neighbours = get_neighbours(self); auto neighbours = get_neighbours(self);
for (auto &neighbour : neighbours) { for (auto& neighbour : neighbours) {
neighbour->task_table().Put(task); neighbour->task_table().Put(task);
} }
} }
void void
Action::PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest) { Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
dest->task_table().Put(task); dest->task_table().Put(task);
} }
...@@ -139,7 +138,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr ...@@ -139,7 +138,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
auto compute_resources = res_mgr.lock()->GetComputeResources(); auto compute_resources = res_mgr.lock()->GetComputeResources();
std::vector<std::vector<std::string>> paths; std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs; std::vector<uint64_t> transport_costs;
for (auto &res : compute_resources) { for (auto& res : compute_resources) {
std::vector<std::string> path; std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path); uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path);
transport_costs.push_back(transport_cost); transport_costs.push_back(transport_cost);
...@@ -166,17 +165,17 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr ...@@ -166,17 +165,17 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path; task->path() = task_path;
} else if (task->job_.lock()->type() == JobType::BUILD) { } else if (task->job_.lock()->type() == JobType::BUILD) {
//step2: Read device id in config // step2: Read device id in config
//get build index gpu resource // get build index gpu resource
server::Config &config = server::Config::GetInstance(); server::Config& config = server::Config::GetInstance();
int32_t build_index_gpu; int32_t build_index_gpu;
Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu); Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu);
bool find_gpu_res = false; bool find_gpu_res = false;
for (uint64_t i = 0; i < compute_resources.size(); ++i) { for (uint64_t i = 0; i < compute_resources.size(); ++i) {
if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) { if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) {
if (compute_resources[i]->name() if (compute_resources[i]->name() ==
== res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) {
find_gpu_res = true; find_gpu_res = true;
Path task_path(paths[i], paths[i].size() - 1); Path task_path(paths[i], paths[i].size() - 1);
task->path() = task_path; task->path() = task_path;
......
...@@ -15,9 +15,11 @@ ...@@ -15,9 +15,11 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "BuildIndexJob.h" #include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h" #include "utils/Log.h"
#include <utility>
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
...@@ -26,7 +28,7 @@ BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::D ...@@ -26,7 +28,7 @@ BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::D
} }
bool bool
BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file) { BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr& to_index_file) {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) { if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) {
return false; return false;
......
...@@ -16,22 +16,21 @@ ...@@ -16,22 +16,21 @@
// under the License. // under the License.
#pragma once #pragma once
#include <string> #include <condition_variable>
#include <vector> #include <deque>
#include <list> #include <list>
#include <memory>
#include <mutex>
#include <queue> #include <queue>
#include <deque> #include <string>
#include <unordered_map>
#include <thread> #include <thread>
#include <mutex> #include <unordered_map>
#include <condition_variable> #include <vector>
#include <memory>
#include "Job.h" #include "Job.h"
#include "db/meta/Meta.h" #include "db/meta/Meta.h"
#include "scheduler/Definition.h" #include "scheduler/Definition.h"
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
...@@ -46,21 +45,21 @@ class BuildIndexJob : public Job { ...@@ -46,21 +45,21 @@ class BuildIndexJob : public Job {
public: public:
bool bool
AddToIndexFiles(const TableFileSchemaPtr &to_index_file); AddToIndexFiles(const TableFileSchemaPtr& to_index_file);
Status & Status&
WaitBuildIndexFinish(); WaitBuildIndexFinish();
void void
BuildIndexDone(size_t to_index_id); BuildIndexDone(size_t to_index_id);
public: public:
Status & Status&
GetStatus() { GetStatus() {
return status_; return status_;
} }
Id2ToIndexMap & Id2ToIndexMap&
to_index_files() { to_index_files() {
return to_index_files_; return to_index_files_;
} }
......
...@@ -15,27 +15,26 @@ ...@@ -15,27 +15,26 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
#include "scheduler/task/BuildIndexTask.h"
#include "BuildIndexTask.h"
#include "db/engine/EngineFactory.h" #include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h" #include "metrics/Metrics.h"
#include "scheduler/job/BuildIndexJob.h" #include "scheduler/job/BuildIndexJob.h"
#include "utils/Log.h" #include "utils/Log.h"
#include "utils/TimeRecorder.h" #include "utils/TimeRecorder.h"
#include <memory>
#include <string> #include <string>
#include <thread> #include <thread>
#include <utility> #include <utility>
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label) XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label)
: Task(TaskType::BuildIndexTask, std::move(label)), file_(file) { : Task(TaskType::BuildIndexTask, std::move(label)), file_(file) {
if (file_) { if (file_) {
to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType) file_->engine_type_, to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType)file_->engine_type_,
(MetricType) file_->metric_type_, file_->nlist_); (MetricType)file_->metric_type_, file_->nlist_);
} }
} }
...@@ -63,7 +62,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { ...@@ -63,7 +62,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
error_msg = "Wrong load type"; error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
} }
} catch (std::exception &ex) { } catch (std::exception& ex) {
// typical error: out of disk space or permition denied // typical error: out of disk space or permition denied
error_msg = "Failed to load to_index file: " + std::string(ex.what()); error_msg = "Failed to load to_index file: " + std::string(ex.what());
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
...@@ -89,9 +88,9 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { ...@@ -89,9 +88,9 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) {
size_t file_size = to_index_engine_->PhysicalSize(); size_t file_size = to_index_engine_->PhysicalSize();
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + std::string info = "Load file id:" + std::to_string(file_->id_) +
std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
" bytes from location: " + file_->location_ + " totally cost"; " bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info); double span = rc.ElapseFromBegin(info);
to_index_id_ = file_->id_; to_index_id_ = file_->id_;
...@@ -110,15 +109,14 @@ XBuildIndexTask::Execute() { ...@@ -110,15 +109,14 @@ XBuildIndexTask::Execute() {
if (auto job = job_.lock()) { if (auto job = job_.lock()) {
auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job); auto build_index_job = std::static_pointer_cast<scheduler::BuildIndexJob>(job);
std::string location = file_->location_; std::string location = file_->location_;
EngineType engine_type = (EngineType) file_->engine_type_; EngineType engine_type = (EngineType)file_->engine_type_;
std::shared_ptr<engine::ExecutionEngine> index; std::shared_ptr<engine::ExecutionEngine> index;
// step 2: create table file // step 2: create table file
engine::meta::TableFileSchema table_file; engine::meta::TableFileSchema table_file;
table_file.table_id_ = file_->table_id_; table_file.table_id_ = file_->table_id_;
table_file.date_ = file_->date_; table_file.date_ = file_->date_;
table_file.file_type_ = table_file.file_type_ = engine::meta::TableFileSchema::NEW_INDEX;
engine::meta::TableFileSchema::NEW_INDEX;
engine::meta::MetaPtr meta_ptr = build_index_job->meta(); engine::meta::MetaPtr meta_ptr = build_index_job->meta();
Status status = build_index_job->meta()->CreateTableFile(table_file); Status status = build_index_job->meta()->CreateTableFile(table_file);
...@@ -131,7 +129,7 @@ XBuildIndexTask::Execute() { ...@@ -131,7 +129,7 @@ XBuildIndexTask::Execute() {
// step 3: build index // step 3: build index
try { try {
index = to_index_engine_->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_); index = to_index_engine_->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_);
if (index == nullptr) { if (index == nullptr) {
table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE;
status = meta_ptr->UpdateTableFile(table_file); status = meta_ptr->UpdateTableFile(table_file);
...@@ -140,7 +138,7 @@ XBuildIndexTask::Execute() { ...@@ -140,7 +138,7 @@ XBuildIndexTask::Execute() {
return; return;
} }
} catch (std::exception &ex) { } catch (std::exception& ex) {
std::string msg = "BuildIndex encounter exception: " + std::string(ex.what()); std::string msg = "BuildIndex encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
...@@ -166,7 +164,7 @@ XBuildIndexTask::Execute() { ...@@ -166,7 +164,7 @@ XBuildIndexTask::Execute() {
// step 5: save index file // step 5: save index file
try { try {
index->Serialize(); index->Serialize();
} catch (std::exception &ex) { } catch (std::exception& ex) {
// typical error: out of disk space or permition denied // typical error: out of disk space or permition denied
std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); std::string msg = "Serialize index encounter exception: " + std::string(ex.what());
ENGINE_LOG_ERROR << msg; ENGINE_LOG_ERROR << msg;
...@@ -197,7 +195,7 @@ XBuildIndexTask::Execute() { ...@@ -197,7 +195,7 @@ XBuildIndexTask::Execute() {
<< " bytes" << " bytes"
<< " from file " << origin_file.file_id_; << " from file " << origin_file.file_id_;
// index->Cache(); // index->Cache();
} else { } else {
// failed to update meta, mark the new file as to_delete, don't delete old file // failed to update meta, mark the new file as to_delete, don't delete old file
origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX; origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX;
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include "scheduler/Definition.h" #include "scheduler/Definition.h"
#include "scheduler/job/BuildIndexJob.h" #include "scheduler/job/BuildIndexJob.h"
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
......
...@@ -23,9 +23,9 @@ ...@@ -23,9 +23,9 @@
#include <memory> #include <memory>
#include <string> #include <string>
//class Resource; // class Resource;
// //
//using ResourceWPtr = std::weak_ptr<Resource>; // using ResourceWPtr = std::weak_ptr<Resource>;
namespace milvus { namespace milvus {
namespace scheduler { namespace scheduler {
......
...@@ -132,7 +132,7 @@ ValidationUtil::ValidateTableIndexMetricType(int32_t metric_type) { ...@@ -132,7 +132,7 @@ ValidationUtil::ValidateTableIndexMetricType(int32_t metric_type) {
Status Status
ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchema& table_schema) { ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchema& table_schema) {
if (top_k <= 0 || top_k > 2048) { if (top_k <= 0 || top_k > 2048) {
std::string msg = "Invalid top k value: " + std::to_string(top_k); std::string msg = "Invalid top k value: " + std::to_string(top_k) + ", rational range [1, 2048]";
SERVER_LOG_ERROR << msg; SERVER_LOG_ERROR << msg;
return Status(SERVER_INVALID_TOPK, msg); return Status(SERVER_INVALID_TOPK, msg);
} }
...@@ -143,7 +143,8 @@ ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchem ...@@ -143,7 +143,8 @@ ValidationUtil::ValidateSearchTopk(int64_t top_k, const engine::meta::TableSchem
Status Status
ValidationUtil::ValidateSearchNprobe(int64_t nprobe, const engine::meta::TableSchema& table_schema) { ValidationUtil::ValidateSearchNprobe(int64_t nprobe, const engine::meta::TableSchema& table_schema) {
if (nprobe <= 0 || nprobe > table_schema.nlist_) { if (nprobe <= 0 || nprobe > table_schema.nlist_) {
std::string msg = "Invalid nprobe value: " + std::to_string(nprobe); std::string msg = "Invalid nprobe value: " + std::to_string(nprobe) + ", rational range [1, " +
std::to_string(table_schema.nlist_) + "]";
SERVER_LOG_ERROR << msg; SERVER_LOG_ERROR << msg;
return Status(SERVER_INVALID_NPROBE, msg); return Status(SERVER_INVALID_NPROBE, msg);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册