提交 244c73f8 编写于 作者: Y Yu Kun

MS-455 Distribute tasks by minimal cost in scheduler


Former-commit-id: 8f04fd244af18f84182f855d7052a46ec2b13e54
上级 b16e044e
......@@ -70,15 +70,15 @@ resource_config:
type: GPU
memory: 6
device_id: 0
enable_loader: true
enable_executor: true
enable_loader: false
enable_executor: false
gtx1660:
type: GPU
memory: 6
device_id: 1
enable_loader: true
enable_executor: true
enable_loader: false
enable_executor: false
# connection list, length: 0~N
# format: -${resource_name}===${resource_name}
......
......@@ -10,13 +10,89 @@ namespace zilliz {
namespace milvus {
namespace engine {
std::vector<std::string>
ShortestPath(const ResourcePtr &src, const ResourcePtr& dest) {
auto node = std::static_pointer_cast<Node>(src);
auto neighbours = node->GetNeighbours();
for (auto &neighbour : neighbours) {
neighbour.connection.speed()
constexpr uint64_t MAXINT = 99999;
uint64_t
ShortestPath(const ResourcePtr &src,
const ResourcePtr &dest,
const ResourceMgrPtr &res_mgr,
std::vector<std::string> &path) {
std::vector<std::vector<std::string>> paths;
uint64_t num_of_resources = res_mgr->GetAllResouces().size();
uint64_t src_id, dest_id;
std::unordered_map<uint64_t, std::string> id_name_map;
std::unordered_map<std::string, uint64_t> name_id_map;
for (auto i = 0; i < num_of_resources; ++i) {
id_name_map.insert(std::make_pair(i, res_mgr->GetAllResouces().at(i)->Name()));
name_id_map.insert(std::make_pair(res_mgr->GetAllResouces().at(i)->Name(), i));
}
std::vector<std::vector<uint64_t> > dis_matrix;
dis_matrix.resize(num_of_resources);
for (auto i = 0; i < num_of_resources; ++i) {
dis_matrix[i].resize(num_of_resources);
for (auto j = 0; j < num_of_resources; ++j) {
dis_matrix[i][j] = MAXINT;
}
dis_matrix[i][i] = 0;
}
std::vector<bool> vis(num_of_resources, false);
std::vector<uint64_t> dis(num_of_resources, MAXINT);
for (auto &res : res_mgr->GetAllResouces()) {
auto cur_node = std::static_pointer_cast<Node>(res);
auto cur_neighbours = cur_node->GetNeighbours();
for (auto &neighbour : cur_neighbours) {
auto neighbour_res = std::static_pointer_cast<Resource>(neighbour.neighbour_node.lock());
dis_matrix[name_id_map.at(res->Name())][name_id_map.at(neighbour_res->Name())] =
neighbour.connection.transport_cost();
}
}
for (uint64_t i = 0; i < num_of_resources; ++i) {
dis[i] = dis_matrix[name_id_map.at(src->Name())][i];
}
vis[name_id_map.at(src->Name())] = true;
std::vector<int64_t> parent(num_of_resources, -1);
for (uint64_t i = 0; i < num_of_resources; ++i) {
uint64_t minn = MAXINT;
uint64_t temp;
for (auto j = 0; j < num_of_resources; ++j) {
if (!vis[j] && dis[j] < minn) {
minn = dis[j];
temp = j;
}
}
vis[temp] = true;
if (i == 0) {
parent[temp] = name_id_map.at(src->Name());
}
for (uint64_t j = 0; j < num_of_resources; ++j) {
if (!vis[j] && dis_matrix[temp][j] != MAXINT && dis_matrix[temp][j] + dis[temp] < dis[j]) {
dis[j] = dis_matrix[temp][j] + dis[temp];
parent[j] = temp;
}
}
}
int64_t parent_idx = parent[name_id_map.at(dest->Name())];
if (parent_idx != -1) {
path.push_back(dest->Name());
}
while (parent_idx != -1) {
path.push_back(id_name_map.at(parent_idx));
parent_idx = parent[parent_idx];
}
// result.push_back(id_name_map.at(parent_idx));
return dis[name_id_map.at(dest->Name())];
}
}
......
......@@ -5,6 +5,7 @@
******************************************************************************/
#include "resource/Resource.h"
#include "ResourceMgr.h"
#include <vector>
#include <string>
......@@ -13,8 +14,11 @@ namespace zilliz {
namespace milvus {
namespace engine {
std::vector<std::string>
ShortestPath(const ResourcePtr &src, const ResourcePtr& dest);
uint64_t
ShortestPath(const ResourcePtr &src,
const ResourcePtr &dest,
const ResourceMgrPtr &res_mgr,
std::vector<std::string>& path);
}
}
......
......@@ -30,7 +30,13 @@ ResourceMgr::GetNumOfComputeResource() {
std::vector<ResourcePtr>
ResourceMgr::GetComputeResource() {
// TODO
std::vector<ResourcePtr > result;
for (auto &resource : resources_) {
if (resource->HasExecutor()) {
result.emplace_back(resource);
}
}
return result;
}
uint64_t
......@@ -54,6 +60,21 @@ ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
return nullptr;
}
ResourcePtr
ResourceMgr::GetResourceByName(std::string name) {
for (auto &resource : resources_) {
if (resource->Name() == name) {
return resource;
}
}
return nullptr;
}
std::vector<ResourcePtr>
ResourceMgr::GetAllResouces() {
return resources_;
}
ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource);
......
......@@ -41,6 +41,12 @@ public:
ResourcePtr
GetResource(ResourceType type, uint64_t device_id);
ResourcePtr
GetResourceByName(std::string name);
std::vector<ResourcePtr>
GetAllResouces();
/*
* Return account of resource which enable executor;
*/
......
......@@ -5,6 +5,7 @@
******************************************************************************/
#include <src/cache/GpuCacheMgr.h>
#include "event/LoadCompletedEvent.h"
#include "Scheduler.h"
#include "Cost.h"
#include "action/Action.h"
......@@ -138,37 +139,50 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
}
break;
}
case TaskLabelType::SPECIAL_RESOURCE: {
case TaskLabelType::SPECIFIED_RESOURCE: {
auto self = event->resource_.lock();
auto task = load_completed_event->task_table_item_->task;
// if this resource is disk, assign it to smallest cost resource
if (self->Type() == ResourceType::DISK) {
// step 1:
// calculate shortest path per resource, from disk to compute resource
// calculate by transport_cost
// step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr_.lock()->GetComputeResource();
std::vector<std::vector<std::string>> paths;
std::vector<uint64_t > transport_costs;
for (auto res : compute_resources) {
std::vector<std::string> path = ShortestPath(self, res);
std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
transport_costs.push_back(transport_cost);
paths.emplace_back(path);
}
// step 2:
// select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
std::vector<uint64_t> costs;
for (auto res : compute_resources) {
uint64_t cost = res->TaskAvgCost() * res->NumOfTaskToExec() + transport_cost;
costs.emplace_back(cost);
uint64_t min_cost = std::numeric_limits<uint64_t>::max();
uint64_t min_cost_idx;
for (uint64_t i = 0; i < compute_resources.size(); ++i) {
uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
+ transport_costs[i];
costs.push_back(cost);
if (min_cost > cost) {
min_cost = cost;
min_cost_idx = i;
}
path, cost
// step 3:
// set path in task
}
// step 3: set path in task
Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
task->path() = task_path;
}
// do or move
auto load_event = std::static_pointer_cast<CopyCompletedEvent>(event);
auto path = (load_event->task_table_item_->task->Path);
if(self->Name() == task->path().Last()) {
self->WakeupLoader();
} else {
auto next_res_name = task->path().Next();
auto next_res = res_mgr_.lock()->GetResourceByName(next_res_name);
// task->Move();
next_res->task_table().Put(task);
}
break;
}
case TaskLabelType::BROADCAST: {
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <chrono>
#include "Utils.h"
namespace zilliz {
......@@ -11,11 +12,12 @@ namespace milvus {
namespace engine {
uint64_t
get_now_timestamp(); {
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
return millis;
get_current_timestamp()
{
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
return millis;
}
}
......
......@@ -11,7 +11,7 @@ namespace milvus {
namespace engine {
uint64_t
get_now_timestamp();
get_current_timestamp();
}
}
......
......@@ -28,6 +28,11 @@ public:
return speed_;
}
uint64_t
transport_cost() {
return 1024 / speed_;
}
public:
std::string
Dump() const {
......
......@@ -140,9 +140,9 @@ void Resource::executor_function() {
break;
}
auto start = get_now_timestamp();
auto start = get_current_timestamp();
Process(task_item->task);
auto finish = get_now_timestamp();
auto finish = get_current_timestamp();
++total_task_;
total_cost_ += finish - start;
......
......@@ -199,8 +199,8 @@ class Resource : public Node, public std::enable_shared_from_this<Resource> {
TaskTable task_table_;
uint64_t total_cost_ = 0;
uint64_t total_task_ = 0;
uint64_t total_cost_ = 10;
uint64_t total_task_ = 10;
std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
std::function<void(EventPtr)> subscriber_ = nullptr;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <vector>
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
class Path {
public:
Path() = default;
Path(std::vector<std::string>& path, uint64_t index) : path_(path), index_(index) {}
void
push_back(const std::string &str) {
path_.push_back(str);
}
std::vector<std::string>
Dump() {
return path_;
}
std::string &
Next() {
--index_;
return path_[index_];
}
std::string &
Last() {
if (!path_.empty()) {
return path_[0];
} else {
std::string str;
return str;
}
}
public:
std::string &
operator[](uint64_t index) {
return path_[index];
}
std::vector<std::string>::iterator begin() { return path_.begin(); }
std::vector<std::string>::iterator end() { return path_.end(); }
public:
std::vector<std::string> path_;
uint64_t index_ = 0;
};
}
}
}
\ No newline at end of file
......@@ -8,6 +8,7 @@
#include "db/scheduler/context/SearchContext.h"
#include "db/scheduler/task/IScheduleTask.h"
#include "scheduler/tasklabel/TaskLabel.h"
#include "Path.h"
#include <string>
#include <memory>
......@@ -47,9 +48,9 @@ public:
/*
* Transport path;
*/
inline std::vector<std::string>&
inline Path&
path() {
return path_;
return task_path_;
}
/*
......@@ -72,7 +73,7 @@ public:
Clone() = 0;
public:
std::vector<std::string> path_;
Path task_path_;
std::vector<SearchContextPtr> search_contexts_;
ScheduleTaskPtr task_;
TaskType type_;
......
......@@ -22,24 +22,24 @@ namespace engine {
class SpecResLabel : public TaskLabel {
public:
SpecResLabel(const ResourceWPtr &resource)
: TaskLabel(TaskLabelType::SPECIAL_RESOURCE), resource_(resource) {}
: TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {}
inline ResourceWPtr &
resource() const {
resource() {
return resource_;
}
inline std::string &
resource_name() const {
resource_name() {
return resource_name_;
}
private:
ResourceWPtr resource_;
std::string resource_name_;
}
};
using SpecResLabelPtr = std::make_shared<SpecResLabel>;
using SpecResLabelPtr = std::shared_ptr<SpecResLabel>();
}
}
......
......@@ -13,7 +13,7 @@ namespace engine {
enum class TaskLabelType {
DEFAULT, // means can be executed in any resource
SPECIAL_RESOURCE, // means must executing in special resource
SPECIFIED_RESOURCE, // means must executing in special resource
BROADCAST, // means all enable-executor resource must execute task
};
......
......@@ -42,5 +42,5 @@ add_subdirectory(server)
add_subdirectory(db)
add_subdirectory(knowhere)
add_subdirectory(metrics)
#add_subdirectory(scheduler)
add_subdirectory(scheduler)
#add_subdirectory(storage)
\ No newline at end of file
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <gtest/gtest.h>
#include "scheduler/resource/Resource.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/ResourceFactory.h"
#include "scheduler/Algorithm.h"
namespace zilliz {
namespace milvus {
namespace engine {
class AlgorithmTest : public testing::Test {
protected:
void
SetUp() override {
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, true);
ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1);
ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2);
ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0);
ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1);
res_mgr_ = std::make_shared<ResourceMgr>();
disk_ = res_mgr_->Add(std::move(disk));
cpu_0_ = res_mgr_->Add(std::move(cpu0));
cpu_1_ = res_mgr_->Add(std::move(cpu1));
cpu_2_ = res_mgr_->Add(std::move(cpu2));
gpu_0_ = res_mgr_->Add(std::move(gpu0));
gpu_1_ = res_mgr_->Add(std::move(gpu1));
auto IO = Connection("IO", 5.0);
auto PCIE = Connection("PCIE", 11.0);
res_mgr_->Connect("disk", "cpu0", IO);
res_mgr_->Connect("cpu0", "cpu1", IO);
res_mgr_->Connect("cpu1", "cpu2", IO);
res_mgr_->Connect("cpu0", "cpu2", IO);
res_mgr_->Connect("cpu1", "gpu0", PCIE);
res_mgr_->Connect("cpu2", "gpu1", PCIE);
}
ResourceWPtr disk_;
ResourceWPtr cpu_0_;
ResourceWPtr cpu_1_;
ResourceWPtr cpu_2_;
ResourceWPtr gpu_0_;
ResourceWPtr gpu_1_;
ResourceMgrPtr res_mgr_;
};
TEST_F(AlgorithmTest, ShortestPath_test) {
std::vector<std::string> sp;
uint64_t cost;
cost = ShortestPath(disk_.lock(), gpu_0_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(cpu_0_.lock(), gpu_0_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(disk_.lock(), disk_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(cpu_0_.lock(), disk_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
std::cout << "************************************\n";
cost = ShortestPath(cpu_2_.lock(), gpu_0_.lock(), res_mgr_, sp);
while (!sp.empty()) {
std::cout << sp[sp.size() - 1] << std::endl;
sp.pop_back();
}
}
}
}
}
\ No newline at end of file
......@@ -30,7 +30,7 @@ protected:
resources_.push_back(gpu_resource_);
auto subscriber = [&](EventPtr event) {
if (event->Type() == EventType::COPY_COMPLETED) {
if (event->Type() == EventType::LOAD_COMPLETED) {
std::lock_guard<std::mutex> lock(load_mutex_);
++load_count_;
cv_.notify_one();
......
......@@ -13,6 +13,7 @@
#include "scheduler/resource/Resource.h"
#include "utils/Error.h"
#include "wrapper/knowhere/vec_index.h"
#include "scheduler/tasklabel/SpecResLabel.h"
namespace zilliz {
namespace milvus {
......@@ -122,9 +123,6 @@ protected:
ResourceMgrPtr res_mgr_;
std::shared_ptr<Scheduler> scheduler_;
uint64_t load_count_ = 0;
std::mutex load_mutex_;
std::condition_variable cv_;
};
void
......@@ -157,6 +155,74 @@ TEST_F(SchedulerTest, OnCopyCompleted) {
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
class SchedulerTest2 : public testing::Test {
protected:
void
SetUp() override {
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false);
ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false);
ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false);
ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true);
ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true);
res_mgr_ = std::make_shared<ResourceMgr>();
disk_ = res_mgr_->Add(std::move(disk));
cpu_0_ = res_mgr_->Add(std::move(cpu0));
cpu_1_ = res_mgr_->Add(std::move(cpu1));
cpu_2_ = res_mgr_->Add(std::move(cpu2));
gpu_0_ = res_mgr_->Add(std::move(gpu0));
gpu_1_ = res_mgr_->Add(std::move(gpu1));
auto IO = Connection("IO", 5.0);
auto PCIE1 = Connection("PCIE", 11.0);
auto PCIE2 = Connection("PCIE", 20.0);
res_mgr_->Connect("disk", "cpu0", IO);
res_mgr_->Connect("cpu0", "cpu1", IO);
res_mgr_->Connect("cpu1", "cpu2", IO);
res_mgr_->Connect("cpu0", "cpu2", IO);
res_mgr_->Connect("cpu1", "gpu0", PCIE1);
res_mgr_->Connect("cpu2", "gpu1", PCIE2);
scheduler_ = std::make_shared<Scheduler>(res_mgr_);
res_mgr_->Start();
scheduler_->Start();
}
void
TearDown() override {
scheduler_->Stop();
res_mgr_->Stop();
}
ResourceWPtr disk_;
ResourceWPtr cpu_0_;
ResourceWPtr cpu_1_;
ResourceWPtr cpu_2_;
ResourceWPtr gpu_0_;
ResourceWPtr gpu_1_;
ResourceMgrPtr res_mgr_;
std::shared_ptr<Scheduler> scheduler_;
};
TEST_F(SchedulerTest2, SpecifiedResourceTest) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
dummy->location_ = "location";
for (uint64_t i = 0; i < NUM; ++i) {
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy);
task->label() = std::make_shared<SpecResLabel>(disk_);
tasks.push_back(task);
disk_.lock()->task_table().Put(task);
}
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册