From dbae09935f2cd9cc5b65e835ab2c03cea6f0ef82 Mon Sep 17 00:00:00 2001 From: wxyu Date: Thu, 5 Sep 2019 19:58:54 +0800 Subject: [PATCH] MS-488 Improve code format in scheduler Former-commit-id: e9e4c2a0271b49cd16b25f0c4f712c9b54b10da6 --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/Algorithm.cpp | 22 +-- cpp/src/scheduler/CacheMgr.h | 24 --- cpp/src/scheduler/ResourceMgr.cpp | 171 ++++++++----------- cpp/src/scheduler/ResourceMgr.h | 95 +++++------ cpp/src/scheduler/SchedInst.cpp | 2 +- cpp/src/scheduler/Scheduler.cpp | 6 +- cpp/src/scheduler/TaskTable.cpp | 30 ++-- cpp/src/scheduler/TaskTable.h | 11 +- cpp/src/scheduler/Utils.cpp | 7 +- cpp/src/scheduler/Utils.h | 1 + cpp/src/scheduler/resource/Connection.h | 1 + cpp/src/scheduler/resource/RegisterHandler.h | 24 --- cpp/src/scheduler/resource/Resource.cpp | 39 +++-- cpp/src/scheduler/resource/Resource.h | 65 +++---- cpp/src/scheduler/task/DeleteTask.cpp | 6 - cpp/src/scheduler/task/DeleteTask.h | 3 - cpp/src/scheduler/task/SearchTask.cpp | 10 -- cpp/src/scheduler/task/SearchTask.h | 3 - cpp/src/scheduler/task/Task.h | 5 - cpp/src/scheduler/task/TaskConvert.cpp | 1 - cpp/src/scheduler/task/TestTask.cpp | 9 - cpp/src/scheduler/task/TestTask.h | 3 - cpp/src/scheduler/tasklabel/TaskLabel.h | 1 + 24 files changed, 193 insertions(+), 347 deletions(-) delete mode 100644 cpp/src/scheduler/CacheMgr.h delete mode 100644 cpp/src/scheduler/resource/RegisterHandler.h diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index c9247d97..81333e55 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -83,6 +83,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-455 - Distribute tasks by minimal cost in scheduler - MS-460 - Put transport speed as weight when choosing neighbour to execute task - MS-459 - Add cache for pick function in tasktable +- MS-488 - Improve code format in scheduler ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp index b861151d..e1e8a1e7 100644 --- a/cpp/src/scheduler/Algorithm.cpp +++ b/cpp/src/scheduler/Algorithm.cpp @@ -20,12 +20,12 @@ ShortestPath(const ResourcePtr &src, std::vector> paths; - uint64_t num_of_resources = res_mgr->GetAllResouces().size(); + uint64_t num_of_resources = res_mgr->GetAllResources().size(); std::unordered_map id_name_map; std::unordered_map name_id_map; for (uint64_t i = 0; i < num_of_resources; ++i) { - id_name_map.insert(std::make_pair(i, res_mgr->GetAllResouces().at(i)->Name())); - name_id_map.insert(std::make_pair(res_mgr->GetAllResouces().at(i)->Name(), i)); + id_name_map.insert(std::make_pair(i, res_mgr->GetAllResources().at(i)->name())); + name_id_map.insert(std::make_pair(res_mgr->GetAllResources().at(i)->name(), i)); } std::vector > dis_matrix; @@ -40,23 +40,23 @@ ShortestPath(const ResourcePtr &src, std::vector vis(num_of_resources, false); std::vector dis(num_of_resources, MAXINT); - for (auto &res : res_mgr->GetAllResouces()) { + for (auto &res : res_mgr->GetAllResources()) { auto cur_node = std::static_pointer_cast(res); auto cur_neighbours = cur_node->GetNeighbours(); for (auto &neighbour : cur_neighbours) { auto neighbour_res = std::static_pointer_cast(neighbour.neighbour_node.lock()); - dis_matrix[name_id_map.at(res->Name())][name_id_map.at(neighbour_res->Name())] = + dis_matrix[name_id_map.at(res->name())][name_id_map.at(neighbour_res->name())] = neighbour.connection.transport_cost(); } } for (uint64_t i = 0; i < num_of_resources; ++i) { - dis[i] = dis_matrix[name_id_map.at(src->Name())][i]; + dis[i] = dis_matrix[name_id_map.at(src->name())][i]; } - vis[name_id_map.at(src->Name())] = true; + vis[name_id_map.at(src->name())] = true; std::vector parent(num_of_resources, -1); for (uint64_t i = 0; i < num_of_resources; ++i) { @@ -71,7 +71,7 @@ ShortestPath(const ResourcePtr &src, vis[temp] = true; if (i == 0) { - parent[temp] = name_id_map.at(src->Name()); + parent[temp] = name_id_map.at(src->name()); } for (uint64_t j = 0; j < num_of_resources; ++j) { @@ -82,15 +82,15 @@ ShortestPath(const ResourcePtr &src, } } - int64_t parent_idx = parent[name_id_map.at(dest->Name())]; + int64_t parent_idx = parent[name_id_map.at(dest->name())]; if (parent_idx != -1) { - path.push_back(dest->Name()); + path.push_back(dest->name()); } while (parent_idx != -1) { path.push_back(id_name_map.at(parent_idx)); parent_idx = parent[parent_idx]; } - return dis[name_id_map.at(dest->Name())]; + return dis[name_id_map.at(dest->name())]; } } diff --git a/cpp/src/scheduler/CacheMgr.h b/cpp/src/scheduler/CacheMgr.h deleted file mode 100644 index 321cbbb8..00000000 --- a/cpp/src/scheduler/CacheMgr.h +++ /dev/null @@ -1,24 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ -#pragma once - -#include - - -namespace zilliz { -namespace milvus { -namespace engine { - -// dummy cache_mgr -class CacheMgr { - -}; - -using CacheMgrPtr = std::shared_ptr; - -} -} -} diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 65373164..f87c5ea0 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -12,67 +12,31 @@ namespace zilliz { namespace milvus { namespace engine { -ResourceMgr::ResourceMgr() - : running_(false) { -} - -uint64_t -ResourceMgr::GetNumOfComputeResource() { - uint64_t count = 0; - for (auto &res : resources_) { - if (res->HasExecutor()) { - ++count; - } - } - return count; -} - -std::vector -ResourceMgr::GetComputeResource() { - std::vector result; +void +ResourceMgr::Start() { + std::lock_guard lck(resources_mutex_); for (auto &resource : resources_) { - if (resource->HasExecutor()) { - result.emplace_back(resource); - } - } - return result; -} - -uint64_t -ResourceMgr::GetNumGpuResource() const { - uint64_t num = 0; - for (auto &res : resources_) { - if (res->Type() == ResourceType::GPU) { - num++; - } + resource->Start(); } - return num; + running_ = true; + worker_thread_ = std::thread(&ResourceMgr::event_process, this); } -ResourcePtr -ResourceMgr::GetResource(ResourceType type, uint64_t device_id) { - for (auto &resource : resources_) { - if (resource->Type() == type && resource->DeviceId() == device_id) { - return resource; - } +void +ResourceMgr::Stop() { + { + std::lock_guard lock(event_mutex_); + running_ = false; + queue_.push(nullptr); + event_cv_.notify_one(); } - return nullptr; -} + worker_thread_.join(); -ResourcePtr -ResourceMgr::GetResourceByName(std::string name) { + std::lock_guard lck(resources_mutex_); for (auto &resource : resources_) { - if (resource->Name() == name) { - return resource; - } + resource->Stop(); } - return nullptr; -} - -std::vector -ResourceMgr::GetAllResouces() { - return resources_; } ResourceWPtr @@ -85,75 +49,85 @@ ResourceMgr::Add(ResourcePtr &&resource) { return ret; } - if (resource->Type() == ResourceType::DISK) { + resource->RegisterSubscriber(std::bind(&ResourceMgr::post_event, this, std::placeholders::_1)); + + if (resource->type() == ResourceType::DISK) { disk_resources_.emplace_back(ResourceWPtr(resource)); } resources_.emplace_back(resource); - size_t index = resources_.size() - 1; - resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1)); return ret; } void ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connection &connection) { - auto res1 = get_resource_by_name(name1); - auto res2 = get_resource_by_name(name2); + auto res1 = GetResource(name1); + auto res2 = GetResource(name2); if (res1 && res2) { res1->AddNeighbour(std::static_pointer_cast(res2), connection); + // TODO: enable when task balance supported // res2->AddNeighbour(std::static_pointer_cast(res1), connection); } } void -ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection) { - if (auto observe_a = res1.lock()) { - if (auto observe_b = res2.lock()) { - observe_a->AddNeighbour(std::static_pointer_cast(observe_b), connection); - observe_b->AddNeighbour(std::static_pointer_cast(observe_a), connection); - } - } +ResourceMgr::Clear() { + std::lock_guard lck(resources_mutex_); + disk_resources_.clear(); + resources_.clear(); } - -void -ResourceMgr::Start() { - std::lock_guard lck(resources_mutex_); +std::vector +ResourceMgr::GetComputeResource() { + std::vector result; for (auto &resource : resources_) { - resource->Start(); + if (resource->HasExecutor()) { + result.emplace_back(resource); + } } - running_ = true; - worker_thread_ = std::thread(&ResourceMgr::event_process, this); + return result; } -void -ResourceMgr::Stop() { - { - std::lock_guard lock(event_mutex_); - running_ = false; - queue_.push(nullptr); - event_cv_.notify_one(); +ResourcePtr +ResourceMgr::GetResource(ResourceType type, uint64_t device_id) { + for (auto &resource : resources_) { + if (resource->type() == type && resource->device_id() == device_id) { + return resource; + } } - worker_thread_.join(); + return nullptr; +} - std::lock_guard lck(resources_mutex_); +ResourcePtr +ResourceMgr::GetResource(const std::string &name) { for (auto &resource : resources_) { - resource->Stop(); + if (resource->name() == name) { + return resource; + } } + return nullptr; } -void -ResourceMgr::Clear() { - std::lock_guard lck(resources_mutex_); - disk_resources_.clear(); - resources_.clear(); +uint64_t +ResourceMgr::GetNumOfComputeResource() { + uint64_t count = 0; + for (auto &res : resources_) { + if (res->HasExecutor()) { + ++count; + } + } + return count; } -void -ResourceMgr::PostEvent(const EventPtr &event) { - std::lock_guard lock(event_mutex_); - queue_.emplace(event); - event_cv_.notify_one(); +uint64_t +ResourceMgr::GetNumGpuResource() const { + uint64_t num = 0; + for (auto &res : resources_) { + if (res->type() == ResourceType::GPU) { + num++; + } + } + return num; } std::string @@ -180,14 +154,13 @@ ResourceMgr::DumpTaskTables() { return ss.str(); } -ResourcePtr -ResourceMgr::get_resource_by_name(const std::string &name) { - for (auto &res : resources_) { - if (res->Name() == name) { - return res; - } +void +ResourceMgr::post_event(const EventPtr &event) { + { + std::lock_guard lock(event_mutex_); + queue_.emplace(event); } - return nullptr; + event_cv_.notify_one(); } void @@ -203,8 +176,6 @@ ResourceMgr::event_process() { break; } -// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event; - if (subscriber_) { subscriber_(event); } diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index da8f34f8..08a99eaa 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -22,78 +22,63 @@ namespace engine { class ResourceMgr { public: - ResourceMgr(); + ResourceMgr() = default; +public: /******** Management Interface ********/ + void + Start(); + + void + Stop(); + + ResourceWPtr + Add(ResourcePtr &&resource); + + void + Connect(const std::string &res1, const std::string &res2, Connection &connection); + + void + Clear(); + inline void RegisterSubscriber(std::function subscriber) { subscriber_ = std::move(subscriber); } - std::vector & +public: + /******** Management Interface ********/ + inline std::vector & GetDiskResources() { return disk_resources_; } - uint64_t - GetNumGpuResource() const; + // TODO: why return shared pointer + inline std::vector + GetAllResources() { + return resources_; + } + + std::vector + GetComputeResource(); ResourcePtr GetResource(ResourceType type, uint64_t device_id); ResourcePtr - GetResourceByName(std::string name); + GetResource(const std::string &name); - std::vector - GetAllResouces(); - - /* - * Return account of resource which enable executor; - */ uint64_t GetNumOfComputeResource(); - std::vector - GetComputeResource(); - - /* - * Add resource into Resource Management; - * Generate functions on events; - * Functions only modify bool variable, like event trigger; - */ - ResourceWPtr - Add(ResourcePtr &&resource); - - void - Connect(const std::string &res1, const std::string &res2, Connection &connection); - - /* - * Create connection between A and B; - */ - void - Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connection); - - /* - * Synchronous start all resource; - * Last, start event process thread; - */ - void - Start(); - - void - Stop(); - - void - Clear(); - - void - PostEvent(const EventPtr &event); + uint64_t + GetNumGpuResource() const; +public: // TODO: add stats interface(low) public: - /******** Utlitity Functions ********/ - + /******** Utility Functions ********/ std::string Dump(); @@ -101,26 +86,26 @@ public: DumpTaskTables(); private: - ResourcePtr - get_resource_by_name(const std::string &name); + void + post_event(const EventPtr &event); void event_process(); private: - std::queue queue_; - std::function subscriber_ = nullptr; - - bool running_; + bool running_ = false; std::vector disk_resources_; std::vector resources_; mutable std::mutex resources_mutex_; - std::thread worker_thread_; + std::queue queue_; + std::function subscriber_ = nullptr; std::mutex event_mutex_; std::condition_variable event_cv_; + std::thread worker_thread_; + }; using ResourceMgrPtr = std::shared_ptr; diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index e805c535..01fbe843 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -38,7 +38,7 @@ StartSchedulerService() { enable_loader, enable_executor)); - if (res.lock()->Type() == ResourceType::GPU) { + if (res.lock()->type() == ResourceType::GPU) { auto pinned_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_PIN_MEMORY, 300); auto temp_memory = resconf.GetInt64Value(server::CONFIG_RESOURCE_TEMP_MEMORY, 300); auto resource_num = resconf.GetInt64Value(server::CONFIG_RESOURCE_NUM, 2); diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index fa67eef4..dcd17e31 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -143,7 +143,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { auto task = load_completed_event->task_table_item_->task; // if this resource is disk, assign it to smallest cost resource - if (self->Type() == ResourceType::DISK) { + if (self->type() == ResourceType::DISK) { // step 1: calculate shortest path per resource, from disk to compute resource auto compute_resources = res_mgr_.lock()->GetComputeResource(); std::vector> paths; @@ -176,11 +176,11 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { task->path() = task_path; } - if(self->Name() == task->path().Last()) { + if(self->name() == task->path().Last()) { self->WakeupLoader(); } else { auto next_res_name = task->path().Next(); - auto next_res = res_mgr_.lock()->GetResourceByName(next_res_name); + auto next_res = res_mgr_.lock()->GetResource(next_res_name); load_completed_event->task_table_item_->Move(); next_res->task_table().Put(task); } diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 56d31e29..086bf068 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -6,6 +6,8 @@ #include "TaskTable.h" #include "event/TaskTableUpdatedEvent.h" +#include "Utils.h" + #include #include #include @@ -15,14 +17,6 @@ namespace zilliz { namespace milvus { namespace engine { -uint64_t -get_now_timestamp() { - std::chrono::time_point now = std::chrono::system_clock::now(); - auto duration = now.time_since_epoch(); - auto millis = std::chrono::duration_cast(duration).count(); - return millis; -} - std::string ToString(TaskTableItemState state) { switch (state) { @@ -64,7 +58,7 @@ TaskTableItem::Load() { if (state == TaskTableItemState::START) { state = TaskTableItemState::LOADING; lock.unlock(); - timestamp.load = get_now_timestamp(); + timestamp.load = get_current_timestamp(); return true; } return false; @@ -75,7 +69,7 @@ TaskTableItem::Loaded() { if (state == TaskTableItemState::LOADING) { state = TaskTableItemState::LOADED; lock.unlock(); - timestamp.loaded = get_now_timestamp(); + timestamp.loaded = get_current_timestamp(); return true; } return false; @@ -86,7 +80,7 @@ TaskTableItem::Execute() { if (state == TaskTableItemState::LOADED) { state = TaskTableItemState::EXECUTING; lock.unlock(); - timestamp.execute = get_now_timestamp(); + timestamp.execute = get_current_timestamp(); return true; } return false; @@ -97,8 +91,8 @@ TaskTableItem::Executed() { if (state == TaskTableItemState::EXECUTING) { state = TaskTableItemState::EXECUTED; lock.unlock(); - timestamp.executed = get_now_timestamp(); - timestamp.finish = get_now_timestamp(); + timestamp.executed = get_current_timestamp(); + timestamp.finish = get_current_timestamp(); return true; } return false; @@ -109,7 +103,7 @@ TaskTableItem::Move() { if (state == TaskTableItemState::LOADED) { state = TaskTableItemState::MOVING; lock.unlock(); - timestamp.move = get_now_timestamp(); + timestamp.move = get_current_timestamp(); return true; } return false; @@ -120,8 +114,8 @@ TaskTableItem::Moved() { if (state == TaskTableItemState::MOVING) { state = TaskTableItemState::MOVED; lock.unlock(); - timestamp.moved = get_now_timestamp(); - timestamp.finish = get_now_timestamp(); + timestamp.moved = get_current_timestamp(); + timestamp.finish = get_current_timestamp(); return true; } return false; @@ -177,7 +171,7 @@ TaskTable::Put(TaskPtr task) { item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; - item->timestamp.start = get_now_timestamp(); + item->timestamp.start = get_current_timestamp(); table_.push_back(item); if (subscriber_) { subscriber_(); @@ -192,7 +186,7 @@ TaskTable::Put(std::vector &tasks) { item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; - item->timestamp.start = get_now_timestamp(); + item->timestamp.start = get_current_timestamp(); table_.push_back(item); } if (subscriber_) { diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index ee6d3b56..f5c151f4 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -40,20 +40,17 @@ struct TaskTimestamp { }; struct TaskTableItem { - TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {} + TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex() {} TaskTableItem(const TaskTableItem &src) - : id(src.id), state(src.state), mutex(), priority(src.priority) {} + : id(src.id), state(src.state), mutex() {} uint64_t id; // auto increment from 0; - // TODO: add tag into task TaskPtr task; // the task; TaskTableItemState state; // the state; std::mutex mutex; TaskTimestamp timestamp; - uint8_t priority; // just a number, meaningless; - bool IsFinish(); @@ -113,7 +110,7 @@ public: Get(uint64_t index); /* - * TODO + * TODO(wxyu): BIG GC * Remove sequence task which is DONE or MOVED from front; * Called by ? */ @@ -135,6 +132,7 @@ public: Size() { return table_.size(); } + public: TaskTableItemPtr & operator[](uint64_t index) { @@ -225,7 +223,6 @@ public: Dump(); private: - // TODO: map better ? std::uint64_t id_ = 0; mutable std::mutex id_mutex_; std::deque table_; diff --git a/cpp/src/scheduler/Utils.cpp b/cpp/src/scheduler/Utils.cpp index 074c035e..597f02b5 100644 --- a/cpp/src/scheduler/Utils.cpp +++ b/cpp/src/scheduler/Utils.cpp @@ -4,16 +4,17 @@ * Proprietary and confidential. ******************************************************************************/ -#include #include "Utils.h" +#include + + namespace zilliz { namespace milvus { namespace engine { uint64_t -get_current_timestamp() -{ +get_current_timestamp() { std::chrono::time_point now = std::chrono::system_clock::now(); auto duration = now.time_since_epoch(); auto millis = std::chrono::duration_cast(duration).count(); diff --git a/cpp/src/scheduler/Utils.h b/cpp/src/scheduler/Utils.h index 7a5bf187..7f012bec 100644 --- a/cpp/src/scheduler/Utils.h +++ b/cpp/src/scheduler/Utils.h @@ -3,6 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ + #include diff --git a/cpp/src/scheduler/resource/Connection.h b/cpp/src/scheduler/resource/Connection.h index 83c9cc52..922cf2b4 100644 --- a/cpp/src/scheduler/resource/Connection.h +++ b/cpp/src/scheduler/resource/Connection.h @@ -15,6 +15,7 @@ namespace engine { class Connection { public: + // TODO: update construct function, speed: double->uint64_t Connection(std::string name, double speed) : name_(std::move(name)), speed_(speed) {} diff --git a/cpp/src/scheduler/resource/RegisterHandler.h b/cpp/src/scheduler/resource/RegisterHandler.h deleted file mode 100644 index 02c55da1..00000000 --- a/cpp/src/scheduler/resource/RegisterHandler.h +++ /dev/null @@ -1,24 +0,0 @@ -/******************************************************************************* - * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved - * Unauthorized copying of this file, via any medium is strictly prohibited. - * Proprietary and confidential. - ******************************************************************************/ - -#pragma once - -#include - -namespace zilliz { -namespace milvus { -namespace engine { - -class RegisterHandler { - public: - virtual void Exec() = 0; -}; - -using RegisterHandlerPtr = std::shared_ptr; - -} -} -} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index b4a6cb5b..9bb9b572 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -12,7 +12,8 @@ namespace zilliz { namespace milvus { namespace engine { -std::ostream &operator<<(std::ostream &out, const Resource &resource) { +std::ostream & +operator<<(std::ostream &out, const Resource &resource) { out << resource.Dump(); return out; } @@ -25,11 +26,9 @@ Resource::Resource(std::string name, : name_(std::move(name)), type_(type), device_id_(device_id), - running_(false), enable_loader_(enable_loader), - enable_executor_(enable_executor), - load_flag_(false), - exec_flag_(false) { + enable_executor_(enable_executor) { + // register subscriber in tasktable task_table_.RegisterSubscriber([&] { if (subscriber_) { auto event = std::make_shared(shared_from_this()); @@ -38,7 +37,8 @@ Resource::Resource(std::string name, }); } -void Resource::Start() { +void +Resource::Start() { running_ = true; if (enable_loader_) { loader_thread_ = std::thread(&Resource::loader_function, this); @@ -48,7 +48,8 @@ void Resource::Start() { } } -void Resource::Stop() { +void +Resource::Stop() { running_ = false; if (enable_loader_) { WakeupLoader(); @@ -60,11 +61,8 @@ void Resource::Stop() { } } -TaskTable &Resource::task_table() { - return task_table_; -} - -void Resource::WakeupLoader() { +void +Resource::WakeupLoader() { { std::lock_guard lock(load_mutex_); load_flag_ = true; @@ -72,7 +70,8 @@ void Resource::WakeupLoader() { load_cv_.notify_one(); } -void Resource::WakeupExecutor() { +void +Resource::WakeupExecutor() { { std::lock_guard lock(exec_mutex_); exec_flag_ = true; @@ -80,6 +79,15 @@ void Resource::WakeupExecutor() { exec_cv_.notify_one(); } +uint64_t +Resource::NumOfTaskToExec() { + uint64_t count = 0; + for (auto &task : task_table_) { + if (task->state == TaskTableItemState::LOADED) ++count; + } + return count; +} + TaskTableItemPtr Resource::pick_task_load() { auto indexes = task_table_.PickToLoad(10); for (auto index : indexes) { @@ -156,11 +164,6 @@ void Resource::executor_function() { } } -RegisterHandlerPtr Resource::GetRegisterFunc(const RegisterType &type) { - // construct object each time. - return register_table_[type](); -} - } } } \ No newline at end of file diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 9169a67c..707a7156 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -21,7 +21,6 @@ #include "../task/Task.h" #include "Connection.h" #include "Node.h" -#include "RegisterHandler.h" namespace zilliz { @@ -35,13 +34,6 @@ enum class ResourceType { GPU = 2 }; -enum class RegisterType { - START_UP, - ON_FINISH_TASK, - ON_COPY_COMPLETED, - ON_TASK_TABLE_UPDATED, -}; - class Resource : public Node, public std::enable_shared_from_this { public: /* @@ -68,56 +60,51 @@ class Resource : public Node, public std::enable_shared_from_this { void WakeupExecutor(); - public: - template - void Register_T(const RegisterType &type) { - register_table_.emplace(type, [] { return std::make_shared(); }); - } - - RegisterHandlerPtr - GetRegisterFunc(const RegisterType &type); - inline void RegisterSubscriber(std::function subscriber) { subscriber_ = std::move(subscriber); } + inline virtual std::string + Dump() const { + return ""; + } + + public: inline std::string - Name() const { + name() const { return name_; } inline ResourceType - Type() const { + type() const { return type_; } inline uint64_t - DeviceId() { + device_id() const { return device_id_; } - // TODO: better name? + TaskTable & + task_table() { + return task_table_; + } + +public: inline bool - HasLoader() { + HasLoader() const { return enable_loader_; } - // TODO: better name? inline bool - HasExecutor() { + HasExecutor() const { return enable_executor_; } // TODO: const uint64_t - NumOfTaskToExec() { - uint64_t count = 0; - for (auto &task : task_table_) { - if (task->state == TaskTableItemState::LOADED) ++count; - } - return count; - } + NumOfTaskToExec(); // TODO: need double ? inline uint64_t @@ -130,14 +117,6 @@ class Resource : public Node, public std::enable_shared_from_this { return total_task_; } - TaskTable & - task_table(); - - inline virtual std::string - Dump() const { - return ""; - } - friend std::ostream &operator<<(std::ostream &out, const Resource &resource); protected: @@ -198,6 +177,7 @@ class Resource : public Node, public std::enable_shared_from_this { protected: uint64_t device_id_; std::string name_; + private: ResourceType type_; @@ -206,17 +186,16 @@ class Resource : public Node, public std::enable_shared_from_this { uint64_t total_cost_ = 0; uint64_t total_task_ = 0; - std::map> register_table_; std::function subscriber_ = nullptr; - bool running_; + bool running_ = false; bool enable_loader_ = true; bool enable_executor_ = true; std::thread loader_thread_; std::thread executor_thread_; - bool load_flag_; - bool exec_flag_; + bool load_flag_ = false; + bool exec_flag_ = false; std::mutex load_mutex_; std::mutex exec_mutex_; std::condition_variable load_cv_; diff --git a/cpp/src/scheduler/task/DeleteTask.cpp b/cpp/src/scheduler/task/DeleteTask.cpp index 0e9c7dc3..69b5ed7f 100644 --- a/cpp/src/scheduler/task/DeleteTask.cpp +++ b/cpp/src/scheduler/task/DeleteTask.cpp @@ -24,12 +24,6 @@ XDeleteTask::Execute() { delete_context_ptr_->ResourceDone(); } -TaskPtr -XDeleteTask::Clone() { - auto task = std::make_shared(delete_context_ptr_); - return task; -} - } } } diff --git a/cpp/src/scheduler/task/DeleteTask.h b/cpp/src/scheduler/task/DeleteTask.h index 2b266243..4f990908 100644 --- a/cpp/src/scheduler/task/DeleteTask.h +++ b/cpp/src/scheduler/task/DeleteTask.h @@ -24,9 +24,6 @@ public: void Execute() override; - TaskPtr - Clone() override; - public: DeleteContextPtr delete_context_ptr_; }; diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index 3ddc2beb..329cb660 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -193,16 +193,6 @@ XSearchTask::Execute() { index_engine_ = nullptr; } -TaskPtr -XSearchTask::Clone() { - auto ret = std::make_shared(file_); - ret->index_id_ = index_id_; - ret->index_engine_ = index_engine_->Clone(); - ret->search_contexts_ = search_contexts_; - ret->metric_l2 = metric_l2; - return ret; -} - Status XSearchTask::ClusterResult(const std::vector &output_ids, const std::vector &output_distence, uint64_t nq, diff --git a/cpp/src/scheduler/task/SearchTask.h b/cpp/src/scheduler/task/SearchTask.h index fb09b29d..b45eea48 100644 --- a/cpp/src/scheduler/task/SearchTask.h +++ b/cpp/src/scheduler/task/SearchTask.h @@ -23,9 +23,6 @@ public: void Execute() override; - TaskPtr - Clone() override; - public: static Status ClusterResult(const std::vector &output_ids, const std::vector &output_distence, diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index 7431679e..01b2c8eb 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -68,14 +68,9 @@ public: virtual void Execute() = 0; - // TODO: dont use this method to support task move - virtual TaskPtr - Clone() = 0; - public: Path task_path_; std::vector search_contexts_; - ScheduleTaskPtr task_; TaskType type_; TaskLabelPtr label_ = nullptr; }; diff --git a/cpp/src/scheduler/task/TaskConvert.cpp b/cpp/src/scheduler/task/TaskConvert.cpp index 30a3a38b..e55f84f5 100644 --- a/cpp/src/scheduler/task/TaskConvert.cpp +++ b/cpp/src/scheduler/task/TaskConvert.cpp @@ -21,7 +21,6 @@ TaskConvert(const ScheduleTaskPtr &schedule_task) { auto task = std::make_shared(load_task->file_); task->label() = std::make_shared(); task->search_contexts_ = load_task->search_contexts_; - task->task_ = schedule_task; return task; } case ScheduleTaskType::kDelete: { diff --git a/cpp/src/scheduler/task/TestTask.cpp b/cpp/src/scheduler/task/TestTask.cpp index 391fd06a..15f60baa 100644 --- a/cpp/src/scheduler/task/TestTask.cpp +++ b/cpp/src/scheduler/task/TestTask.cpp @@ -27,15 +27,6 @@ TestTask::Execute() { done_ = true; } -TaskPtr -TestTask::Clone() { - TableFileSchemaPtr dummy = nullptr; - auto ret = std::make_shared(dummy); - ret->load_count_ = load_count_; - ret->exec_count_ = exec_count_; - return ret; -} - void TestTask::Wait() { std::unique_lock lock(mutex_); diff --git a/cpp/src/scheduler/task/TestTask.h b/cpp/src/scheduler/task/TestTask.h index dce50e9f..6e969e90 100644 --- a/cpp/src/scheduler/task/TestTask.h +++ b/cpp/src/scheduler/task/TestTask.h @@ -23,9 +23,6 @@ public: void Execute() override; - TaskPtr - Clone() override; - void Wait(); diff --git a/cpp/src/scheduler/tasklabel/TaskLabel.h b/cpp/src/scheduler/tasklabel/TaskLabel.h index 84fd5ee7..51602487 100644 --- a/cpp/src/scheduler/tasklabel/TaskLabel.h +++ b/cpp/src/scheduler/tasklabel/TaskLabel.h @@ -25,6 +25,7 @@ public: } protected: + explicit TaskLabel(TaskLabelType type) : type_(type) {} private: -- GitLab