diff --git a/cpp/src/db/engine/ExecutionEngine.h b/cpp/src/db/engine/ExecutionEngine.h index f266c1c3e11b08a12ea8c390db6758b02bf4564e..9e63e02a2615422b52acf753764f221424e34c90 100644 --- a/cpp/src/db/engine/ExecutionEngine.h +++ b/cpp/src/db/engine/ExecutionEngine.h @@ -73,6 +73,8 @@ public: virtual EngineType IndexEngineType() const = 0; virtual MetricType IndexMetricType() const = 0; + + virtual std::string GetLocation() const = 0; }; using ExecutionEnginePtr = std::shared_ptr; diff --git a/cpp/src/db/engine/ExecutionEngineImpl.h b/cpp/src/db/engine/ExecutionEngineImpl.h index 4ccf97177ac890a02e168d4c7606214a6649bb0e..e5480322a2d27243a73cc3bab65649b156960de1 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.h +++ b/cpp/src/db/engine/ExecutionEngineImpl.h @@ -73,6 +73,8 @@ public: MetricType IndexMetricType() const override { return metric_type_; } + std::string GetLocation() const override { return location_; } + private: VecIndexPtr CreatetVecIndex(EngineType type); diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index b0af3890fc8d5b0f43743f81d50788eb9bff0408..649f840827a97a7ded46f4e9e40067b2d38a06c7 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -28,6 +28,27 @@ ResourceMgr::GetNumOfComputeResource() { return count; } +uint64_t +ResourceMgr::GetNumGpuResource() const { + uint64_t num = 0; + for (auto &res : resources_) { + if (res->Type() == ResourceType::GPU) { + num++; + } + } + return num; +} + +ResourcePtr +ResourceMgr::GetResource(ResourceType type, uint64_t device_id) { + for (auto &resource : resources_) { + if (resource->Type() == type && resource->DeviceId() == device_id) { + return resource; + } + } + return nullptr; +} + ResourceWPtr ResourceMgr::Add(ResourcePtr &&resource) { ResourceWPtr ret(resource); diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 40d1ab807b8ac60def5a5dda3bfe269c9d18dc7c..5083aa1b53f7345555b27358047fdc27cc27ff11 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -35,6 +35,12 @@ public: return disk_resources_; } + uint64_t + GetNumGpuResource() const; + + ResourcePtr + GetResource(ResourceType type, uint64_t device_id); + /* * Return account of resource which enable executor; */ diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 775cf76ba2a3695d90d728c5a77af29e45572380..a55a4566ff15be58985aa2e8c9d7f1b06b1dc30c 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -4,6 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ +#include #include "Scheduler.h" #include "Cost.h" #include "action/Action.h" @@ -116,7 +117,18 @@ Scheduler::OnCopyCompleted(const EventPtr &event) { switch (task_table_type) { case TaskLabelType::DEFAULT: { if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) { - Action::PushTaskToNeighbourRandomly(load_completed_event->task_table_item_->task, resource); + auto task = load_completed_event->task_table_item_->task; + auto search_task = std::static_pointer_cast(task); + auto location = search_task->index_engine_->GetLocation(); + + for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) { + auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location); + if (index != nullptr) { + auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i); + Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource); + } + } + } break; } diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h index 8315ecb0ffe16fa3482f524e845d9835682a259d..7c7b4c3bec10c98a8103e5306dd49e1326ab79aa 100644 --- a/cpp/src/scheduler/action/Action.h +++ b/cpp/src/scheduler/action/Action.h @@ -19,6 +19,9 @@ public: static void PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self); + + static void + PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest); }; diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index 6b2ee442674103fd10db24b29a7fcba1b94d2266..32cb9995bcbe05bd45a62a19d2a82335837b9e54 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -46,6 +46,10 @@ Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) { } } +void +Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { + dest->task_table().Put(task); +} } } diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h index 4ca39b738bacb11a340e4864b12eb75d94d29077..01822f8d9f02f331dfc8160b3f59927eef06a760 100644 --- a/cpp/src/scheduler/resource/CpuResource.h +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -21,7 +21,7 @@ public: inline std::string Dump() const override { - return ""; + return ""; } friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource); diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h index 8dcb909e2599d116f7e911a0c8ff6a705dde7a92..8c70404e8374b9f13b0bc75affc6a2671d75aec9 100644 --- a/cpp/src/scheduler/resource/DiskResource.h +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -20,7 +20,7 @@ public: inline std::string Dump() const override { - return ""; + return ""; } friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource); diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h index 5db3c005ee3a71ebb58c3ca6b4f0bc91ddf2fe05..5ef47a127ddf345c145245d8a1b843998a388d2b 100644 --- a/cpp/src/scheduler/resource/GpuResource.h +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -20,7 +20,7 @@ public: inline std::string Dump() const override { - return ""; + return ""; } friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource); diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index e55f84dea27d90117f35712fa5c0a930ed665baa..76c0b007bf2f2631ec68165757f4b5fdeff02ddb 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -93,6 +93,11 @@ public: return type_; } + inline uint64_t + DeviceId() { + return device_id_; + } + // TODO: better name? inline bool HasLoader() { @@ -172,9 +177,8 @@ private: protected: uint64_t device_id_; - -private: std::string name_; +private: ResourceType type_; TaskTable task_table_; diff --git a/cpp/src/scheduler/task/TestTask.cpp b/cpp/src/scheduler/task/TestTask.cpp index 29f2a94048e422644420364e35c851b1d8f81fa8..391fd06a4fa98703b9a7d6495b0c2a8783e8a8d7 100644 --- a/cpp/src/scheduler/task/TestTask.cpp +++ b/cpp/src/scheduler/task/TestTask.cpp @@ -4,6 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ +#include #include "TestTask.h" @@ -11,7 +12,8 @@ namespace zilliz { namespace milvus { namespace engine { -TestTask::TestTask() : Task(TaskType::TestTask) {} + +TestTask::TestTask(TableFileSchemaPtr& file) : XSearchTask(file) {} void TestTask::Load(LoadType type, uint8_t device_id) { @@ -27,7 +29,8 @@ TestTask::Execute() { TaskPtr TestTask::Clone() { - auto ret = std::make_shared(); + TableFileSchemaPtr dummy = nullptr; + auto ret = std::make_shared(dummy); ret->load_count_ = load_count_; ret->exec_count_ = exec_count_; return ret; diff --git a/cpp/src/scheduler/task/TestTask.h b/cpp/src/scheduler/task/TestTask.h index 5a466ec3394db3ac546251dbf400331dc2e0a7b3..dce50e9fcb558018dc01fdd7fe883f9c34dc9d0d 100644 --- a/cpp/src/scheduler/task/TestTask.h +++ b/cpp/src/scheduler/task/TestTask.h @@ -5,16 +5,16 @@ ******************************************************************************/ #pragma once -#include "Task.h" +#include "SearchTask.h" namespace zilliz { namespace milvus { namespace engine { -class TestTask : public Task { +class TestTask : public XSearchTask { public: - TestTask(); + TestTask(TableFileSchemaPtr& file); public: void diff --git a/cpp/unittest/scheduler/cost_test.cpp b/cpp/unittest/scheduler/cost_test.cpp index c53331d0f9dd4a96485d0901a7bb13238a109c7f..1a625d786ef5ed6ea2069b0690ea5ca056f942a6 100644 --- a/cpp/unittest/scheduler/cost_test.cpp +++ b/cpp/unittest/scheduler/cost_test.cpp @@ -10,8 +10,9 @@ class CostTest : public ::testing::Test { protected: void SetUp() override { + TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < 8; ++i) { - auto task = std::make_shared(); + auto task = std::make_shared(dummy); table_.Put(task); } table_.Get(0)->state = TaskTableItemState::INVALID; diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index e3e791c02c147a049cd51c876d52b0a0b2f3257a..576ed3ee2ab58db9e8684f20de74d0b6990a16e8 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -14,7 +14,7 @@ TEST(normal_test, test1) { // auto res_mgr = std::make_shared(); auto res_mgr = ResMgrInst::GetInstance(); auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false)); - auto cpu = res_mgr->Add(ResourceFactory::Create("cpu")); + auto cpu = res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0)); auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false)); auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false)); @@ -32,9 +32,11 @@ TEST(normal_test, test1) { const uint64_t NUM_TASK = 1000; std::vector> tasks; + TableFileSchemaPtr dummy = nullptr; + for (uint64_t i = 0; i < NUM_TASK; ++i) { if (auto observe = disk.lock()) { - auto task = std::make_shared(); + auto task = std::make_shared(dummy); tasks.push_back(task); observe->task_table().Put(task); } diff --git a/cpp/unittest/scheduler/resource_factory_test.cpp b/cpp/unittest/scheduler/resource_factory_test.cpp index 829fb46cdab2f10ba38ed0df1a0f2f04f3e39b43..fdd3a4d263204b60e9fe169c17a5f41e03a3579a 100644 --- a/cpp/unittest/scheduler/resource_factory_test.cpp +++ b/cpp/unittest/scheduler/resource_factory_test.cpp @@ -5,9 +5,9 @@ using namespace zilliz::milvus::engine; TEST(resource_factory_test, create) { - auto disk = ResourceFactory::Create("disk"); - auto cpu = ResourceFactory::Create("cpu"); - auto gpu = ResourceFactory::Create("gpu"); + auto disk = ResourceFactory::Create("ssd", "DISK", 0); + auto cpu = ResourceFactory::Create("cpu", "CPU", 0); + auto gpu = ResourceFactory::Create("gpu", "GPU", 0); ASSERT_TRUE(std::dynamic_pointer_cast(disk)); ASSERT_TRUE(std::dynamic_pointer_cast(cpu)); diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp index 2f7d58eb57d018dce2d0a0d2ba50bc39fe911b13..fd6017faddf26516fb488de2b199c27bf9a2ca7f 100644 --- a/cpp/unittest/scheduler/resource_test.cpp +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -22,9 +22,9 @@ class ResourceTest : public testing::Test { protected: void SetUp() override { - disk_resource_ = ResourceFactory::Create("disk"); - cpu_resource_ = ResourceFactory::Create("cpu"); - gpu_resource_ = ResourceFactory::Create("gpu"); + disk_resource_ = ResourceFactory::Create("ssd", "DISK", 0); + cpu_resource_ = ResourceFactory::Create("cpu", "CPU", 0); + gpu_resource_ = ResourceFactory::Create("gpu", "GPU", 0); resources_.push_back(disk_resource_); resources_.push_back(cpu_resource_); resources_.push_back(gpu_resource_); @@ -85,8 +85,9 @@ protected: TEST_F(ResourceTest, cpu_resource_test) { const uint64_t NUM = 100; std::vector> tasks; + TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { - auto task = std::make_shared(); + auto task = std::make_shared(dummy); tasks.push_back(task); cpu_resource_->task_table().Put(task); } @@ -113,8 +114,9 @@ TEST_F(ResourceTest, cpu_resource_test) { TEST_F(ResourceTest, gpu_resource_test) { const uint64_t NUM = 100; std::vector> tasks; + TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < NUM; ++i) { - auto task = std::make_shared(); + auto task = std::make_shared(dummy); tasks.push_back(task); gpu_resource_->task_table().Put(task); } diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp index 787ae59329c41cf69a99edc01e7c5dc51ea155b3..5335dc8de61b232f0dc57397c8c287d6f1586799 100644 --- a/cpp/unittest/scheduler/scheduler_test.cpp +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -3,14 +3,159 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ - +#include "scheduler/Scheduler.h" #include - +#include +#include "cache/DataObj.h" +#include "cache/GpuCacheMgr.h" +#include "scheduler/task/TestTask.h" +#include "scheduler/ResourceFactory.h" +#include "scheduler/resource/Resource.h" +#include "utils/Error.h" +#include "wrapper/knowhere/vec_index.h" namespace zilliz { namespace milvus { namespace engine { +class MockVecIndex : public engine::VecIndex { +public: + virtual server::KnowhereError BuildAll(const long &nb, + const float *xb, + const long *ids, + const engine::Config &cfg, + const long &nt = 0, + const float *xt = nullptr) { + + } + + engine::VecIndexPtr Clone() override { + return zilliz::milvus::engine::VecIndexPtr(); + } + + int64_t GetDeviceId() override { + return 0; + } + + engine::IndexType GetType() override { + return engine::IndexType::INVALID; + } + + virtual server::KnowhereError Add(const long &nb, + const float *xb, + const long *ids, + const engine::Config &cfg = engine::Config()) { + + } + + virtual server::KnowhereError Search(const long &nq, + const float *xq, + float *dist, + long *ids, + const engine::Config &cfg = engine::Config()) { + + } + + engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override { + + } + + engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override { + + } + + virtual int64_t Dimension() { + return dimension_; + } + + virtual int64_t Count() { + return ntotal_; + } + + virtual zilliz::knowhere::BinarySet Serialize() { + zilliz::knowhere::BinarySet binset; + return binset; + } + + virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) { + + } + +public: + int64_t dimension_ = 512; + int64_t ntotal_ = 0; +}; + + +class SchedulerTest : public testing::Test { +protected: + void + SetUp() override { + ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false); + ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0); + ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1); + + res_mgr_ = std::make_shared(); + cpu_resource_ = res_mgr_->Add(std::move(cpu)); + gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0)); + gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1)); + + auto PCIE = Connection("IO", 11000.0); + res_mgr_->Connect("cpu", "gpu0", PCIE); + res_mgr_->Connect("cpu", "gpu1", PCIE); + + scheduler_ = std::make_shared(res_mgr_); + + res_mgr_->Start(); + scheduler_->Start(); + } + + void + TearDown() override { + scheduler_->Stop(); + res_mgr_->Stop(); + } + + ResourceWPtr cpu_resource_; + ResourceWPtr gpu_resource_0_; + ResourceWPtr gpu_resource_1_; + + ResourceMgrPtr res_mgr_; + std::shared_ptr scheduler_; + uint64_t load_count_ = 0; + std::mutex load_mutex_; + std::condition_variable cv_; +}; + +void +insert_dummy_index_into_gpu_cache(uint64_t device_id) { + MockVecIndex* mock_index = new MockVecIndex(); + mock_index->ntotal_ = 1000; + engine::VecIndexPtr index(mock_index); + + cache::DataObjPtr obj = std::make_shared(index); + + cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj); +} + +TEST_F(SchedulerTest, OnCopyCompleted) { + const uint64_t NUM = 10; + std::vector> tasks; + TableFileSchemaPtr dummy = std::make_shared(); + dummy->location_ = "location"; + + insert_dummy_index_into_gpu_cache(1); + + for (uint64_t i = 0; i < NUM; ++i) { + auto task = std::make_shared(dummy); + task->label() = std::make_shared(); + tasks.push_back(task); + cpu_resource_.lock()->task_table().Put(task); + } + + sleep(3); + ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM); +} } } diff --git a/cpp/unittest/scheduler/tasktable_test.cpp b/cpp/unittest/scheduler/tasktable_test.cpp index 2d9108cd68f84c30364b34938f28c027b4e393f3..5a1094b0ad2cb24bb4c3bcb412d99915dbb343ef 100644 --- a/cpp/unittest/scheduler/tasktable_test.cpp +++ b/cpp/unittest/scheduler/tasktable_test.cpp @@ -43,9 +43,10 @@ class TaskTableBaseTest : public ::testing::Test { protected: void SetUp() override { + TableFileSchemaPtr dummy = nullptr; invalid_task_ = nullptr; - task1_ = std::make_shared(); - task2_ = std::make_shared(); + task1_ = std::make_shared(dummy); + task2_ = std::make_shared(dummy); } TaskPtr invalid_task_; @@ -83,8 +84,9 @@ class TaskTableAdvanceTest : public ::testing::Test { protected: void SetUp() override { + TableFileSchemaPtr dummy = nullptr; for (uint64_t i = 0; i < 8; ++i) { - auto task = std::make_shared(); + auto task = std::make_shared(dummy); table1_.Put(task); }