diff --git a/CHANGELOG.md b/CHANGELOG.md index 45f0be3ecf4847c16c7826db87a1a9001bd8720b..b28c7bd3a7ced8c4d41144ffff28c682eb626b39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#96 - Remove .a file in milvus/lib for docker-version - \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss - \#122 - Add unique id for Job +- \#130 - Set task state MOVED after resource copy it completed ## Task diff --git a/core/src/scheduler/JobMgr.cpp b/core/src/scheduler/JobMgr.cpp index e7b15a81858c2634428fc5d88aa3ba71243a617c..794f6a0f37984071e3dcb40b71d7c17dcbe3c4e6 100644 --- a/core/src/scheduler/JobMgr.cpp +++ b/core/src/scheduler/JobMgr.cpp @@ -91,7 +91,7 @@ JobMgr::worker_function() { // disk resources NEVER be empty. if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { for (auto& task : tasks) { - disk->task_table().Put(task); + disk->task_table().Put(task, nullptr); } } } diff --git a/core/src/scheduler/Scheduler.cpp b/core/src/scheduler/Scheduler.cpp index cba847c25e9115233577681dda9e6e30317838d5..8d2d4406f8ccc3e1d8f7a69f2426818ead8c6c30 100644 --- a/core/src/scheduler/Scheduler.cpp +++ b/core/src/scheduler/Scheduler.cpp @@ -120,7 +120,7 @@ Scheduler::OnLoadCompleted(const EventPtr& event) { if (resource->HasExecutor() == false) { load_completed_event->task_table_item_->Move(); } - Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); + Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_, resource); break; } default: { break; } diff --git a/core/src/scheduler/TaskTable.cpp b/core/src/scheduler/TaskTable.cpp index bd3dd466a9f1d9646d74c5ad869e26aea4033eae..425eb0ab062f75d8209d91a729733ee04dc449f4 100644 --- a/core/src/scheduler/TaskTable.cpp +++ b/core/src/scheduler/TaskTable.cpp @@ -264,8 +264,8 @@ TaskTable::PickToExecute(uint64_t limit) { } void -TaskTable::Put(TaskPtr task) { - auto item = std::make_shared(); +TaskTable::Put(TaskPtr task, TaskTableItemPtr from) { + auto item = std::make_shared(std::move(from)); item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; @@ -276,21 +276,6 @@ TaskTable::Put(TaskPtr task) { } } -void -TaskTable::Put(std::vector& tasks) { - for (auto& task : tasks) { - auto item = std::make_shared(); - item->id = id_++; - item->task = std::move(task); - item->state = TaskTableItemState::START; - item->timestamp.start = get_current_timestamp(); - table_.put(std::move(item)); - } - if (subscriber_) { - subscriber_(); - } -} - size_t TaskTable::TaskToExecute() { size_t count = 0; diff --git a/core/src/scheduler/TaskTable.h b/core/src/scheduler/TaskTable.h index 898141d02855dda98dbafbab491f8a3b43f06218..37e274734351693f683059f8473e103419e3aa5d 100644 --- a/core/src/scheduler/TaskTable.h +++ b/core/src/scheduler/TaskTable.h @@ -58,8 +58,12 @@ struct TaskTimestamp : public interface::dumpable { Dump() const override; }; +struct TaskTableItem; +using TaskTableItemPtr = std::shared_ptr; + struct TaskTableItem : public interface::dumpable { - TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() { + explicit TaskTableItem(TaskTableItemPtr f = nullptr) + : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex(), from(std::move(f)) { } TaskTableItem(const TaskTableItem& src) = delete; @@ -70,6 +74,7 @@ struct TaskTableItem : public interface::dumpable { TaskTableItemState state; // the state; std::mutex mutex; TaskTimestamp timestamp; + TaskTableItemPtr from; bool IsFinish(); @@ -96,8 +101,6 @@ struct TaskTableItem : public interface::dumpable { Dump() const override; }; -using TaskTableItemPtr = std::shared_ptr; - class TaskTable : public interface::dumpable { public: TaskTable() : table_(1ULL << 16ULL) { @@ -120,14 +123,7 @@ class TaskTable : public interface::dumpable { * Put one task; */ void - Put(TaskPtr task); - - /* - * Put tasks back of task table; - * Called by DBImpl; - */ - void - Put(std::vector& tasks); + Put(TaskPtr task, TaskTableItemPtr from = nullptr); size_t TaskToExecute(); diff --git a/core/src/scheduler/action/Action.h b/core/src/scheduler/action/Action.h index ff729100550a1e1a3b552192def54d65a85bf98f..f5f828cbf6b3632e7bcc59f848d6c05840562bd9 100644 --- a/core/src/scheduler/action/Action.h +++ b/core/src/scheduler/action/Action.h @@ -28,13 +28,13 @@ namespace scheduler { class Action { public: static void - PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self); + PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self); static void - PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self); + PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self); static void - PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest); + PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest); static void DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, diff --git a/core/src/scheduler/action/PushTaskToNeighbour.cpp b/core/src/scheduler/action/PushTaskToNeighbour.cpp index 6f74849eac0a1aa9c8d3e52a9d6765ade2788ed0..b8a4a1164b255585d57d0cf992aa2ac4c0f8a5e3 100644 --- a/core/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/core/src/scheduler/action/PushTaskToNeighbour.cpp @@ -59,7 +59,7 @@ get_neighbours_with_connetion(const ResourcePtr& self) { } void -Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) { +Action::PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self) { auto neighbours = get_neighbours_with_connetion(self); if (not neighbours.empty()) { std::vector speeds; @@ -78,7 +78,7 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self for (uint64_t i = 0; i < speeds.size(); ++i) { rd_speed -= speeds[i]; if (rd_speed <= 0) { - neighbours[i].first->task_table().Put(task); + neighbours[i].first->task_table().Put(task_item->task, task_item); return; } } @@ -89,22 +89,23 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self } void -Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) { +Action::PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self) { auto neighbours = get_neighbours(self); for (auto& neighbour : neighbours) { - neighbour->task_table().Put(task); + neighbour->task_table().Put(task_item->task, task_item); } } void -Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { - dest->task_table().Put(task); +Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest) { + dest->task_table().Put(task_item->task, task_item); } void Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event) { if (not resource->HasExecutor() && event->task_table_item_->Move()) { + auto task_item = event->task_table_item_; auto task = event->task_table_item_->task; auto search_task = std::static_pointer_cast(task); bool moved = false; @@ -119,7 +120,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res if (index != nullptr) { moved = true; auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i); - PushTaskToResource(event->task_table_item_->task, dest_resource); + PushTaskToResource(event->task_table_item_, dest_resource); break; } } @@ -127,7 +128,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res } if (not moved) { - PushTaskToNeighbourRandomly(task, resource); + PushTaskToNeighbourRandomly(task_item, resource); } } } @@ -135,6 +136,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res void Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event) { + auto task_item = event->task_table_item_; auto task = event->task_table_item_->task; if (resource->type() == ResourceType::DISK) { // step 1: calculate shortest path per resource, from disk to compute resource @@ -213,7 +215,7 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou // next_res->task_table().Put(task); // } event->task_table_item_->Move(); - next_res->task_table().Put(task); + next_res->task_table().Put(task, task_item); } } diff --git a/core/src/scheduler/resource/Resource.cpp b/core/src/scheduler/resource/Resource.cpp index 8e1059226216c92d94d23d999318ce8af3e74463..8cf03275f7b0a6bf75f96a516508a4514df0f4c8 100644 --- a/core/src/scheduler/resource/Resource.cpp +++ b/core/src/scheduler/resource/Resource.cpp @@ -180,6 +180,10 @@ Resource::loader_function() { } LoadFile(task_item->task); task_item->Loaded(); + if (task_item->from) { + task_item->from->Moved(); + task_item->from = nullptr; + } if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); diff --git a/core/unittest/scheduler/test_tasktable.cpp b/core/unittest/scheduler/test_tasktable.cpp index 601bd2431d821c5b9ff445684ac764b22f752292..28a2e29c98433501967d45d4f3d163e02374e545 100644 --- a/core/unittest/scheduler/test_tasktable.cpp +++ b/core/unittest/scheduler/test_tasktable.cpp @@ -193,16 +193,13 @@ TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) { TEST_F(TaskTableBaseTest, PUT_BATCH) { std::vector tasks{task1_, task2_}; - empty_table_.Put(tasks); + for (auto& task : tasks) { + empty_table_.Put(task); + } ASSERT_EQ(empty_table_.at(0)->task, task1_); ASSERT_EQ(empty_table_.at(1)->task, task2_); } -TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) { - std::vector tasks{}; - empty_table_.Put(tasks); -} - TEST_F(TaskTableBaseTest, SIZE) { ASSERT_EQ(empty_table_.size(), 0); empty_table_.Put(task1_);