From 3fa2c1e69a690588d890c153c09dc502e2e8fc51 Mon Sep 17 00:00:00 2001 From: wxyu Date: Wed, 30 Oct 2019 12:34:09 +0800 Subject: [PATCH] Set task state MOVED after resource copy it completed Former-commit-id: a97e306b62f6a0a7a06c881e93a973ad75b8ac9d --- CHANGELOG.md | 1 + core/src/scheduler/JobMgr.cpp | 2 +- core/src/scheduler/Scheduler.cpp | 2 +- core/src/scheduler/TaskTable.cpp | 19 ++---------------- core/src/scheduler/TaskTable.h | 18 +++++++---------- core/src/scheduler/action/Action.h | 6 +++--- .../scheduler/action/PushTaskToNeighbour.cpp | 20 ++++++++++--------- core/src/scheduler/job/Job.cpp | 2 +- core/src/scheduler/resource/Resource.cpp | 4 ++++ core/unittest/scheduler/test_tasktable.cpp | 9 +++------ 10 files changed, 34 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 785b7c89..b506f6d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Please mark all change in change log and use the ticket from JIRA. - \#96 - Remove .a file in milvus/lib for docker-version - \#118 - Using shared_ptr instead of weak_ptr to avoid performance loss - \#122 - Add unique id for Job +- \#130 - Set task state MOVED after resource copy it completed ## Feature - \#115 - Using new structure for tasktable diff --git a/core/src/scheduler/JobMgr.cpp b/core/src/scheduler/JobMgr.cpp index e7b15a81..794f6a0f 100644 --- a/core/src/scheduler/JobMgr.cpp +++ b/core/src/scheduler/JobMgr.cpp @@ -91,7 +91,7 @@ JobMgr::worker_function() { // disk resources NEVER be empty. if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { for (auto& task : tasks) { - disk->task_table().Put(task); + disk->task_table().Put(task, nullptr); } } } diff --git a/core/src/scheduler/Scheduler.cpp b/core/src/scheduler/Scheduler.cpp index cba847c2..8d2d4406 100644 --- a/core/src/scheduler/Scheduler.cpp +++ b/core/src/scheduler/Scheduler.cpp @@ -120,7 +120,7 @@ Scheduler::OnLoadCompleted(const EventPtr& event) { if (resource->HasExecutor() == false) { load_completed_event->task_table_item_->Move(); } - Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource); + Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_, resource); break; } default: { break; } diff --git a/core/src/scheduler/TaskTable.cpp b/core/src/scheduler/TaskTable.cpp index bd3dd466..425eb0ab 100644 --- a/core/src/scheduler/TaskTable.cpp +++ b/core/src/scheduler/TaskTable.cpp @@ -264,8 +264,8 @@ TaskTable::PickToExecute(uint64_t limit) { } void -TaskTable::Put(TaskPtr task) { - auto item = std::make_shared(); +TaskTable::Put(TaskPtr task, TaskTableItemPtr from) { + auto item = std::make_shared(std::move(from)); item->id = id_++; item->task = std::move(task); item->state = TaskTableItemState::START; @@ -276,21 +276,6 @@ TaskTable::Put(TaskPtr task) { } } -void -TaskTable::Put(std::vector& tasks) { - for (auto& task : tasks) { - auto item = std::make_shared(); - item->id = id_++; - item->task = std::move(task); - item->state = TaskTableItemState::START; - item->timestamp.start = get_current_timestamp(); - table_.put(std::move(item)); - } - if (subscriber_) { - subscriber_(); - } -} - size_t TaskTable::TaskToExecute() { size_t count = 0; diff --git a/core/src/scheduler/TaskTable.h b/core/src/scheduler/TaskTable.h index 898141d0..37e27473 100644 --- a/core/src/scheduler/TaskTable.h +++ b/core/src/scheduler/TaskTable.h @@ -58,8 +58,12 @@ struct TaskTimestamp : public interface::dumpable { Dump() const override; }; +struct TaskTableItem; +using TaskTableItemPtr = std::shared_ptr; + struct TaskTableItem : public interface::dumpable { - TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() { + explicit TaskTableItem(TaskTableItemPtr f = nullptr) + : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex(), from(std::move(f)) { } TaskTableItem(const TaskTableItem& src) = delete; @@ -70,6 +74,7 @@ struct TaskTableItem : public interface::dumpable { TaskTableItemState state; // the state; std::mutex mutex; TaskTimestamp timestamp; + TaskTableItemPtr from; bool IsFinish(); @@ -96,8 +101,6 @@ struct TaskTableItem : public interface::dumpable { Dump() const override; }; -using TaskTableItemPtr = std::shared_ptr; - class TaskTable : public interface::dumpable { public: TaskTable() : table_(1ULL << 16ULL) { @@ -120,14 +123,7 @@ class TaskTable : public interface::dumpable { * Put one task; */ void - Put(TaskPtr task); - - /* - * Put tasks back of task table; - * Called by DBImpl; - */ - void - Put(std::vector& tasks); + Put(TaskPtr task, TaskTableItemPtr from = nullptr); size_t TaskToExecute(); diff --git a/core/src/scheduler/action/Action.h b/core/src/scheduler/action/Action.h index ff729100..f5f828cb 100644 --- a/core/src/scheduler/action/Action.h +++ b/core/src/scheduler/action/Action.h @@ -28,13 +28,13 @@ namespace scheduler { class Action { public: static void - PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self); + PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self); static void - PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self); + PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self); static void - PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest); + PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest); static void DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, diff --git a/core/src/scheduler/action/PushTaskToNeighbour.cpp b/core/src/scheduler/action/PushTaskToNeighbour.cpp index 6f74849e..b8a4a116 100644 --- a/core/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/core/src/scheduler/action/PushTaskToNeighbour.cpp @@ -59,7 +59,7 @@ get_neighbours_with_connetion(const ResourcePtr& self) { } void -Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self) { +Action::PushTaskToNeighbourRandomly(TaskTableItemPtr task_item, const ResourcePtr& self) { auto neighbours = get_neighbours_with_connetion(self); if (not neighbours.empty()) { std::vector speeds; @@ -78,7 +78,7 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self for (uint64_t i = 0; i < speeds.size(); ++i) { rd_speed -= speeds[i]; if (rd_speed <= 0) { - neighbours[i].first->task_table().Put(task); + neighbours[i].first->task_table().Put(task_item->task, task_item); return; } } @@ -89,22 +89,23 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self } void -Action::PushTaskToAllNeighbour(const TaskPtr& task, const ResourcePtr& self) { +Action::PushTaskToAllNeighbour(TaskTableItemPtr task_item, const ResourcePtr& self) { auto neighbours = get_neighbours(self); for (auto& neighbour : neighbours) { - neighbour->task_table().Put(task); + neighbour->task_table().Put(task_item->task, task_item); } } void -Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { - dest->task_table().Put(task); +Action::PushTaskToResource(TaskTableItemPtr task_item, const ResourcePtr& dest) { + dest->task_table().Put(task_item->task, task_item); } void Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event) { if (not resource->HasExecutor() && event->task_table_item_->Move()) { + auto task_item = event->task_table_item_; auto task = event->task_table_item_->task; auto search_task = std::static_pointer_cast(task); bool moved = false; @@ -119,7 +120,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res if (index != nullptr) { moved = true; auto dest_resource = res_mgr->GetResource(ResourceType::GPU, i); - PushTaskToResource(event->task_table_item_->task, dest_resource); + PushTaskToResource(event->task_table_item_, dest_resource); break; } } @@ -127,7 +128,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res } if (not moved) { - PushTaskToNeighbourRandomly(task, resource); + PushTaskToNeighbourRandomly(task_item, resource); } } } @@ -135,6 +136,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res void Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr resource, std::shared_ptr event) { + auto task_item = event->task_table_item_; auto task = event->task_table_item_->task; if (resource->type() == ResourceType::DISK) { // step 1: calculate shortest path per resource, from disk to compute resource @@ -213,7 +215,7 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou // next_res->task_table().Put(task); // } event->task_table_item_->Move(); - next_res->task_table().Put(task); + next_res->task_table().Put(task, task_item); } } diff --git a/core/src/scheduler/job/Job.cpp b/core/src/scheduler/job/Job.cpp index 1199fe17..06a163b9 100644 --- a/core/src/scheduler/job/Job.cpp +++ b/core/src/scheduler/job/Job.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "Job.h" +#include "scheduler/job/Job.h" namespace milvus { namespace scheduler { diff --git a/core/src/scheduler/resource/Resource.cpp b/core/src/scheduler/resource/Resource.cpp index 8e105922..8cf03275 100644 --- a/core/src/scheduler/resource/Resource.cpp +++ b/core/src/scheduler/resource/Resource.cpp @@ -180,6 +180,10 @@ Resource::loader_function() { } LoadFile(task_item->task); task_item->Loaded(); + if (task_item->from) { + task_item->from->Moved(); + task_item->from = nullptr; + } if (subscriber_) { auto event = std::make_shared(shared_from_this(), task_item); subscriber_(std::static_pointer_cast(event)); diff --git a/core/unittest/scheduler/test_tasktable.cpp b/core/unittest/scheduler/test_tasktable.cpp index 601bd243..28a2e29c 100644 --- a/core/unittest/scheduler/test_tasktable.cpp +++ b/core/unittest/scheduler/test_tasktable.cpp @@ -193,16 +193,13 @@ TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) { TEST_F(TaskTableBaseTest, PUT_BATCH) { std::vector tasks{task1_, task2_}; - empty_table_.Put(tasks); + for (auto& task : tasks) { + empty_table_.Put(task); + } ASSERT_EQ(empty_table_.at(0)->task, task1_); ASSERT_EQ(empty_table_.at(1)->task, task2_); } -TEST_F(TaskTableBaseTest, PUT_EMPTY_BATCH) { - std::vector tasks{}; - empty_table_.Put(tasks); -} - TEST_F(TaskTableBaseTest, SIZE) { ASSERT_EQ(empty_table_.size(), 0); empty_table_.Put(task1_); -- GitLab