diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 434e944732cf65022c1db9a22294edaecb730fdd..232ea03cfb4fbc9e5a6ec9d68675187d820bdb8b 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -36,6 +36,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-384 - Add global instance of ResourceMgr and Scheduler - MS-389 - Add clone interface in Task - MS-390 - Update resource construct function +- MS-391 - Add PushTaskToNeighbourHasExecutor action ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 6eef6014fb4ca2f4a6b02a2abda75b2b0b3f82e2..85fa90585c532aca9b1b63bc8f99689e0e62ae31 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -114,13 +114,14 @@ Scheduler::OnCopyCompleted(const EventPtr &event) { resource->WakeupExecutor(); if (resource->Type() == ResourceType::DISK) { Action::PushTaskToNeighbour(event->resource_); + } else { + Action::PushTaskToNeighbourHasExecutor(event->resource_); } } } void Scheduler::OnTaskTableUpdated(const EventPtr &event) { -// Action::PushTaskToNeighbour(event->resource_); if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h index d72bbefc8d7bb6ef7630493d85aefacae0781aa5..715088718563ca1e67fa11e41b559e68fd6b9a33 100644 --- a/cpp/src/scheduler/action/Action.h +++ b/cpp/src/scheduler/action/Action.h @@ -20,6 +20,11 @@ public: static void PushTaskToNeighbour(const ResourceWPtr &self); + /* + * Push task to neighbour that has executor; + */ + static void + PushTaskToNeighbourHasExecutor(const ResourceWPtr &self); /* * Pull task From neighbour; diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 3c01fc492840fd72ba34ce9959dadcda3985c7cd..5b9cb0f6ba17312a42432006aeb9e81011d401e4 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -4,7 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ -#include +#include #include "Action.h" @@ -13,29 +13,65 @@ namespace milvus { namespace engine { void -push_task(const ResourcePtr &self, const ResourcePtr &other) { - auto &self_task_table = self->task_table(); - auto &other_task_table = other->task_table(); +next(std::list neighbours, std::list::iterator &it) { + it++; + if (neighbours.end() == it) { + it = neighbours.begin(); + } +} + + +void +push_task_round_robin(TaskTable &self_task_table, std::list &neighbours) { CacheMgr cache; - auto indexes = PickToMove(self_task_table, cache, 10); + auto it = neighbours.begin(); + if (it == neighbours.end()) return; + auto indexes = PickToMove(self_task_table, cache, self_task_table.Size()); + for (auto index : indexes) { if (self_task_table.Move(index)) { auto task = self_task_table.Get(index)->task; - other_task_table.Put(task); - // TODO: mark moved future + task = task->Clone(); + (*it)->task_table().Put(task); + next(neighbours, it); } } } void Action::PushTaskToNeighbour(const ResourceWPtr &res) { - if (auto self = res.lock()) { - for (auto &neighbour : self->GetNeighbours()) { - if (auto n = neighbour.neighbour_node.lock()) { - push_task(self, std::static_pointer_cast(n)); - } + auto self = res.lock(); + if (not self) return; + + std::list neighbours; + for (auto &neighbour_node : self->GetNeighbours()) { + auto node = neighbour_node.neighbour_node.lock(); + if (not node) continue; + + auto resource = std::static_pointer_cast(node); + neighbours.emplace_back(resource); + } + + push_task_round_robin(self->task_table(), neighbours); +} + +void +Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) { + auto self = res.lock(); + if (not self) return; + + std::list neighbours; + for (auto &neighbour_node : self->GetNeighbours()) { + auto node = neighbour_node.neighbour_node.lock(); + if (not node) continue; + + auto resource = std::static_pointer_cast(node); + if (resource->HasExecutor()) { + neighbours.emplace_back(resource); } } + + push_task_round_robin(self->task_table(), neighbours); }