From 0fbf4315023a9887c83cbf69a17b5cba494a68df Mon Sep 17 00:00:00 2001 From: wxyu Date: Wed, 21 Aug 2019 18:27:05 +0800 Subject: [PATCH] MS-391 Add PushTaskToNeighbourHasExecutor action Former-commit-id: bddcefc244cc06cfa908e9da9f6c5150ed53f87f --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/Scheduler.cpp | 3 +- cpp/src/scheduler/action/Action.h | 5 ++ .../scheduler/action/PushTaskToNeighbour.cpp | 60 +++++++++++++++---- 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 434e9447..232ea03c 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -36,6 +36,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-384 - Add global instance of ResourceMgr and Scheduler - MS-389 - Add clone interface in Task - MS-390 - Update resource construct function +- MS-391 - Add PushTaskToNeighbourHasExecutor action ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 6eef6014..85fa9058 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -114,13 +114,14 @@ Scheduler::OnCopyCompleted(const EventPtr &event) { resource->WakeupExecutor(); if (resource->Type() == ResourceType::DISK) { Action::PushTaskToNeighbour(event->resource_); + } else { + Action::PushTaskToNeighbourHasExecutor(event->resource_); } } } void Scheduler::OnTaskTableUpdated(const EventPtr &event) { -// Action::PushTaskToNeighbour(event->resource_); if (auto resource = event->resource_.lock()) { resource->WakeupLoader(); } diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h index d72bbefc..71508871 100644 --- a/cpp/src/scheduler/action/Action.h +++ b/cpp/src/scheduler/action/Action.h @@ -20,6 +20,11 @@ public: static void PushTaskToNeighbour(const ResourceWPtr &self); + /* + * Push task to neighbour that has executor; + */ + static void + PushTaskToNeighbourHasExecutor(const ResourceWPtr &self); /* * Pull task From neighbour; diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 3c01fc49..5b9cb0f6 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -4,7 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ -#include +#include #include "Action.h" @@ -13,29 +13,65 @@ namespace milvus { namespace engine { void -push_task(const ResourcePtr &self, const ResourcePtr &other) { - auto &self_task_table = self->task_table(); - auto &other_task_table = other->task_table(); +next(std::list neighbours, std::list::iterator &it) { + it++; + if (neighbours.end() == it) { + it = neighbours.begin(); + } +} + + +void +push_task_round_robin(TaskTable &self_task_table, std::list &neighbours) { CacheMgr cache; - auto indexes = PickToMove(self_task_table, cache, 10); + auto it = neighbours.begin(); + if (it == neighbours.end()) return; + auto indexes = PickToMove(self_task_table, cache, self_task_table.Size()); + for (auto index : indexes) { if (self_task_table.Move(index)) { auto task = self_task_table.Get(index)->task; - other_task_table.Put(task); - // TODO: mark moved future + task = task->Clone(); + (*it)->task_table().Put(task); + next(neighbours, it); } } } void Action::PushTaskToNeighbour(const ResourceWPtr &res) { - if (auto self = res.lock()) { - for (auto &neighbour : self->GetNeighbours()) { - if (auto n = neighbour.neighbour_node.lock()) { - push_task(self, std::static_pointer_cast(n)); - } + auto self = res.lock(); + if (not self) return; + + std::list neighbours; + for (auto &neighbour_node : self->GetNeighbours()) { + auto node = neighbour_node.neighbour_node.lock(); + if (not node) continue; + + auto resource = std::static_pointer_cast(node); + neighbours.emplace_back(resource); + } + + push_task_round_robin(self->task_table(), neighbours); +} + +void +Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) { + auto self = res.lock(); + if (not self) return; + + std::list neighbours; + for (auto &neighbour_node : self->GetNeighbours()) { + auto node = neighbour_node.neighbour_node.lock(); + if (not node) continue; + + auto resource = std::static_pointer_cast(node); + if (resource->HasExecutor()) { + neighbours.emplace_back(resource); } } + + push_task_round_robin(self->task_table(), neighbours); } -- GitLab