提交 2cf61605 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-343 Implement ResourceMgr

See merge request megasearch/milvus!333

Former-commit-id: 16ed40c87e7519cbd3da2f94e1565f68ebd32bb6
......@@ -10,6 +10,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-327 - Clean code for milvus
## New Feature
- MS-343 - Implement ResourceMgr
## Task
- MS-297 - disable mysql unit test
......
......@@ -16,6 +16,7 @@ aux_source_directory(db/insert db_insert_files)
aux_source_directory(db/meta db_meta_files)
aux_source_directory(metrics metrics_files)
aux_source_directory(wrapper/knowhere knowhere_files)
aux_source_directory(scheduler new_scheduler_files)
aux_source_directory(db/scheduler scheduler_files)
aux_source_directory(db/scheduler/context scheduler_context_files)
......@@ -62,6 +63,7 @@ set(db_files
${db_insert_files}
${db_meta_files}
${db_scheduler_files}
${new_scheduler_files}
${metrics_files}
${knowhere_files}
)
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "ResourceMgr.h"
#include "db/Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
ResourceMgr::ResourceMgr()
: running_(false) {
}
ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource);
std::lock_guard<std::mutex> lck(resources_mutex_);
if(running_) {
ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
return ret;
}
resources_.emplace_back(resource);
size_t index = resources_.size() - 1;
resource->RegisterOnStartUp([&] {
start_up_event_[index] = true;
event_cv_.notify_one();
});
resource->RegisterOnFinishTask([&] {
finish_task_event_[index] = true;
event_cv_.notify_one();
});
return ret;
}
void
ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) {
if (auto observe_a = res1.lock()) {
if (auto observe_b = res2.lock()) {
observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
}
}
}
void
ResourceMgr::EventProcess() {
while (running_) {
std::unique_lock <std::mutex> lock(resources_mutex_);
event_cv_.wait(lock, [this] { return !resources_.empty(); });
if(!running_) {
break;
}
for (uint64_t i = 0; i < resources_.size(); ++i) {
ResourceWPtr res(resources_[i]);
if (start_up_event_[i]) {
on_start_up_(res);
start_up_event_[i] = false;
}
if (finish_task_event_[i]) {
on_finish_task_(res);
finish_task_event_[i] = false;
}
if (copy_completed_event_[i]) {
on_copy_completed_(res);
copy_completed_event_[i] = false;
}
if (task_table_updated_event_[i]) {
on_task_table_updated_(res);
task_table_updated_event_[i] = false;
}
}
}
}
void
ResourceMgr::Start() {
std::lock_guard<std::mutex> lck(resources_mutex_);
for (auto &resource : resources_) {
resource->Start();
}
worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);
running_ = true;
}
void
ResourceMgr::Stop() {
std::lock_guard<std::mutex> lck(resources_mutex_);
running_ = false;
worker_thread_.join();
for (auto &resource : resources_) {
resource->Stop();
}
}
std::string
ResourceMgr::Dump() {
std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";
for (uint64_t i = 0; i < resources_.size(); ++i) {
str += "Resource No." + std::to_string(i) + ":\n";
str += resources_[i]->Dump();
}
return str;
}
}
}
}
......@@ -9,7 +9,10 @@
#include <string>
#include <vector>
#include <memory>
#include <mutex>
#include <condition_variable>
#include "resource/Resource.h"
namespace zilliz {
namespace milvus {
......@@ -17,7 +20,7 @@ namespace engine {
class ResourceMgr {
public:
ResourceMgr() : running_(false) {}
ResourceMgr();
/******** Management Interface ********/
......@@ -27,42 +30,24 @@ public:
* Functions only modify bool variable, like event trigger;
*/
ResourceWPtr
Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource);
resources_.emplace_back(resource);
// resource->RegisterOnStartUp([] {
// start_up_event_[index] = true;
// });
// resource.RegisterOnFinishTask([] {
// finish_task_event_[index] = true;
// });
return ret;
}
Add(ResourcePtr &&resource);
/*
* Create connection between A and B;
*/
void
Connect(ResourceWPtr &A, ResourceWPtr &B, Connection &connection) {
if (auto observe_a = A.lock()) {
if (auto observe_b = B.lock()) {
observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
}
}
}
Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection);
/*
* Synchronous start all resource;
* Last, start event process thread;
*/
void
StartAll() {
for (auto &resource : resources_) {
resource->Start();
}
worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);
}
Start();
void
Stop();
// TODO: add stats interface(low)
......@@ -89,13 +74,17 @@ public:
* Register on copy task data completed event;
*/
void
RegisterOnCopyCompleted(std::function<void(ResourceWPtr)> &func);
RegisterOnCopyCompleted(std::function<void(ResourceWPtr)> &func) {
on_copy_completed_ = func;
}
/*
* Register on task table updated event;
*/
void
RegisterOnTaskTableUpdated(std::function<void(ResourceWPtr)> &func);
RegisterOnTaskTableUpdated(std::function<void(ResourceWPtr)> &func) {
on_task_table_updated_ = func;
}
public:
/******** Utlitity Functions ********/
......@@ -105,23 +94,16 @@ public:
private:
void
EventProcess() {
while (running_) {
for (uint64_t i = 0; i < resources_.size(); ++i) {
if (start_up_event_[i]) {
on_start_up_(resources_[i]);
}
}
}
}
EventProcess();
private:
bool running_;
std::vector<ResourcePtr> resources_;
mutable std::mutex resources_mutex_;
std::thread worker_thread_;
std::condition_variable event_cv_;
std::vector<bool> start_up_event_;
std::vector<bool> finish_task_event_;
std::vector<bool> copy_completed_event_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册