提交 4f5466db 编写于 作者: W wxyu

MS-400 Add timestamp record in task state change function


Former-commit-id: 062045c8e5cbd0b0c71a05e0f5650ea943e4b3fa
上级 598f9f30
...@@ -38,6 +38,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -38,6 +38,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-390 - Update resource construct function - MS-390 - Update resource construct function
- MS-391 - Add PushTaskToNeighbourHasExecutor action - MS-391 - Add PushTaskToNeighbourHasExecutor action
- MS-394 - Update scheduler unittest - MS-394 - Update scheduler unittest
- MS-400 - Add timestamp record in task state change function
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr
......
...@@ -93,6 +93,17 @@ ResourceMgr::Dump() { ...@@ -93,6 +93,17 @@ ResourceMgr::Dump() {
return str; return str;
} }
std::string
ResourceMgr::DumpTaskTables() {
std::stringstream ss;
ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
for (auto &resource : resources_) {
ss << resource->Dump() << std::endl;
ss << resource->task_table().Dump() << std::endl;
}
return ss.str();
}
void void
ResourceMgr::event_process() { ResourceMgr::event_process() {
while (running_) { while (running_) {
......
...@@ -70,6 +70,9 @@ public: ...@@ -70,6 +70,9 @@ public:
std::string std::string
Dump(); Dump();
std::string
DumpTaskTables();
private: private:
void void
event_process(); event_process();
......
...@@ -8,12 +8,88 @@ ...@@ -8,12 +8,88 @@
#include "event/TaskTableUpdatedEvent.h" #include "event/TaskTableUpdatedEvent.h"
#include <vector> #include <vector>
#include <sstream> #include <sstream>
#include <ctime>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
uint64_t
get_now_timestamp() {
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
return millis;
}
bool
TaskTableItem::Load() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::START) {
state = TaskTableItemState::LOADING;
lock.unlock();
timestamp.load = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Loaded() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::LOADING) {
state = TaskTableItemState::LOADED;
lock.unlock();
timestamp.loaded = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Execute() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::LOADED) {
state = TaskTableItemState::EXECUTING;
lock.unlock();
timestamp.execute = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Executed() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::EXECUTING) {
state = TaskTableItemState::EXECUTED;
lock.unlock();
timestamp.executed = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Move() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::LOADED) {
state = TaskTableItemState::MOVING;
lock.unlock();
timestamp.move = get_now_timestamp();
return true;
}
return false;
}
bool
TaskTableItem::Moved() {
std::unique_lock<std::mutex> lock(mutex);
if (state == TaskTableItemState::MOVING) {
state = TaskTableItemState::MOVED;
lock.unlock();
timestamp.moved = get_now_timestamp();
return true;
}
return false;
}
void void
TaskTable::Put(TaskPtr task) { TaskTable::Put(TaskPtr task) {
...@@ -59,78 +135,6 @@ TaskTable::Clear() { ...@@ -59,78 +135,6 @@ TaskTable::Clear() {
// table_.erase(table_.begin(), iterator); // table_.erase(table_.begin(), iterator);
} }
bool
TaskTable::Move(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::LOADED) {
task->state = TaskTableItemState::MOVING;
return true;
}
return false;
}
bool
TaskTable::Moved(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::MOVING) {
task->state = TaskTableItemState::MOVED;
return true;
}
return false;
}
bool
TaskTable::Load(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::START) {
task->state = TaskTableItemState::LOADING;
return true;
}
return false;
}
bool
TaskTable::Loaded(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::LOADING) {
task->state = TaskTableItemState::LOADED;
return true;
}
return false;
}
bool
TaskTable::Execute(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::LOADED) {
task->state = TaskTableItemState::EXECUTING;
return true;
}
return false;
}
bool
TaskTable::Executed(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::EXECUTING) {
task->state = TaskTableItemState::EXECUTED;
return true;
}
return false;
}
std::string std::string
ToString(TaskTableItemState state) { ToString(TaskTableItemState state) {
switch (state) { switch (state) {
...@@ -146,12 +150,27 @@ ToString(TaskTableItemState state) { ...@@ -146,12 +150,27 @@ ToString(TaskTableItemState state) {
} }
} }
std::string
ToString(const TaskTimestamp &timestamp) {
std::stringstream ss;
ss << "<start=" << timestamp.start;
ss << ", load=" << timestamp.load;
ss << ", loaded=" << timestamp.loaded;
ss << ", execute=" << timestamp.execute;
ss << ", executed=" << timestamp.executed;
ss << ", move=" << timestamp.move;
ss << ", moved=" << timestamp.moved;
ss << ">";
return ss.str();
}
std::string std::string
TaskTable::Dump() { TaskTable::Dump() {
std::stringstream ss; std::stringstream ss;
for (auto &item : table_) { for (auto &item : table_) {
ss << "<" << item->id; ss << "<id=" << item->id;
ss << ", " << ToString(item->state); ss << ", state=" << ToString(item->state);
ss << ", timestamp=" << ToString(item->timestamp);
ss << ">" << std::endl; ss << ">" << std::endl;
} }
return ss.str(); return ss.str();
......
...@@ -28,6 +28,16 @@ enum class TaskTableItemState { ...@@ -28,6 +28,16 @@ enum class TaskTableItemState {
MOVED, // moved, termination state MOVED, // moved, termination state
}; };
struct TaskTimestamp {
uint64_t start = 0;
uint64_t move = 0;
uint64_t moved = 0;
uint64_t load = 0;
uint64_t loaded = 0;
uint64_t execute = 0;
uint64_t executed = 0;
};
struct TaskTableItem { struct TaskTableItem {
TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {} TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {}
...@@ -39,8 +49,27 @@ struct TaskTableItem { ...@@ -39,8 +49,27 @@ struct TaskTableItem {
TaskPtr task; // the task; TaskPtr task; // the task;
TaskTableItemState state; // the state; TaskTableItemState state; // the state;
std::mutex mutex; std::mutex mutex;
TaskTimestamp timestamp;
uint8_t priority; // just a number, meaningless; uint8_t priority; // just a number, meaningless;
bool
Load();
bool
Loaded();
bool
Execute();
bool
Executed();
bool
Move();
bool
Moved();
}; };
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>; using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
...@@ -111,55 +140,68 @@ public: ...@@ -111,55 +140,68 @@ public:
public: public:
/******** Action ********/ /******** Action ********/
/*
* Move a task;
* Set state moving;
* Called by scheduler;
*/
// TODO: bool to Status // TODO: bool to Status
bool
Move(uint64_t index);
/*
* Move task finished;
* Set state moved;
* Called by scheduler;
*/
bool
Moved(uint64_t index);
/* /*
* Load a task; * Load a task;
* Set state loading; * Set state loading;
* Called by loader; * Called by loader;
*/ */
bool inline bool
Load(uint64_t index); Load(uint64_t index) {
return table_[index]->Load();
}
/* /*
* Load task finished; * Load task finished;
* Set state loaded; * Set state loaded;
* Called by loader; * Called by loader;
*/ */
bool inline bool
Loaded(uint64_t index); Loaded(uint64_t index) {
return table_[index]->Loaded();
}
/* /*
* Execute a task; * Execute a task;
* Set state executing; * Set state executing;
* Called by executor; * Called by executor;
*/ */
bool inline bool
Execute(uint64_t index); Execute(uint64_t index) {
return table_[index]->Execute();
}
/* /*
* Execute task finished; * Execute task finished;
* Set state executed; * Set state executed;
* Called by executor; * Called by executor;
*/ */
bool inline bool
Executed(uint64_t index); Executed(uint64_t index){
return table_[index]->Executed();
}
/*
* Move a task;
* Set state moving;
* Called by scheduler;
*/
inline bool
Move(uint64_t index){
return table_[index]->Move();
}
/*
* Move task finished;
* Set state moved;
* Called by scheduler;
*/
inline bool
Moved(uint64_t index){
return table_[index]->Moved();
}
public: public:
/* /*
......
...@@ -112,7 +112,7 @@ void Resource::loader_function() { ...@@ -112,7 +112,7 @@ void Resource::loader_function() {
} }
LoadFile(task_item->task); LoadFile(task_item->task);
// TODO: wrapper loaded // TODO: wrapper loaded
task_item->state = TaskTableItemState::LOADED; task_item->Loaded();
if (subscriber_) { if (subscriber_) {
auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item); auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event)); subscriber_(std::static_pointer_cast<Event>(event));
...@@ -138,7 +138,7 @@ void Resource::executor_function() { ...@@ -138,7 +138,7 @@ void Resource::executor_function() {
break; break;
} }
Process(task_item->task); Process(task_item->task);
task_item->state = TaskTableItemState::EXECUTED; task_item->Executed();
if (subscriber_) { if (subscriber_) {
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item); auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event)); subscriber_(std::static_pointer_cast<Event>(event));
......
...@@ -113,7 +113,6 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { ...@@ -113,7 +113,6 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
if (type == LoadType::DISK2CPU) { if (type == LoadType::DISK2CPU) {
index_engine_->Load(); index_engine_->Load();
} else if (type == LoadType::CPU2GPU) { } else if (type == LoadType::CPU2GPU) {
index_engine_->Load();
index_engine_->CopyToGpu(device_id); index_engine_->CopyToGpu(device_id);
} else if (type == LoadType::GPU2CPU) { } else if (type == LoadType::GPU2CPU) {
index_engine_->CopyToCpu(); index_engine_->CopyToCpu();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册