提交 950e0735 编写于 作者: W wxyu

MS-357 Add minimum schedule function


Former-commit-id: b289ccc81dcffc7afa226185d51ae48da37fd3a9
上级 3bec8a1a
......@@ -16,6 +16,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-350 - Remove knowhere submodule
- MS-354 - Add task class and interface in scheduler
- MS-355 - Add copy interface in ExcutionEngine
- MS-357 - Add minimum schedule function
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -12,7 +12,7 @@ namespace milvus {
namespace engine {
std::vector<uint64_t>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) {
std::vector<uint64_t> indexes;
return indexes;
}
......
......@@ -23,7 +23,7 @@ namespace engine {
* call from scheduler;
*/
std::vector<uint64_t>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit);
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit);
/*
......
......@@ -5,6 +5,7 @@
******************************************************************************/
#include "Scheduler.h"
#include "Cost.h"
namespace zilliz {
......@@ -12,33 +13,55 @@ namespace milvus {
namespace engine {
void
StartUpEvent::Process() {
push_task(ResourcePtr &self, ResourcePtr &other) {
auto self_task_table = self->task_table();
auto other_task_table = other->task_table();
if (!other_task_table.Empty()) {
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, 1);
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index).task;
other_task_table.Put(task);
// TODO: mark moved future
other->WakeupLoader();
other->WakeupExecutor();
}
}
}
}
void
FinishTaskEvent::Process() {
// for (nei : res->neighbours) {
// tasks = cost(nei->task_table(), nei->connection, limit = 3)
// res->task_table()->PutTasks(tasks);
// }
// res->WakeUpExec();
schedule(const ResourceWPtr &res) {
if (auto self = res.lock()) {
for (auto &nei : self->GetNeighbours()) {
if (auto n = nei.neighbour_node.lock()) {
auto neighbour = std::static_pointer_cast<Resource>(n);
push_task(self, neighbour);
}
}
}
}
void
CopyCompletedEvent::Process() {
StartUpEvent::Process() {
schedule(resource_);
}
void
TaskTableUpdatedEvent::Process() {
FinishTaskEvent::Process() {
schedule(resource_);
}
void
CopyCompletedEvent::Process() {
schedule(resource_);
}
void
Scheduler::Start() {
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
TaskTableUpdatedEvent::Process() {
schedule(resource_);
}
std::string
......@@ -46,14 +69,6 @@ Scheduler::Dump() {
return std::string();
}
void
Scheduler::worker_function() {
while (running_) {
auto event = event_queue_.front();
event->Process();
}
}
}
}
}
......@@ -27,7 +27,7 @@ public:
virtual void
Process() = 0;
private:
protected:
ResourceWPtr resource_;
};
......@@ -86,7 +86,9 @@ public:
}
void
Start();
Start() {
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
}
public:
/******** Events ********/
......@@ -138,7 +140,12 @@ private:
* Called by worker_thread_;
*/
void
worker_function();
worker_function() {
while (running_) {
auto event = event_queue_.front();
event->Process();
}
}
private:
bool running_;
......
......@@ -75,7 +75,22 @@ public:
*/
void
Clear();
/*
* Return true if task table empty, otherwise false;
*/
inline bool
Empty() {
return table_.empty();
}
/*
* Return size of task table;
*/
inline size_t
Size() {
return table_.size();
}
public:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册