diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 76dd3a8f7b0861f6b88c7edfb4f03a4d297147ed..2a9146dc07bd367487f4ada81b05f161dd4815c4 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -58,6 +58,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-418 - Update server_config.template file, set CPU compute only default - MS-419 - Move index_file_size from IndexParam to TableSchema - MS-421 - Add TaskLabel in scheduler +- MS-422 - Support DeleteTask in Multi-GpuResource case ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 96e2589382442ba55108bf4526f06e650e452aef..b0af3890fc8d5b0f43743f81d50788eb9bff0408 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -54,7 +54,7 @@ ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connect auto res2 = get_resource_by_name(name2); if (res1 && res2) { res1->AddNeighbour(std::static_pointer_cast(res2), connection); - res2->AddNeighbour(std::static_pointer_cast(res1), connection); +// res2->AddNeighbour(std::static_pointer_cast(res1), connection); } } diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 164ea430d1fca0d160cc9aa2d8e0f516da0db433..775cf76ba2a3695d90d728c5a77af29e45572380 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -108,12 +108,25 @@ Scheduler::OnFinishTask(const EventPtr &event) { void Scheduler::OnCopyCompleted(const EventPtr &event) { + auto load_completed_event = std::static_pointer_cast(event); if (auto resource = event->resource_.lock()) { resource->WakeupExecutor(); - if (resource->Type() == ResourceType::DISK) { - Action::PushTaskToNeighbour(event->resource_); - } else { - Action::PushTaskToNeighbourHasExecutor(event->resource_); + + auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); + switch (task_table_type) { + case TaskLabelType::DEFAULT: { + if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) { + Action::PushTaskToNeighbourRandomly(load_completed_event->task_table_item_->task, resource); + } + break; + } + case TaskLabelType::BROADCAST: { + Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); + break; + } + default: { + break; + } } } } diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h index 715088718563ca1e67fa11e41b559e68fd6b9a33..8315ecb0ffe16fa3482f524e845d9835682a259d 100644 --- a/cpp/src/scheduler/action/Action.h +++ b/cpp/src/scheduler/action/Action.h @@ -14,23 +14,11 @@ namespace engine { class Action { public: - /* - * Push task to neighbour; - */ static void - PushTaskToNeighbour(const ResourceWPtr &self); + PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self); - /* - * Push task to neighbour that has executor; - */ static void - PushTaskToNeighbourHasExecutor(const ResourceWPtr &self); - - /* - * Pull task From neighbour; - */ - static void - PullTaskFromNeighbour(const ResourceWPtr &self); + PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self); }; diff --git a/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp b/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp deleted file mode 100644 index b1ac97b6e48a9343f6a8a5e009706a0d30269fc9..0000000000000000000000000000000000000000 --- a/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ - -#include "Action.h" - - -namespace zilliz { -namespace milvus { -namespace engine { - -void -Action::PullTaskFromNeighbour(const ResourceWPtr &self) { - // TODO: implement -} - - -} -} -} - - diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 9afeac688a5ff1ce9126c1e619d99d5427467110..6b2ee442674103fd10db24b29a7fcba1b94d2266 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -13,54 +13,9 @@ namespace zilliz { namespace milvus { namespace engine { -void -next(std::list &neighbours, std::list::iterator &it) { - it++; - if (neighbours.end() == it) { - it = neighbours.begin(); - } -} - -// TODO: this function called with only on tasks, so it will always push task to first neighbour -void -push_task_round_robin(TaskTable &self_task_table, std::list &neighbours) { - CacheMgr cache; - auto it = neighbours.begin(); - if (it == neighbours.end()) return; - auto indexes = PickToMove(self_task_table, cache, self_task_table.Size()); - - for (auto index : indexes) { - if (self_task_table.Move(index)) { - auto task = self_task_table.Get(index)->task; -// task = task->Clone(); - (*it)->task_table().Put(task); - next(neighbours, it); - } - } -} - -void -push_task_randomly(TaskTable &self_task_table, std::vector &neighbours) { - std::random_device rd; - std::mt19937 mt(rd()); - std::uniform_int_distribution dist(0, neighbours.size() - 1); - CacheMgr cache; - - auto indexes = PickToMove(self_task_table, cache, self_task_table.Size()); - for (auto index : indexes) { - if (self_task_table.Move(index)) { - auto task = self_task_table.Get(index)->task; - neighbours[dist(mt)]->task_table().Put(task); - } - } -} - -void -Action::PushTaskToNeighbour(const ResourceWPtr &res) { - auto self = res.lock(); - if (not self) return; - - std::list neighbours; +std::vector +get_neighbours(const ResourcePtr &self) { + std::vector neighbours; for (auto &neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; @@ -68,30 +23,27 @@ Action::PushTaskToNeighbour(const ResourceWPtr &res) { auto resource = std::static_pointer_cast(node); neighbours.emplace_back(resource); } - - push_task_round_robin(self->task_table(), neighbours); + return neighbours; } + void -Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) { - auto self = res.lock(); - if (not self) return; +Action::PushTaskToNeighbourRandomly(const TaskPtr &task, + const ResourcePtr &self) { + auto neighbours = get_neighbours(self); + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0, neighbours.size() - 1); - std::list l_neighbours; - std::vector v_neighbours; - for (auto &neighbour_node : self->GetNeighbours()) { - auto node = neighbour_node.neighbour_node.lock(); - if (not node) continue; + neighbours[dist(mt)]->task_table().Put(task); +} - auto resource = std::static_pointer_cast(node); - if (resource->HasExecutor()) { - l_neighbours.push_back(resource); - v_neighbours.push_back(resource); - } +void +Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) { + auto neighbours = get_neighbours(self); + for (auto &neighbour : neighbours) { + neighbour->task_table().Put(task); } - -// push_task_round_robin(self->task_table(), l_neighbours); - push_task_randomly(self->task_table(), v_neighbours); }