提交 4be665ee 编写于 作者: J jinhai

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-428 Add PushTaskByDataLocality in scheduler

See merge request megasearch/milvus!442

Former-commit-id: 74ecfc738facdc85c5d02c43b3706c40b9b29e64
...@@ -60,6 +60,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -60,6 +60,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-419 - Move index_file_size from IndexParam to TableSchema - MS-419 - Move index_file_size from IndexParam to TableSchema
- MS-421 - Add TaskLabel in scheduler - MS-421 - Add TaskLabel in scheduler
- MS-422 - Support DeleteTask in Multi-GpuResource case - MS-422 - Support DeleteTask in Multi-GpuResource case
- MS-428 - Add PushTaskByDataLocality in scheduler
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr
......
...@@ -73,6 +73,8 @@ public: ...@@ -73,6 +73,8 @@ public:
virtual EngineType IndexEngineType() const = 0; virtual EngineType IndexEngineType() const = 0;
virtual MetricType IndexMetricType() const = 0; virtual MetricType IndexMetricType() const = 0;
virtual std::string GetLocation() const = 0;
}; };
using ExecutionEnginePtr = std::shared_ptr<ExecutionEngine>; using ExecutionEnginePtr = std::shared_ptr<ExecutionEngine>;
......
...@@ -73,6 +73,8 @@ public: ...@@ -73,6 +73,8 @@ public:
MetricType IndexMetricType() const override { return metric_type_; } MetricType IndexMetricType() const override { return metric_type_; }
std::string GetLocation() const override { return location_; }
private: private:
VecIndexPtr CreatetVecIndex(EngineType type); VecIndexPtr CreatetVecIndex(EngineType type);
......
...@@ -28,6 +28,27 @@ ResourceMgr::GetNumOfComputeResource() { ...@@ -28,6 +28,27 @@ ResourceMgr::GetNumOfComputeResource() {
return count; return count;
} }
uint64_t
ResourceMgr::GetNumGpuResource() const {
uint64_t num = 0;
for (auto &res : resources_) {
if (res->Type() == ResourceType::GPU) {
num++;
}
}
return num;
}
ResourcePtr
ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
for (auto &resource : resources_) {
if (resource->Type() == type && resource->DeviceId() == device_id) {
return resource;
}
}
return nullptr;
}
ResourceWPtr ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) { ResourceMgr::Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource); ResourceWPtr ret(resource);
......
...@@ -35,6 +35,12 @@ public: ...@@ -35,6 +35,12 @@ public:
return disk_resources_; return disk_resources_;
} }
uint64_t
GetNumGpuResource() const;
ResourcePtr
GetResource(ResourceType type, uint64_t device_id);
/* /*
* Return account of resource which enable executor; * Return account of resource which enable executor;
*/ */
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
* Proprietary and confidential. * Proprietary and confidential.
******************************************************************************/ ******************************************************************************/
#include <src/cache/GpuCacheMgr.h>
#include "Scheduler.h" #include "Scheduler.h"
#include "Cost.h" #include "Cost.h"
#include "action/Action.h" #include "action/Action.h"
...@@ -116,7 +117,18 @@ Scheduler::OnCopyCompleted(const EventPtr &event) { ...@@ -116,7 +117,18 @@ Scheduler::OnCopyCompleted(const EventPtr &event) {
switch (task_table_type) { switch (task_table_type) {
case TaskLabelType::DEFAULT: { case TaskLabelType::DEFAULT: {
if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) { if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
Action::PushTaskToNeighbourRandomly(load_completed_event->task_table_item_->task, resource); auto task = load_completed_event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
auto location = search_task->index_engine_->GetLocation();
for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
}
}
} }
break; break;
} }
......
...@@ -19,6 +19,9 @@ public: ...@@ -19,6 +19,9 @@ public:
static void static void
PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self); PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self);
static void
PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest);
}; };
......
...@@ -46,6 +46,10 @@ Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) { ...@@ -46,6 +46,10 @@ Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) {
} }
} }
void
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
dest->task_table().Put(task);
}
} }
} }
......
...@@ -21,7 +21,7 @@ public: ...@@ -21,7 +21,7 @@ public:
inline std::string inline std::string
Dump() const override { Dump() const override {
return "<CpuResource>"; return "<CpuResource, name=" + name_ + ">";
} }
friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource); friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource);
......
...@@ -20,7 +20,7 @@ public: ...@@ -20,7 +20,7 @@ public:
inline std::string inline std::string
Dump() const override { Dump() const override {
return "<DiskResource>"; return "<DiskResource, name=" + name_ + ">";
} }
friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource); friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource);
......
...@@ -20,7 +20,7 @@ public: ...@@ -20,7 +20,7 @@ public:
inline std::string inline std::string
Dump() const override { Dump() const override {
return "<GpuResource>"; return "<GpuResource, name=" + name_ + ">";
} }
friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource); friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource);
......
...@@ -93,6 +93,11 @@ public: ...@@ -93,6 +93,11 @@ public:
return type_; return type_;
} }
inline uint64_t
DeviceId() {
return device_id_;
}
// TODO: better name? // TODO: better name?
inline bool inline bool
HasLoader() { HasLoader() {
...@@ -172,9 +177,8 @@ private: ...@@ -172,9 +177,8 @@ private:
protected: protected:
uint64_t device_id_; uint64_t device_id_;
private:
std::string name_; std::string name_;
private:
ResourceType type_; ResourceType type_;
TaskTable task_table_; TaskTable task_table_;
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
* Proprietary and confidential. * Proprietary and confidential.
******************************************************************************/ ******************************************************************************/
#include <src/cache/GpuCacheMgr.h>
#include "TestTask.h" #include "TestTask.h"
...@@ -11,7 +12,8 @@ namespace zilliz { ...@@ -11,7 +12,8 @@ namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
TestTask::TestTask() : Task(TaskType::TestTask) {}
TestTask::TestTask(TableFileSchemaPtr& file) : XSearchTask(file) {}
void void
TestTask::Load(LoadType type, uint8_t device_id) { TestTask::Load(LoadType type, uint8_t device_id) {
...@@ -27,7 +29,8 @@ TestTask::Execute() { ...@@ -27,7 +29,8 @@ TestTask::Execute() {
TaskPtr TaskPtr
TestTask::Clone() { TestTask::Clone() {
auto ret = std::make_shared<TestTask>(); TableFileSchemaPtr dummy = nullptr;
auto ret = std::make_shared<TestTask>(dummy);
ret->load_count_ = load_count_; ret->load_count_ = load_count_;
ret->exec_count_ = exec_count_; ret->exec_count_ = exec_count_;
return ret; return ret;
......
...@@ -5,16 +5,16 @@ ...@@ -5,16 +5,16 @@
******************************************************************************/ ******************************************************************************/
#pragma once #pragma once
#include "Task.h" #include "SearchTask.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
class TestTask : public Task { class TestTask : public XSearchTask {
public: public:
TestTask(); TestTask(TableFileSchemaPtr& file);
public: public:
void void
......
...@@ -10,8 +10,9 @@ class CostTest : public ::testing::Test { ...@@ -10,8 +10,9 @@ class CostTest : public ::testing::Test {
protected: protected:
void void
SetUp() override { SetUp() override {
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) { for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<TestTask>(); auto task = std::make_shared<TestTask>(dummy);
table_.Put(task); table_.Put(task);
} }
table_.Get(0)->state = TaskTableItemState::INVALID; table_.Get(0)->state = TaskTableItemState::INVALID;
......
...@@ -14,7 +14,7 @@ TEST(normal_test, test1) { ...@@ -14,7 +14,7 @@ TEST(normal_test, test1) {
// auto res_mgr = std::make_shared<ResourceMgr>(); // auto res_mgr = std::make_shared<ResourceMgr>();
auto res_mgr = ResMgrInst::GetInstance(); auto res_mgr = ResMgrInst::GetInstance();
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false)); auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false));
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu")); auto cpu = res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0));
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false)); auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false));
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false)); auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false));
...@@ -32,9 +32,11 @@ TEST(normal_test, test1) { ...@@ -32,9 +32,11 @@ TEST(normal_test, test1) {
const uint64_t NUM_TASK = 1000; const uint64_t NUM_TASK = 1000;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM_TASK; ++i) { for (uint64_t i = 0; i < NUM_TASK; ++i) {
if (auto observe = disk.lock()) { if (auto observe = disk.lock()) {
auto task = std::make_shared<TestTask>(); auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task); tasks.push_back(task);
observe->task_table().Put(task); observe->task_table().Put(task);
} }
......
...@@ -5,9 +5,9 @@ ...@@ -5,9 +5,9 @@
using namespace zilliz::milvus::engine; using namespace zilliz::milvus::engine;
TEST(resource_factory_test, create) { TEST(resource_factory_test, create) {
auto disk = ResourceFactory::Create("disk"); auto disk = ResourceFactory::Create("ssd", "DISK", 0);
auto cpu = ResourceFactory::Create("cpu"); auto cpu = ResourceFactory::Create("cpu", "CPU", 0);
auto gpu = ResourceFactory::Create("gpu"); auto gpu = ResourceFactory::Create("gpu", "GPU", 0);
ASSERT_TRUE(std::dynamic_pointer_cast<DiskResource>(disk)); ASSERT_TRUE(std::dynamic_pointer_cast<DiskResource>(disk));
ASSERT_TRUE(std::dynamic_pointer_cast<CpuResource>(cpu)); ASSERT_TRUE(std::dynamic_pointer_cast<CpuResource>(cpu));
......
...@@ -22,9 +22,9 @@ class ResourceTest : public testing::Test { ...@@ -22,9 +22,9 @@ class ResourceTest : public testing::Test {
protected: protected:
void void
SetUp() override { SetUp() override {
disk_resource_ = ResourceFactory::Create("disk"); disk_resource_ = ResourceFactory::Create("ssd", "DISK", 0);
cpu_resource_ = ResourceFactory::Create("cpu"); cpu_resource_ = ResourceFactory::Create("cpu", "CPU", 0);
gpu_resource_ = ResourceFactory::Create("gpu"); gpu_resource_ = ResourceFactory::Create("gpu", "GPU", 0);
resources_.push_back(disk_resource_); resources_.push_back(disk_resource_);
resources_.push_back(cpu_resource_); resources_.push_back(cpu_resource_);
resources_.push_back(gpu_resource_); resources_.push_back(gpu_resource_);
...@@ -85,8 +85,9 @@ protected: ...@@ -85,8 +85,9 @@ protected:
TEST_F(ResourceTest, cpu_resource_test) { TEST_F(ResourceTest, cpu_resource_test) {
const uint64_t NUM = 100; const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(); auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task); tasks.push_back(task);
cpu_resource_->task_table().Put(task); cpu_resource_->task_table().Put(task);
} }
...@@ -113,8 +114,9 @@ TEST_F(ResourceTest, cpu_resource_test) { ...@@ -113,8 +114,9 @@ TEST_F(ResourceTest, cpu_resource_test) {
TEST_F(ResourceTest, gpu_resource_test) { TEST_F(ResourceTest, gpu_resource_test) {
const uint64_t NUM = 100; const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) { for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(); auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task); tasks.push_back(task);
gpu_resource_->task_table().Put(task); gpu_resource_->task_table().Put(task);
} }
......
...@@ -3,14 +3,159 @@ ...@@ -3,14 +3,159 @@
* Unauthorized copying of this file, via any medium is strictly prohibited. * Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential. * Proprietary and confidential.
******************************************************************************/ ******************************************************************************/
#include "scheduler/Scheduler.h"
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <src/scheduler/tasklabel/DefaultLabel.h>
#include "cache/DataObj.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/ResourceFactory.h"
#include "scheduler/resource/Resource.h"
#include "utils/Error.h"
#include "wrapper/knowhere/vec_index.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
class MockVecIndex : public engine::VecIndex {
public:
virtual server::KnowhereError BuildAll(const long &nb,
const float *xb,
const long *ids,
const engine::Config &cfg,
const long &nt = 0,
const float *xt = nullptr) {
}
engine::VecIndexPtr Clone() override {
return zilliz::milvus::engine::VecIndexPtr();
}
int64_t GetDeviceId() override {
return 0;
}
engine::IndexType GetType() override {
return engine::IndexType::INVALID;
}
virtual server::KnowhereError Add(const long &nb,
const float *xb,
const long *ids,
const engine::Config &cfg = engine::Config()) {
}
virtual server::KnowhereError Search(const long &nq,
const float *xq,
float *dist,
long *ids,
const engine::Config &cfg = engine::Config()) {
}
engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override {
}
engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override {
}
virtual int64_t Dimension() {
return dimension_;
}
virtual int64_t Count() {
return ntotal_;
}
virtual zilliz::knowhere::BinarySet Serialize() {
zilliz::knowhere::BinarySet binset;
return binset;
}
virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) {
}
public:
int64_t dimension_ = 512;
int64_t ntotal_ = 0;
};
class SchedulerTest : public testing::Test {
protected:
void
SetUp() override {
ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
res_mgr_ = std::make_shared<ResourceMgr>();
cpu_resource_ = res_mgr_->Add(std::move(cpu));
gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0));
gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1));
auto PCIE = Connection("IO", 11000.0);
res_mgr_->Connect("cpu", "gpu0", PCIE);
res_mgr_->Connect("cpu", "gpu1", PCIE);
scheduler_ = std::make_shared<Scheduler>(res_mgr_);
res_mgr_->Start();
scheduler_->Start();
}
void
TearDown() override {
scheduler_->Stop();
res_mgr_->Stop();
}
ResourceWPtr cpu_resource_;
ResourceWPtr gpu_resource_0_;
ResourceWPtr gpu_resource_1_;
ResourceMgrPtr res_mgr_;
std::shared_ptr<Scheduler> scheduler_;
uint64_t load_count_ = 0;
std::mutex load_mutex_;
std::condition_variable cv_;
};
void
insert_dummy_index_into_gpu_cache(uint64_t device_id) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000;
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj);
}
TEST_F(SchedulerTest, OnCopyCompleted) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
dummy->location_ = "location";
insert_dummy_index_into_gpu_cache(1);
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task);
}
sleep(3);
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
} }
} }
......
...@@ -43,9 +43,10 @@ class TaskTableBaseTest : public ::testing::Test { ...@@ -43,9 +43,10 @@ class TaskTableBaseTest : public ::testing::Test {
protected: protected:
void void
SetUp() override { SetUp() override {
TableFileSchemaPtr dummy = nullptr;
invalid_task_ = nullptr; invalid_task_ = nullptr;
task1_ = std::make_shared<TestTask>(); task1_ = std::make_shared<TestTask>(dummy);
task2_ = std::make_shared<TestTask>(); task2_ = std::make_shared<TestTask>(dummy);
} }
TaskPtr invalid_task_; TaskPtr invalid_task_;
...@@ -83,8 +84,9 @@ class TaskTableAdvanceTest : public ::testing::Test { ...@@ -83,8 +84,9 @@ class TaskTableAdvanceTest : public ::testing::Test {
protected: protected:
void void
SetUp() override { SetUp() override {
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) { for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<TestTask>(); auto task = std::make_shared<TestTask>(dummy);
table1_.Put(task); table1_.Put(task);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册