提交 e83c1b0f 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-517 Update resource_mgr_test in scheduler

See merge request megasearch/milvus!508

Former-commit-id: 73ea60f9a850d8d900f572cfa0c58e123ae3cf81
...@@ -97,6 +97,8 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -97,6 +97,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-504 - Update node_test in scheduler - MS-504 - Update node_test in scheduler
- MS-505 - Install core unit test and add to coverage - MS-505 - Install core unit test and add to coverage
- MS-508 - Update normal_test in scheduler - MS-508 - Update normal_test in scheduler
- MS-511 - Update resource_test in scheduler
- MS-517 - Update resource_mgr_test in scheduler
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr
......
...@@ -59,7 +59,7 @@ ResourceMgr::Add(ResourcePtr &&resource) { ...@@ -59,7 +59,7 @@ ResourceMgr::Add(ResourcePtr &&resource) {
return ret; return ret;
} }
void bool
ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connection &connection) { ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connection &connection) {
auto res1 = GetResource(name1); auto res1 = GetResource(name1);
auto res2 = GetResource(name2); auto res2 = GetResource(name2);
...@@ -67,7 +67,9 @@ ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connect ...@@ -67,7 +67,9 @@ ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connect
res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection); res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
// TODO: enable when task balance supported // TODO: enable when task balance supported
// res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection); // res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
return true;
} }
return false;
} }
void void
...@@ -78,7 +80,7 @@ ResourceMgr::Clear() { ...@@ -78,7 +80,7 @@ ResourceMgr::Clear() {
} }
std::vector<ResourcePtr> std::vector<ResourcePtr>
ResourceMgr::GetComputeResource() { ResourceMgr::GetComputeResources() {
std::vector<ResourcePtr> result; std::vector<ResourcePtr> result;
for (auto &resource : resources_) { for (auto &resource : resources_) {
if (resource->HasExecutor()) { if (resource->HasExecutor()) {
...@@ -109,7 +111,12 @@ ResourceMgr::GetResource(const std::string &name) { ...@@ -109,7 +111,12 @@ ResourceMgr::GetResource(const std::string &name) {
} }
uint64_t uint64_t
ResourceMgr::GetNumOfComputeResource() { ResourceMgr::GetNumOfResource() const {
return resources_.size();
}
uint64_t
ResourceMgr::GetNumOfComputeResource() const {
uint64_t count = 0; uint64_t count = 0;
for (auto &res : resources_) { for (auto &res : resources_) {
if (res->HasExecutor()) { if (res->HasExecutor()) {
......
...@@ -35,7 +35,7 @@ public: ...@@ -35,7 +35,7 @@ public:
ResourceWPtr ResourceWPtr
Add(ResourcePtr &&resource); Add(ResourcePtr &&resource);
void bool
Connect(const std::string &res1, const std::string &res2, Connection &connection); Connect(const std::string &res1, const std::string &res2, Connection &connection);
void void
...@@ -60,7 +60,7 @@ public: ...@@ -60,7 +60,7 @@ public:
} }
std::vector<ResourcePtr> std::vector<ResourcePtr>
GetComputeResource(); GetComputeResources();
ResourcePtr ResourcePtr
GetResource(ResourceType type, uint64_t device_id); GetResource(ResourceType type, uint64_t device_id);
...@@ -69,7 +69,10 @@ public: ...@@ -69,7 +69,10 @@ public:
GetResource(const std::string &name); GetResource(const std::string &name);
uint64_t uint64_t
GetNumOfComputeResource(); GetNumOfResource() const;
uint64_t
GetNumOfComputeResource() const;
uint64_t uint64_t
GetNumGpuResource() const; GetNumGpuResource() const;
......
...@@ -151,7 +151,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { ...@@ -151,7 +151,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
// if this resource is disk, assign it to smallest cost resource // if this resource is disk, assign it to smallest cost resource
if (self->type() == ResourceType::DISK) { if (self->type() == ResourceType::DISK) {
// step 1: calculate shortest path per resource, from disk to compute resource // step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr_.lock()->GetComputeResource(); auto compute_resources = res_mgr_.lock()->GetComputeResources();
std::vector<std::vector<std::string>> paths; std::vector<std::vector<std::string>> paths;
std::vector<uint64_t> transport_costs; std::vector<uint64_t> transport_costs;
for (auto &res : compute_resources) { for (auto &res : compute_resources) {
......
...@@ -31,7 +31,8 @@ namespace engine { ...@@ -31,7 +31,8 @@ namespace engine {
enum class ResourceType { enum class ResourceType {
DISK = 0, DISK = 0,
CPU = 1, CPU = 1,
GPU = 2 GPU = 2,
TEST = 3,
}; };
class Resource : public Node, public std::enable_shared_from_this<Resource> { class Resource : public Node, public std::enable_shared_from_this<Resource> {
...@@ -126,7 +127,6 @@ public: ...@@ -126,7 +127,6 @@ public:
bool enable_loader, bool enable_loader,
bool enable_executor); bool enable_executor);
// TODO: SearchContextPtr to TaskPtr
/* /*
* Implementation by inherit class; * Implementation by inherit class;
* Blocking function; * Blocking function;
...@@ -142,11 +142,6 @@ public: ...@@ -142,11 +142,6 @@ public:
Process(TaskPtr task) = 0; Process(TaskPtr task) = 0;
private: private:
/*
* These function should move to cost.h ???
* COST.H ???
*/
/* /*
* Pick one task to load; * Pick one task to load;
* Order by start time; * Order by start time;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "TestResource.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::ostream &operator<<(std::ostream &out, const TestResource &resource) {
out << resource.Dump();
return out;
}
TestResource::TestResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::TEST, device_id, enable_loader, enable_executor) {
}
void TestResource::LoadFile(TaskPtr task) {
task->Load(LoadType::TEST, 0);
}
void TestResource::Process(TaskPtr task) {
task->Execute();
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
class TestResource : public Resource {
public:
explicit
TestResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
return "<TestResource, name=" + name_ + ">";
}
friend std::ostream &operator<<(std::ostream &out, const TestResource &resource);
protected:
void
LoadFile(TaskPtr task) override;
void
Process(TaskPtr task) override;
};
}
}
}
...@@ -22,6 +22,7 @@ enum class LoadType { ...@@ -22,6 +22,7 @@ enum class LoadType {
DISK2CPU, DISK2CPU,
CPU2GPU, CPU2GPU,
GPU2CPU, GPU2CPU,
TEST,
}; };
enum class TaskType { enum class TaskType {
......
...@@ -46,5 +46,5 @@ add_subdirectory(server) ...@@ -46,5 +46,5 @@ add_subdirectory(server)
add_subdirectory(db) add_subdirectory(db)
add_subdirectory(knowhere) add_subdirectory(knowhere)
add_subdirectory(metrics) add_subdirectory(metrics)
#add_subdirectory(scheduler) add_subdirectory(scheduler)
#add_subdirectory(storage) #add_subdirectory(storage)
\ No newline at end of file
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
using namespace zilliz::milvus::engine; using namespace zilliz::milvus::engine;
TEST(normal_test, inst_test) { TEST(normal_test, DISABLED_inst_test) {
// ResourceMgr only compose resources, provide unified event // ResourceMgr only compose resources, provide unified event
auto res_mgr = ResMgrInst::GetInstance(); auto res_mgr = ResMgrInst::GetInstance();
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "scheduler/resource/CpuResource.h"
#include "scheduler/resource/GpuResource.h"
#include "scheduler/resource/DiskResource.h"
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/ResourceMgr.h"
#include <gtest/gtest.h>
namespace zilliz {
namespace milvus {
namespace engine {
/************ ResourceMgrBaseTest ************/
class ResourceMgrBaseTest : public testing::Test {
protected:
void
SetUp() override {
empty_mgr_ = std::make_shared<ResourceMgr>();
mgr1_ = std::make_shared<ResourceMgr>();
disk_res = std::make_shared<DiskResource>("disk", 0, true, false);
cpu_res = std::make_shared<CpuResource>("cpu", 1, true, false);
gpu_res = std::make_shared<GpuResource>("gpu", 2, true, true);
mgr1_->Add(ResourcePtr(disk_res));
mgr1_->Add(ResourcePtr(cpu_res));
mgr1_->Add(ResourcePtr(gpu_res));
}
void
TearDown() override {
}
ResourceMgrPtr empty_mgr_;
ResourceMgrPtr mgr1_;
ResourcePtr disk_res;
ResourcePtr cpu_res;
ResourcePtr gpu_res;
};
TEST_F(ResourceMgrBaseTest, add) {
auto resource = std::make_shared<TestResource>("test", 0, true, true);
auto ret = empty_mgr_->Add(ResourcePtr(resource));
ASSERT_EQ(ret.lock(), resource);
}
TEST_F(ResourceMgrBaseTest, add_disk) {
auto resource = std::make_shared<DiskResource>("disk", 0, true, true);
auto ret = empty_mgr_->Add(ResourcePtr(resource));
ASSERT_EQ(ret.lock(), resource);
}
TEST_F(ResourceMgrBaseTest, connect) {
auto resource1 = std::make_shared<TestResource>("resource1", 0, true, true);
auto resource2 = std::make_shared<TestResource>("resource2", 2, true, true);
empty_mgr_->Add(resource1);
empty_mgr_->Add(resource2);
Connection io("io", 500.0);
ASSERT_TRUE(empty_mgr_->Connect("resource1", "resource2", io));
}
TEST_F(ResourceMgrBaseTest, invalid_connect) {
auto resource1 = std::make_shared<TestResource>("resource1", 0, true, true);
auto resource2 = std::make_shared<TestResource>("resource2", 2, true, true);
empty_mgr_->Add(resource1);
empty_mgr_->Add(resource2);
Connection io("io", 500.0);
ASSERT_FALSE(empty_mgr_->Connect("xx", "yy", io));
}
TEST_F(ResourceMgrBaseTest, clear) {
ASSERT_EQ(mgr1_->GetNumOfResource(), 3);
mgr1_->Clear();
ASSERT_EQ(mgr1_->GetNumOfResource(), 0);
}
TEST_F(ResourceMgrBaseTest, get_disk_resources) {
auto disks = mgr1_->GetDiskResources();
ASSERT_EQ(disks.size(), 1);
ASSERT_EQ(disks[0].lock(), disk_res);
}
TEST_F(ResourceMgrBaseTest, get_all_resources) {
bool disk = false, cpu = false, gpu = false;
auto resources = mgr1_->GetAllResources();
ASSERT_EQ(resources.size(), 3);
for (auto &res : resources) {
if (res->type() == ResourceType::DISK) disk = true;
if (res->type() == ResourceType::CPU) cpu = true;
if (res->type() == ResourceType::GPU) gpu = true;
}
ASSERT_TRUE(disk);
ASSERT_TRUE(cpu);
ASSERT_TRUE(gpu);
}
TEST_F(ResourceMgrBaseTest, get_compute_resources) {
auto compute_resources = mgr1_->GetComputeResources();
ASSERT_EQ(compute_resources.size(), 1);
ASSERT_EQ(compute_resources[0], gpu_res);
}
TEST_F(ResourceMgrBaseTest, get_resource_by_type_and_deviceid) {
auto cpu = mgr1_->GetResource(ResourceType::CPU, 1);
ASSERT_EQ(cpu, cpu_res);
auto invalid = mgr1_->GetResource(ResourceType::GPU, 1);
ASSERT_EQ(invalid, nullptr);
}
TEST_F(ResourceMgrBaseTest, get_resource_by_name) {
auto disk = mgr1_->GetResource("disk");
ASSERT_EQ(disk, disk_res);
auto invalid = mgr1_->GetResource("invalid");
ASSERT_EQ(invalid, nullptr);
}
TEST_F(ResourceMgrBaseTest, get_num_of_resource) {
ASSERT_EQ(empty_mgr_->GetNumOfResource(), 0);
ASSERT_EQ(mgr1_->GetNumOfResource(), 3);
}
TEST_F(ResourceMgrBaseTest, get_num_of_compute_resource) {
ASSERT_EQ(empty_mgr_->GetNumOfComputeResource(), 0);
ASSERT_EQ(mgr1_->GetNumOfComputeResource(), 1);
}
TEST_F(ResourceMgrBaseTest, get_num_of_gpu_resource) {
ASSERT_EQ(empty_mgr_->GetNumGpuResource(), 0);
ASSERT_EQ(mgr1_->GetNumGpuResource(), 1);
}
TEST_F(ResourceMgrBaseTest, dump) {
ASSERT_FALSE(mgr1_->Dump().empty());
}
TEST_F(ResourceMgrBaseTest, dump_tasktables) {
ASSERT_FALSE(mgr1_->DumpTaskTables().empty());
}
/************ ResourceMgrAdvanceTest ************/
class ResourceMgrAdvanceTest : public testing::Test {
protected:
void
SetUp() override {
mgr1_ = std::make_shared<ResourceMgr>();
disk_res = std::make_shared<DiskResource>("disk", 0, true, false);
mgr1_->Add(ResourcePtr(disk_res));
mgr1_->Start();
}
void
TearDown() override {
mgr1_->Stop();
}
ResourceMgrPtr mgr1_;
ResourcePtr disk_res;
};
TEST_F(ResourceMgrAdvanceTest, register_subscriber) {
bool flag = false;
auto callback = [&](EventPtr event) {
flag = true;
};
mgr1_->RegisterSubscriber(callback);
TableFileSchemaPtr dummy = nullptr;
disk_res->task_table().Put(std::make_shared<TestTask>(dummy));
sleep(1);
ASSERT_TRUE(flag);
}
}
}
}
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "scheduler/resource/DiskResource.h" #include "scheduler/resource/DiskResource.h"
#include "scheduler/resource/CpuResource.h" #include "scheduler/resource/CpuResource.h"
#include "scheduler/resource/GpuResource.h" #include "scheduler/resource/GpuResource.h"
#include "scheduler/resource/TestResource.h"
#include "scheduler/task/Task.h" #include "scheduler/task/Task.h"
#include "scheduler/task/TestTask.h" #include "scheduler/task/TestTask.h"
#include "scheduler/ResourceFactory.h" #include "scheduler/ResourceFactory.h"
...@@ -18,7 +19,71 @@ namespace zilliz { ...@@ -18,7 +19,71 @@ namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
class ResourceTest : public testing::Test { /************ ResourceBaseTest ************/
class ResourceBaseTest : public testing::Test {
protected:
void
SetUp() override {
only_loader_ = std::make_shared<DiskResource>(name1, id1, true, false);
only_executor_ = std::make_shared<CpuResource>(name2, id2, false, true);
both_enable_ = std::make_shared<GpuResource>(name3, id3, true, true);
both_disable_ = std::make_shared<TestResource>(name4, id4, false, false);
}
const std::string name1 = "only_loader_";
const std::string name2 = "only_executor_";
const std::string name3 = "both_enable_";
const std::string name4 = "both_disable_";
const uint64_t id1 = 1;
const uint64_t id2 = 2;
const uint64_t id3 = 3;
const uint64_t id4 = 4;
ResourcePtr only_loader_ = nullptr;
ResourcePtr only_executor_ = nullptr;
ResourcePtr both_enable_ = nullptr;
ResourcePtr both_disable_ = nullptr;
};
TEST_F(ResourceBaseTest, name) {
ASSERT_EQ(only_loader_->name(), name1);
ASSERT_EQ(only_executor_->name(), name2);
ASSERT_EQ(both_enable_->name(), name3);
ASSERT_EQ(both_disable_->name(), name4);
}
TEST_F(ResourceBaseTest, type) {
ASSERT_EQ(only_loader_->type(), ResourceType::DISK);
ASSERT_EQ(only_executor_->type(), ResourceType::CPU);
ASSERT_EQ(both_enable_->type(), ResourceType::GPU);
ASSERT_EQ(both_disable_->type(), ResourceType::TEST);
}
TEST_F(ResourceBaseTest, device_id) {
ASSERT_EQ(only_loader_->device_id(), id1);
ASSERT_EQ(only_executor_->device_id(), id2);
ASSERT_EQ(both_enable_->device_id(), id3);
ASSERT_EQ(both_disable_->device_id(), id4);
}
TEST_F(ResourceBaseTest, has_loader) {
ASSERT_TRUE(only_loader_->HasLoader());
ASSERT_FALSE(only_executor_->HasLoader());
ASSERT_TRUE(both_enable_->HasLoader());
ASSERT_FALSE(both_disable_->HasLoader());
}
TEST_F(ResourceBaseTest, has_executor) {
ASSERT_FALSE(only_loader_->HasExecutor());
ASSERT_TRUE(only_executor_->HasExecutor());
ASSERT_TRUE(both_enable_->HasExecutor());
ASSERT_FALSE(both_disable_->HasExecutor());
}
/************ ResourceAdvanceTest ************/
class ResourceAdvanceTest : public testing::Test {
protected: protected:
void void
SetUp() override { SetUp() override {
...@@ -31,14 +96,18 @@ protected: ...@@ -31,14 +96,18 @@ protected:
auto subscriber = [&](EventPtr event) { auto subscriber = [&](EventPtr event) {
if (event->Type() == EventType::LOAD_COMPLETED) { if (event->Type() == EventType::LOAD_COMPLETED) {
std::lock_guard<std::mutex> lock(load_mutex_); {
++load_count_; std::lock_guard<std::mutex> lock(load_mutex_);
++load_count_;
}
cv_.notify_one(); cv_.notify_one();
} }
if (event->Type() == EventType::FINISH_TASK) { if (event->Type() == EventType::FINISH_TASK) {
std::lock_guard<std::mutex> lock(load_mutex_); {
++exec_count_; std::lock_guard<std::mutex> lock(load_mutex_);
++exec_count_;
}
cv_.notify_one(); cv_.notify_one();
} }
}; };
...@@ -82,7 +151,32 @@ protected: ...@@ -82,7 +151,32 @@ protected:
std::condition_variable cv_; std::condition_variable cv_;
}; };
TEST_F(ResourceTest, cpu_resource_test) { TEST_F(ResourceAdvanceTest, disk_resource_test) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
disk_resource_->task_table().Put(task);
}
disk_resource_->WakeupLoader();
WaitLoader(NUM);
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 0);
}
disk_resource_->WakeupExecutor();
WaitExecutor(NUM);
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 0);
}
}
TEST_F(ResourceAdvanceTest, cpu_resource_test) {
const uint64_t NUM = 100; const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr; TableFileSchemaPtr dummy = nullptr;
...@@ -94,8 +188,6 @@ TEST_F(ResourceTest, cpu_resource_test) { ...@@ -94,8 +188,6 @@ TEST_F(ResourceTest, cpu_resource_test) {
cpu_resource_->WakeupLoader(); cpu_resource_->WakeupLoader();
WaitLoader(NUM); WaitLoader(NUM);
// std::cout << "after WakeupLoader" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1); ASSERT_EQ(tasks[i]->load_count_, 1);
...@@ -103,15 +195,13 @@ TEST_F(ResourceTest, cpu_resource_test) { ...@@ -103,15 +195,13 @@ TEST_F(ResourceTest, cpu_resource_test) {
cpu_resource_->WakeupExecutor(); cpu_resource_->WakeupExecutor();
WaitExecutor(NUM); WaitExecutor(NUM);
// std::cout << "after WakeupExecutor" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 1); ASSERT_EQ(tasks[i]->exec_count_, 1);
} }
} }
TEST_F(ResourceTest, gpu_resource_test) { TEST_F(ResourceAdvanceTest, gpu_resource_test) {
const uint64_t NUM = 100; const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr; TableFileSchemaPtr dummy = nullptr;
...@@ -123,8 +213,6 @@ TEST_F(ResourceTest, gpu_resource_test) { ...@@ -123,8 +213,6 @@ TEST_F(ResourceTest, gpu_resource_test) {
gpu_resource_->WakeupLoader(); gpu_resource_->WakeupLoader();
WaitLoader(NUM); WaitLoader(NUM);
// std::cout << "after WakeupLoader" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1); ASSERT_EQ(tasks[i]->load_count_, 1);
...@@ -132,8 +220,6 @@ TEST_F(ResourceTest, gpu_resource_test) { ...@@ -132,8 +220,6 @@ TEST_F(ResourceTest, gpu_resource_test) {
gpu_resource_->WakeupExecutor(); gpu_resource_->WakeupExecutor();
WaitExecutor(NUM); WaitExecutor(NUM);
// std::cout << "after WakeupExecutor" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 1); ASSERT_EQ(tasks[i]->exec_count_, 1);
......
...@@ -19,229 +19,229 @@ namespace zilliz { ...@@ -19,229 +19,229 @@ namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
class MockVecIndex : public engine::VecIndex { //class MockVecIndex : public engine::VecIndex {
public: //public:
virtual server::KnowhereError BuildAll(const long &nb, // virtual server::KnowhereError BuildAll(const long &nb,
const float *xb, // const float *xb,
const long *ids, // const long *ids,
const engine::Config &cfg, // const engine::Config &cfg,
const long &nt = 0, // const long &nt = 0,
const float *xt = nullptr) { // const float *xt = nullptr) {
//
} // }
//
engine::VecIndexPtr Clone() override { // engine::VecIndexPtr Clone() override {
return zilliz::milvus::engine::VecIndexPtr(); // return zilliz::milvus::engine::VecIndexPtr();
} // }
//
int64_t GetDeviceId() override { // int64_t GetDeviceId() override {
return 0; // return 0;
} // }
//
engine::IndexType GetType() override { // engine::IndexType GetType() override {
return engine::IndexType::INVALID; // return engine::IndexType::INVALID;
} // }
//
virtual server::KnowhereError Add(const long &nb, // virtual server::KnowhereError Add(const long &nb,
const float *xb, // const float *xb,
const long *ids, // const long *ids,
const engine::Config &cfg = engine::Config()) { // const engine::Config &cfg = engine::Config()) {
//
} // }
//
virtual server::KnowhereError Search(const long &nq, // virtual server::KnowhereError Search(const long &nq,
const float *xq, // const float *xq,
float *dist, // float *dist,
long *ids, // long *ids,
const engine::Config &cfg = engine::Config()) { // const engine::Config &cfg = engine::Config()) {
//
} // }
//
engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override { // engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override {
//
} // }
//
engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override { // engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override {
//
} // }
//
virtual int64_t Dimension() { // virtual int64_t Dimension() {
return dimension_; // return dimension_;
} // }
//
virtual int64_t Count() { // virtual int64_t Count() {
return ntotal_; // return ntotal_;
} // }
//
virtual zilliz::knowhere::BinarySet Serialize() { // virtual zilliz::knowhere::BinarySet Serialize() {
zilliz::knowhere::BinarySet binset; // zilliz::knowhere::BinarySet binset;
return binset; // return binset;
} // }
//
virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) { // virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) {
//
} // }
//
public: //public:
int64_t dimension_ = 512; // int64_t dimension_ = 512;
int64_t ntotal_ = 0; // int64_t ntotal_ = 0;
}; //};
//
//
class SchedulerTest : public testing::Test { //class SchedulerTest : public testing::Test {
protected: //protected:
void // void
SetUp() override { // SetUp() override {
ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false); // ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0); // ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1); // ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
//
res_mgr_ = std::make_shared<ResourceMgr>(); // res_mgr_ = std::make_shared<ResourceMgr>();
cpu_resource_ = res_mgr_->Add(std::move(cpu)); // cpu_resource_ = res_mgr_->Add(std::move(cpu));
gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0)); // gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0));
gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1)); // gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1));
//
auto PCIE = Connection("IO", 11000.0); // auto PCIE = Connection("IO", 11000.0);
res_mgr_->Connect("cpu", "gpu0", PCIE); // res_mgr_->Connect("cpu", "gpu0", PCIE);
res_mgr_->Connect("cpu", "gpu1", PCIE); // res_mgr_->Connect("cpu", "gpu1", PCIE);
//
scheduler_ = std::make_shared<Scheduler>(res_mgr_); // scheduler_ = std::make_shared<Scheduler>(res_mgr_);
//
res_mgr_->Start(); // res_mgr_->Start();
scheduler_->Start(); // scheduler_->Start();
} // }
//
void // void
TearDown() override { // TearDown() override {
scheduler_->Stop(); // scheduler_->Stop();
res_mgr_->Stop(); // res_mgr_->Stop();
} // }
//
ResourceWPtr cpu_resource_; // ResourceWPtr cpu_resource_;
ResourceWPtr gpu_resource_0_; // ResourceWPtr gpu_resource_0_;
ResourceWPtr gpu_resource_1_; // ResourceWPtr gpu_resource_1_;
//
ResourceMgrPtr res_mgr_; // ResourceMgrPtr res_mgr_;
std::shared_ptr<Scheduler> scheduler_; // std::shared_ptr<Scheduler> scheduler_;
}; //};
//
void //void
insert_dummy_index_into_gpu_cache(uint64_t device_id) { //insert_dummy_index_into_gpu_cache(uint64_t device_id) {
MockVecIndex* mock_index = new MockVecIndex(); // MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000; // mock_index->ntotal_ = 1000;
engine::VecIndexPtr index(mock_index); // engine::VecIndexPtr index(mock_index);
//
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index); // cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
//
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj); // cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj);
} //}
//
TEST_F(SchedulerTest, OnCopyCompleted) { //TEST_F(SchedulerTest, OnCopyCompleted) {
const uint64_t NUM = 10; // const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks; // std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>(); // TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
dummy->location_ = "location"; // dummy->location_ = "location";
//
insert_dummy_index_into_gpu_cache(1); // insert_dummy_index_into_gpu_cache(1);
//
for (uint64_t i = 0; i < NUM; ++i) { // for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy); // auto task = std::make_shared<TestTask>(dummy);
task->label() = std::make_shared<DefaultLabel>(); // task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task); // tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task); // cpu_resource_.lock()->task_table().Put(task);
} // }
//
sleep(3); // sleep(3);
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy1 = std::make_shared<meta::TableFileSchema>();
dummy1->location_ = "location";
tasks.clear();
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy1);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task);
}
sleep(3);
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); // ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
} //
//}
class SchedulerTest2 : public testing::Test { //
protected: //TEST_F(SchedulerTest, PushTaskToNeighbourRandomlyTest) {
void // const uint64_t NUM = 10;
SetUp() override { // std::vector<std::shared_ptr<TestTask>> tasks;
ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false); // TableFileSchemaPtr dummy1 = std::make_shared<meta::TableFileSchema>();
ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false); // dummy1->location_ = "location";
ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false); //
ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false); // tasks.clear();
ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true); //
ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true); // for (uint64_t i = 0; i < NUM; ++i) {
// auto task = std::make_shared<TestTask>(dummy1);
res_mgr_ = std::make_shared<ResourceMgr>(); // task->label() = std::make_shared<DefaultLabel>();
disk_ = res_mgr_->Add(std::move(disk)); // tasks.push_back(task);
cpu_0_ = res_mgr_->Add(std::move(cpu0)); // cpu_resource_.lock()->task_table().Put(task);
cpu_1_ = res_mgr_->Add(std::move(cpu1)); // }
cpu_2_ = res_mgr_->Add(std::move(cpu2)); //
gpu_0_ = res_mgr_->Add(std::move(gpu0)); // sleep(3);
gpu_1_ = res_mgr_->Add(std::move(gpu1)); //// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
auto IO = Connection("IO", 5.0); //}
auto PCIE1 = Connection("PCIE", 11.0); //
auto PCIE2 = Connection("PCIE", 20.0); //class SchedulerTest2 : public testing::Test {
res_mgr_->Connect("disk", "cpu0", IO); // protected:
res_mgr_->Connect("cpu0", "cpu1", IO); // void
res_mgr_->Connect("cpu1", "cpu2", IO); // SetUp() override {
res_mgr_->Connect("cpu0", "cpu2", IO); // ResourcePtr disk = ResourceFactory::Create("disk", "DISK", 0, true, false);
res_mgr_->Connect("cpu1", "gpu0", PCIE1); // ResourcePtr cpu0 = ResourceFactory::Create("cpu0", "CPU", 0, true, false);
res_mgr_->Connect("cpu2", "gpu1", PCIE2); // ResourcePtr cpu1 = ResourceFactory::Create("cpu1", "CPU", 1, true, false);
// ResourcePtr cpu2 = ResourceFactory::Create("cpu2", "CPU", 2, true, false);
scheduler_ = std::make_shared<Scheduler>(res_mgr_); // ResourcePtr gpu0 = ResourceFactory::Create("gpu0", "GPU", 0, true, true);
// ResourcePtr gpu1 = ResourceFactory::Create("gpu1", "GPU", 1, true, true);
res_mgr_->Start(); //
scheduler_->Start(); // res_mgr_ = std::make_shared<ResourceMgr>();
} // disk_ = res_mgr_->Add(std::move(disk));
// cpu_0_ = res_mgr_->Add(std::move(cpu0));
void // cpu_1_ = res_mgr_->Add(std::move(cpu1));
TearDown() override { // cpu_2_ = res_mgr_->Add(std::move(cpu2));
scheduler_->Stop(); // gpu_0_ = res_mgr_->Add(std::move(gpu0));
res_mgr_->Stop(); // gpu_1_ = res_mgr_->Add(std::move(gpu1));
} // auto IO = Connection("IO", 5.0);
// auto PCIE1 = Connection("PCIE", 11.0);
ResourceWPtr disk_; // auto PCIE2 = Connection("PCIE", 20.0);
ResourceWPtr cpu_0_; // res_mgr_->Connect("disk", "cpu0", IO);
ResourceWPtr cpu_1_; // res_mgr_->Connect("cpu0", "cpu1", IO);
ResourceWPtr cpu_2_; // res_mgr_->Connect("cpu1", "cpu2", IO);
ResourceWPtr gpu_0_; // res_mgr_->Connect("cpu0", "cpu2", IO);
ResourceWPtr gpu_1_; // res_mgr_->Connect("cpu1", "gpu0", PCIE1);
ResourceMgrPtr res_mgr_; // res_mgr_->Connect("cpu2", "gpu1", PCIE2);
//
std::shared_ptr<Scheduler> scheduler_; // scheduler_ = std::make_shared<Scheduler>(res_mgr_);
}; //
// res_mgr_->Start();
// scheduler_->Start();
TEST_F(SchedulerTest2, SpecifiedResourceTest) { // }
const uint64_t NUM = 10; //
std::vector<std::shared_ptr<TestTask>> tasks; // void
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>(); // TearDown() override {
dummy->location_ = "location"; // scheduler_->Stop();
// res_mgr_->Stop();
for (uint64_t i = 0; i < NUM; ++i) { // }
std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy); //
task->label() = std::make_shared<SpecResLabel>(disk_); // ResourceWPtr disk_;
tasks.push_back(task); // ResourceWPtr cpu_0_;
disk_.lock()->task_table().Put(task); // ResourceWPtr cpu_1_;
} // ResourceWPtr cpu_2_;
// ResourceWPtr gpu_0_;
// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); // ResourceWPtr gpu_1_;
} // ResourceMgrPtr res_mgr_;
//
// std::shared_ptr<Scheduler> scheduler_;
//};
//
//
//TEST_F(SchedulerTest2, SpecifiedResourceTest) {
// const uint64_t NUM = 10;
// std::vector<std::shared_ptr<TestTask>> tasks;
// TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
// dummy->location_ = "location";
//
// for (uint64_t i = 0; i < NUM; ++i) {
// std::shared_ptr<TestTask> task = std::make_shared<TestTask>(dummy);
// task->label() = std::make_shared<SpecResLabel>(disk_);
// tasks.push_back(task);
// disk_.lock()->task_table().Put(task);
// }
//
//// ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
//}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册