From 4f2c45ab41e4dd3524c30a0c732a2f0560236615 Mon Sep 17 00:00:00 2001 From: wxyu Date: Mon, 9 Sep 2019 15:27:26 +0800 Subject: [PATCH] MS-527 Update scheduler_test and enable it Former-commit-id: 96fdfdc0c46a0e4cf1438ad3e30cbfb2c42b4416 --- cpp/CHANGELOG.md | 1 + cpp/unittest/scheduler/scheduler_test.cpp | 450 +++++++++++----------- 2 files changed, 229 insertions(+), 222 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 83f1aa69..24689510 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -105,6 +105,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-520 - Update resource_test in scheduler - MS-524 - Add some unittest in event_test and resource_test - MS-525 - Disable parallel reduce in SearchTask +- MS-527 - Update scheduler_test and enable it ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp index b8eb9ce9..f176311e 100644 --- a/cpp/unittest/scheduler/scheduler_test.cpp +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -6,6 +6,7 @@ #include "scheduler/Scheduler.h" #include #include +#include #include "cache/DataObj.h" #include "cache/GpuCacheMgr.h" #include "scheduler/task/TestTask.h" @@ -15,233 +16,238 @@ #include "wrapper/knowhere/vec_index.h" #include "scheduler/tasklabel/SpecResLabel.h" + namespace zilliz { namespace milvus { namespace engine { -//class MockVecIndex : public engine::VecIndex { -//public: -// virtual server::KnowhereError BuildAll(const long &nb, -// const float *xb, -// const long *ids, -// const engine::Config &cfg, -// const long &nt = 0, -// const float *xt = nullptr) { -// -// } -// -// engine::VecIndexPtr Clone() override { -// return zilliz::milvus::engine::VecIndexPtr(); -// } -// -// int64_t GetDeviceId() override { -// return 0; -// } -// -// engine::IndexType GetType() override { -// return engine::IndexType::INVALID; -// } -// -// virtual server::KnowhereError Add(const long &nb, -// const float *xb, -// const long *ids, -// const engine::Config &cfg = engine::Config()) { -// -// } -// -// virtual server::KnowhereError Search(const long &nq, -// const float *xq, -// float *dist, -// long *ids, -// const engine::Config &cfg = engine::Config()) { -// -// } -// -// engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override { -// -// } -// -// engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override { -// -// } -// -// virtual int64_t Dimension() { -// return dimension_; -// } -// -// virtual int64_t Count() { -// return ntotal_; -// } -// -// virtual zilliz::knowhere::BinarySet Serialize() { -// zilliz::knowhere::BinarySet binset; -// return binset; -// } -// -// virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) { -// -// } -// -//public: -// int64_t dimension_ = 512; -// int64_t ntotal_ = 0; -//}; -// -// -//class SchedulerTest : public testing::Test { -//protected: -// void -// SetUp() override { -// ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false); -// ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0); -// ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1); -// -// res_mgr_ = std::make_shared(); -// cpu_resource_ = res_mgr_->Add(std::move(cpu)); -// gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0)); -// gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1)); -// -// auto PCIE = Connection("IO", 11000.0); -// res_mgr_->Connect("cpu", "gpu0", PCIE); -// res_mgr_->Connect("cpu", "gpu1", PCIE); -// -// scheduler_ = std::make_shared(res_mgr_); -// -// res_mgr_->Start(); -// scheduler_->Start(); -// } -// -// void -// TearDown() override { -// scheduler_->Stop(); -// res_mgr_->Stop(); -// } -// -// ResourceWPtr cpu_resource_; -// ResourceWPtr gpu_resource_0_; -// ResourceWPtr gpu_resource_1_; -// -// ResourceMgrPtr res_mgr_; -// std::shared_ptr scheduler_; -//}; -// -//void -//insert_dummy_index_into_gpu_cache(uint64_t device_id) { -// MockVecIndex* mock_index = new MockVecIndex(); -// mock_index->ntotal_ = 1000; -// engine::VecIndexPtr index(mock_index); -// -// cache::DataObjPtr obj = std::make_shared(index); -// -// cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj); -//} -// -//TEST_F(SchedulerTest, OnCopyCompleted) { -// const uint64_t NUM = 10; -// std::vector> tasks; -// TableFileSchemaPtr dummy = std::make_shared(); -// dummy->location_ = "location"; -// -// insert_dummy_index_into_gpu_cache(1); -// -// for (uint64_t i = 0; i < NUM; ++i) { -// auto task = std::make_shared(dummy); -// task->label() = std::make_shared(); -// tasks.push_back(task); -// cpu_resource_.lock()->task_table().Put(task); -// } -// -// sleep(3); +class MockVecIndex : public engine::VecIndex { +public: + virtual ErrorCode BuildAll(const long &nb, + const float *xb, + const long *ids, + const engine::Config &cfg, + const long &nt = 0, + const float *xt = nullptr) { + + } + + engine::VecIndexPtr Clone() override { + return zilliz::milvus::engine::VecIndexPtr(); + } + + int64_t GetDeviceId() override { + return 0; + } + + engine::IndexType GetType() override { + return engine::IndexType::INVALID; + } + + virtual ErrorCode Add(const long &nb, + const float *xb, + const long *ids, + const engine::Config &cfg = engine::Config()) { + + } + + virtual ErrorCode Search(const long &nq, + const float *xq, + float *dist, + long *ids, + const engine::Config &cfg = engine::Config()) { + + } + + engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override { + + } + + engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override { + + } + + virtual int64_t Dimension() { + return dimension_; + } + + virtual int64_t Count() { + return ntotal_; + } + + virtual zilliz::knowhere::BinarySet Serialize() { + zilliz::knowhere::BinarySet binset; + return binset; + } + + virtual ErrorCode Load(const zilliz::knowhere::BinarySet &index_binary) { + + } + +public: + int64_t dimension_ = 512; + int64_t ntotal_ = 0; +}; + + +class SchedulerTest : public testing::Test { +protected: + void + SetUp() override { + server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE); + config.AddSequenceItem(server::CONFIG_GPU_IDS, "0"); + config.AddSequenceItem(server::CONFIG_GPU_IDS, "1"); + + ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false); + ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0); + ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1); + + res_mgr_ = std::make_shared(); + cpu_resource_ = res_mgr_->Add(std::move(cpu)); + gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0)); + gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1)); + + auto PCIE = Connection("IO", 11000.0); + res_mgr_->Connect("cpu", "gpu0", PCIE); + res_mgr_->Connect("cpu", "gpu1", PCIE); + + scheduler_ = std::make_shared(res_mgr_); + + res_mgr_->Start(); + scheduler_->Start(); + } + + void + TearDown() override { + scheduler_->Stop(); + res_mgr_->Stop(); + } + + ResourceWPtr cpu_resource_; + ResourceWPtr gpu_resource_0_; + ResourceWPtr gpu_resource_1_; + + ResourceMgrPtr res_mgr_; + std::shared_ptr scheduler_; +}; + +void +insert_dummy_index_into_gpu_cache(uint64_t device_id) { + MockVecIndex *mock_index = new MockVecIndex(); + mock_index->ntotal_ = 1000; + engine::VecIndexPtr index(mock_index); + + cache::DataObjPtr obj = std::make_shared(index); + + cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj); +} + +TEST_F(SchedulerTest, OnLoadCompleted) { + const uint64_t NUM = 10; + std::vector> tasks; + TableFileSchemaPtr dummy = std::make_shared(); + dummy->location_ = "location"; + + insert_dummy_index_into_gpu_cache(1); + + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(dummy); + task->label() = std::make_shared(); + tasks.push_back(task); + cpu_resource_.lock()->task_table().Put(task); + } + + sleep(3); + ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); + +} + +TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) { + const uint64_t NUM = 10; + std::vector> tasks; + TableFileSchemaPtr dummy1 = std::make_shared(); + dummy1->location_ = "location"; + + tasks.clear(); + + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(dummy1); + task->label() = std::make_shared(); + tasks.push_back(task); + cpu_resource_.lock()->task_table().Put(task); + } + + sleep(3); // ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); -// -//} -// -//TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) { -// const uint64_t NUM = 10; -// std::vector> tasks; -// TableFileSchemaPtr dummy1 = std::make_shared(); -// dummy1->location_ = "location"; -// -// tasks.clear(); -// -// for (uint64_t i = 0; i < NUM; ++i) { -// auto task = std::make_shared(dummy1); -// task->label() = std::make_shared(); -// tasks.push_back(task); -// cpu_resource_.lock()->task_table().Put(task); -// } -// -// sleep(3); -//// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); -//} -// -//class SchedulerTest2 : public testing::Test { -// protected: -// void -// SetUp() override { -// ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); -// ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false); -// ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false); -// ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false); -// ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true); -// ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true); -// -// res_mgr_ = std::make_shared(); -// disk_ = res_mgr_->Add(std::move(disk)); -// cpu_0_ = res_mgr_->Add(std::move(cpu0)); -// cpu_1_ = res_mgr_->Add(std::move(cpu1)); -// cpu_2_ = res_mgr_->Add(std::move(cpu2)); -// gpu_0_ = res_mgr_->Add(std::move(gpu0)); -// gpu_1_ = res_mgr_->Add(std::move(gpu1)); -// auto IO = Connection("IO", 5.0); -// auto PCIE1 = Connection("PCIE", 11.0); -// auto PCIE2 = Connection("PCIE", 20.0); -// res_mgr_->Connect("disk", "cpu0", IO); -// res_mgr_->Connect("cpu0", "cpu1", IO); -// res_mgr_->Connect("cpu1", "cpu2", IO); -// res_mgr_->Connect("cpu0", "cpu2", IO); -// res_mgr_->Connect("cpu1", "gpu0", PCIE1); -// res_mgr_->Connect("cpu2", "gpu1", PCIE2); -// -// scheduler_ = std::make_shared(res_mgr_); -// -// res_mgr_->Start(); -// scheduler_->Start(); -// } -// -// void -// TearDown() override { -// scheduler_->Stop(); -// res_mgr_->Stop(); -// } -// -// ResourceWPtr disk_; -// ResourceWPtr cpu_0_; -// ResourceWPtr cpu_1_; -// ResourceWPtr cpu_2_; -// ResourceWPtr gpu_0_; -// ResourceWPtr gpu_1_; -// ResourceMgrPtr res_mgr_; -// -// std::shared_ptr scheduler_; -//}; -// -// -//TEST_F(SchedulerTest2, SpecifiedResourceTest) { -// const uint64_t NUM = 10; -// std::vector> tasks; -// TableFileSchemaPtr dummy = std::make_shared(); -// dummy->location_ = "location"; -// -// for (uint64_t i = 0; i < NUM; ++i) { -// std::shared_ptr task = std::make_shared(dummy); -// task->label() = std::make_shared(disk_); -// tasks.push_back(task); -// disk_.lock()->task_table().Put(task); -// } -// -//// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); -//} +} + +class SchedulerTest2 : public testing::Test { +protected: + void + SetUp() override { + ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); + ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false); + ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false); + ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false); + ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true); + ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true); + + res_mgr_ = std::make_shared(); + disk_ = res_mgr_->Add(std::move(disk)); + cpu_0_ = res_mgr_->Add(std::move(cpu0)); + cpu_1_ = res_mgr_->Add(std::move(cpu1)); + cpu_2_ = res_mgr_->Add(std::move(cpu2)); + gpu_0_ = res_mgr_->Add(std::move(gpu0)); + gpu_1_ = res_mgr_->Add(std::move(gpu1)); + auto IO = Connection("IO", 5.0); + auto PCIE1 = Connection("PCIE", 11.0); + auto PCIE2 = Connection("PCIE", 20.0); + res_mgr_->Connect("disk", "cpu0", IO); + res_mgr_->Connect("cpu0", "cpu1", IO); + res_mgr_->Connect("cpu1", "cpu2", IO); + res_mgr_->Connect("cpu0", "cpu2", IO); + res_mgr_->Connect("cpu1", "gpu0", PCIE1); + res_mgr_->Connect("cpu2", "gpu1", PCIE2); + + scheduler_ = std::make_shared(res_mgr_); + + res_mgr_->Start(); + scheduler_->Start(); + } + + void + TearDown() override { + scheduler_->Stop(); + res_mgr_->Stop(); + } + + ResourceWPtr disk_; + ResourceWPtr cpu_0_; + ResourceWPtr cpu_1_; + ResourceWPtr cpu_2_; + ResourceWPtr gpu_0_; + ResourceWPtr gpu_1_; + ResourceMgrPtr res_mgr_; + + std::shared_ptr scheduler_; +}; + + +TEST_F(SchedulerTest2, SpecifiedResourceTest) { + const uint64_t NUM = 10; + std::vector> tasks; + TableFileSchemaPtr dummy = std::make_shared(); + dummy->location_ = "location"; + + for (uint64_t i = 0; i < NUM; ++i) { + std::shared_ptr task = std::make_shared(dummy); + task->label() = std::make_shared(disk_); + tasks.push_back(task); + disk_.lock()->task_table().Put(task); + } + +// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); +} } } -- GitLab