diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 6f1c49e9f4cbc8b92a5094c4d444f766f7a3f33e..47c5575eff9751f4160953c460f6b96cffb1d512 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -16,6 +16,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-350 - Remove knowhere submodule - MS-354 - Add task class and interface in scheduler - MS-355 - Add copy interface in ExcutionEngine +- MS-357 - Add minimum schedule function ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Cost.cpp b/cpp/src/scheduler/Cost.cpp index 0f7c30c6a7de7870dbad84123d9cfac27e30b9a8..14b56d98a5b469223c2e7d25bbfa0bbdbd7d7b10 100644 --- a/cpp/src/scheduler/Cost.cpp +++ b/cpp/src/scheduler/Cost.cpp @@ -12,7 +12,7 @@ namespace milvus { namespace engine { std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) { +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) { std::vector indexes; return indexes; } diff --git a/cpp/src/scheduler/Cost.h b/cpp/src/scheduler/Cost.h index 98de9d8fc14adcebc1db9e6dae546321f0d9b96d..85414337bc6c61b99abb881c9df37da34b14eb4f 100644 --- a/cpp/src/scheduler/Cost.h +++ b/cpp/src/scheduler/Cost.h @@ -23,7 +23,7 @@ namespace engine { * call from scheduler; */ std::vector -PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit); +PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit); /* diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index c5ae9281660995bc03fad118530b9589665a4e5b..d4ba1ac4a6e5f759b3c96b79b94cbc124b57fa5d 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -5,6 +5,7 @@ ******************************************************************************/ #include "Scheduler.h" +#include "Cost.h" namespace zilliz { @@ -12,33 +13,55 @@ namespace milvus { namespace engine { void -StartUpEvent::Process() { - +push_task(ResourcePtr &self, ResourcePtr &other) { + auto self_task_table = self->task_table(); + auto other_task_table = other->task_table(); + if (!other_task_table.Empty()) { + CacheMgr cache; + auto indexes = PickToMove(self_task_table, cache, 1); + for (auto index : indexes) { + if (self_task_table.Move(index)) { + auto task = self_task_table.Get(index).task; + other_task_table.Put(task); + // TODO: mark moved future + other->WakeupLoader(); + other->WakeupExecutor(); + } + } + } } void -FinishTaskEvent::Process() { -// for (nei : res->neighbours) { -// tasks = cost(nei->task_table(), nei->connection, limit = 3) -// res->task_table()->PutTasks(tasks); -// } -// res->WakeUpExec(); +schedule(const ResourceWPtr &res) { + if (auto self = res.lock()) { + for (auto &nei : self->GetNeighbours()) { + if (auto n = nei.neighbour_node.lock()) { + auto neighbour = std::static_pointer_cast(n); + push_task(self, neighbour); + } + } + + } } void -CopyCompletedEvent::Process() { - +StartUpEvent::Process() { + schedule(resource_); } void -TaskTableUpdatedEvent::Process() { - +FinishTaskEvent::Process() { + schedule(resource_); } +void +CopyCompletedEvent::Process() { + schedule(resource_); +} void -Scheduler::Start() { - worker_thread_ = std::thread(&Scheduler::worker_thread_, this); +TaskTableUpdatedEvent::Process() { + schedule(resource_); } std::string @@ -46,14 +69,6 @@ Scheduler::Dump() { return std::string(); } -void -Scheduler::worker_function() { - while (running_) { - auto event = event_queue_.front(); - event->Process(); - } -} - } } } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 152937a1d2e7b14da6720ed6e4b9e476f86e2855..4f7c714c8642a1d4ec1c4ba24980e717e3fa6743 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -27,7 +27,7 @@ public: virtual void Process() = 0; -private: +protected: ResourceWPtr resource_; }; @@ -86,7 +86,9 @@ public: } void - Start(); + Start() { + worker_thread_ = std::thread(&Scheduler::worker_thread_, this); + } public: /******** Events ********/ @@ -138,7 +140,12 @@ private: * Called by worker_thread_; */ void - worker_function(); + worker_function() { + while (running_) { + auto event = event_queue_.front(); + event->Process(); + } + } private: bool running_; diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index dc286d8f74cd381c663aafd1d10230e73e41973e..c99d484919574a105111f43912d98ecdf76ad2dc 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -75,7 +75,22 @@ public: */ void Clear(); - + + /* + * Return true if task table empty, otherwise false; + */ + inline bool + Empty() { + return table_.empty(); + } + + /* + * Return size of task table; + */ + inline size_t + Size() { + return table_.size(); + } public: