diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 667399cc5d2b4267c72a23fb32e19d3f515a239b..09929ed8d0225f7c661ab7027ec1cda54d8412e2 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -147,6 +147,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-539 - Remove old task code - MS-546 - Add simple mode resource_config - MS-570 - Add prometheus docker-compose file +- MS-576 - Scheduler refactor ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 6aa4501e29372323dfa1d01f21b6878289847a1a..ff66e6ac217e1445d4e91615ae4707c1988eb5b1 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -16,7 +16,7 @@ // under the License. -#include +#include "src/cache/GpuCacheMgr.h" #include "event/LoadCompletedEvent.h" #include "Scheduler.h" #include "action/Action.h" @@ -33,6 +33,14 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr) if (auto mgr = res_mgr_.lock()) { mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); } + event_register_.insert(std::make_pair(static_cast(EventType::START_UP), + std::bind(&Scheduler::OnStartUp, this, std::placeholders::_1))); + event_register_.insert(std::make_pair(static_cast(EventType::LOAD_COMPLETED), + std::bind(&Scheduler::OnLoadCompleted, this, std::placeholders::_1))); + event_register_.insert(std::make_pair(static_cast(EventType::TASK_TABLE_UPDATED), + std::bind(&Scheduler::OnTaskTableUpdated, this, std::placeholders::_1))); + event_register_.insert(std::make_pair(static_cast(EventType::FINISH_TASK), + std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1))); } @@ -84,40 +92,8 @@ Scheduler::worker_function() { void Scheduler::Process(const EventPtr &event) { - switch (event->Type()) { - case EventType::START_UP: { - OnStartUp(event); - break; - } - case EventType::LOAD_COMPLETED: { - OnLoadCompleted(event); - break; - } - case EventType::FINISH_TASK: { - OnFinishTask(event); - break; - } - case EventType::TASK_TABLE_UPDATED: { - OnTaskTableUpdated(event); - break; - } - default: { - // TODO: logging - break; - } - } -} - - -void -Scheduler::OnStartUp(const EventPtr &event) { - if (auto resource = event->resource_.lock()) { - resource->WakeupLoader(); - } -} - -void -Scheduler::OnFinishTask(const EventPtr &event) { + auto process_event = event_register_.at(static_cast(event->Type())); + process_event(event); } // TODO: refactor the function @@ -130,79 +106,11 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { auto task_table_type = load_completed_event->task_table_item_->task->label()->Type(); switch (task_table_type) { case TaskLabelType::DEFAULT: { - if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) { - auto task = load_completed_event->task_table_item_->task; - auto search_task = std::static_pointer_cast(task); - bool moved = false; - - // to support test task, REFACTOR - if (auto index_engine = search_task->index_engine_) { - auto location = index_engine->GetLocation(); - - for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) { - auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location); - if (index != nullptr) { - moved = true; - auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i); - Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource); - break; - } - } - } - - if (not moved) { - Action::PushTaskToNeighbourRandomly(task, resource); - } - } + Action::DefaultLabelTaskScheduler(res_mgr_, resource, load_completed_event); break; } case TaskLabelType::SPECIFIED_RESOURCE: { - // support next version -// auto self = event->resource_.lock(); -// auto task = load_completed_event->task_table_item_->task; -// -// // if this resource is disk, assign it to smallest cost resource -// if (self->type() == ResourceType::DISK) { -// // step 1: calculate shortest path per resource, from disk to compute resource -// auto compute_resources = res_mgr_.lock()->GetComputeResources(); -// std::vector> paths; -// std::vector transport_costs; -// for (auto &res : compute_resources) { -// std::vector path; -// uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path); -// transport_costs.push_back(transport_cost); -// paths.emplace_back(path); -// } -// -// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost -// uint64_t min_cost = std::numeric_limits::max(); -// uint64_t min_cost_idx = 0; -// for (uint64_t i = 0; i < compute_resources.size(); ++i) { -// if (compute_resources[i]->TotalTasks() == 0) { -// min_cost_idx = i; -// break; -// } -// uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() -// + transport_costs[i]; -// if (min_cost > cost) { -// min_cost = cost; -// min_cost_idx = i; -// } -// } -// -// // step 3: set path in task -// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); -// task->path() = task_path; -// } -// -// if (self->name() == task->path().Last()) { -// self->WakeupLoader(); -// } else { -// auto next_res_name = task->path().Next(); -// auto next_res = res_mgr_.lock()->GetResource(next_res_name); -// load_completed_event->task_table_item_->Move(); -// next_res->task_table().Put(task); -// } + Action::SpecifiedResourceLabelTaskScheduler(res_mgr_, resource, load_completed_event); break; } case TaskLabelType::BROADCAST: { @@ -216,6 +124,17 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { } } +void +Scheduler::OnStartUp(const EventPtr &event) { + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + } +} + +void +Scheduler::OnFinishTask(const EventPtr &event) { +} + void Scheduler::OnTaskTableUpdated(const EventPtr &event) { if (auto resource = event->resource_.lock()) { diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 0de80d5cc5084524858573e991dc955959175d37..feb1374a8bb923fecbf8c725dce6a82904fd4284 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -122,6 +122,8 @@ private: private: bool running_; + std::unordered_map> event_register_; + ResourceMgrWPtr res_mgr_; std::queue event_queue_; std::thread worker_thread_; diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h index 4af8e1cfeefeb7b5a31fb9dd3f0228b55b61bdda..57fc8fe686e248d539c755199ec16c51eee52d8a 100644 --- a/cpp/src/scheduler/action/Action.h +++ b/cpp/src/scheduler/action/Action.h @@ -18,6 +18,7 @@ #pragma once #include "../resource/Resource.h" +#include "../ResourceMgr.h" namespace zilliz { @@ -34,6 +35,15 @@ public: static void PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest); + + static void + DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr event); + + static void + SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, + ResourcePtr resource, + std::shared_ptr event); + }; diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 05f997e96d4fda8b3c4926084817e3da5585d948..76beaa3e07adf9948701199a28f161cadc8a7812 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -18,6 +18,8 @@ #include #include +#include "../Algorithm.h" +#include "src/cache/GpuCacheMgr.h" #include "Action.h" @@ -101,6 +103,84 @@ Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { dest->task_table().Put(task); } +void +Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, + ResourcePtr resource, + std::shared_ptr event) { + if (not resource->HasExecutor() && event->task_table_item_->Move()) { + auto task = event->task_table_item_->task; + auto search_task = std::static_pointer_cast(task); + bool moved = false; + + //to support test task, REFACTOR + if (auto index_engine = search_task->index_engine_) { + auto location = index_engine->GetLocation(); + + for (auto i = 0; i < res_mgr.lock()->GetNumGpuResource(); ++i) { + auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location); + if (index != nullptr) { + moved = true; + auto dest_resource = res_mgr.lock()->GetResource(ResourceType::GPU, i); + PushTaskToResource(event->task_table_item_->task, dest_resource); + break; + } + } + } + + if (not moved) { + PushTaskToNeighbourRandomly(task, resource); + } + } +} + +void +Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, + ResourcePtr resource, + std::shared_ptr event) { + auto task = event->task_table_item_->task; + if (resource->type() == ResourceType::DISK) { + // step 1: calculate shortest path per resource, from disk to compute resource + auto compute_resources = res_mgr.lock()->GetComputeResources(); + std::vector> paths; + std::vector transport_costs; + for (auto &res : compute_resources) { + std::vector path; + uint64_t transport_cost = ShortestPath(resource, res, res_mgr.lock(), path); + transport_costs.push_back(transport_cost); + paths.emplace_back(path); + } + + // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost + uint64_t min_cost = std::numeric_limits::max(); + uint64_t min_cost_idx = 0; + for (uint64_t i = 0; i < compute_resources.size(); ++i) { + if (compute_resources[i]->TotalTasks() == 0) { + min_cost_idx = i; + break; + } + uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec() + + transport_costs[i]; + if (min_cost > cost) { + min_cost = cost; + min_cost_idx = i; + } + } + + // step 3: set path in task + Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1); + task->path() = task_path; + } + + if (resource->name() == task->path().Last()) { + resource->WakeupLoader(); + } else { + auto next_res_name = task->path().Next(); + auto next_res = res_mgr.lock()->GetResource(next_res_name); + event->task_table_item_->Move(); + next_res->task_table().Put(task); + } +} + } } } diff --git a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp index ff3c6edaf599dce8bbb2671e7b758800adef4ba2..6b1194615065f6b6f2310589f96e6f775ceff4e9 100644 --- a/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp +++ b/cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp @@ -39,7 +39,7 @@ constexpr int64_t BATCH_ROW_COUNT = 100000; constexpr int64_t NQ = 5; constexpr int64_t TOP_K = 10; constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different -constexpr int64_t ADD_VECTOR_LOOP = 10; +constexpr int64_t ADD_VECTOR_LOOP = 1; constexpr int64_t SECONDS_EACH_HOUR = 3600; #define BLOCK_SPLITER std::cout << "===========================================" << std::endl;