提交 cd765154 编写于 作者: Y Yu Kun

MS-428 Add PushTaskByDataLocality in scheduler


Former-commit-id: 3f4728181c2449d8c0705ca720a65d81d610f0d2
上级 9f0f7dcf
......@@ -73,6 +73,8 @@ public:
virtual EngineType IndexEngineType() const = 0;
virtual MetricType IndexMetricType() const = 0;
virtual std::string GetLocation() const = 0;
};
using ExecutionEnginePtr = std::shared_ptr<ExecutionEngine>;
......
......@@ -73,6 +73,8 @@ public:
MetricType IndexMetricType() const override { return metric_type_; }
std::string GetLocation() const override { return location_; }
private:
VecIndexPtr CreatetVecIndex(EngineType type);
......
......@@ -28,6 +28,27 @@ ResourceMgr::GetNumOfComputeResource() {
return count;
}
uint64_t
ResourceMgr::GetNumGpuResource() const {
uint64_t num = 0;
for (auto &res : resources_) {
if (res->Type() == ResourceType::GPU) {
num++;
}
}
return num;
}
ResourcePtr
ResourceMgr::GetResource(ResourceType type, uint64_t device_id) {
for (auto &resource : resources_) {
if (resource->Type() == type && resource->DeviceId() == device_id) {
return resource;
}
}
return nullptr;
}
ResourceWPtr
ResourceMgr::Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource);
......
......@@ -35,6 +35,12 @@ public:
return disk_resources_;
}
uint64_t
GetNumGpuResource() const;
ResourcePtr
GetResource(ResourceType type, uint64_t device_id);
/*
* Return account of resource which enable executor;
*/
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <src/cache/GpuCacheMgr.h>
#include "Scheduler.h"
#include "Cost.h"
#include "action/Action.h"
......@@ -116,7 +117,18 @@ Scheduler::OnCopyCompleted(const EventPtr &event) {
switch (task_table_type) {
case TaskLabelType::DEFAULT: {
if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
Action::PushTaskToNeighbourRandomly(load_completed_event->task_table_item_->task, resource);
auto task = load_completed_event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
auto location = search_task->index_engine_->GetLocation();
for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
}
}
}
break;
}
......
......@@ -19,6 +19,9 @@ public:
static void
PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self);
static void
PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest);
};
......
......@@ -46,6 +46,10 @@ Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) {
}
}
void
Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
dest->task_table().Put(task);
}
}
}
......
......@@ -21,7 +21,7 @@ public:
inline std::string
Dump() const override {
return "<CpuResource>";
return "<CpuResource, name=" + name_ + ">";
}
friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource);
......
......@@ -20,7 +20,7 @@ public:
inline std::string
Dump() const override {
return "<DiskResource>";
return "<DiskResource, name=" + name_ + ">";
}
friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource);
......
......@@ -20,7 +20,7 @@ public:
inline std::string
Dump() const override {
return "<GpuResource>";
return "<GpuResource, name=" + name_ + ">";
}
friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource);
......
......@@ -93,6 +93,11 @@ public:
return type_;
}
inline uint64_t
DeviceId() {
return device_id_;
}
// TODO: better name?
inline bool
HasLoader() {
......@@ -172,9 +177,8 @@ private:
protected:
uint64_t device_id_;
private:
std::string name_;
private:
ResourceType type_;
TaskTable task_table_;
......
......@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <src/cache/GpuCacheMgr.h>
#include "TestTask.h"
......@@ -11,7 +12,8 @@ namespace zilliz {
namespace milvus {
namespace engine {
TestTask::TestTask() : Task(TaskType::TestTask) {}
TestTask::TestTask(TableFileSchemaPtr& file) : XSearchTask(file) {}
void
TestTask::Load(LoadType type, uint8_t device_id) {
......@@ -27,7 +29,8 @@ TestTask::Execute() {
TaskPtr
TestTask::Clone() {
auto ret = std::make_shared<TestTask>();
TableFileSchemaPtr dummy = nullptr;
auto ret = std::make_shared<TestTask>(dummy);
ret->load_count_ = load_count_;
ret->exec_count_ = exec_count_;
return ret;
......
......@@ -5,16 +5,16 @@
******************************************************************************/
#pragma once
#include "Task.h"
#include "SearchTask.h"
namespace zilliz {
namespace milvus {
namespace engine {
class TestTask : public Task {
class TestTask : public XSearchTask {
public:
TestTask();
TestTask(TableFileSchemaPtr& file);
public:
void
......
......@@ -10,8 +10,9 @@ class CostTest : public ::testing::Test {
protected:
void
SetUp() override {
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<TestTask>();
auto task = std::make_shared<TestTask>(dummy);
table_.Put(task);
}
table_.Get(0)->state = TaskTableItemState::INVALID;
......
......@@ -14,7 +14,7 @@ TEST(normal_test, test1) {
// auto res_mgr = std::make_shared<ResourceMgr>();
auto res_mgr = ResMgrInst::GetInstance();
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false));
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu"));
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0));
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false));
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false));
......@@ -32,9 +32,11 @@ TEST(normal_test, test1) {
const uint64_t NUM_TASK = 1000;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM_TASK; ++i) {
if (auto observe = disk.lock()) {
auto task = std::make_shared<TestTask>();
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
observe->task_table().Put(task);
}
......
......@@ -5,9 +5,9 @@
using namespace zilliz::milvus::engine;
TEST(resource_factory_test, create) {
auto disk = ResourceFactory::Create("disk");
auto cpu = ResourceFactory::Create("cpu");
auto gpu = ResourceFactory::Create("gpu");
auto disk = ResourceFactory::Create("ssd", "DISK", 0);
auto cpu = ResourceFactory::Create("cpu", "CPU", 0);
auto gpu = ResourceFactory::Create("gpu", "GPU", 0);
ASSERT_TRUE(std::dynamic_pointer_cast<DiskResource>(disk));
ASSERT_TRUE(std::dynamic_pointer_cast<CpuResource>(cpu));
......
......@@ -22,9 +22,9 @@ class ResourceTest : public testing::Test {
protected:
void
SetUp() override {
disk_resource_ = ResourceFactory::Create("disk");
cpu_resource_ = ResourceFactory::Create("cpu");
gpu_resource_ = ResourceFactory::Create("gpu");
disk_resource_ = ResourceFactory::Create("ssd", "DISK", 0);
cpu_resource_ = ResourceFactory::Create("cpu", "CPU", 0);
gpu_resource_ = ResourceFactory::Create("gpu", "GPU", 0);
resources_.push_back(disk_resource_);
resources_.push_back(cpu_resource_);
resources_.push_back(gpu_resource_);
......@@ -85,8 +85,9 @@ protected:
TEST_F(ResourceTest, cpu_resource_test) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>();
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
cpu_resource_->task_table().Put(task);
}
......@@ -113,8 +114,9 @@ TEST_F(ResourceTest, cpu_resource_test) {
TEST_F(ResourceTest, gpu_resource_test) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>();
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
gpu_resource_->task_table().Put(task);
}
......
......@@ -3,14 +3,159 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "scheduler/Scheduler.h"
#include <gtest/gtest.h>
#include <src/scheduler/tasklabel/DefaultLabel.h>
#include "cache/DataObj.h"
#include "cache/GpuCacheMgr.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/ResourceFactory.h"
#include "scheduler/resource/Resource.h"
#include "utils/Error.h"
#include "wrapper/knowhere/vec_index.h"
namespace zilliz {
namespace milvus {
namespace engine {
class MockVecIndex : public engine::VecIndex {
public:
virtual server::KnowhereError BuildAll(const long &nb,
const float *xb,
const long *ids,
const engine::Config &cfg,
const long &nt = 0,
const float *xt = nullptr) {
}
engine::VecIndexPtr Clone() override {
return zilliz::milvus::engine::VecIndexPtr();
}
int64_t GetDeviceId() override {
return 0;
}
engine::IndexType GetType() override {
return engine::IndexType::INVALID;
}
virtual server::KnowhereError Add(const long &nb,
const float *xb,
const long *ids,
const engine::Config &cfg = engine::Config()) {
}
virtual server::KnowhereError Search(const long &nq,
const float *xq,
float *dist,
long *ids,
const engine::Config &cfg = engine::Config()) {
}
engine::VecIndexPtr CopyToGpu(const int64_t &device_id, const engine::Config &cfg) override {
}
engine::VecIndexPtr CopyToCpu(const engine::Config &cfg) override {
}
virtual int64_t Dimension() {
return dimension_;
}
virtual int64_t Count() {
return ntotal_;
}
virtual zilliz::knowhere::BinarySet Serialize() {
zilliz::knowhere::BinarySet binset;
return binset;
}
virtual server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) {
}
public:
int64_t dimension_ = 512;
int64_t ntotal_ = 0;
};
class SchedulerTest : public testing::Test {
protected:
void
SetUp() override {
ResourcePtr cpu = ResourceFactory::Create("cpu", "CPU", 0, true, false);
ResourcePtr gpu_0 = ResourceFactory::Create("gpu0", "GPU", 0);
ResourcePtr gpu_1 = ResourceFactory::Create("gpu1", "GPU", 1);
res_mgr_ = std::make_shared<ResourceMgr>();
cpu_resource_ = res_mgr_->Add(std::move(cpu));
gpu_resource_0_ = res_mgr_->Add(std::move(gpu_0));
gpu_resource_1_ = res_mgr_->Add(std::move(gpu_1));
auto PCIE = Connection("IO", 11000.0);
res_mgr_->Connect("cpu", "gpu0", PCIE);
res_mgr_->Connect("cpu", "gpu1", PCIE);
scheduler_ = std::make_shared<Scheduler>(res_mgr_);
res_mgr_->Start();
scheduler_->Start();
}
void
TearDown() override {
scheduler_->Stop();
res_mgr_->Stop();
}
ResourceWPtr cpu_resource_;
ResourceWPtr gpu_resource_0_;
ResourceWPtr gpu_resource_1_;
ResourceMgrPtr res_mgr_;
std::shared_ptr<Scheduler> scheduler_;
uint64_t load_count_ = 0;
std::mutex load_mutex_;
std::condition_variable cv_;
};
void
insert_dummy_index_into_gpu_cache(uint64_t device_id) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000;
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location",obj);
}
TEST_F(SchedulerTest, OnCopyCompleted) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
dummy->location_ = "location";
insert_dummy_index_into_gpu_cache(1);
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task);
cpu_resource_.lock()->task_table().Put(task);
}
sleep(3);
ASSERT_EQ(res_mgr_->GetResource(ResourceType::GPU, 1)->task_table().Size(), NUM);
}
}
}
......
......@@ -43,9 +43,10 @@ class TaskTableBaseTest : public ::testing::Test {
protected:
void
SetUp() override {
TableFileSchemaPtr dummy = nullptr;
invalid_task_ = nullptr;
task1_ = std::make_shared<TestTask>();
task2_ = std::make_shared<TestTask>();
task1_ = std::make_shared<TestTask>(dummy);
task2_ = std::make_shared<TestTask>(dummy);
}
TaskPtr invalid_task_;
......@@ -83,8 +84,9 @@ class TaskTableAdvanceTest : public ::testing::Test {
protected:
void
SetUp() override {
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<TestTask>();
auto task = std::make_shared<TestTask>(dummy);
table1_.Put(task);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册