diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index f69311adbb0d12ee0121f69801d5ccb8940601e5..d0d353cfa7b3881285db2b524bc2740e1e46496c 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -76,6 +76,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-451 - Update server_config.template file, set GPU compute default - MS-455 - Distribute tasks by minimal cost in scheduler - MS-460 - Put transport speed as weight when choosing neighbour to execute task +- MS-459 - Add cache for pick function in tasktable ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 6bf056e4711c4fca8ff0cfc0399652e9bc6de109..8a381b9dbfedf2e443722b09e97c6121079e41ea 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -163,7 +163,7 @@ Status DBImpl::PreloadTable(const std::string &table_id) { //step 1: load index engine->Load(true); } catch (std::exception &ex) { - std::string msg = "load to cache exception" + std::string(ex.what()); + std::string msg = "Pre-load table encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; return Status::Error(msg); } @@ -198,8 +198,6 @@ Status DBImpl::InsertVectors(const std::string& table_id_, Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float *vectors, QueryResults &results) { - server::CollectQueryMetrics metrics(nq); - meta::DatesT dates = {utils::GetDate()}; Status result = Query(table_id, k, nq, nprobe, vectors, dates, results); @@ -208,7 +206,7 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq, uint6 Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) { - ENGINE_LOG_DEBUG << "Query by vectors " << table_id; + ENGINE_LOG_DEBUG << "Query by dates for table: " << table_id; //get all table files from table meta::DatePartionedTableFilesSchema files; @@ -232,7 +230,7 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq, uint6 Status DBImpl::Query(const std::string& table_id, const std::vector& file_ids, uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors, const meta::DatesT& dates, QueryResults& results) { - ENGINE_LOG_DEBUG << "Query by file ids"; + ENGINE_LOG_DEBUG << "Query by file ids for table: " << table_id; //get specified files std::vector ids; @@ -274,7 +272,7 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch server::TimeRecorder rc(""); //step 1: get files to search - ENGINE_LOG_DEBUG << "Engine query begin, index file count:" << files.size() << " date range count:" << dates.size(); + ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size(); SearchContextPtr context = std::make_shared(k, nq, nprobe, vectors); for (auto &file : files) { TableFileSchemaPtr file_ptr = std::make_shared(file); @@ -300,11 +298,11 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch double search_percent = search_cost/total_cost; double reduce_percent = reduce_cost/total_cost; - ENGINE_LOG_DEBUG << "Engine load index totally cost:" << load_info << " percent: " << load_percent*100 << "%"; - ENGINE_LOG_DEBUG << "Engine search index totally cost:" << search_info << " percent: " << search_percent*100 << "%"; - ENGINE_LOG_DEBUG << "Engine reduce topk totally cost:" << reduce_info << " percent: " << reduce_percent*100 << "%"; + ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%"; + ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%"; + ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%"; } else { - ENGINE_LOG_DEBUG << "Engine load cost:" << load_info + ENGINE_LOG_DEBUG << "Engine load cost: " << load_info << " search cost: " << search_info << " reduce cost: " << reduce_info; } @@ -413,7 +411,7 @@ void DBImpl::StartCompactionTask() { Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, const meta::TableFilesSchema& files) { - ENGINE_LOG_DEBUG << "Merge files for table " << table_id; + ENGINE_LOG_DEBUG << "Merge files for table: " << table_id; //step 1: create table file meta::TableFileSchema table_file; @@ -453,7 +451,7 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, index->Serialize(); } catch (std::exception& ex) { //typical error: out of disk space or permition denied - std::string msg = "Serialize merged index encounter exception" + std::string(ex.what()); + std::string msg = "Serialize merged index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; table_file.file_type_ = meta::TableFileSchema::TO_DELETE; @@ -508,7 +506,7 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) { MergeFiles(table_id, kv.first, kv.second); if (shutting_down_.load(std::memory_order_acquire)){ - ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table " << table_id; + ENGINE_LOG_DEBUG << "Server will shutdown, skip merge action for table: " << table_id; break; } } @@ -574,7 +572,7 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) TableIndex old_index; auto status = DescribeIndex(table_id, old_index); if(!status.ok()) { - ENGINE_LOG_ERROR << "Failed to get table index info"; + ENGINE_LOG_ERROR << "Failed to get table index info for table: " << table_id; return status; } @@ -584,7 +582,7 @@ Status DBImpl::CreateIndex(const std::string& table_id, const TableIndex& index) status = meta_ptr_->UpdateTableIndexParam(table_id, index); if (!status.ok()) { - ENGINE_LOG_ERROR << "Failed to update table index info"; + ENGINE_LOG_ERROR << "Failed to update table index info for table: " << table_id; return status; } } @@ -632,7 +630,7 @@ Status DBImpl::DescribeIndex(const std::string& table_id, TableIndex& index) { } Status DBImpl::DropIndex(const std::string& table_id) { - ENGINE_LOG_DEBUG << "drop index for table: " << table_id; + ENGINE_LOG_DEBUG << "Drop index for table: " << table_id; return meta_ptr_->DropTableIndex(table_id); } @@ -656,7 +654,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { table_file.file_type_ = meta::TableFileSchema::NEW_INDEX; //for multi-db-path, distribute index file averagely to each path Status status = meta_ptr_->CreateTableFile(table_file); if (!status.ok()) { - ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString(); + ENGINE_LOG_ERROR << "Failed to create table file: " << status.ToString(); return status; } @@ -668,7 +666,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { index = to_index->BuildIndex(table_file.location_, (EngineType)table_file.engine_type_); } catch (std::exception& ex) { //typical error: out of gpu memory - std::string msg = "BuildIndex encounter exception" + std::string(ex.what()); + std::string msg = "BuildIndex encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; table_file.file_type_ = meta::TableFileSchema::TO_DELETE; @@ -693,7 +691,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { index->Serialize(); } catch (std::exception& ex) { //typical error: out of disk space or permition denied - std::string msg = "Serialize index encounter exception" + std::string(ex.what()); + std::string msg = "Serialize index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; table_file.file_type_ = meta::TableFileSchema::TO_DELETE; @@ -736,7 +734,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { } } catch (std::exception& ex) { - std::string msg = "Build index encounter exception" + std::string(ex.what()); + std::string msg = "Build index encounter exception: " + std::string(ex.what()); ENGINE_LOG_ERROR << msg; return Status::Error(msg); } @@ -745,7 +743,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { } void DBImpl::BackgroundBuildIndex() { - ENGINE_LOG_TRACE << " Background build index thread start"; + ENGINE_LOG_TRACE << "Background build index thread start"; std::unique_lock lock(build_index_mutex_); meta::TableFilesSchema to_index_files; @@ -764,7 +762,7 @@ void DBImpl::BackgroundBuildIndex() { } } - ENGINE_LOG_TRACE << " Background build index thread exit"; + ENGINE_LOG_TRACE << "Background build index thread exit"; } Status DBImpl::DropAll() { diff --git a/cpp/src/scheduler/Cost.cpp b/cpp/src/scheduler/Cost.cpp deleted file mode 100644 index 724a717d2f32f0475f8d119cbf4fb3ffb8d79092..0000000000000000000000000000000000000000 --- a/cpp/src/scheduler/Cost.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ - -#include "Cost.h" - - -namespace zilliz { -namespace milvus { -namespace engine { - -std::vector -PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) { - std::vector indexes; - for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) { - if (task_table[i]->state == TaskTableItemState::LOADED) { - indexes.push_back(i); - ++count; - } - } - return indexes; -} - - -std::vector -PickToLoad(TaskTable &task_table, uint64_t limit) { - std::vector indexes; - for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) { - if (task_table[i]->state == TaskTableItemState::START) { - indexes.push_back(i); - ++count; - } - } - return indexes; -} - - -std::vector -PickToExecute(TaskTable &task_table, uint64_t limit) { - std::vector indexes; - for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) { - if (task_table[i]->state == TaskTableItemState::LOADED) { - indexes.push_back(i); - ++count; - } - } - return indexes; -} - -} -} -} diff --git a/cpp/src/scheduler/Cost.h b/cpp/src/scheduler/Cost.h deleted file mode 100644 index 76f16d4d1dcbe1b8f4f89ba80a66f34bb9115d64..0000000000000000000000000000000000000000 --- a/cpp/src/scheduler/Cost.h +++ /dev/null @@ -1,48 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ -#pragma once - -#include -#include "task/Task.h" -#include "TaskTable.h" -#include "CacheMgr.h" - - -namespace zilliz { -namespace milvus { -namespace engine { - -// TODO: Policy interface -// TODO: collect statistics - -/* - * select tasks to move; - * call from scheduler; - */ -std::vector -PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit); - - -/* - * select task to load - * call from resource; - * I DONT SURE NEED THIS; - */ -std::vector -PickToLoad(TaskTable &task_table, uint64_t limit); - -/* - * select task to execute; - * call from resource; - * I DONT SURE NEED THIS; - */ -std::vector -PickToExecute(TaskTable &task_table, uint64_t limit); - - -} -} -} diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 83c3e9864c5286bac20e830b6c4532f1389abb72..fa67eef489c61d0767f34c9d8d0bbc0a24db6dc0 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -7,7 +7,6 @@ #include #include "event/LoadCompletedEvent.h" #include "Scheduler.h" -#include "Cost.h" #include "action/Action.h" #include "Algorithm.h" diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 2d309d591ca34319b7468725bd9b946b831c3003..56d31e299a8e174397778d14db638454469eb9d8 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -53,6 +53,11 @@ ToString(const TaskTimestamp ×tamp) { return ss.str(); } +bool +TaskTableItem::IsFinish() { + return state == TaskTableItemState::MOVED || state == TaskTableItemState::EXECUTED; +} + bool TaskTableItem::Load() { std::unique_lock lock(mutex); @@ -133,6 +138,38 @@ TaskTableItem::Dump() { return ss.str(); } +std::vector +TaskTable::PickToLoad(uint64_t limit) { + std::vector indexes; + bool cross = false; + for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) { + if (not cross && table_[i]->IsFinish()) { + last_finish_ = i; + } else if (table_[i]->state == TaskTableItemState::START) { + cross = true; + indexes.push_back(i); + ++count; + } + } + return indexes; +} + +std::vector +TaskTable::PickToExecute(uint64_t limit) { + std::vector indexes; + bool cross = false; + for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) { + if (not cross && table_[i]->IsFinish()) { + last_finish_ = i; + } else if (table_[i]->state == TaskTableItemState::LOADED) { + cross = true; + indexes.push_back(i); + ++count; + } + } + return indexes; +} + void TaskTable::Put(TaskPtr task) { std::lock_guard lock(id_mutex_); diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 886259957cd992e68623668d5d51c7e3540cc914..ee6d3b56cb6a16a2e690d39c01a58a54af4b588f 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -54,6 +54,9 @@ struct TaskTableItem { uint8_t priority; // just a number, meaningless; + bool + IsFinish(); + bool Load(); @@ -141,6 +144,13 @@ public: std::deque::iterator begin() { return table_.begin(); } std::deque::iterator end() { return table_.end(); } +public: + std::vector + PickToLoad(uint64_t limit); + + std::vector + PickToExecute(uint64_t limit); + public: /******** Action ********/ @@ -182,7 +192,7 @@ public: * Called by executor; */ inline bool - Executed(uint64_t index){ + Executed(uint64_t index) { return table_[index]->Executed(); } @@ -193,7 +203,7 @@ public: */ inline bool - Move(uint64_t index){ + Move(uint64_t index) { return table_[index]->Move(); } @@ -203,7 +213,7 @@ public: * Called by scheduler; */ inline bool - Moved(uint64_t index){ + Moved(uint64_t index) { return table_[index]->Moved(); } @@ -220,6 +230,9 @@ private: mutable std::mutex id_mutex_; std::deque table_; std::function subscriber_ = nullptr; + + // cache last finish avoid Pick task from begin always + uint64_t last_finish_ = 0; }; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 8d82d4821cb2e043d74fbb1cf00120cde0f85e1e..b4a6cb5b66faac3a3c7a7e73b9473675ac48fda0 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -81,7 +81,7 @@ void Resource::WakeupExecutor() { } TaskTableItemPtr Resource::pick_task_load() { - auto indexes = PickToLoad(task_table_, 10); + auto indexes = task_table_.PickToLoad(10); for (auto index : indexes) { // try to set one task loading, then return if (task_table_.Load(index)) @@ -92,7 +92,7 @@ TaskTableItemPtr Resource::pick_task_load() { } TaskTableItemPtr Resource::pick_task_execute() { - auto indexes = PickToExecute(task_table_, 3); + auto indexes = task_table_.PickToExecute(3); for (auto index : indexes) { // try to set one task executing, then return if (task_table_.Execute(index)) diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 5395eafb42b547fb86f42922e1dd3cd09d831d39..9169a67cf9e79395b2b1ce2e181c7b6b0f6c8e3a 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -19,7 +19,6 @@ #include "../event/TaskTableUpdatedEvent.h" #include "../TaskTable.h" #include "../task/Task.h" -#include "../Cost.h" #include "Connection.h" #include "Node.h" #include "RegisterHandler.h" diff --git a/cpp/unittest/scheduler/cost_test.cpp b/cpp/unittest/scheduler/cost_test.cpp deleted file mode 100644 index 1a625d786ef5ed6ea2069b0690ea5ca056f942a6..0000000000000000000000000000000000000000 --- a/cpp/unittest/scheduler/cost_test.cpp +++ /dev/null @@ -1,49 +0,0 @@ -#include "scheduler/TaskTable.h" -#include "scheduler/Cost.h" -#include -#include "scheduler/task/TestTask.h" - - -using namespace zilliz::milvus::engine; - -class CostTest : public ::testing::Test { -protected: - void - SetUp() override { - TableFileSchemaPtr dummy = nullptr; - for (uint64_t i = 0; i < 8; ++i) { - auto task = std::make_shared(dummy); - table_.Put(task); - } - table_.Get(0)->state = TaskTableItemState::INVALID; - table_.Get(1)->state = TaskTableItemState::START; - table_.Get(2)->state = TaskTableItemState::LOADING; - table_.Get(3)->state = TaskTableItemState::LOADED; - table_.Get(4)->state = TaskTableItemState::EXECUTING; - table_.Get(5)->state = TaskTableItemState::EXECUTED; - table_.Get(6)->state = TaskTableItemState::MOVING; - table_.Get(7)->state = TaskTableItemState::MOVED; - } - - - TaskTable table_; -}; - -TEST_F(CostTest, pick_to_move) { - CacheMgr cache; - auto indexes = PickToMove(table_, cache, 10); - ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0], 3); -} - -TEST_F(CostTest, pick_to_load) { - auto indexes = PickToLoad(table_, 10); - ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0], 1); -} - -TEST_F(CostTest, pick_to_executed) { - auto indexes = PickToExecute(table_, 10); - ASSERT_EQ(indexes.size(), 1); - ASSERT_EQ(indexes[0], 3); -}