/******************************************************************************* * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ #include #include #include "Action.h" namespace zilliz { namespace milvus { namespace engine { void next(std::list &neighbours, std::list::iterator &it) { it++; if (neighbours.end() == it) { it = neighbours.begin(); } } // TODO: this function called with only on tasks, so it will always push task to first neighbour void push_task_round_robin(TaskTable &self_task_table, std::list &neighbours) { CacheMgr cache; auto it = neighbours.begin(); if (it == neighbours.end()) return; auto indexes = PickToMove(self_task_table, cache, self_task_table.Size()); for (auto index : indexes) { if (self_task_table.Move(index)) { auto task = self_task_table.Get(index)->task; // task = task->Clone(); (*it)->task_table().Put(task); next(neighbours, it); } } } void push_task_randomly(TaskTable &self_task_table, std::vector &neighbours) { std::random_device rd; std::mt19937 mt(rd()); std::uniform_int_distribution dist(0, neighbours.size() - 1); CacheMgr cache; auto indexes = PickToMove(self_task_table, cache, self_task_table.Size()); for (auto index : indexes) { if (self_task_table.Move(index)) { auto task = self_task_table.Get(index)->task; neighbours[dist(mt)]->task_table().Put(task); } } } void Action::PushTaskToNeighbour(const ResourceWPtr &res) { auto self = res.lock(); if (not self) return; std::list neighbours; for (auto &neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; auto resource = std::static_pointer_cast(node); neighbours.emplace_back(resource); } push_task_round_robin(self->task_table(), neighbours); } void Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) { auto self = res.lock(); if (not self) return; std::list l_neighbours; std::vector v_neighbours; for (auto &neighbour_node : self->GetNeighbours()) { auto node = neighbour_node.neighbour_node.lock(); if (not node) continue; auto resource = std::static_pointer_cast(node); if (resource->HasExecutor()) { l_neighbours.push_back(resource); v_neighbours.push_back(resource); } } // push_task_round_robin(self->task_table(), l_neighbours); push_task_randomly(self->task_table(), v_neighbours); } } } }