diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index ce3463499b57f2487a128e1e280bae1720300e5e..f81fb32e7fde14f7614dc7f263f721ba6d8f21b0 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -26,9 +26,9 @@ #include "meta/SqliteMetaImpl.h" #include "metrics/Metrics.h" #include "scheduler/SchedInst.h" +#include "scheduler/job/BuildIndexJob.h" #include "scheduler/job/DeleteJob.h" #include "scheduler/job/SearchJob.h" -#include "scheduler/job/BuildIndexJob.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" @@ -51,7 +51,7 @@ constexpr uint64_t INDEX_ACTION_INTERVAL = 1; } // namespace -DBImpl::DBImpl(const DBOptions &options) +DBImpl::DBImpl(const DBOptions& options) : options_(options), shutting_down_(true), compact_thread_pool_(1, 1), index_thread_pool_(1, 1) { meta_ptr_ = MetaFactory::Build(options.meta_, options.mode_); mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_); @@ -111,7 +111,7 @@ DBImpl::DropAll() { } Status -DBImpl::CreateTable(meta::TableSchema &table_schema) { +DBImpl::CreateTable(meta::TableSchema& table_schema) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -122,7 +122,7 @@ DBImpl::CreateTable(meta::TableSchema &table_schema) { } Status -DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) { +DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -147,7 +147,7 @@ DBImpl::DeleteTable(const std::string &table_id, const meta::DatesT &dates) { } Status -DBImpl::DescribeTable(meta::TableSchema &table_schema) { +DBImpl::DescribeTable(meta::TableSchema& table_schema) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -158,7 +158,7 @@ DBImpl::DescribeTable(meta::TableSchema &table_schema) { } Status -DBImpl::HasTable(const std::string &table_id, bool &has_or_not) { +DBImpl::HasTable(const std::string& table_id, bool& has_or_not) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -167,7 +167,7 @@ DBImpl::HasTable(const std::string &table_id, bool &has_or_not) { } Status -DBImpl::AllTables(std::vector &table_schema_array) { +DBImpl::AllTables(std::vector& table_schema_array) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -176,7 +176,7 @@ DBImpl::AllTables(std::vector &table_schema_array) { } Status -DBImpl::PreloadTable(const std::string &table_id) { +DBImpl::PreloadTable(const std::string& table_id) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -195,11 +195,11 @@ DBImpl::PreloadTable(const std::string &table_id) { int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage(); int64_t available_size = cache_total - cache_usage; - for (auto &day_files : files) { - for (auto &file : day_files.second) { + for (auto& day_files : files) { + for (auto& file : day_files.second) { ExecutionEnginePtr engine = - EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_, - (MetricType) file.metric_type_, file.nlist_); + EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, + (MetricType)file.metric_type_, file.nlist_); if (engine == nullptr) { ENGINE_LOG_ERROR << "Invalid engine type"; return Status(DB_ERROR, "Invalid engine type"); @@ -212,7 +212,7 @@ DBImpl::PreloadTable(const std::string &table_id) { try { // step 1: load index engine->Load(true); - } catch (std::exception &ex) { + } catch (std::exception& ex) { std::string msg = "Pre-load table encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; return Status(DB_ERROR, msg); @@ -224,7 +224,7 @@ DBImpl::PreloadTable(const std::string &table_id) { } Status -DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { +DBImpl::UpdateTableFlag(const std::string& table_id, int64_t flag) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -233,7 +233,7 @@ DBImpl::UpdateTableFlag(const std::string &table_id, int64_t flag) { } Status -DBImpl::GetTableRowCount(const std::string &table_id, uint64_t &row_count) { +DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -261,7 +261,7 @@ DBImpl::InsertVectors(const std::string& table_id, uint64_t n, const float* vect } Status -DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { +DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) { { std::unique_lock lock(build_index_mutex_); @@ -316,7 +316,7 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { while (!file_ids.empty()) { ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times; - if (index.engine_type_ != (int) EngineType::FAISS_IDMAP) { + if (index.engine_type_ != (int)EngineType::FAISS_IDMAP) { status = meta_ptr_->UpdateTableFilesToIndex(table_id); } @@ -329,19 +329,19 @@ DBImpl::CreateIndex(const std::string &table_id, const TableIndex &index) { } Status -DBImpl::DescribeIndex(const std::string &table_id, TableIndex &index) { +DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) { return meta_ptr_->DescribeTableIndex(table_id, index); } Status -DBImpl::DropIndex(const std::string &table_id) { +DBImpl::DropIndex(const std::string& table_id) { ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; return meta_ptr_->DropTableIndex(table_id); } Status -DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors, - QueryResults &results) { +DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, + QueryResults& results) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -353,8 +353,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr } Status -DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors, - const meta::DatesT &dates, QueryResults &results) { +DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, + const meta::DatesT& dates, QueryResults& results) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -370,8 +370,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr } meta::TableFilesSchema file_id_array; - for (auto &day_files : files) { - for (auto &file : day_files.second) { + for (auto& day_files : files) { + for (auto& file : day_files.second) { file_id_array.push_back(file); } } @@ -383,8 +383,8 @@ DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t npr } Status -DBImpl::Query(const std::string &table_id, const std::vector &file_ids, uint64_t k, uint64_t nq, - uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) { +DBImpl::Query(const std::string& table_id, const std::vector& file_ids, uint64_t k, uint64_t nq, + uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -393,7 +393,7 @@ DBImpl::Query(const std::string &table_id, const std::vector &file_ // get specified files std::vector ids; - for (auto &id : file_ids) { + for (auto& id : file_ids) { meta::TableFileSchema table_file; table_file.table_id_ = table_id; std::string::size_type sz; @@ -407,8 +407,8 @@ DBImpl::Query(const std::string &table_id, const std::vector &file_ } meta::TableFilesSchema file_id_array; - for (auto &day_files : files_array) { - for (auto &file : day_files.second) { + for (auto& day_files : files_array) { + for (auto& file : day_files.second) { file_id_array.push_back(file); } } @@ -424,7 +424,7 @@ DBImpl::Query(const std::string &table_id, const std::vector &file_ } Status -DBImpl::Size(uint64_t &result) { +DBImpl::Size(uint64_t& result) { if (shutting_down_.load(std::memory_order_acquire)) { return Status(DB_ERROR, "Milsvus server is shutdown!"); } @@ -436,8 +436,8 @@ DBImpl::Size(uint64_t &result) { // internal methods /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// Status -DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &files, uint64_t k, uint64_t nq, - uint64_t nprobe, const float *vectors, const meta::DatesT &dates, QueryResults &results) { +DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files, uint64_t k, uint64_t nq, + uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) { server::CollectQueryMetrics metrics(nq); TimeRecorder rc(""); @@ -446,7 +446,7 @@ DBImpl::QueryAsync(const std::string &table_id, const meta::TableFilesSchema &fi ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size(); scheduler::SearchJobPtr job = std::make_shared(0, k, nq, nprobe, vectors); - for (auto &file : files) { + for (auto& file : files) { scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); job->AddIndexFile(file_ptr); } @@ -514,7 +514,7 @@ DBImpl::BackgroundTimerTask() { void DBImpl::WaitMergeFileFinish() { std::lock_guard lck(compact_result_mutex_); - for (auto &iter : compact_thread_results_) { + for (auto& iter : compact_thread_results_) { iter.wait(); } } @@ -522,7 +522,7 @@ DBImpl::WaitMergeFileFinish() { void DBImpl::WaitBuildIndexFinish() { std::lock_guard lck(index_result_mutex_); - for (auto &iter : index_thread_results_) { + for (auto& iter : index_thread_results_) { iter.wait(); } } @@ -563,7 +563,7 @@ DBImpl::MemSerialize() { std::lock_guard lck(mem_serialize_mutex_); std::set temp_table_ids; mem_mgr_->Serialize(temp_table_ids); - for (auto &id : temp_table_ids) { + for (auto& id : temp_table_ids) { compact_table_ids_.insert(id); } @@ -608,7 +608,7 @@ DBImpl::StartCompactionTask() { } Status -DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const meta::TableFilesSchema &files) { +DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) { ENGINE_LOG_DEBUG << "Merge files for table: " << table_id; // step 1: create table file @@ -625,13 +625,13 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m // step 2: merge files ExecutionEnginePtr index = - EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType) table_file.engine_type_, - (MetricType) table_file.metric_type_, table_file.nlist_); + EngineFactory::Build(table_file.dimension_, table_file.location_, (EngineType)table_file.engine_type_, + (MetricType)table_file.metric_type_, table_file.nlist_); meta::TableFilesSchema updated; int64_t index_size = 0; - for (auto &file : files) { + for (auto& file : files) { server::CollectMergeFilesMetrics metrics; index->Merge(file.location_); @@ -649,7 +649,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m // step 3: serialize to disk try { index->Serialize(); - } catch (std::exception &ex) { + } catch (std::exception& ex) { // typical error: out of disk space or permition denied std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; @@ -667,7 +667,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m // step 4: update table files state // if index type isn't IDMAP, set file type to TO_INDEX if file size execeed index_file_size // else set file type to RAW, no need to build index - if (table_file.engine_type_ != (int) EngineType::FAISS_IDMAP) { + if (table_file.engine_type_ != (int)EngineType::FAISS_IDMAP) { table_file.file_type_ = (index->PhysicalSize() >= table_file.index_file_size_) ? meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; } else { @@ -687,7 +687,7 @@ DBImpl::MergeFiles(const std::string &table_id, const meta::DateT &date, const m } Status -DBImpl::BackgroundMergeFiles(const std::string &table_id) { +DBImpl::BackgroundMergeFiles(const std::string& table_id) { meta::DatePartionedTableFilesSchema raw_files; auto status = meta_ptr_->FilesToMerge(table_id, raw_files); if (!status.ok()) { @@ -696,7 +696,7 @@ DBImpl::BackgroundMergeFiles(const std::string &table_id) { } bool has_merge = false; - for (auto &kv : raw_files) { + for (auto& kv : raw_files) { auto files = kv.second; if (files.size() < options_.merge_trigger_number_) { ENGINE_LOG_DEBUG << "Files number not greater equal than merge trigger number, skip merge action"; @@ -719,7 +719,7 @@ DBImpl::BackgroundCompaction(std::set table_ids) { ENGINE_LOG_TRACE << " Background compaction thread start"; Status status; - for (auto &table_id : table_ids) { + for (auto& table_id : table_ids) { status = BackgroundMergeFiles(table_id); if (!status.ok()) { ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString(); @@ -771,9 +771,9 @@ DBImpl::StartBuildIndexTask(bool force) { } Status -DBImpl::BuildIndex(const meta::TableFileSchema &file) { - ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType) file.engine_type_, - (MetricType) file.metric_type_, file.nlist_); +DBImpl::BuildIndex(const meta::TableFileSchema& file) { + ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_, + (MetricType)file.metric_type_, file.nlist_); if (to_index == nullptr) { ENGINE_LOG_ERROR << "Invalid engine type"; return Status(DB_ERROR, "Invalid engine type"); @@ -804,7 +804,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) { try { server::CollectBuildIndexMetrics metrics; - index = to_index->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_); + index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_); if (index == nullptr) { table_file.file_type_ = meta::TableFileSchema::TO_DELETE; status = meta_ptr_->UpdateTableFile(table_file); @@ -813,7 +813,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) { return status; } - } catch (std::exception &ex) { + } catch (std::exception& ex) { // typical error: out of gpu memory std::string msg = "BuildIndex encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; @@ -839,7 +839,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) { // step 5: save index file try { index->Serialize(); - } catch (std::exception &ex) { + } catch (std::exception& ex) { // typical error: out of disk space or permition denied std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; @@ -882,7 +882,7 @@ DBImpl::BuildIndex(const meta::TableFileSchema &file) { status = meta_ptr_->UpdateTableFile(table_file); ENGINE_LOG_DEBUG << "Failed to update file to index, mark file: " << table_file.file_id_ << " to to_delete"; } - } catch (std::exception &ex) { + } catch (std::exception& ex) { std::string msg = "Build index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; return Status(DB_ERROR, msg); @@ -900,11 +900,10 @@ DBImpl::BackgroundBuildIndex() { meta_ptr_->FilesToIndex(to_index_files); Status status; - scheduler::BuildIndexJobPtr - job = std::make_shared(0, meta_ptr_, options_); + scheduler::BuildIndexJobPtr job = std::make_shared(0, meta_ptr_, options_); // step 2: put build index task to scheduler - for (auto &file : to_index_files) { + for (auto& file : to_index_files) { scheduler::TableFileSchemaPtr file_ptr = std::make_shared(file); job->AddToIndexFiles(file_ptr); } @@ -915,17 +914,17 @@ DBImpl::BackgroundBuildIndex() { ENGINE_LOG_ERROR << "Building index failed: " << status.ToString(); } -// for (auto &file : to_index_files) { -// status = BuildIndex(file); -// if (!status.ok()) { -// ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); -// } -// -// if (shutting_down_.load(std::memory_order_acquire)) { -// ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; -// break; -// } -// } + // for (auto &file : to_index_files) { + // status = BuildIndex(file); + // if (!status.ok()) { + // ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString(); + // } + // + // if (shutting_down_.load(std::memory_order_acquire)) { + // ENGINE_LOG_DEBUG << "Server will shutdown, skip build index action"; + // break; + // } + // } ENGINE_LOG_TRACE << "Background build index thread exit"; } diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index 2645a46df6a080333d8a18127f08fb54f9837240..ee63c2c6b7763ea102ec4650aa5590965ca0ed74 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -15,18 +15,17 @@ // specific language governing permissions and limitations // under the License. -#include #include "scheduler/TaskCreator.h" +#include +#include "SchedInst.h" #include "scheduler/tasklabel/BroadcastLabel.h" #include "tasklabel/DefaultLabel.h" -#include "SchedInst.h" - namespace milvus { namespace scheduler { std::vector -TaskCreator::Create(const JobPtr &job) { +TaskCreator::Create(const JobPtr& job) { switch (job->type()) { case JobType::SEARCH: { return Create(std::static_pointer_cast(job)); @@ -45,7 +44,7 @@ TaskCreator::Create(const JobPtr &job) { } std::vector -TaskCreator::Create(const SearchJobPtr &job) { +TaskCreator::Create(const SearchJobPtr& job) { std::vector tasks; for (auto& index_file : job->index_files()) { auto label = std::make_shared(); @@ -58,7 +57,7 @@ TaskCreator::Create(const SearchJobPtr &job) { } std::vector -TaskCreator::Create(const DeleteJobPtr &job) { +TaskCreator::Create(const DeleteJobPtr& job) { std::vector tasks; auto label = std::make_shared(); auto task = std::make_shared(job, label); @@ -69,12 +68,12 @@ TaskCreator::Create(const DeleteJobPtr &job) { } std::vector -TaskCreator::Create(const BuildIndexJobPtr &job) { +TaskCreator::Create(const BuildIndexJobPtr& job) { std::vector tasks; - //TODO(yukun): remove "disk" hardcode here + // TODO(yukun): remove "disk" hardcode here ResourcePtr res_ptr = ResMgrInst::GetInstance()->GetResource("disk"); - for (auto &to_index_file : job->to_index_files()) { + for (auto& to_index_file : job->to_index_files()) { auto label = std::make_shared(std::weak_ptr(res_ptr)); auto task = std::make_shared(to_index_file.second, label); task->job_ = job; diff --git a/cpp/src/scheduler/TaskCreator.h b/cpp/src/scheduler/TaskCreator.h index a178247aa25a66577e135e444715ae6c5ac9f283..ef71d9a3d37b90f3d6f4cbc4e8eb608b4e812145 100644 --- a/cpp/src/scheduler/TaskCreator.h +++ b/cpp/src/scheduler/TaskCreator.h @@ -30,9 +30,9 @@ #include "job/DeleteJob.h" #include "job/Job.h" #include "job/SearchJob.h" +#include "task/BuildIndexTask.h" #include "task/DeleteTask.h" #include "task/SearchTask.h" -#include "task/BuildIndexTask.h" #include "task/Task.h" namespace milvus { diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index f5184d47507da28be219369c1c42d86eb175aa23..53dd45faca0f5c57fc6f1973128b4b1b0a978c1f 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -22,14 +22,13 @@ #include "src/cache/GpuCacheMgr.h" #include "src/server/Config.h" - namespace milvus { namespace scheduler { std::vector -get_neighbours(const ResourcePtr &self) { +get_neighbours(const ResourcePtr& self) { std::vector neighbours; - for (auto &neighbour_node : self->GetNeighbours()) { + for (auto& neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; @@ -43,9 +42,9 @@ get_neighbours(const ResourcePtr &self) { } std::vector> -get_neighbours_with_connetion(const ResourcePtr &self) { +get_neighbours_with_connetion(const ResourcePtr& self) { std::vector> neighbours; - for (auto &neighbour_node : self->GetNeighbours()) { + for (auto& neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; @@ -59,12 +58,12 @@ get_neighbours_with_connetion(const ResourcePtr &self) { } void -Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) { +Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) { auto neighbours = get_neighbours_with_connetion(self); if (not neighbours.empty()) { std::vector speeds; uint64_t total_speed = 0; - for (auto &neighbour : neighbours) { + for (auto& neighbour : neighbours) { uint64_t speed = neighbour.second.speed(); speeds.emplace_back(speed); total_speed += speed; @@ -89,15 +88,15 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self } void -Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) { +Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) { auto neighbours = get_neighbours(self); - for (auto &neighbour : neighbours) { + for (auto& neighbour : neighbours) { neighbour->task_table().Put(task); } } void -Action::PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest) { +Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { dest->task_table().Put(task); } @@ -139,7 +138,7 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr auto compute_resources = res_mgr.lock()->GetComputeResources(); std::vector> paths; std::vector transport_costs; - for (auto &res : compute_resources) { + for (auto& res : compute_resources) { std::vector path; uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path); transport_costs.push_back(transport_cost); @@ -166,17 +165,17 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); task->path() = task_path; } else if (task->job_.lock()->type() == JobType::BUILD) { - //step2: Read device id in config - //get build index gpu resource - server::Config &config = server::Config::GetInstance(); + // step2: Read device id in config + // get build index gpu resource + server::Config& config = server::Config::GetInstance(); int32_t build_index_gpu; Status stat = config.GetDBConfigBuildIndexGPU(build_index_gpu); bool find_gpu_res = false; for (uint64_t i = 0; i < compute_resources.size(); ++i) { if (res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu) != nullptr) { - if (compute_resources[i]->name() - == res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { + if (compute_resources[i]->name() == + res_mgr.lock()->GetResource(ResourceType::GPU, build_index_gpu)->name()) { find_gpu_res = true; Path task_path(paths[i], paths[i].size() - 1); task->path() = task_path; diff --git a/cpp/src/scheduler/job/BuildIndexJob.cpp b/cpp/src/scheduler/job/BuildIndexJob.cpp index bccc7a9327485d869a69a6deb7154c46f0f20125..423121c5fb130654be90c4bb5713a29bbd576451 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.cpp +++ b/cpp/src/scheduler/job/BuildIndexJob.cpp @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "BuildIndexJob.h" +#include "scheduler/job/BuildIndexJob.h" #include "utils/Log.h" +#include + namespace milvus { namespace scheduler { @@ -26,7 +28,7 @@ BuildIndexJob::BuildIndexJob(JobId id, engine::meta::MetaPtr meta_ptr, engine::D } bool -BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr &to_index_file) { +BuildIndexJob::AddToIndexFiles(const engine::meta::TableFileSchemaPtr& to_index_file) { std::unique_lock lock(mutex_); if (to_index_file == nullptr || to_index_files_.find(to_index_file->id_) != to_index_files_.end()) { return false; diff --git a/cpp/src/scheduler/job/BuildIndexJob.h b/cpp/src/scheduler/job/BuildIndexJob.h index cc50d70fc90dcc361a2cda72c142723aa258e2f3..b6ca462537d787b691705c4c89ba8762f98030c7 100644 --- a/cpp/src/scheduler/job/BuildIndexJob.h +++ b/cpp/src/scheduler/job/BuildIndexJob.h @@ -16,22 +16,21 @@ // under the License. #pragma once -#include -#include +#include +#include #include +#include +#include #include -#include -#include +#include #include -#include -#include -#include +#include +#include #include "Job.h" #include "db/meta/Meta.h" #include "scheduler/Definition.h" - namespace milvus { namespace scheduler { @@ -46,21 +45,21 @@ class BuildIndexJob : public Job { public: bool - AddToIndexFiles(const TableFileSchemaPtr &to_index_file); + AddToIndexFiles(const TableFileSchemaPtr& to_index_file); - Status & + Status& WaitBuildIndexFinish(); void BuildIndexDone(size_t to_index_id); public: - Status & + Status& GetStatus() { return status_; } - Id2ToIndexMap & + Id2ToIndexMap& to_index_files() { return to_index_files_; } diff --git a/cpp/src/scheduler/task/BuildIndexTask.cpp b/cpp/src/scheduler/task/BuildIndexTask.cpp index b37251092ed77273e8f246064fc546452baf2c8c..f2cebcac9e24b9b36d6003ad19ca20242cc0715c 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.cpp +++ b/cpp/src/scheduler/task/BuildIndexTask.cpp @@ -15,27 +15,26 @@ // specific language governing permissions and limitations // under the License. - -#include "BuildIndexTask.h" +#include "scheduler/task/BuildIndexTask.h" #include "db/engine/EngineFactory.h" #include "metrics/Metrics.h" #include "scheduler/job/BuildIndexJob.h" #include "utils/Log.h" #include "utils/TimeRecorder.h" +#include #include #include #include - namespace milvus { namespace scheduler { XBuildIndexTask::XBuildIndexTask(TableFileSchemaPtr file, TaskLabelPtr label) : Task(TaskType::BuildIndexTask, std::move(label)), file_(file) { if (file_) { - to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType) file_->engine_type_, - (MetricType) file_->metric_type_, file_->nlist_); + to_index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, (EngineType)file_->engine_type_, + (MetricType)file_->metric_type_, file_->nlist_); } } @@ -63,7 +62,7 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { error_msg = "Wrong load type"; stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); } - } catch (std::exception &ex) { + } catch (std::exception& ex) { // typical error: out of disk space or permition denied error_msg = "Failed to load to_index file: " + std::string(ex.what()); stat = Status(SERVER_UNEXPECTED_ERROR, error_msg); @@ -89,9 +88,9 @@ XBuildIndexTask::Load(milvus::scheduler::LoadType type, uint8_t device_id) { size_t file_size = to_index_engine_->PhysicalSize(); - std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + - std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + - " bytes from location: " + file_->location_ + " totally cost"; + std::string info = "Load file id:" + std::to_string(file_->id_) + + " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) + + " bytes from location: " + file_->location_ + " totally cost"; double span = rc.ElapseFromBegin(info); to_index_id_ = file_->id_; @@ -110,15 +109,14 @@ XBuildIndexTask::Execute() { if (auto job = job_.lock()) { auto build_index_job = std::static_pointer_cast(job); std::string location = file_->location_; - EngineType engine_type = (EngineType) file_->engine_type_; + EngineType engine_type = (EngineType)file_->engine_type_; std::shared_ptr index; // step 2: create table file engine::meta::TableFileSchema table_file; table_file.table_id_ = file_->table_id_; table_file.date_ = file_->date_; - table_file.file_type_ = - engine::meta::TableFileSchema::NEW_INDEX; + table_file.file_type_ = engine::meta::TableFileSchema::NEW_INDEX; engine::meta::MetaPtr meta_ptr = build_index_job->meta(); Status status = build_index_job->meta()->CreateTableFile(table_file); @@ -131,7 +129,7 @@ XBuildIndexTask::Execute() { // step 3: build index try { - index = to_index_engine_->BuildIndex(table_file.location_, (EngineType) table_file.engine_type_); + index = to_index_engine_->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_); if (index == nullptr) { table_file.file_type_ = engine::meta::TableFileSchema::TO_DELETE; status = meta_ptr->UpdateTableFile(table_file); @@ -140,7 +138,7 @@ XBuildIndexTask::Execute() { return; } - } catch (std::exception &ex) { + } catch (std::exception& ex) { std::string msg = "BuildIndex encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; @@ -166,7 +164,7 @@ XBuildIndexTask::Execute() { // step 5: save index file try { index->Serialize(); - } catch (std::exception &ex) { + } catch (std::exception& ex) { // typical error: out of disk space or permition denied std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; @@ -197,7 +195,7 @@ XBuildIndexTask::Execute() { << " bytes" << " from file " << origin_file.file_id_; -// index->Cache(); + // index->Cache(); } else { // failed to update meta, mark the new file as to_delete, don't delete old file origin_file.file_type_ = engine::meta::TableFileSchema::TO_INDEX; diff --git a/cpp/src/scheduler/task/BuildIndexTask.h b/cpp/src/scheduler/task/BuildIndexTask.h index 84825921f90d9c5b6ef6769530f07c3238b16734..5c2aa69a009b73d30a76b1ed165ff2d9450493ce 100644 --- a/cpp/src/scheduler/task/BuildIndexTask.h +++ b/cpp/src/scheduler/task/BuildIndexTask.h @@ -21,7 +21,6 @@ #include "scheduler/Definition.h" #include "scheduler/job/BuildIndexJob.h" - namespace milvus { namespace scheduler { diff --git a/cpp/src/scheduler/tasklabel/SpecResLabel.h b/cpp/src/scheduler/tasklabel/SpecResLabel.h index 3738ead708467e4866a84cad14a93e08704ec5fb..db2989fbc244c92c128bcade16e5a62738563cc7 100644 --- a/cpp/src/scheduler/tasklabel/SpecResLabel.h +++ b/cpp/src/scheduler/tasklabel/SpecResLabel.h @@ -23,9 +23,9 @@ #include #include -//class Resource; +// class Resource; // -//using ResourceWPtr = std::weak_ptr; +// using ResourceWPtr = std::weak_ptr; namespace milvus { namespace scheduler {