From d1d220561b0ba5bd7f9f2868f430e683ca1adbb2 Mon Sep 17 00:00:00 2001 From: Yu Kun Date: Mon, 2 Sep 2019 17:11:26 +0800 Subject: [PATCH] MS-460 Put transport speed as weight when choosing neighbour to execute task Former-commit-id: d82b330df300d269a3a11544fc5625ea317f9118 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/SchedInst.cpp | 17 ++++++--- .../scheduler/action/PushTaskToNeighbour.cpp | 37 +++++++++++++++++-- cpp/src/server/ServerConfig.h | 2 + 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 9c5fc5b7..f69311ad 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -75,6 +75,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-445 - Rename CopyCompleted to LoadCompleted - MS-451 - Update server_config.template file, set GPU compute default - MS-455 - Distribute tasks by minimal cost in scheduler +- MS-460 - Put transport speed as weight when choosing neighbour to execute task ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 3ee8cbfd..43204f09 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -43,14 +43,21 @@ StartSchedulerService() { knowhere::FaissGpuResourceMgr::GetInstance().InitResource(); - auto default_connection = Connection("default_connection", 500.0); - auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS); +// auto default_connection = Connection("default_connection", 500.0); + auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren(); for (auto &conn : connections) { + auto &connect_name = conn.first; + auto &connect_conf = conn.second; + auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS); + auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS); + std::string delimiter = "==="; - std::string left = conn.substr(0, conn.find(delimiter)); - std::string right = conn.substr(conn.find(delimiter) + 3, conn.length()); + std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter)); + std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3, + connect_endpoint.length()); - ResMgrInst::GetInstance()->Connect(left, right, default_connection); + auto connection = Connection(connect_name, connect_speed); + ResMgrInst::GetInstance()->Connect(left, right, connection); } ResMgrInst::GetInstance()->Start(); diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 1939cbc1..200f6214 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -28,17 +28,48 @@ get_neighbours(const ResourcePtr &self) { return neighbours; } +std::vector> +get_neighbours_with_connetion(const ResourcePtr &self) { + std::vector> neighbours; + for (auto &neighbour_node : self->GetNeighbours()) { + auto node = neighbour_node.neighbour_node.lock(); + if (not node) continue; + + auto resource = std::static_pointer_cast(node); +// if (not resource->HasExecutor()) continue; + Connection conn = neighbour_node.connection; + neighbours.emplace_back(std::make_pair(resource, conn)); + } + return neighbours; +} + void Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) { - auto neighbours = get_neighbours(self); + auto neighbours = get_neighbours_with_connetion(self); if (not neighbours.empty()) { + std::vector speeds; + uint64_t total_speed = 0; + for (auto &neighbour : neighbours) { + uint64_t speed = neighbour.second.speed(); + speeds.emplace_back(speed); + total_speed += speed; + } + std::random_device rd; std::mt19937 mt(rd()); - std::uniform_int_distribution dist(0, neighbours.size() - 1); + std::uniform_int_distribution dist(0, total_speed); + uint64_t index = 0; + int64_t rd_speed = dist(mt); + for (uint64_t i = 0; i < speeds.size(); ++i) { + rd_speed -= speeds[i]; + if (rd_speed <= 0) { + neighbours[i].first->task_table().Put(task); + return; + } + } - neighbours[dist(mt)]->task_table().Put(task); } else { //TODO: process } diff --git a/cpp/src/server/ServerConfig.h b/cpp/src/server/ServerConfig.h index f8f6deea..4b4a80e8 100644 --- a/cpp/src/server/ServerConfig.h +++ b/cpp/src/server/ServerConfig.h @@ -57,6 +57,8 @@ static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id"; static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader"; static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor"; static const char* CONFIG_RESOURCE_CONNECTIONS = "connections"; +static const char* CONFIG_SPEED_CONNECTIONS = "speed"; +static const char* CONFIG_ENDPOINT_CONNECTIONS = "connections"; class ServerConfig { -- GitLab