提交 031561d8 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-346 add .cpp to solve compile error

See merge request megasearch/milvus!340

Former-commit-id: a377e3d4de53eda8f5e47a425a38c92ce87c81fa
...@@ -6,6 +6,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -6,6 +6,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-336 - Scheduler interface - MS-336 - Scheduler interface
- MS-344 - Add TaskTable Test - MS-344 - Add TaskTable Test
- MS-345 - Add Node Test - MS-345 - Add Node Test
- MS-346 - Add some implementation of scheduler to solve compile error
## Bug ## Bug
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Cost.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::vector<uint64_t>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {
std::vector<uint64_t> indexes;
return indexes;
}
std::vector<uint64_t>
PickToLoad(const TaskTable &task_table, uint64_t limit) {
std::vector<uint64_t> indexes;
return indexes;
}
std::vector<uint64_t>
PickToExecute(const TaskTable &task_table, uint64_t limit) {
std::vector<uint64_t> indexes;
return indexes;
}
}
}
}
...@@ -7,8 +7,10 @@ ...@@ -7,8 +7,10 @@
#include <vector> #include <vector>
#include "Task.h" #include "Task.h"
#include "TaskTable.h"
#include "CacheMgr.h" #include "CacheMgr.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
...@@ -20,8 +22,8 @@ namespace engine { ...@@ -20,8 +22,8 @@ namespace engine {
* select tasks to move; * select tasks to move;
* call from scheduler; * call from scheduler;
*/ */
std::vector<TaskPtr> std::vector<uint64_t>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {} PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit);
/* /*
...@@ -29,16 +31,16 @@ PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) ...@@ -29,16 +31,16 @@ PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit)
* call from resource; * call from resource;
* I DONT SURE NEED THIS; * I DONT SURE NEED THIS;
*/ */
std::vector<TaskPtr> std::vector<uint64_t>
PickToLoad(TaskTable task_table, uint64_t limit) {} PickToLoad(const TaskTable &task_table, uint64_t limit);
/* /*
* select task to execute; * select task to execute;
* call from resource; * call from resource;
* I DONT SURE NEED THIS; * I DONT SURE NEED THIS;
*/ */
std::vector<TaskPtr> std::vector<uint64_t>
PickToExecute(TaskTable task_table, uint64_t limit) {} PickToExecute(const TaskTable &task_table, uint64_t limit);
} }
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "ResourceFactory.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::shared_ptr<Resource>
ResourceFactory::Create(const std::string &name, const std::string &alias) {
if (name == "disk") {
return std::make_shared<CpuResource>(alias);
} else if (name == "cpu") {
return std::make_shared<CpuResource>(alias);
} else if (name == "gpu") {
return std::make_shared<CpuResource>(alias);
} else {
return nullptr;
}
}
}
}
}
...@@ -21,17 +21,7 @@ namespace engine { ...@@ -21,17 +21,7 @@ namespace engine {
class ResourceFactory { class ResourceFactory {
public: public:
static std::shared_ptr<Resource> static std::shared_ptr<Resource>
Create(const std::string &name, const std::string &alias = "") { Create(const std::string &name, const std::string &alias = "");
if (name == "disk") {
return std::make_shared<CpuResource>(alias);
} else if (name == "cpu") {
return std::make_shared<CpuResource>(alias);
} else if (name == "gpu") {
return std::make_shared<CpuResource>(alias);
} else {
return nullptr;
}
}
}; };
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Scheduler.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
StartUpEvent::Process() {
}
void
FinishTaskEvent::Process() {
// for (nei : res->neighbours) {
// tasks = cost(nei->task_table(), nei->connection, limit = 3)
// res->task_table()->PutTasks(tasks);
// }
// res->WakeUpExec();
}
void
CopyCompletedEvent::Process() {
}
void
TaskTableUpdatedEvent::Process() {
}
void
Scheduler::Start() {
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
}
std::string
Scheduler::Dump() {
return std::string();
}
void
Scheduler::worker_function() {
while (running_) {
auto event = event_queue_.front();
event->Process();
}
}
}
}
}
...@@ -10,6 +10,9 @@ ...@@ -10,6 +10,9 @@
#include <thread> #include <thread>
#include <queue> #include <queue>
#include "resource/Resource.h"
#include "ResourceMgr.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -18,8 +21,7 @@ namespace engine { ...@@ -18,8 +21,7 @@ namespace engine {
class Event { class Event {
public: public:
explicit explicit
Event(ResourceWPtr &resource) Event(ResourceWPtr &resource) : resource_(resource) {}
: resource_(resource) {}
public: public:
virtual void virtual void
...@@ -34,8 +36,7 @@ using EventPtr = std::shared_ptr<Event>; ...@@ -34,8 +36,7 @@ using EventPtr = std::shared_ptr<Event>;
class StartUpEvent : public Event { class StartUpEvent : public Event {
public: public:
explicit explicit
StartUpEvent(ResourceWPtr &resource) StartUpEvent(ResourceWPtr &resource) : Event(resource) {}
: Event(resource) {}
public: public:
void void
...@@ -45,25 +46,17 @@ public: ...@@ -45,25 +46,17 @@ public:
class FinishTaskEvent : public Event { class FinishTaskEvent : public Event {
public: public:
explicit explicit
FinishTaskEvent(ResourceWPtr &resource) FinishTaskEvent(ResourceWPtr &resource) : Event(resource) {}
: Event(resource) {}
public: public:
void void
Process() override { Process() override;
// for (nei : res->neighbours) {
// tasks = cost(nei->task_table(), nei->connection, limit = 3)
// res->task_table()->PutTasks(tasks);
// }
// res->WakeUpExec();
}
}; };
class CopyCompletedEvent : public Event { class CopyCompletedEvent : public Event {
public: public:
explicit explicit
CopyCompletedEvent(ResourceWPtr &resource) CopyCompletedEvent(ResourceWPtr &resource) : Event(resource) {}
: Event(resource) {}
public: public:
void void
...@@ -73,8 +66,7 @@ public: ...@@ -73,8 +66,7 @@ public:
class TaskTableUpdatedEvent : public Event { class TaskTableUpdatedEvent : public Event {
public: public:
explicit explicit
TaskTableUpdatedEvent(ResourceWPtr &resource) TaskTableUpdatedEvent(ResourceWPtr &resource) : Event(resource) {}
: Event(resource) {}
public: public:
void void
...@@ -94,16 +86,16 @@ public: ...@@ -94,16 +86,16 @@ public:
} }
void void
Start() {} Start();
public:
/******** Events ********/ /******** Events ********/
/* /*
* Process start up events; * Process start up events;
*/ */
void inline void
OnStartUp(ResourceWPtr &resource) { OnStartUp(ResourceWPtr &resource) {
// call from res_mgr, non-blocking, if queue size over limit, exception!
auto event = std::make_shared<StartUpEvent>(resource); auto event = std::make_shared<StartUpEvent>(resource);
event_queue_.push(event); event_queue_.push(event);
} }
...@@ -111,20 +103,29 @@ public: ...@@ -111,20 +103,29 @@ public:
/* /*
* Process finish task events; * Process finish task events;
*/ */
void inline void
OnFinishTask(ResourceWPtr); OnFinishTask(ResourceWPtr &resource) {
auto event = std::make_shared<FinishTaskEvent>(resource);
event_queue_.push(event);
}
/* /*
* Process copy completed events; * Process copy completed events;
*/ */
void inline void
OnCopyCompleted(ResourceWPtr); OnCopyCompleted(ResourceWPtr &resource) {
auto event = std::make_shared<CopyCompletedEvent>(resource);
event_queue_.push(event);
}
/* /*
* Process task table updated events; * Process task table updated events;
*/ */
void inline void
OnTaskTableUpdated(ResourceWPtr); OnTaskTableUpdated(ResourceWPtr &resource) {
auto event = std::make_shared<TaskTableUpdatedEvent>(resource);
event_queue_.push(event);
}
public: public:
...@@ -133,13 +134,11 @@ public: ...@@ -133,13 +134,11 @@ public:
private: private:
/*
* Called by worker_thread_;
*/
void void
worker_function() { worker_function();
while (running_) {
auto event = event_queue_.front();
event->Process();
}
}
private: private:
bool running_; bool running_;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "TaskTable.h"
#include <vector>
namespace zilliz {
namespace milvus {
namespace engine {
TaskTable::TaskTable(std::vector<TaskPtr> &&tasks) {
}
void
TaskTable::Put(TaskPtr task) {
}
void
TaskTable::Put(std::vector<TaskPtr> &tasks) {
}
TaskTableItem &
TaskTable::Get(uint64_t index) {
return table_[index];
}
void
TaskTable::Clear() {
// find first task is NOT (done or moved), erase from begin to it;
// auto iterator = table_.begin();
// while (iterator->state == TaskTableItemState::EXECUTED or
// iterator->state == TaskTableItemState::MOVED)
// iterator++;
// table_.erase(table_.begin(), iterator);
}
bool
TaskTable::Move(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task.mutex);
if (task.state == TaskTableItemState::START) {
task.state = TaskTableItemState::LOADING;
return true;
}
return false;
}
bool
TaskTable::Moved(uint64_t index) {
return false;
}
bool
TaskTable::Load(uint64_t index) {
return false;
}
bool
TaskTable::Loaded(uint64_t index) {
return false;
}
bool
TaskTable::Execute(uint64_t index) {
return false;
}
bool
TaskTable::Executed(uint64_t index) {
return false;
}
std::string
TaskTable::Dump() {
return std::string();
}
}
}
}
...@@ -47,26 +47,26 @@ public: ...@@ -47,26 +47,26 @@ public:
TaskTable() = default; TaskTable() = default;
explicit explicit
TaskTable(std::vector<TaskPtr> &&tasks) {} TaskTable(std::vector<TaskPtr> &&tasks);
/* /*
* Put one task; * Put one task;
*/ */
void void
Put(TaskPtr task) {} Put(TaskPtr task);
/* /*
* Put tasks back of task table; * Put tasks back of task table;
* Called by DBImpl; * Called by DBImpl;
*/ */
void void
Put(std::vector<TaskPtr> &tasks) {} Put(std::vector<TaskPtr> &tasks);
/* /*
* Return task table item reference; * Return task table item reference;
*/ */
TaskTableItem & TaskTableItem &
Get(uint64_t index) {} Get(uint64_t index);
/* /*
* TODO * TODO
...@@ -74,14 +74,7 @@ public: ...@@ -74,14 +74,7 @@ public:
* Called by ? * Called by ?
*/ */
void void
Clear() { Clear();
// find first task is NOT (done or moved), erase from begin to it;
// auto iterator = table_.begin();
// while (iterator->state == TaskTableItemState::EXECUTED or
// iterator->state == TaskTableItemState::MOVED)
// iterator++;
// table_.erase(table_.begin(), iterator);
}
public: public:
...@@ -95,16 +88,7 @@ public: ...@@ -95,16 +88,7 @@ public:
// TODO: bool to Status // TODO: bool to Status
bool bool
Move(uint64_t index) { Move(uint64_t index);
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task.mutex);
if (task.state == TaskTableItemState::START) {
task.state = TaskTableItemState::LOADING;
return true;
}
return false;
}
/* /*
* Move task finished; * Move task finished;
...@@ -112,7 +96,7 @@ public: ...@@ -112,7 +96,7 @@ public:
* Called by scheduler; * Called by scheduler;
*/ */
bool bool
Moved(uint64_t index) {} Moved(uint64_t index);
/* /*
* Load a task; * Load a task;
...@@ -120,7 +104,7 @@ public: ...@@ -120,7 +104,7 @@ public:
* Called by loader; * Called by loader;
*/ */
bool bool
Load(uint64_t index) {} Load(uint64_t index);
/* /*
* Load task finished; * Load task finished;
...@@ -128,7 +112,7 @@ public: ...@@ -128,7 +112,7 @@ public:
* Called by loader; * Called by loader;
*/ */
bool bool
Loaded(uint64_t index) {} Loaded(uint64_t index);
/* /*
* Execute a task; * Execute a task;
...@@ -136,7 +120,7 @@ public: ...@@ -136,7 +120,7 @@ public:
* Called by executor; * Called by executor;
*/ */
bool bool
Execute(uint64_t index) {} Execute(uint64_t index);
/* /*
* Execute task finished; * Execute task finished;
...@@ -144,7 +128,7 @@ public: ...@@ -144,7 +128,7 @@ public:
* Called by executor; * Called by executor;
*/ */
bool bool
Executed(uint64_t index) {} Executed(uint64_t index);
public: public:
/* /*
......
...@@ -143,11 +143,11 @@ private: ...@@ -143,11 +143,11 @@ private:
*/ */
TaskPtr TaskPtr
pick_task_load() { pick_task_load() {
auto tasks = PickToLoad(task_table_, 3); auto indexes = PickToLoad(task_table_, 3);
for (uint64_t i = 0; i < tasks.size(); ++i) { for (auto index : indexes) {
// try to set one task loading, then return // try to set one task loading, then return
if (task_table_.Load(i)) if (task_table_.Load(index))
return task_table_.Get(i).task; return task_table_.Get(index).task;
// else try next // else try next
} }
return nullptr; return nullptr;
...@@ -159,11 +159,11 @@ private: ...@@ -159,11 +159,11 @@ private:
*/ */
TaskPtr TaskPtr
pick_task_execute() { pick_task_execute() {
auto tasks = PickToExecute(task_table_, 3); auto indexes = PickToExecute(task_table_, 3);
for (uint64_t i = 0; i < tasks.size(); ++i) { for (auto index : indexes) {
// try to set one task executing, then return // try to set one task executing, then return
if (task_table_.Execute(i)) if (task_table_.Execute(index))
return task_table_.Get(i).task; return task_table_.Get(index).task;
// else try next // else try next
} }
return nullptr; return nullptr;
......
...@@ -21,7 +21,7 @@ TEST(normal_test, DISABLED_test1) { ...@@ -21,7 +21,7 @@ TEST(normal_test, DISABLED_test1) {
res_mgr->Connect(cpu, gpu1, PCIE); res_mgr->Connect(cpu, gpu1, PCIE);
res_mgr->Connect(cpu, gpu2, PCIE); res_mgr->Connect(cpu, gpu2, PCIE);
res_mgr->StartAll(); res_mgr->Start();
auto task1 = std::make_shared<Task>("123456789"); auto task1 = std::make_shared<Task>("123456789");
auto task2 = std::make_shared<Task>("222222222"); auto task2 = std::make_shared<Task>("222222222");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册