From e757727e487b9f34380c376f8e76f5255c58fb7b Mon Sep 17 00:00:00 2001 From: wxyu Date: Sat, 7 Sep 2019 14:54:32 +0800 Subject: [PATCH] MS-508 Update normal_test in scheduler Former-commit-id: 343dae4a96cac46259094de9c1d025de3b3db53e --- cpp/CHANGELOG.md | 1 + cpp/src/scheduler/Scheduler.cpp | 26 +++++++++------- cpp/src/scheduler/Scheduler.h | 1 + cpp/src/scheduler/task/SearchTask.cpp | 13 ++++---- cpp/src/scheduler/task/SearchTask.h | 1 + cpp/src/scheduler/task/Task.h | 1 + cpp/src/scheduler/task/TestTask.cpp | 11 ++++--- cpp/unittest/scheduler/normal_test.cpp | 41 ++++++++++++-------------- 8 files changed, 54 insertions(+), 41 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 78a5e835..cf202ce8 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -89,6 +89,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-488 - Improve code format in scheduler - MS-502 - Update tasktable_test in scheduler - MS-504 - Update node_test in scheduler +- MS-508 - Update normal_test in scheduler ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index dcd17e31..c77b3764 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -108,6 +108,7 @@ void Scheduler::OnFinishTask(const EventPtr &event) { } +// TODO: refactor the function void Scheduler::OnLoadCompleted(const EventPtr &event) { auto load_completed_event = std::static_pointer_cast(event); @@ -120,18 +121,23 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) { auto task = load_completed_event->task_table_item_->task; auto search_task = std::static_pointer_cast(task); - auto location = search_task->index_engine_->GetLocation(); bool moved = false; - for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) { - auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location); - if (index != nullptr) { - moved = true; - auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i); - Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource); - break; + // to support test task, REFACTOR + if (auto index_engine = search_task->index_engine_) { + auto location = index_engine->GetLocation(); + + for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) { + auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location); + if (index != nullptr) { + moved = true; + auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i); + Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource); + break; + } } } + if (not moved) { Action::PushTaskToNeighbourRandomly(task, resource); } @@ -147,7 +153,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { // step 1: calculate shortest path per resource, from disk to compute resource auto compute_resources = res_mgr_.lock()->GetComputeResource(); std::vector> paths; - std::vector transport_costs; + std::vector transport_costs; for (auto &res : compute_resources) { std::vector path; uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path); @@ -176,7 +182,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { task->path() = task_path; } - if(self->name() == task->path().Last()) { + if (self->name() == task->path().Last()) { self->WakeupLoader(); } else { auto next_res_name = task->path().Next(); diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index e2d51ee3..c2a36069 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -21,6 +21,7 @@ namespace milvus { namespace engine { +// TODO: refactor, not friendly to unittest, logical in framework code class Scheduler { public: explicit diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index cb006771..869f22cb 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -83,11 +83,14 @@ CollectFileMetrics(int file_type, size_t file_size) { XSearchTask::XSearchTask(TableFileSchemaPtr file) : Task(TaskType::SearchTask), file_(file) { - index_engine_ = EngineFactory::Build(file_->dimension_, - file_->location_, - (EngineType) file_->engine_type_, - (MetricType) file_->metric_type_, - file_->nlist_); + if (file_) { + index_engine_ = EngineFactory::Build(file_->dimension_, + file_->location_, + (EngineType) file_->engine_type_, + (MetricType) file_->metric_type_, + file_->nlist_); + } + } void diff --git a/cpp/src/scheduler/task/SearchTask.h b/cpp/src/scheduler/task/SearchTask.h index b45eea48..2370e263 100644 --- a/cpp/src/scheduler/task/SearchTask.h +++ b/cpp/src/scheduler/task/SearchTask.h @@ -12,6 +12,7 @@ namespace zilliz { namespace milvus { namespace engine { +// TODO: rewrite class XSearchTask : public Task { public: explicit diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index 01b2c8eb..53893aac 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -34,6 +34,7 @@ class Task; using TaskPtr = std::shared_ptr; +// TODO: re-design class Task { public: explicit diff --git a/cpp/src/scheduler/task/TestTask.cpp b/cpp/src/scheduler/task/TestTask.cpp index 15f60baa..1078da58 100644 --- a/cpp/src/scheduler/task/TestTask.cpp +++ b/cpp/src/scheduler/task/TestTask.cpp @@ -13,7 +13,7 @@ namespace milvus { namespace engine { -TestTask::TestTask(TableFileSchemaPtr& file) : XSearchTask(file) {} +TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {} void TestTask::Load(LoadType type, uint8_t device_id) { @@ -22,9 +22,12 @@ TestTask::Load(LoadType type, uint8_t device_id) { void TestTask::Execute() { - std::lock_guard lock(mutex_); - exec_count_++; - done_ = true; + { + std::lock_guard lock(mutex_); + exec_count_++; + done_ = true; + } + cv_.notify_one(); } void diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index 576ed3ee..c679a356 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -2,6 +2,7 @@ #include "scheduler/ResourceMgr.h" #include "scheduler/Scheduler.h" #include "scheduler/task/TestTask.h" +#include "scheduler/tasklabel/DefaultLabel.h" #include "scheduler/SchedInst.h" #include "utils/Log.h" #include @@ -9,48 +10,44 @@ using namespace zilliz::milvus::engine; -TEST(normal_test, test1) { + +TEST(normal_test, inst_test) { // ResourceMgr only compose resources, provide unified event -// auto res_mgr = std::make_shared(); auto res_mgr = ResMgrInst::GetInstance(); - auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false)); - auto cpu = res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0)); - auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false)); - auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false)); - auto IO = Connection("IO", 500.0); - auto PCIE = Connection("IO", 11000.0); - res_mgr->Connect(disk, cpu, IO); - res_mgr->Connect(cpu, gpu1, PCIE); - res_mgr->Connect(cpu, gpu2, PCIE); + res_mgr->Add(ResourceFactory::Create("disk", "DISK", 0, true, false)); + res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true)); - res_mgr->Start(); + auto IO = Connection("IO", 500.0); + res_mgr->Connect("disk", "cpu", IO); -// auto scheduler = new Scheduler(res_mgr); auto scheduler = SchedInst::GetInstance(); + + res_mgr->Start(); scheduler->Start(); const uint64_t NUM_TASK = 1000; std::vector> tasks; TableFileSchemaPtr dummy = nullptr; - for (uint64_t i = 0; i < NUM_TASK; ++i) { - if (auto observe = disk.lock()) { + auto disks = res_mgr->GetDiskResources(); + ASSERT_FALSE(disks.empty()); + if (auto observe = disks[0].lock()) { + for (uint64_t i = 0; i < NUM_TASK; ++i) { auto task = std::make_shared(dummy); + task->label() = std::make_shared(); tasks.push_back(task); observe->task_table().Put(task); } } - sleep(1); + for (auto &task : tasks) { + task->Wait(); + ASSERT_EQ(task->load_count_, 1); + ASSERT_EQ(task->exec_count_, 1); + } scheduler->Stop(); res_mgr->Stop(); - auto pcpu = cpu.lock(); - for (uint64_t i = 0; i < NUM_TASK; ++i) { - auto task = std::static_pointer_cast(pcpu->task_table()[i]->task); - ASSERT_EQ(task->load_count_, 1); - ASSERT_EQ(task->exec_count_, 1); - } } -- GitLab