diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 3f6b12cf1bcd15af14aa308bafcefc1f326be7e6..c9ebc41d37f60c5dc3587acec07102c86faf0186 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -44,6 +44,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-403 - Add GpuCacheMgr - MS-404 - Release index after search task done avoid memory increment continues - MS-405 - Add delete task support +- MS-408 - Add device_id in resource construct function ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceFactory.cpp b/cpp/src/scheduler/ResourceFactory.cpp index 82dae5eb251a8748c5f7d038abbe0451f6e4895b..71137c361a3eb72b042124b43cc3e7f6e80a3ffd 100644 --- a/cpp/src/scheduler/ResourceFactory.cpp +++ b/cpp/src/scheduler/ResourceFactory.cpp @@ -13,15 +13,16 @@ namespace engine { std::shared_ptr ResourceFactory::Create(const std::string &name, - const std::string &alias, + const std::string &type, + uint64_t device_id, bool enable_loader, bool enable_executor) { - if (name == "disk") { - return std::make_shared(alias, enable_loader, enable_executor); - } else if (name == "cpu") { - return std::make_shared(alias, enable_loader, enable_executor); - } else if (name == "gpu") { - return std::make_shared(alias, enable_loader, enable_executor); + if (type == "DISK") { + return std::make_shared(name, device_id, enable_loader, enable_executor); + } else if (type == "CPU") { + return std::make_shared(name, device_id, enable_loader, enable_executor); + } else if (type == "GPU") { + return std::make_shared(name, device_id, enable_loader, enable_executor); } else { return nullptr; } diff --git a/cpp/src/scheduler/ResourceFactory.h b/cpp/src/scheduler/ResourceFactory.h index c91fd72938085322e69400457c5957dabf549bc8..84549c62f1bb5fdf75ff134ea9a2ab0d0debafb3 100644 --- a/cpp/src/scheduler/ResourceFactory.h +++ b/cpp/src/scheduler/ResourceFactory.h @@ -22,7 +22,8 @@ class ResourceFactory { public: static std::shared_ptr Create(const std::string &name, - const std::string &alias = "", + const std::string &type, + uint64_t device_id, bool enable_loader = true, bool enable_executor = true); }; diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index f341821392e746c729289ae93a91b1f5b15278ff..0ef8d7b01f6ed40f0df17b18f26359965af3dd66 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -48,6 +48,16 @@ ResourceMgr::Add(ResourcePtr &&resource) { return ret; } +void +ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connection &connection) { + auto res1 = get_resource_by_name(name1); + auto res2 = get_resource_by_name(name2); + if (res1 && res2) { + res1->AddNeighbour(std::static_pointer_cast(res2), connection); + res2->AddNeighbour(std::static_pointer_cast(res1), connection); + } +} + void ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) { if (auto observe_a = res1.lock()) { @@ -116,6 +126,16 @@ ResourceMgr::DumpTaskTables() { return ss.str(); } +ResourcePtr +ResourceMgr::get_resource_by_name(const std::string &name) { + for (auto &res : resources_) { + if (res->Name() == name) { + return res; + } + } + return nullptr; +} + void ResourceMgr::event_process() { while (running_) { diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 54ff551b6b36fdf41bfe76a54b38fa41672d7d95..c695e70ee41d83a24bdb18e43704f153f9d23e78 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -49,6 +49,9 @@ public: ResourceWPtr Add(ResourcePtr &&resource); + void + Connect(const std::string &res1, const std::string &res2, Connection &connection); + /* * Create connection between A and B; */ @@ -80,6 +83,9 @@ public: DumpTaskTables(); private: + ResourcePtr + get_resource_by_name(const std::string &name); + void event_process(); diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 87083c0c6f341aa5f6bae73352d94de6a2947f2e..2d309d591ca34319b7468725bd9b946b831c3003 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -48,6 +48,7 @@ ToString(const TaskTimestamp ×tamp) { ss << ", executed=" << timestamp.executed; ss << ", move=" << timestamp.move; ss << ", moved=" << timestamp.moved; + ss << ", finish=" << timestamp.finish; ss << ">"; return ss.str(); } @@ -92,6 +93,7 @@ TaskTableItem::Executed() { state = TaskTableItemState::EXECUTED; lock.unlock(); timestamp.executed = get_now_timestamp(); + timestamp.finish = get_now_timestamp(); return true; } return false; @@ -114,6 +116,7 @@ TaskTableItem::Moved() { state = TaskTableItemState::MOVED; lock.unlock(); timestamp.moved = get_now_timestamp(); + timestamp.finish = get_now_timestamp(); return true; } return false; diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index cb91e01e9d04ad9f2e6c7e3615e8baf31ab6c035..886259957cd992e68623668d5d51c7e3540cc914 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -36,6 +36,7 @@ struct TaskTimestamp { uint64_t loaded = 0; uint64_t execute = 0; uint64_t executed = 0; + uint64_t finish = 0; }; struct TaskTableItem { diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index f253e4c6129aaf348aaba7f0c3225d98f9b52194..7c5855c11181a6b481e026608f6baaae9f17b75f 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -20,7 +20,7 @@ next(std::list &neighbours, std::list::iterator &it) { } } - +// TODO: this function called with only on tasks, so it will always push task to first neighbour void push_task_round_robin(TaskTable &self_task_table, std::list &neighbours) { CacheMgr cache; @@ -31,7 +31,7 @@ push_task_round_robin(TaskTable &self_task_table, std::list &neighb for (auto index : indexes) { if (self_task_table.Move(index)) { auto task = self_task_table.Get(index)->task; - task = task->Clone(); +// task = task->Clone(); (*it)->task_table().Put(task); next(neighbours, it); } diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp index 0eb6aeb4f52a999718c75f449f4f57e1bfe4fad7..9428e1816bc6938c7b42b5aad8210f1b766f83b8 100644 --- a/cpp/src/scheduler/resource/CpuResource.cpp +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -16,8 +16,8 @@ std::ostream &operator<<(std::ostream &out, const CpuResource &resource) { return out; } -CpuResource::CpuResource(std::string name, bool enable_loader, bool enable_executor) - : Resource(std::move(name), ResourceType::CPU, enable_loader, enable_executor) {} +CpuResource::CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor) + : Resource(std::move(name), ResourceType::CPU, device_id, enable_loader, enable_executor) {} void CpuResource::LoadFile(TaskPtr task) { task->Load(LoadType::DISK2CPU, 0); diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h index 19f49c59b464de349701bf0a558152ffedb16154..4ca39b738bacb11a340e4864b12eb75d94d29077 100644 --- a/cpp/src/scheduler/resource/CpuResource.h +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -17,7 +17,7 @@ namespace engine { class CpuResource : public Resource { public: explicit - CpuResource(std::string name, bool enable_loader, bool enable_executor); + CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor); inline std::string Dump() const override { diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp index 3a6b0e5d6b51921c2b5a2b79d10c9a34b2e0ba6d..943e1e5efb0d5a7bfac8b16107ad17ec58d60096 100644 --- a/cpp/src/scheduler/resource/DiskResource.cpp +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -15,8 +15,8 @@ std::ostream &operator<<(std::ostream &out, const DiskResource &resource) { return out; } -DiskResource::DiskResource(std::string name, bool enable_loader, bool enable_executor) - : Resource(std::move(name), ResourceType::DISK, enable_loader, enable_executor) { +DiskResource::DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor) + : Resource(std::move(name), ResourceType::DISK, device_id, enable_loader, enable_executor) { } void DiskResource::LoadFile(TaskPtr task) { diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h index 11263a3298d1128633b3f9b84ac88d47d0e759a9..8dcb909e2599d116f7e911a0c8ff6a705dde7a92 100644 --- a/cpp/src/scheduler/resource/DiskResource.h +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -16,7 +16,7 @@ namespace engine { class DiskResource : public Resource { public: explicit - DiskResource(std::string name, bool enable_loader, bool enable_executor); + DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor); inline std::string Dump() const override { diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp index 85760ea6d278f53bc6cede44c593a278508026f8..38f1d624a67c6a6b36512abd84f538709b1a053b 100644 --- a/cpp/src/scheduler/resource/GpuResource.cpp +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -16,11 +16,11 @@ std::ostream &operator<<(std::ostream &out, const GpuResource &resource) { return out; } -GpuResource::GpuResource(std::string name, bool enable_loader, bool enable_executor) - : Resource(std::move(name), ResourceType::GPU, enable_loader, enable_executor) {} +GpuResource::GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor) + : Resource(std::move(name), ResourceType::GPU, device_id, enable_loader, enable_executor) {} void GpuResource::LoadFile(TaskPtr task) { - task->Load(LoadType::CPU2GPU, 0); + task->Load(LoadType::CPU2GPU, device_id_); } void GpuResource::Process(TaskPtr task) { diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h index 95debd743298e4948d5969cc5e19bf67fc49111e..5db3c005ee3a71ebb58c3ca6b4f0bc91ddf2fe05 100644 --- a/cpp/src/scheduler/resource/GpuResource.h +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -16,7 +16,7 @@ namespace engine { class GpuResource : public Resource { public: explicit - GpuResource(std::string name, bool enable_loader, bool enable_executor); + GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor); inline std::string Dump() const override { diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 77bcb28f3576269708430441ea91fe1495f011bc..6789d00c8927ca59120c03e4730636ec32050a1b 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -18,10 +18,12 @@ std::ostream &operator<<(std::ostream &out, const Resource &resource) { Resource::Resource(std::string name, ResourceType type, + uint64_t device_id, bool enable_loader, bool enable_executor) : name_(std::move(name)), type_(type), + device_id_(device_id), running_(false), enable_loader_(enable_loader), enable_executor_(enable_executor), diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index ae78c225a019fbe10aad13fbf5f950e4e048f303..5a6ae28cbc3196a843abfcff63c0d64924958e61 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -82,6 +82,11 @@ public: subscriber_ = std::move(subscriber); } + inline std::string + Name() const { + return name_; + } + inline ResourceType Type() const { return type_; @@ -112,6 +117,7 @@ public: protected: Resource(std::string name, ResourceType type, + uint64_t device_id, bool enable_loader, bool enable_executor); @@ -163,6 +169,9 @@ private: void executor_function(); +protected: + uint64_t device_id_; + private: std::string name_; ResourceType type_;