提交 d1d22056 编写于 作者: Y Yu Kun

MS-460 Put transport speed as weight when choosing neighbour to execute task


Former-commit-id: d82b330df300d269a3a11544fc5625ea317f9118
上级 600387fb
......@@ -75,6 +75,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-445 - Rename CopyCompleted to LoadCompleted
- MS-451 - Update server_config.template file, set GPU compute default
- MS-455 - Distribute tasks by minimal cost in scheduler
- MS-460 - Put transport speed as weight when choosing neighbour to execute task
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -43,14 +43,21 @@ StartSchedulerService() {
knowhere::FaissGpuResourceMgr::GetInstance().InitResource();
auto default_connection = Connection("default_connection", 500.0);
auto connections = config.GetSequence(server::CONFIG_RESOURCE_CONNECTIONS);
// auto default_connection = Connection("default_connection", 500.0);
auto connections = config.GetChild(server::CONFIG_RESOURCE_CONNECTIONS).GetChildren();
for (auto &conn : connections) {
auto &connect_name = conn.first;
auto &connect_conf = conn.second;
auto connect_speed = connect_conf.GetInt64Value(server::CONFIG_SPEED_CONNECTIONS);
auto connect_endpoint = connect_conf.GetValue(server::CONFIG_ENDPOINT_CONNECTIONS);
std::string delimiter = "===";
std::string left = conn.substr(0, conn.find(delimiter));
std::string right = conn.substr(conn.find(delimiter) + 3, conn.length());
std::string left = connect_endpoint.substr(0, connect_endpoint.find(delimiter));
std::string right = connect_endpoint.substr(connect_endpoint.find(delimiter) + 3,
connect_endpoint.length());
ResMgrInst::GetInstance()->Connect(left, right, default_connection);
auto connection = Connection(connect_name, connect_speed);
ResMgrInst::GetInstance()->Connect(left, right, connection);
}
ResMgrInst::GetInstance()->Start();
......
......@@ -28,17 +28,48 @@ get_neighbours(const ResourcePtr &self) {
return neighbours;
}
std::vector<std::pair<ResourcePtr, Connection>>
get_neighbours_with_connetion(const ResourcePtr &self) {
std::vector<std::pair<ResourcePtr, Connection>> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
// if (not resource->HasExecutor()) continue;
Connection conn = neighbour_node.connection;
neighbours.emplace_back(std::make_pair(resource, conn));
}
return neighbours;
}
void
Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
const ResourcePtr &self) {
auto neighbours = get_neighbours(self);
auto neighbours = get_neighbours_with_connetion(self);
if (not neighbours.empty()) {
std::vector<uint64_t > speeds;
uint64_t total_speed = 0;
for (auto &neighbour : neighbours) {
uint64_t speed = neighbour.second.speed();
speeds.emplace_back(speed);
total_speed += speed;
}
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint64_t> dist(0, neighbours.size() - 1);
std::uniform_int_distribution<int> dist(0, total_speed);
uint64_t index = 0;
int64_t rd_speed = dist(mt);
for (uint64_t i = 0; i < speeds.size(); ++i) {
rd_speed -= speeds[i];
if (rd_speed <= 0) {
neighbours[i].first->task_table().Put(task);
return;
}
}
neighbours[dist(mt)]->task_table().Put(task);
} else {
//TODO: process
}
......
......@@ -57,6 +57,8 @@ static const char* CONFIG_RESOURCE_DEVICE_ID = "device_id";
static const char* CONFIG_RESOURCE_ENABLE_LOADER = "enable_loader";
static const char* CONFIG_RESOURCE_ENABLE_EXECUTOR = "enable_executor";
static const char* CONFIG_RESOURCE_CONNECTIONS = "connections";
static const char* CONFIG_SPEED_CONNECTIONS = "speed";
static const char* CONFIG_ENDPOINT_CONNECTIONS = "connections";
class ServerConfig {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册