diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 6ea83b167afb4f3b75a6be2b4126916588caf679..78def9905bc1d01fbf51ee2793294207f6891288 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -38,6 +38,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-390 - Update resource construct function - MS-391 - Add PushTaskToNeighbourHasExecutor action - MS-394 - Update scheduler unittest +- MS-400 - Add timestamp record in task state change function ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 1c8e7607c36b148bc5d7d52884250b5f640a5904..44ebbfad44ae2679dabd074174426e2b21258057 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -93,6 +93,17 @@ ResourceMgr::Dump() { return str; } +std::string +ResourceMgr::DumpTaskTables() { + std::stringstream ss; + ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl; + for (auto &resource : resources_) { + ss << resource->Dump() << std::endl; + ss << resource->task_table().Dump() << std::endl; + } + return ss.str(); +} + void ResourceMgr::event_process() { while (running_) { diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 5d273a4f3881bfb11167babad8842c43cead9627..c3f900b2db25792c17ff61be20d6cef04933ef17 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -70,6 +70,9 @@ public: std::string Dump(); + std::string + DumpTaskTables(); + private: void event_process(); diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index 7ef033c500510a13667bfbbd0d8e8a7cfe3ae5ba..fb3c317747b87f971b7407efc8d957f1cc90cfe8 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -8,12 +8,88 @@ #include "event/TaskTableUpdatedEvent.h" #include #include +#include namespace zilliz { namespace milvus { namespace engine { +uint64_t +get_now_timestamp() { + std::chrono::time_point now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto millis = std::chrono::duration_cast(duration).count(); + return millis; +} + +bool +TaskTableItem::Load() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::START) { + state = TaskTableItemState::LOADING; + lock.unlock(); + timestamp.load = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Loaded() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::LOADING) { + state = TaskTableItemState::LOADED; + lock.unlock(); + timestamp.loaded = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Execute() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::LOADED) { + state = TaskTableItemState::EXECUTING; + lock.unlock(); + timestamp.execute = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Executed() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::EXECUTING) { + state = TaskTableItemState::EXECUTED; + lock.unlock(); + timestamp.executed = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Move() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::LOADED) { + state = TaskTableItemState::MOVING; + lock.unlock(); + timestamp.move = get_now_timestamp(); + return true; + } + return false; +} +bool +TaskTableItem::Moved() { + std::unique_lock lock(mutex); + if (state == TaskTableItemState::MOVING) { + state = TaskTableItemState::MOVED; + lock.unlock(); + timestamp.moved = get_now_timestamp(); + return true; + } + return false; +} + void TaskTable::Put(TaskPtr task) { @@ -59,78 +135,6 @@ TaskTable::Clear() { // table_.erase(table_.begin(), iterator); } -bool -TaskTable::Move(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::LOADED) { - task->state = TaskTableItemState::MOVING; - return true; - } - return false; -} - -bool -TaskTable::Moved(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::MOVING) { - task->state = TaskTableItemState::MOVED; - return true; - } - return false; -} - -bool -TaskTable::Load(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::START) { - task->state = TaskTableItemState::LOADING; - return true; - } - return false; -} - -bool -TaskTable::Loaded(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::LOADING) { - task->state = TaskTableItemState::LOADED; - return true; - } - return false; -} - -bool -TaskTable::Execute(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::LOADED) { - task->state = TaskTableItemState::EXECUTING; - return true; - } - return false; -} - -bool -TaskTable::Executed(uint64_t index) { - auto &task = table_[index]; - - std::lock_guard lock(task->mutex); - if (task->state == TaskTableItemState::EXECUTING) { - task->state = TaskTableItemState::EXECUTED; - return true; - } - return false; -} - std::string ToString(TaskTableItemState state) { switch (state) { @@ -146,12 +150,27 @@ ToString(TaskTableItemState state) { } } +std::string +ToString(const TaskTimestamp ×tamp) { + std::stringstream ss; + ss << ""; + return ss.str(); +} + std::string TaskTable::Dump() { std::stringstream ss; for (auto &item : table_) { - ss << "<" << item->id; - ss << ", " << ToString(item->state); + ss << "id; + ss << ", state=" << ToString(item->state); + ss << ", timestamp=" << ToString(item->timestamp); ss << ">" << std::endl; } return ss.str(); diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 8a482d0579a386f2bbbc59ca59c7df3e581938e8..45cb9a815e05bd9ed5df0730f3121bbd28851232 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -28,6 +28,16 @@ enum class TaskTableItemState { MOVED, // moved, termination state }; +struct TaskTimestamp { + uint64_t start = 0; + uint64_t move = 0; + uint64_t moved = 0; + uint64_t load = 0; + uint64_t loaded = 0; + uint64_t execute = 0; + uint64_t executed = 0; +}; + struct TaskTableItem { TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {} @@ -39,8 +49,27 @@ struct TaskTableItem { TaskPtr task; // the task; TaskTableItemState state; // the state; std::mutex mutex; + TaskTimestamp timestamp; uint8_t priority; // just a number, meaningless; + + bool + Load(); + + bool + Loaded(); + + bool + Execute(); + + bool + Executed(); + + bool + Move(); + + bool + Moved(); }; using TaskTableItemPtr = std::shared_ptr; @@ -111,55 +140,68 @@ public: public: /******** Action ********/ - /* - * Move a task; - * Set state moving; - * Called by scheduler; - */ // TODO: bool to Status - bool - Move(uint64_t index); - - /* - * Move task finished; - * Set state moved; - * Called by scheduler; - */ - bool - Moved(uint64_t index); - /* * Load a task; * Set state loading; * Called by loader; */ - bool - Load(uint64_t index); + inline bool + Load(uint64_t index) { + return table_[index]->Load(); + } /* * Load task finished; * Set state loaded; * Called by loader; */ - bool - Loaded(uint64_t index); + inline bool + Loaded(uint64_t index) { + return table_[index]->Loaded(); + } /* * Execute a task; * Set state executing; * Called by executor; */ - bool - Execute(uint64_t index); + inline bool + Execute(uint64_t index) { + return table_[index]->Execute(); + } /* * Execute task finished; * Set state executed; * Called by executor; */ - bool - Executed(uint64_t index); + inline bool + Executed(uint64_t index){ + return table_[index]->Executed(); + } + + /* + * Move a task; + * Set state moving; + * Called by scheduler; + */ + + inline bool + Move(uint64_t index){ + return table_[index]->Move(); + } + + /* + * Move task finished; + * Set state moved; + * Called by scheduler; + */ + inline bool + Moved(uint64_t index){ + return table_[index]->Moved(); + } public: /* diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 0bb886fdf04cf5cd2b12fcb81452c33d5fc04779..77bcb28f3576269708430441ea91fe1495f011bc 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -112,7 +112,7 @@ void Resource::loader_function() { } LoadFile(task_item->task); // TODO: wrapper loaded - task_item->state = TaskTableItemState::LOADED; + task_item->Loaded(); if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); @@ -138,7 +138,7 @@ void Resource::executor_function() { break; } Process(task_item->task); - task_item->state = TaskTableItemState::EXECUTED; + task_item->Executed(); if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index 98cbdb219b3343657e04806f4e985d994d90b4cb..ea482bcb7210a4c9ffdcfb3c1d21678bdb085b93 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -113,7 +113,6 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { if (type == LoadType::DISK2CPU) { index_engine_->Load(); } else if (type == LoadType::CPU2GPU) { - index_engine_->Load(); index_engine_->CopyToGpu(device_id); } else if (type == LoadType::GPU2CPU) { index_engine_->CopyToCpu();