diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 26b0226949ba6d29d6d2f65aebce4c15c2b657d4..189757cc14a61731a721755872d717628af59b84 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -10,6 +10,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-327 - Clean code for milvus ## New Feature +- MS-343 - Implement ResourceMgr ## Task - MS-297 - disable mysql unit test diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 63edcd3993f4ef0ae2514182d5876f28057b8532..ba4e3344808c2f9627392fc10c9972a4be12e523 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -16,6 +16,7 @@ aux_source_directory(db/insert db_insert_files) aux_source_directory(db/meta db_meta_files) aux_source_directory(metrics metrics_files) aux_source_directory(wrapper/knowhere knowhere_files) +aux_source_directory(scheduler new_scheduler_files) aux_source_directory(db/scheduler scheduler_files) aux_source_directory(db/scheduler/context scheduler_context_files) @@ -62,6 +63,7 @@ set(db_files ${db_insert_files} ${db_meta_files} ${db_scheduler_files} + ${new_scheduler_files} ${metrics_files} ${knowhere_files} ) diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d033bd9298c17d7c72797dceba10597f6a5ccfd5 --- /dev/null +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -0,0 +1,121 @@ + +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "ResourceMgr.h" +#include "db/Log.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +ResourceMgr::ResourceMgr() + : running_(false) { + +} + +ResourceWPtr +ResourceMgr::Add(ResourcePtr &&resource) { + ResourceWPtr ret(resource); + + std::lock_guard lck(resources_mutex_); + if(running_) { + ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource"; + return ret; + } + + resources_.emplace_back(resource); + + size_t index = resources_.size() - 1; + resource->RegisterOnStartUp([&] { + start_up_event_[index] = true; + event_cv_.notify_one(); + }); + resource->RegisterOnFinishTask([&] { + finish_task_event_[index] = true; + event_cv_.notify_one(); + }); + return ret; +} + +void +ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) { + if (auto observe_a = res1.lock()) { + if (auto observe_b = res2.lock()) { + observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); + } + } +} + +void +ResourceMgr::EventProcess() { + while (running_) { + std::unique_lock lock(resources_mutex_); + event_cv_.wait(lock, [this] { return !resources_.empty(); }); + + if(!running_) { + break; + } + + for (uint64_t i = 0; i < resources_.size(); ++i) { + ResourceWPtr res(resources_[i]); + if (start_up_event_[i]) { + on_start_up_(res); + start_up_event_[i] = false; + } + if (finish_task_event_[i]) { + on_finish_task_(res); + finish_task_event_[i] = false; + } + if (copy_completed_event_[i]) { + on_copy_completed_(res); + copy_completed_event_[i] = false; + } + if (task_table_updated_event_[i]) { + on_task_table_updated_(res); + task_table_updated_event_[i] = false; + } + } + } +} + +void +ResourceMgr::Start() { + std::lock_guard lck(resources_mutex_); + for (auto &resource : resources_) { + resource->Start(); + } + worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); + + running_ = true; +} + +void +ResourceMgr::Stop() { + std::lock_guard lck(resources_mutex_); + + running_ = false; + worker_thread_.join(); + + for (auto &resource : resources_) { + resource->Stop(); + } +} + +std::string +ResourceMgr::Dump() { + std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n"; + + for (uint64_t i = 0; i < resources_.size(); ++i) { + str += "Resource No." + std::to_string(i) + ":\n"; + str += resources_[i]->Dump(); + } + + return str; +} + +} +} +} diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index e1cc9110bf6f8c89b8c1ac41e9f6439e43d437b9..e7a765069553c7865ac385d4114562e2483aacb2 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -9,7 +9,10 @@ #include #include #include +#include +#include +#include "resource/Resource.h" namespace zilliz { namespace milvus { @@ -17,7 +20,7 @@ namespace engine { class ResourceMgr { public: - ResourceMgr() : running_(false) {} + ResourceMgr(); /******** Management Interface ********/ @@ -27,42 +30,24 @@ public: * Functions only modify bool variable, like event trigger; */ ResourceWPtr - Add(ResourcePtr &&resource) { - ResourceWPtr ret(resource); - resources_.emplace_back(resource); - -// resource->RegisterOnStartUp([] { -// start_up_event_[index] = true; -// }); -// resource.RegisterOnFinishTask([] { -// finish_task_event_[index] = true; -// }); - return ret; - } + Add(ResourcePtr &&resource); /* * Create connection between A and B; */ void - Connect(ResourceWPtr &A, ResourceWPtr &B, Connection &connection) { - if (auto observe_a = A.lock()) { - if (auto observe_b = B.lock()) { - observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); - } - } - } + Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection); /* * Synchronous start all resource; * Last, start event process thread; */ void - StartAll() { - for (auto &resource : resources_) { - resource->Start(); - } - worker_thread_ = std::thread(&ResourceMgr::EventProcess, this); - } + Start(); + + void + Stop(); + // TODO: add stats interface(low) @@ -89,13 +74,17 @@ public: * Register on copy task data completed event; */ void - RegisterOnCopyCompleted(std::function &func); + RegisterOnCopyCompleted(std::function &func) { + on_copy_completed_ = func; + } /* * Register on task table updated event; */ void - RegisterOnTaskTableUpdated(std::function &func); + RegisterOnTaskTableUpdated(std::function &func) { + on_task_table_updated_ = func; + } public: /******** Utlitity Functions ********/ @@ -105,23 +94,16 @@ public: private: void - EventProcess() { - while (running_) { - for (uint64_t i = 0; i < resources_.size(); ++i) { - if (start_up_event_[i]) { - on_start_up_(resources_[i]); - } - } - } - - } + EventProcess(); private: bool running_; std::vector resources_; + mutable std::mutex resources_mutex_; std::thread worker_thread_; + std::condition_variable event_cv_; std::vector start_up_event_; std::vector finish_task_event_; std::vector copy_completed_event_;