diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 5bb6053d49fcc837894354831fa66628152db1f8..8232bd77c0c4c60a84a9ac819f1972619ad89d13 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -23,6 +23,13 @@ Please mark all change in change log and use the ticket from JIRA. - MS-365 - Use tasktableitemptr instead in event - MS-366 - Implement TaskTable - MS-368 - Implement cost.cpp +- MS-371 - Add TaskTableUpdatedEvent +- MS-373 - Add resource test +- MS-374 - Add action definition +- MS-375 - Add Dump implementation for Event +- MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task +- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable +- MS-378 - Debug and Update normal_test in scheduler unittest ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index a0625c836f28105d259839b0b721872d63058329..916aaa238a0debf8636a637ac4aaf2c953f1aa81 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -33,11 +33,7 @@ ResourceMgr::Add(ResourcePtr &&resource) { resources_.emplace_back(resource); size_t index = resources_.size() - 1; - resource->RegisterSubscriber([&](EventPtr event) { - queue_.emplace(event); - std::unique_lock lock(event_mutex_); - event_cv_.notify_one(); - }); + resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1)); return ret; } @@ -46,27 +42,11 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect if (auto observe_a = res1.lock()) { if (auto observe_b = res2.lock()) { observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); + observe_b->AddNeighbour(std::static_pointer_cast(observe_a), connection); } } } -void -ResourceMgr::EventProcess() { - while (running_) { - std::unique_lock lock(event_mutex_); - event_cv_.wait(lock, [this] { return !queue_.empty(); }); - - if (!running_) { - break; - } - - auto event = queue_.front(); - queue_.pop(); - if (subscriber_) { - subscriber_(event); - } - } -} void ResourceMgr::Start() { @@ -74,23 +54,33 @@ ResourceMgr::Start() { for (auto &resource : resources_) { resource->Start(); } - worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); - running_ = true; + worker_thread_ = std::thread(&ResourceMgr::event_process, this); } void ResourceMgr::Stop() { - std::lock_guard lck(resources_mutex_); - - running_ = false; + { + std::lock_guard lock(event_mutex_); + running_ = false; + queue_.push(nullptr); + event_cv_.notify_one(); + } worker_thread_.join(); + std::lock_guard lck(resources_mutex_); for (auto &resource : resources_) { resource->Stop(); } } +void +ResourceMgr::PostEvent(const EventPtr &event) { + std::unique_lock lock(event_mutex_); + queue_.emplace(event); + event_cv_.notify_one(); +} + std::string ResourceMgr::Dump() { std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n"; @@ -103,6 +93,26 @@ ResourceMgr::Dump() { return str; } +void +ResourceMgr::event_process() { + while (running_) { + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !queue_.empty(); }); + + auto event = queue_.front(); + if (event == nullptr) { + break; + } + +// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event; + + queue_.pop(); + if (subscriber_) { + subscriber_(event); + } + } +} + } } } diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index fc7744cd2bdb14d9ef5fa5d0de270da0393524a3..cb2e63193524194971ac9fac197f0b91146fccca 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -14,6 +14,7 @@ #include #include "resource/Resource.h" +#include "utils/Log.h" namespace zilliz { @@ -59,6 +60,8 @@ public: void Stop(); + void + PostEvent(const EventPtr& event); // TODO: add stats interface(low) @@ -70,7 +73,7 @@ public: private: void - EventProcess(); + event_process(); private: std::queue queue_; diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index c9d92ccb573f020031809a2b8f3c4e4ed84d7067..06dfa669dbb1c46ea0bc862da4f65896fa80ce66 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -4,69 +4,129 @@ * Proprietary and confidential. ******************************************************************************/ +#include #include "Scheduler.h" #include "Cost.h" +#include "action/Action.h" namespace zilliz { namespace milvus { namespace engine { +Scheduler::Scheduler(ResourceMgrWPtr res_mgr) + : running_(false), + res_mgr_(std::move(res_mgr)) { + if (auto mgr = res_mgr_.lock()) { + mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1)); + } +} + + void -push_task(ResourcePtr &self, ResourcePtr &other) { - auto self_task_table = self->task_table(); - auto other_task_table = other->task_table(); - if (!other_task_table.Empty()) { - CacheMgr cache; - auto indexes = PickToMove(self_task_table, cache, 1); - for (auto index : indexes) { - if (self_task_table.Move(index)) { - auto task = self_task_table.Get(index)->task; - other_task_table.Put(task); - // TODO: mark moved future - other->WakeupLoader(); - other->WakeupExecutor(); - } - } +Scheduler::Start() { + running_ = true; + worker_thread_ = std::thread(&Scheduler::worker_function, this); +} + +void +Scheduler::Stop() { + { + std::lock_guard lock(event_mutex_); + running_ = false; + event_queue_.push(nullptr); + event_cv_.notify_one(); } + worker_thread_.join(); +} + +void +Scheduler::PostEvent(const EventPtr &event) { + std::lock_guard lock(event_mutex_); + event_queue_.push(event); + event_cv_.notify_one(); +// SERVER_LOG_DEBUG << "Scheduler post " << *event; +} + +std::string +Scheduler::Dump() { + return std::string(); } void -schedule(const ResourceWPtr &res) { - if (auto self = res.lock()) { - for (auto &nei : self->GetNeighbours()) { - if (auto n = nei.neighbour_node.lock()) { - auto neighbour = std::static_pointer_cast(n); - push_task(self, neighbour); - } +Scheduler::worker_function() { + while (running_) { + std::unique_lock lock(event_mutex_); + event_cv_.wait(lock, [this] { return !event_queue_.empty(); }); + auto event = event_queue_.front(); + if (event == nullptr) { + break; } +// SERVER_LOG_DEBUG << "Scheduler process " << *event; + event_queue_.pop(); + Process(event); + } +} + +void +Scheduler::Process(const EventPtr &event) { + switch (event->Type()) { + case EventType::START_UP: { + OnStartUp(event); + break; + } + case EventType::COPY_COMPLETED: { + OnCopyCompleted(event); + break; + } + case EventType::FINISH_TASK: { + OnFinishTask(event); + break; + } + case EventType::TASK_TABLE_UPDATED: { + OnTaskTableUpdated(event); + break; + } + default: { + // TODO: logging + break; + } } } + void Scheduler::OnStartUp(const EventPtr &event) { - schedule(event->resource_); + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + } } void Scheduler::OnFinishTask(const EventPtr &event) { - schedule(event->resource_); + if (auto resource = event->resource_.lock()) { + resource->WakeupExecutor(); + } } void Scheduler::OnCopyCompleted(const EventPtr &event) { - schedule(event->resource_); + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + resource->WakeupExecutor(); + if (resource->Type()== ResourceType::DISK) { + Action::PushTaskToNeighbour(event->resource_); + } + } } void Scheduler::OnTaskTableUpdated(const EventPtr &event) { - schedule(event->resource_); -} - -std::string -Scheduler::Dump() { - return std::string(); +// Action::PushTaskToNeighbour(event->resource_); + if (auto resource = event->resource_.lock()) { + resource->WakeupLoader(); + } } } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 5e50826238c985bf5b66b684ea55feb5588f8cc6..012a479a824aafac9aad46c82a1e6190fd09da54 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -13,6 +13,7 @@ #include "resource/Resource.h" #include "ResourceMgr.h" +#include "utils/Log.h" namespace zilliz { @@ -23,20 +24,32 @@ namespace engine { class Scheduler { public: explicit - Scheduler(ResourceMgrWPtr res_mgr) - : running_(false), - res_mgr_(std::move(res_mgr)) { -// res_mgr.Register(); -// res_mgr.Register(); -// res_mgr.Register(); -// res_mgr.Register(); - } + Scheduler(ResourceMgrWPtr res_mgr); + Scheduler(const Scheduler &) = delete; + Scheduler(Scheduler &&) = delete; + + /* + * Start worker thread; + */ + void + Start(); + + /* + * Stop worker thread, join it; + */ void - Start() { - worker_thread_ = std::thread(&Scheduler::worker_thread_, this); - } + Stop(); + /* + * Post event to scheduler event queue; + */ + void + PostEvent(const EventPtr &event); + + /* + * Dump as string; + */ std::string Dump(); @@ -45,24 +58,37 @@ private: /* * Process start up events; + * + * Actions: + * Pull task from neighbours; */ void OnStartUp(const EventPtr &event); /* * Process finish task events; + * + * Actions: + * Pull task from neighbours; */ void OnFinishTask(const EventPtr &event); /* * Process copy completed events; + * + * Actions: + * Mark task source MOVED; + * Pull task from neighbours; */ void OnCopyCompleted(const EventPtr &event); /* - * Process task table updated events; + * Process task table updated events, which happened on task_table->put; + * + * Actions: + * Push task to neighbours; */ void OnTaskTableUpdated(const EventPtr &event); @@ -72,40 +98,13 @@ private: * Dispatch event to event handler; */ void - Process(const EventPtr &event) { - switch (event->Type()) { - case EventType::START_UP: { - OnStartUp(event); - break; - } - case EventType::COPY_COMPLETED: { - OnCopyCompleted(event); - break; - } - case EventType::FINISH_TASK: { - OnFinishTask(event); - break; - } - case EventType::TASK_TABLE_UPDATED: { - OnTaskTableUpdated(event); - break; - } - default: { - break; - } - } - } + Process(const EventPtr &event); /* * Called by worker_thread_; */ void - worker_function() { - while (running_) { - auto event = event_queue_.front(); - Process(event); - } - } + worker_function(); private: bool running_; @@ -113,6 +112,8 @@ private: ResourceMgrWPtr res_mgr_; std::queue event_queue_; std::thread worker_thread_; + std::mutex event_mutex_; + std::condition_variable event_cv_; }; using SchedulerPtr = std::shared_ptr; diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 807d35be4ac907baa852956fb0b4446b440dc2ba..7ef033c500510a13667bfbbd0d8e8a7cfe3ae5ba 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -7,6 +7,7 @@ #include "TaskTable.h" #include "event/TaskTableUpdatedEvent.h" #include +#include namespace zilliz { @@ -16,9 +17,11 @@ namespace engine { void TaskTable::Put(TaskPtr task) { + std::lock_guard lock(id_mutex_); auto item = std::make_shared(); + item->id = id_++; item->task = std::move(task); - item->state = TaskTableItemState::LOADED; + item->state = TaskTableItemState::START; table_.push_back(item); if (subscriber_) { subscriber_(); @@ -27,10 +30,12 @@ TaskTable::Put(TaskPtr task) { void TaskTable::Put(std::vector &tasks) { + std::lock_guard lock(id_mutex_); for (auto &task : tasks) { auto item = std::make_shared(); + item->id = id_++; item->task = std::move(task); - item->state = TaskTableItemState::LOADED; + item->state = TaskTableItemState::START; table_.push_back(item); } if (subscriber_) { @@ -59,8 +64,8 @@ TaskTable::Move(uint64_t index) { auto &task = table_[index]; std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::START) { - task->state = TaskTableItemState::LOADING; + if (task->state == TaskTableItemState::LOADED) { + task->state = TaskTableItemState::MOVING; return true; } return false; @@ -126,9 +131,30 @@ TaskTable::Executed(uint64_t index) { return false; } +std::string +ToString(TaskTableItemState state) { + switch (state) { + case TaskTableItemState::INVALID: return "INVALID"; + case TaskTableItemState::START: return "START"; + case TaskTableItemState::LOADING: return "LOADING"; + case TaskTableItemState::LOADED: return "LOADED"; + case TaskTableItemState::EXECUTING: return "EXECUTING"; + case TaskTableItemState::EXECUTED: return "EXECUTED"; + case TaskTableItemState::MOVING: return "MOVING"; + case TaskTableItemState::MOVED: return "MOVED"; + default: return ""; + } +} + std::string TaskTable::Dump() { - return std::string(); + std::stringstream ss; + for (auto &item : table_) { + ss << "<" << item->id; + ss << ", " << ToString(item->state); + ss << ">" << std::endl; + } + return ss.str(); } } diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 70a2899331239c14d7baaf055186a3823ae668a4..8a482d0579a386f2bbbc59ca59c7df3e581938e8 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -49,6 +49,9 @@ class TaskTable { public: TaskTable() = default; + TaskTable(const TaskTable &) = delete; + TaskTable(TaskTable &&) = delete; + inline void RegisterSubscriber(std::function subscriber) { subscriber_ = std::move(subscriber); @@ -167,6 +170,8 @@ public: private: // TODO: map better ? + std::uint64_t id_ = 0; + mutable std::mutex id_mutex_; std::deque table_; std::function subscriber_ = nullptr; }; diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h new file mode 100644 index 0000000000000000000000000000000000000000..d72bbefc8d7bb6ef7630493d85aefacae0781aa5 --- /dev/null +++ b/cpp/src/scheduler/action/Action.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved + * unauthorized copying of this file, via any medium is strictly prohibited. + * proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "../resource/Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class Action { +public: + /* + * Push task to neighbour; + */ + static void + PushTaskToNeighbour(const ResourceWPtr &self); + + + /* + * Pull task From neighbour; + */ + static void + PullTaskFromNeighbour(const ResourceWPtr &self); +}; + + +} +} +} diff --git a/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp b/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b1ac97b6e48a9343f6a8a5e009706a0d30269fc9 --- /dev/null +++ b/cpp/src/scheduler/action/PullTaskFromNeighbour.cpp @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Action.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +void +Action::PullTaskFromNeighbour(const ResourceWPtr &self) { + // TODO: implement +} + + +} +} +} + + diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c99f490f11dc4be4c8d4cf9d61561fa0456c768e --- /dev/null +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -0,0 +1,45 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Action.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +void +push_task(ResourcePtr &self, ResourcePtr &other) { + auto &self_task_table = self->task_table(); + auto &other_task_table = other->task_table(); + CacheMgr cache; + auto indexes = PickToMove(self_task_table, cache, 1); + for (auto index : indexes) { + if (self_task_table.Move(index)) { + auto task = self_task_table.Get(index)->task; + other_task_table.Put(task); + // TODO: mark moved future + } + } +} + +void +Action::PushTaskToNeighbour(const ResourceWPtr &res) { + if (auto self = res.lock()) { + for (auto &neighbour : self->GetNeighbours()) { + if (auto n = neighbour.neighbour_node.lock()) { + auto neighbour = std::static_pointer_cast(n); + push_task(self, neighbour); + } + } + } +} + + +} +} +} + diff --git a/cpp/src/scheduler/event/CopyCompletedEvent.h b/cpp/src/scheduler/event/CopyCompletedEvent.h index c84c59333ef149a15549915b97cd3a422c582f14..d2f5ddb0ff22f11e391bcb3a95605c0ad42d2c51 100644 --- a/cpp/src/scheduler/event/CopyCompletedEvent.h +++ b/cpp/src/scheduler/event/CopyCompletedEvent.h @@ -18,6 +18,14 @@ public: CopyCompletedEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::COPY_COMPLETED, std::move(resource)), task_table_item_(std::move(task_table_item)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event); + public: TaskTableItemPtr task_table_item_; }; diff --git a/cpp/src/scheduler/event/Event.h b/cpp/src/scheduler/event/Event.h index 4b04d5404b5c83e91b73a91b8897a56c1668b2a9..788cfd6a73f2c958b500372952839a02ea12ca8c 100644 --- a/cpp/src/scheduler/event/Event.h +++ b/cpp/src/scheduler/event/Event.h @@ -5,6 +5,8 @@ ******************************************************************************/ #pragma once +#include + namespace zilliz { namespace milvus { namespace engine { @@ -30,6 +32,13 @@ public: return type_; } + inline virtual std::string + Dump() const { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const Event &event); + public: EventType type_; std::weak_ptr resource_; diff --git a/cpp/src/scheduler/event/EventDump.cpp b/cpp/src/scheduler/event/EventDump.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0d10f6f7b4fa4af451e4fffc613f1f1825ebf688 --- /dev/null +++ b/cpp/src/scheduler/event/EventDump.cpp @@ -0,0 +1,45 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "Event.h" +#include "StartUpEvent.h" +#include "CopyCompletedEvent.h" +#include "FinishTaskEvent.h" +#include "TaskTableUpdatedEvent.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +std::ostream &operator<<(std::ostream &out, const Event &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const StartUpEvent &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event) { + out << event.Dump(); + return out; +} + +std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) { + out << event.Dump(); + return out; +} + +} +} +} diff --git a/cpp/src/scheduler/event/FinishTaskEvent.h b/cpp/src/scheduler/event/FinishTaskEvent.h index 2739bb2fcc87dd7d823ffb7576dcef736147d9f9..14daa9b532949570f50ba273108ac879f65ab792 100644 --- a/cpp/src/scheduler/event/FinishTaskEvent.h +++ b/cpp/src/scheduler/event/FinishTaskEvent.h @@ -18,6 +18,13 @@ public: : Event(EventType::FINISH_TASK, std::move(resource)), task_table_item_(std::move(task_table_item)) {} + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event); + public: TaskTableItemPtr task_table_item_; }; diff --git a/cpp/src/scheduler/event/StartUpEvent.h b/cpp/src/scheduler/event/StartUpEvent.h index 04bc462dccfd0b9a79d80d70eecdc7be567d7e2f..4b5ec78cd683f0695e26d8daffc47cd46aa3339d 100644 --- a/cpp/src/scheduler/event/StartUpEvent.h +++ b/cpp/src/scheduler/event/StartUpEvent.h @@ -17,6 +17,13 @@ public: explicit StartUpEvent(std::weak_ptr resource) : Event(EventType::START_UP, std::move(resource)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const StartUpEvent &event); }; } diff --git a/cpp/src/scheduler/event/TaskTableUpdatedEvent.h b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h index 8658316222cde981bb4942d867d1aff3c7d321ca..f96c30674c5ec6d331fdf321d6bff7b3b6871b74 100644 --- a/cpp/src/scheduler/event/TaskTableUpdatedEvent.h +++ b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h @@ -17,6 +17,13 @@ public: explicit TaskTableUpdatedEvent(std::weak_ptr resource) : Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {} + + inline std::string + Dump() const override { + return ""; + } + + friend std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event); }; diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp index 32eb62704607c6a471b028fdb73384e9361f8bb1..11c7796187c3dfc0c982c45970e02d7f8ee534a7 100644 --- a/cpp/src/scheduler/resource/CpuResource.cpp +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -16,21 +16,11 @@ CpuResource::CpuResource(std::string name) : Resource(std::move(name), ResourceType::CPU) {} void CpuResource::LoadFile(TaskPtr task) { - //if (src.type == DISK) { - // fd = open(filename); - // content = fd.read(); - // close(fd); - //} else if (src.type == CPU) { - // memcpy(src, dest, len); - //} else if (src.type == GPU) { - // cudaMemcpyD2H(src, dest); - //} else { - // // unknown type, exception - //} + task->Load(LoadType::DISK2CPU, 0); } void CpuResource::Process(TaskPtr task) { - + task->Execute(); } } diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp index dcc0687ac46c414c1775782ab7f4c5480e5deb45..66cb72b06268bb6bc2917fe8279729abb595f10c 100644 --- a/cpp/src/scheduler/resource/DiskResource.cpp +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -12,7 +12,8 @@ namespace engine { DiskResource::DiskResource(std::string name) - : Resource(std::move(name), ResourceType::DISK) {} + : Resource(std::move(name), ResourceType::DISK, true, false) { +} void DiskResource::LoadFile(TaskPtr task) { diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp index 00d5df05b46fd70d9652a9ca618879d7c23f582d..df6827881cafcd250054308ae428d702b6fb1e02 100644 --- a/cpp/src/scheduler/resource/GpuResource.cpp +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -16,11 +16,11 @@ GpuResource::GpuResource(std::string name) : Resource(std::move(name), ResourceType::GPU) {} void GpuResource::LoadFile(TaskPtr task) { - + task->Load(LoadType::CPU2GPU, 0); } void GpuResource::Process(TaskPtr task) { - + task->Execute(); } } diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index ea6f71a3591632f82c04c6cb499f6f22a2d89979..ed5d57b7ddaf65f034c378c42273331b0fd51e78 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -10,10 +10,15 @@ namespace zilliz { namespace milvus { namespace engine { -Resource::Resource(std::string name, ResourceType type) +Resource::Resource(std::string name, + ResourceType type, + bool enable_loader, + bool enable_executor) : name_(std::move(name)), type_(type), running_(false), + enable_loader_(enable_loader), + enable_executor_(enable_executor), load_flag_(false), exec_flag_(false) { task_table_.RegisterSubscriber([&] { @@ -25,28 +30,43 @@ Resource::Resource(std::string name, ResourceType type) } void Resource::Start() { - loader_thread_ = std::thread(&Resource::loader_function, this); - executor_thread_ = std::thread(&Resource::executor_function, this); + running_ = true; + if (enable_loader_) { + loader_thread_ = std::thread(&Resource::loader_function, this); + } + if (enable_executor_) { + executor_thread_ = std::thread(&Resource::executor_function, this); + } } void Resource::Stop() { running_ = false; - WakeupLoader(); - WakeupExecutor(); + if (enable_loader_) { + WakeupLoader(); + loader_thread_.join(); + } + if (enable_executor_) { + WakeupExecutor(); + executor_thread_.join(); + } } TaskTable &Resource::task_table() { return task_table_; } -void Resource::WakeupExecutor() { - exec_cv_.notify_one(); -} - void Resource::WakeupLoader() { + std::lock_guard lock(load_mutex_); + load_flag_ = true; load_cv_.notify_one(); } +void Resource::WakeupExecutor() { + std::lock_guard lock(exec_mutex_); + exec_flag_ = true; + exec_cv_.notify_one(); +} + TaskTableItemPtr Resource::pick_task_load() { auto indexes = PickToLoad(task_table_, 3); for (auto index : indexes) { @@ -73,9 +93,12 @@ void Resource::loader_function() { while (running_) { std::unique_lock lock(load_mutex_); load_cv_.wait(lock, [&] { return load_flag_; }); + load_flag_ = false; auto task_item = pick_task_load(); if (task_item) { LoadFile(task_item->task); + // TODO: wrapper loaded + task_item->state = TaskTableItemState::LOADED; if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); @@ -85,7 +108,6 @@ void Resource::loader_function() { } void Resource::executor_function() { - GetRegisterFunc(RegisterType::START_UP)->Exec(); if (subscriber_) { auto event = std::make_shared(shared_from_this()); subscriber_(std::static_pointer_cast(event)); @@ -93,9 +115,11 @@ void Resource::executor_function() { while (running_) { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); + exec_flag_ = false; auto task_item = pick_task_execute(); if (task_item) { Process(task_item->task); + task_item->state = TaskTableItemState::EXECUTED; if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index ab7aab7bacb58b16afad00d46b376329bdcb5ce1..769661c67b97a308070c5ae68ba462b02f10fe2a 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -76,19 +76,22 @@ public: public: /* - * wake up executor; + * wake up loader; */ void - WakeupExecutor(); + WakeupLoader(); - /* - * wake up loader; + /* + * wake up executor; */ void - WakeupLoader(); + WakeupExecutor(); protected: - Resource(std::string name, ResourceType type); + Resource(std::string name, + ResourceType type, + bool enable_loader = true, + bool enable_executor = true); // TODO: SearchContextPtr to TaskPtr /* @@ -138,7 +141,6 @@ private: void executor_function(); - private: std::string name_; ResourceType type_; @@ -149,8 +151,8 @@ private: std::function subscriber_ = nullptr; bool running_; - bool loader_running_ = false; - bool executor_running_ = false; + bool enable_loader_ = true; + bool enable_executor_ = true; std::thread loader_thread_; std::thread executor_thread_; diff --git a/cpp/unittest/scheduler/CMakeLists.txt b/cpp/unittest/scheduler/CMakeLists.txt index d47022d317dad186f64dd2605addb58bc476677d..24210cb84de38d86df4d39830d84e4438da302e3 100644 --- a/cpp/unittest/scheduler/CMakeLists.txt +++ b/cpp/unittest/scheduler/CMakeLists.txt @@ -14,6 +14,8 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files) aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs) +aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs) aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs) @@ -36,6 +38,8 @@ include_directories(/usr/include/mysql) set(scheduler_test_src ${unittest_srcs} ${test_srcs} + ${scheduler_action_srcs} + ${scheduler_event_srcs} ${scheduler_resource_srcs} ${scheduler_task_srcs} ${scheduler_srcs} diff --git a/cpp/unittest/scheduler/cost_test.cpp b/cpp/unittest/scheduler/cost_test.cpp index 27f1c08254515e52cf1c4f69f30ef68619b55f19..d4c05257d113b4fc27000c4e8873b0e071e8cc04 100644 --- a/cpp/unittest/scheduler/cost_test.cpp +++ b/cpp/unittest/scheduler/cost_test.cpp @@ -9,7 +9,7 @@ class CostTest : public ::testing::Test { protected: void SetUp() override { - for (uint64_t i = 0; i < 7; ++i) { + for (uint64_t i = 0; i < 8; ++i) { auto task = std::make_shared(); table_.Put(task); } diff --git a/cpp/unittest/scheduler/node_test.cpp b/cpp/unittest/scheduler/node_test.cpp index a2249e715b2b16f462cc69656b54037433d68356..f0621043db9e10d198a2503c56f67634cf28760d 100644 --- a/cpp/unittest/scheduler/node_test.cpp +++ b/cpp/unittest/scheduler/node_test.cpp @@ -10,6 +10,8 @@ protected: SetUp() override { node1_ = std::make_shared(); node2_ = std::make_shared(); + node3_ = std::make_shared(); + node4_ = std::make_shared(); auto pcie = Connection("PCIe", 11.0); diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index af5d1b91f8d170c03c9ac6d41c3f8b29573e2abf..1123a3fb7ecb9c4b30e2edff52f8b42b421fa779 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -1,13 +1,14 @@ #include "scheduler/ResourceFactory.h" #include "scheduler/ResourceMgr.h" #include "scheduler/Scheduler.h" +#include "scheduler/task/TestTask.h" +#include "utils/Log.h" #include using namespace zilliz::milvus::engine; -TEST(normal_test, DISABLED_test1) { - +TEST(normal_test, test1) { // ResourceMgr only compose resources, provide unified event auto res_mgr = std::make_shared(); auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd")); @@ -23,17 +24,35 @@ TEST(normal_test, DISABLED_test1) { res_mgr->Start(); - auto task1 = std::make_shared(); - auto task2 = std::make_shared(); + auto scheduler = new Scheduler(res_mgr); + scheduler->Start(); + + auto task1 = std::make_shared(); + auto task2 = std::make_shared(); + auto task3 = std::make_shared(); + auto task4 = std::make_shared(); if (auto observe = disk.lock()) { observe->task_table().Put(task1); observe->task_table().Put(task2); - observe->task_table().Put(task1); - observe->task_table().Put(task1); + observe->task_table().Put(task3); + observe->task_table().Put(task4); + std::cout << "disk:" << std::endl; + std::cout << observe->task_table().Dump() << std::endl; } - auto scheduler = new Scheduler(res_mgr); - scheduler->Start(); + sleep(5); + + if (auto observe = disk.lock()) { + std::cout << "disk:" << std::endl; + std::cout << observe->task_table().Dump() << std::endl; + } + if (auto observe = cpu.lock()) { + std::cout << "cpu:" << std::endl; + std::cout << observe->task_table().Dump() << std::endl; + } + scheduler->Stop(); + res_mgr->Stop(); - while (true) sleep(1); + ASSERT_EQ(task1->load_count_, 1); + ASSERT_EQ(task1->exec_count_, 1); } diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp index afb484b376775a1f94ca6f13e7900b67e9187247..0395856fea8e4cd68962fd7c48555a2921abc0fa 100644 --- a/cpp/unittest/scheduler/resource_test.cpp +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -27,15 +27,28 @@ protected: gpu_resource_ = ResourceFactory::Create("gpu"); flag_ = false; - auto subscriber = [&](EventPtr) { + auto subscriber = [&](EventPtr event) { std::unique_lock lock(mutex_); - flag_ = true; - cv_.notify_one(); + if (event->Type() == EventType::COPY_COMPLETED || event->Type() == EventType::FINISH_TASK) { + flag_ = true; + cv_.notify_one(); + } }; disk_resource_->RegisterSubscriber(subscriber); cpu_resource_->RegisterSubscriber(subscriber); gpu_resource_->RegisterSubscriber(subscriber); + + disk_resource_->Start(); + cpu_resource_->Start(); + gpu_resource_->Start(); + } + + void + TearDown() override { + disk_resource_->Stop(); + cpu_resource_->Stop(); + gpu_resource_->Stop(); } void diff --git a/cpp/unittest/scheduler/tasktable_test.cpp b/cpp/unittest/scheduler/tasktable_test.cpp index a9ad9ebca575ae6507393bd837b05b178ed48fe4..f48db3af2d347e4417de51991a4399cb3e5a2237 100644 --- a/cpp/unittest/scheduler/tasktable_test.cpp +++ b/cpp/unittest/scheduler/tasktable_test.cpp @@ -45,8 +45,6 @@ protected: invalid_task_ = nullptr; task1_ = std::make_shared(); task2_ = std::make_shared(); - - empty_table_ = TaskTable(); } TaskPtr invalid_task_;