提交 0fbf4315 编写于 作者: W wxyu

MS-391 Add PushTaskToNeighbourHasExecutor action


Former-commit-id: bddcefc244cc06cfa908e9da9f6c5150ed53f87f
上级 109eb6ea
......@@ -36,6 +36,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-384 - Add global instance of ResourceMgr and Scheduler
- MS-389 - Add clone interface in Task
- MS-390 - Update resource construct function
- MS-391 - Add PushTaskToNeighbourHasExecutor action
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -114,13 +114,14 @@ Scheduler::OnCopyCompleted(const EventPtr &event) {
resource->WakeupExecutor();
if (resource->Type() == ResourceType::DISK) {
Action::PushTaskToNeighbour(event->resource_);
} else {
Action::PushTaskToNeighbourHasExecutor(event->resource_);
}
}
}
void
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
// Action::PushTaskToNeighbour(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
......
......@@ -20,6 +20,11 @@ public:
static void
PushTaskToNeighbour(const ResourceWPtr &self);
/*
* Push task to neighbour that has executor;
*/
static void
PushTaskToNeighbourHasExecutor(const ResourceWPtr &self);
/*
* Pull task From neighbour;
......
......@@ -4,7 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <iostream>
#include <list>
#include "Action.h"
......@@ -13,29 +13,65 @@ namespace milvus {
namespace engine {
void
push_task(const ResourcePtr &self, const ResourcePtr &other) {
auto &self_task_table = self->task_table();
auto &other_task_table = other->task_table();
next(std::list<ResourcePtr> neighbours, std::list<ResourcePtr>::iterator &it) {
it++;
if (neighbours.end() == it) {
it = neighbours.begin();
}
}
void
push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighbours) {
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, 10);
auto it = neighbours.begin();
if (it == neighbours.end()) return;
auto indexes = PickToMove(self_task_table, cache, self_task_table.Size());
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
other_task_table.Put(task);
// TODO: mark moved future
task = task->Clone();
(*it)->task_table().Put(task);
next(neighbours, it);
}
}
}
void
Action::PushTaskToNeighbour(const ResourceWPtr &res) {
if (auto self = res.lock()) {
for (auto &neighbour : self->GetNeighbours()) {
if (auto n = neighbour.neighbour_node.lock()) {
push_task(self, std::static_pointer_cast<Resource>(n));
}
auto self = res.lock();
if (not self) return;
std::list<ResourcePtr> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
neighbours.emplace_back(resource);
}
push_task_round_robin(self->task_table(), neighbours);
}
void
Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) {
auto self = res.lock();
if (not self) return;
std::list<ResourcePtr> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
if (resource->HasExecutor()) {
neighbours.emplace_back(resource);
}
}
push_task_round_robin(self->task_table(), neighbours);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册