From 4f5466dbe8be83a4a0fb2af60a0badb481b74f3c Mon Sep 17 00:00:00 2001 From: wxyu Date: Thu, 22 Aug 2019 11:56:45 +0800 Subject: [PATCH] MS-400 Add timestamp record in task state change function Former-commit-id: 062045c8e5cbd0b0c71a05e0f5650ea943e4b3fa --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/ResourceMgr.cpp | 11 ++ cpp/src/scheduler/ResourceMgr.h | 3 + cpp/src/scheduler/TaskTable.cpp | 167 +++++++++++++----------- cpp/src/scheduler/TaskTable.h | 90 +++++++++---- cpp/src/scheduler/resource/Resource.cpp | 4 +- cpp/src/scheduler/task/SearchTask.cpp | 1 - 7 files changed, 176 insertions(+), 101 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 4e671387..9291e5a1 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -38,6 +38,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-390 - Update resource construct function - MS-391 - Add PushTaskToNeighbourHasExecutor action - MS-394 - Update scheduler unittest +- MS-400 - Add timestamp record in task state change function ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 1c8e7607..44ebbfad 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -93,6 +93,17 @@ ResourceMgr::Dump() { return str; } +std::string +ResourceMgr::DumpTaskTables() { + std::stringstream ss; + ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl; + for (auto &resource : resources_) { + ss << resource->Dump() << std::endl; + ss << resource->task_table().Dump() << std::endl; + } + return ss.str(); +} + void ResourceMgr::event_process() { while (running_) { diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 5d273a4f..c3f900b2 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -70,6 +70,9 @@ public: std::string Dump(); + std::string + DumpTaskTables(); + private: void event_process(); diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 7ef033c5..fb3c3177 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -8,12 +8,88 @@ #include "event/TaskTableUpdatedEvent.h" #include #include +#include namespace zilliz { namespace milvus { namespace engine { +uint64_t +get_now_timestamp() { + std::chrono::time_point now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto millis = std::chrono::duration_cast(duration).count(); + return millis; +} + +bool +TaskTableItem::Load() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::START) { + state = TaskTableItemState::LOADING; + lock.unlock(); + timestamp.load = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Loaded() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::LOADING) { + state = TaskTableItemState::LOADED; + lock.unlock(); + timestamp.loaded = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Execute() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::LOADED) { + state = TaskTableItemState::EXECUTING; + lock.unlock(); + timestamp.execute = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Executed() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::EXECUTING) { + state = TaskTableItemState::EXECUTED; + lock.unlock(); + timestamp.executed = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Move() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::LOADED) { + state = TaskTableItemState::MOVING; + lock.unlock(); + timestamp.move = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Moved() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::MOVING) { + state = TaskTableItemState::MOVED; + lock.unlock(); + timestamp.moved = get_now_timestamp(); + return true; + } + return false; +} + void TaskTable::Put(TaskPtr task) { @@ -59,78 +135,6 @@ TaskTable::Clear() { // table_.erase(table_.begin(), iterator); } -bool -TaskTable::Move(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::LOADED) { - task->state = TaskTableItemState::MOVING; - return true; - } - return false; -} - -bool -TaskTable::Moved(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::MOVING) { - task->state = TaskTableItemState::MOVED; - return true; - } - return false; -} - -bool -TaskTable::Load(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::START) { - task->state = TaskTableItemState::LOADING; - return true; - } - return false; -} - -bool -TaskTable::Loaded(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::LOADING) { - task->state = TaskTableItemState::LOADED; - return true; - } - return false; -} - -bool -TaskTable::Execute(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::LOADED) { - task->state = TaskTableItemState::EXECUTING; - return true; - } - return false; -} - -bool -TaskTable::Executed(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::EXECUTING) { - task->state = TaskTableItemState::EXECUTED; - return true; - } - return false; -} - std::string ToString(TaskTableItemState state) { switch (state) { @@ -146,12 +150,27 @@ ToString(TaskTableItemState state) { } } +std::string +ToString(const TaskTimestamp ×tamp) { + std::stringstream ss; + ss << ""; + return ss.str(); +} + std::string TaskTable::Dump() { std::stringstream ss; for (auto &item : table_) { - ss << "<" << item->id; - ss << ", " << ToString(item->state); + ss << "id; + ss << ", state=" << ToString(item->state); + ss << ", timestamp=" << ToString(item->timestamp); ss << ">" << std::endl; } return ss.str(); diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 8a482d05..45cb9a81 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -28,6 +28,16 @@ enum class TaskTableItemState { MOVED, // moved, termination state }; +struct TaskTimestamp { + uint64_t start = 0; + uint64_t move = 0; + uint64_t moved = 0; + uint64_t load = 0; + uint64_t loaded = 0; + uint64_t execute = 0; + uint64_t executed = 0; +}; + struct TaskTableItem { TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {} @@ -39,8 +49,27 @@ struct TaskTableItem { TaskPtr task; // the task; TaskTableItemState state; // the state; std::mutex mutex; + TaskTimestamp timestamp; uint8_t priority; // just a number, meaningless; + + bool + Load(); + + bool + Loaded(); + + bool + Execute(); + + bool + Executed(); + + bool + Move(); + + bool + Moved(); }; using TaskTableItemPtr = std::shared_ptr; @@ -111,55 +140,68 @@ public: public: /******** Action ********/ - /* - * Move a task; - * Set state moving; - * Called by scheduler; - */ // TODO: bool to Status - bool - Move(uint64_t index); - - /* - * Move task finished; - * Set state moved; - * Called by scheduler; - */ - bool - Moved(uint64_t index); - /* * Load a task; * Set state loading; * Called by loader; */ - bool - Load(uint64_t index); + inline bool + Load(uint64_t index) { + return table_[index]->Load(); + } /* * Load task finished; * Set state loaded; * Called by loader; */ - bool - Loaded(uint64_t index); + inline bool + Loaded(uint64_t index) { + return table_[index]->Loaded(); + } /* * Execute a task; * Set state executing; * Called by executor; */ - bool - Execute(uint64_t index); + inline bool + Execute(uint64_t index) { + return table_[index]->Execute(); + } /* * Execute task finished; * Set state executed; * Called by executor; */ - bool - Executed(uint64_t index); + inline bool + Executed(uint64_t index){ + return table_[index]->Executed(); + } + + /* + * Move a task; + * Set state moving; + * Called by scheduler; + */ + + inline bool + Move(uint64_t index){ + return table_[index]->Move(); + } + + /* + * Move task finished; + * Set state moved; + * Called by scheduler; + */ + inline bool + Moved(uint64_t index){ + return table_[index]->Moved(); + } public: /* diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 0bb886fd..77bcb28f 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -112,7 +112,7 @@ void Resource::loader_function() { } LoadFile(task_item->task); // TODO: wrapper loaded - task_item->state = TaskTableItemState::LOADED; + task_item->Loaded(); if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); @@ -138,7 +138,7 @@ void Resource::executor_function() { break; } Process(task_item->task); - task_item->state = TaskTableItemState::EXECUTED; + task_item->Executed(); if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index 98cbdb21..ea482bcb 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -113,7 +113,6 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { if (type == LoadType::DISK2CPU) { index_engine_->Load(); } else if (type == LoadType::CPU2GPU) { - index_engine_->Load(); index_engine_->CopyToGpu(device_id); } else if (type == LoadType::GPU2CPU) { index_engine_->CopyToCpu(); -- GitLab