提交 f31ae642 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-394 Update scheduler unittest

See merge request megasearch/milvus!397

Former-commit-id: 2ad26a7acb68a6928150131526ed78ed2e0d373e
......@@ -34,6 +34,10 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-380 - Update resource loader and executor, work util all finished
- MS-383 - Modify condition variable usage in scheduler
- MS-384 - Add global instance of ResourceMgr and Scheduler
- MS-389 - Add clone interface in Task
- MS-390 - Update resource construct function
- MS-391 - Add PushTaskToNeighbourHasExecutor action
- MS-394 - Update scheduler unittest
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -12,13 +12,16 @@ namespace milvus {
namespace engine {
std::shared_ptr<Resource>
ResourceFactory::Create(const std::string &name, const std::string &alias) {
ResourceFactory::Create(const std::string &name,
const std::string &alias,
bool enable_loader,
bool enable_executor) {
if (name == "disk") {
return std::make_shared<DiskResource>(alias);
return std::make_shared<DiskResource>(alias, enable_loader, enable_executor);
} else if (name == "cpu") {
return std::make_shared<CpuResource>(alias);
return std::make_shared<CpuResource>(alias, enable_loader, enable_executor);
} else if (name == "gpu") {
return std::make_shared<GpuResource>(alias);
return std::make_shared<GpuResource>(alias, enable_loader, enable_executor);
} else {
return nullptr;
}
......
......@@ -21,7 +21,10 @@ namespace engine {
class ResourceFactory {
public:
static std::shared_ptr<Resource>
Create(const std::string &name, const std::string &alias = "");
Create(const std::string &name,
const std::string &alias = "",
bool enable_loader = true,
bool enable_executor = true);
};
......
......@@ -114,13 +114,14 @@ Scheduler::OnCopyCompleted(const EventPtr &event) {
resource->WakeupExecutor();
if (resource->Type() == ResourceType::DISK) {
Action::PushTaskToNeighbour(event->resource_);
} else {
Action::PushTaskToNeighbourHasExecutor(event->resource_);
}
}
}
void
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
// Action::PushTaskToNeighbour(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
......
......@@ -20,6 +20,11 @@ public:
static void
PushTaskToNeighbour(const ResourceWPtr &self);
/*
* Push task to neighbour that has executor;
*/
static void
PushTaskToNeighbourHasExecutor(const ResourceWPtr &self);
/*
* Pull task From neighbour;
......
......@@ -4,7 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <iostream>
#include <list>
#include "Action.h"
......@@ -13,29 +13,65 @@ namespace milvus {
namespace engine {
void
push_task(const ResourcePtr &self, const ResourcePtr &other) {
auto &self_task_table = self->task_table();
auto &other_task_table = other->task_table();
next(std::list<ResourcePtr> &neighbours, std::list<ResourcePtr>::iterator &it) {
it++;
if (neighbours.end() == it) {
it = neighbours.begin();
}
}
void
push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighbours) {
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, 10);
auto it = neighbours.begin();
if (it == neighbours.end()) return;
auto indexes = PickToMove(self_task_table, cache, self_task_table.Size());
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
other_task_table.Put(task);
// TODO: mark moved future
task = task->Clone();
(*it)->task_table().Put(task);
next(neighbours, it);
}
}
}
void
Action::PushTaskToNeighbour(const ResourceWPtr &res) {
if (auto self = res.lock()) {
for (auto &neighbour : self->GetNeighbours()) {
if (auto n = neighbour.neighbour_node.lock()) {
push_task(self, std::static_pointer_cast<Resource>(n));
}
auto self = res.lock();
if (not self) return;
std::list<ResourcePtr> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
neighbours.emplace_back(resource);
}
push_task_round_robin(self->task_table(), neighbours);
}
void
Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) {
auto self = res.lock();
if (not self) return;
std::list<ResourcePtr> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
if (resource->HasExecutor()) {
neighbours.emplace_back(resource);
}
}
push_task_round_robin(self->task_table(), neighbours);
}
......
......@@ -16,8 +16,8 @@ std::ostream &operator<<(std::ostream &out, const CpuResource &resource) {
return out;
}
CpuResource::CpuResource(std::string name)
: Resource(std::move(name), ResourceType::CPU) {}
CpuResource::CpuResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::CPU, enable_loader, enable_executor) {}
void CpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::DISK2CPU, 0);
......@@ -29,4 +29,4 @@ void CpuResource::Process(TaskPtr task) {
}
}
}
\ No newline at end of file
}
......@@ -17,7 +17,7 @@ namespace engine {
class CpuResource : public Resource {
public:
explicit
CpuResource(std::string name);
CpuResource(std::string name, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -15,8 +15,8 @@ std::ostream &operator<<(std::ostream &out, const DiskResource &resource) {
return out;
}
DiskResource::DiskResource(std::string name)
: Resource(std::move(name), ResourceType::DISK, true, false) {
DiskResource::DiskResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::DISK, enable_loader, enable_executor) {
}
void DiskResource::LoadFile(TaskPtr task) {
......
......@@ -16,7 +16,7 @@ namespace engine {
class DiskResource : public Resource {
public:
explicit
DiskResource(std::string name);
DiskResource(std::string name, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -16,8 +16,8 @@ std::ostream &operator<<(std::ostream &out, const GpuResource &resource) {
return out;
}
GpuResource::GpuResource(std::string name)
: Resource(std::move(name), ResourceType::GPU) {}
GpuResource::GpuResource(std::string name, bool enable_loader, bool enable_executor)
: Resource(std::move(name), ResourceType::GPU, enable_loader, enable_executor) {}
void GpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::CPU2GPU, 0);
......
......@@ -16,7 +16,7 @@ namespace engine {
class GpuResource : public Resource {
public:
explicit
GpuResource(std::string name);
GpuResource(std::string name, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
......
......@@ -45,8 +45,30 @@ enum class RegisterType {
class Resource : public Node, public std::enable_shared_from_this<Resource> {
public:
/*
* Event function MUST be a short function, never blocking;
* Start loader and executor if enable;
*/
void
Start();
/*
* Stop loader and executor, join it, blocking util thread exited;
*/
void
Stop();
/*
* wake up loader;
*/
void
WakeupLoader();
/*
* wake up executor;
*/
void
WakeupExecutor();
public:
template<typename T>
void Register_T(const RegisterType &type) {
register_table_.emplace(type, [] { return std::make_shared<T>(); });
......@@ -65,11 +87,17 @@ public:
return type_;
}
void
Start();
// TODO: better name?
inline bool
HasLoader() {
return enable_loader_;
}
void
Stop();
// TODO: better name?
inline bool
HasExecutor() {
return enable_executor_;
}
TaskTable &
task_table();
......@@ -81,24 +109,11 @@ public:
friend std::ostream &operator<<(std::ostream &out, const Resource &resource);
public:
/*
* wake up loader;
*/
void
WakeupLoader();
/*
* wake up executor;
*/
void
WakeupExecutor();
protected:
Resource(std::string name,
ResourceType type,
bool enable_loader = true,
bool enable_executor = true);
bool enable_loader,
bool enable_executor);
// TODO: SearchContextPtr to TaskPtr
/*
......
......@@ -20,6 +20,11 @@ XDeleteTask::Execute() {
}
TaskPtr
XDeleteTask::Clone() {
return nullptr;
}
}
}
}
......@@ -19,6 +19,9 @@ public:
void
Execute() override;
TaskPtr
Clone() override;
};
}
......
......@@ -99,16 +99,27 @@ CollectDurationMetrics(int index_type, double total_time) {
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file) : file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType) file_->engine_type_);
}
void
XSearchTask::Load(LoadType type, uint8_t device_id) {
server::TimeRecorder rc("");
//step 1: load index
ExecutionEnginePtr index_ptr = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType) file_->engine_type_);
try {
index_ptr->Load();
if (type == LoadType::DISK2CPU) {
index_engine_->Load();
} else if (type == LoadType::CPU2GPU) {
index_engine_->Load();
index_engine_->CopyToGpu(device_id);
} else if (type == LoadType::GPU2CPU) {
index_engine_->CopyToCpu();
} else {
// TODO: exception
}
} catch (std::exception &ex) {
//typical error: out of disk space or permition denied
std::string msg = "Failed to load index file: " + std::string(ex.what());
......@@ -121,7 +132,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
return;
}
size_t file_size = index_ptr->PhysicalSize();
size_t file_size = index_engine_->PhysicalSize();
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + std::to_string(file_->file_type_)
+ " size:" + std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally cost";
......@@ -135,7 +146,6 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
//step 2: return search task for later execution
index_id_ = file_->id_;
index_type_ = file_->file_type_;
index_engine_ = index_ptr;
search_contexts_.swap(search_contexts_);
}
......@@ -157,12 +167,13 @@ XSearchTask::Execute() {
for (auto &context : search_contexts_) {
//step 1: allocate memory
auto inner_k = context->topk();
auto nprobe = context->nprobe();
output_ids.resize(inner_k * context->nq());
output_distence.resize(inner_k * context->nq());
try {
//step 2: search
index_engine_->Search(context->nq(), context->vectors(), inner_k, output_distence.data(),
index_engine_->Search(context->nq(), context->vectors(), inner_k, nprobe, output_distence.data(),
output_ids.data());
double span = rc.RecordSection("do search for context:" + context->Identity());
......@@ -199,6 +210,16 @@ XSearchTask::Execute() {
rc.ElapseFromBegin("totally cost");
}
TaskPtr
XSearchTask::Clone() {
auto ret = std::make_shared<XSearchTask>(file_);
ret->index_id_ = index_id_;
ret->index_engine_ = index_engine_->Clone();
ret->search_contexts_ = search_contexts_;
ret->metric_l2 = metric_l2;
return ret;
}
Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
const std::vector<float> &output_distence,
uint64_t nq,
......@@ -343,6 +364,7 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
return Status::OK();
}
}
}
}
......@@ -14,12 +14,18 @@ namespace engine {
class XSearchTask : public Task {
public:
explicit
XSearchTask(TableFileSchemaPtr file);
void
Load(LoadType type, uint8_t device_id) override;
void
Execute() override;
TaskPtr
Clone() override;
public:
static Status ClusterResult(const std::vector<long> &output_ids,
const std::vector<float> &output_distence,
......
......@@ -35,6 +35,9 @@ public:
virtual void
Execute() = 0;
virtual TaskPtr
Clone() = 0;
public:
std::vector<SearchContextPtr> search_contexts_;
ScheduleTaskPtr task_;
......
......@@ -16,8 +16,7 @@ TaskConvert(const ScheduleTaskPtr &schedule_task) {
switch (schedule_task->type()) {
case ScheduleTaskType::kIndexLoad: {
auto load_task = std::static_pointer_cast<IndexLoadTask>(schedule_task);
auto task = std::make_shared<XSearchTask>();
task->file_ = load_task->file_;
auto task = std::make_shared<XSearchTask>(load_task->file_);
task->search_contexts_ = load_task->search_contexts_;
task->task_ = schedule_task;
return task;
......
......@@ -6,6 +6,7 @@
#include "TestTask.h"
namespace zilliz {
namespace milvus {
namespace engine {
......@@ -17,7 +18,23 @@ TestTask::Load(LoadType type, uint8_t device_id) {
void
TestTask::Execute() {
std::lock_guard<std::mutex> lock(mutex_);
exec_count_++;
done_ = true;
}
TaskPtr
TestTask::Clone() {
auto ret = std::make_shared<TestTask>();
ret->load_count_ = load_count_;
ret->exec_count_ = exec_count_;
return ret;
}
void
TestTask::Wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_; });
}
}
......
......@@ -23,9 +23,19 @@ public:
void
Execute() override;
TaskPtr
Clone() override;
void
Wait();
public:
uint64_t load_count_;
uint64_t exec_count_;
uint64_t load_count_ = 0;
uint64_t exec_count_ = 0;
bool done_ = false;
std::mutex mutex_;
std::condition_variable cv_;
};
......
......@@ -144,7 +144,9 @@ VecIndexPtr VecIndexImpl::CopyToGpu(const int64_t &device_id, const Config &cfg)
// TODO(linxj): update type
auto gpu_index = zilliz::knowhere::CopyCpuToGpu(index_, device_id, cfg);
return std::make_shared<VecIndexImpl>(gpu_index, type);
auto new_index = std::make_shared<VecIndexImpl>(gpu_index, type);
new_index->dim = dim;
return new_index;
}
// TODO(linxj): rename copytocpu => copygputocpu
......
#include "scheduler/TaskTable.h"
#include "scheduler/Cost.h"
#include <gtest/gtest.h>
#include "scheduler/task/TestTask.h"
using namespace zilliz::milvus::engine;
......@@ -10,7 +11,7 @@ protected:
void
SetUp() override {
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<XSearchTask>();
auto task = std::make_shared<TestTask>();
table_.Put(task);
}
table_.Get(0)->state = TaskTableItemState::INVALID;
......
......@@ -13,10 +13,10 @@ TEST(normal_test, test1) {
// ResourceMgr only compose resources, provide unified event
// auto res_mgr = std::make_shared<ResourceMgr>();
auto res_mgr = ResMgrInst::GetInstance();
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd"));
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false));
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu"));
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu"));
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu"));
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false));
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false));
auto IO = Connection("IO", 500.0);
auto PCIE = Connection("IO", 11000.0);
......@@ -30,7 +30,7 @@ TEST(normal_test, test1) {
auto scheduler = SchedInst::GetInstance();
scheduler->Start();
const uint64_t NUM_TASK = 100;
const uint64_t NUM_TASK = 1000;
std::vector<std::shared_ptr<TestTask>> tasks;
for (uint64_t i = 0; i < NUM_TASK; ++i) {
if (auto observe = disk.lock()) {
......@@ -45,8 +45,10 @@ TEST(normal_test, test1) {
scheduler->Stop();
res_mgr->Stop();
for (uint64_t i = 0 ; i < NUM_TASK; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1);
ASSERT_EQ(tasks[i]->exec_count_, 1);
auto pcpu = cpu.lock();
for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto task = std::static_pointer_cast<TestTask>(pcpu->task_table()[i]->task);
ASSERT_EQ(task->load_count_, 1);
ASSERT_EQ(task->exec_count_, 1);
}
}
#include "scheduler/TaskTable.h"
#include "scheduler/task/TestTask.h"
#include <gtest/gtest.h>
......@@ -43,8 +44,8 @@ protected:
void
SetUp() override {
invalid_task_ = nullptr;
task1_ = std::make_shared<XSearchTask>();
task2_ = std::make_shared<XSearchTask>();
task1_ = std::make_shared<TestTask>();
task2_ = std::make_shared<TestTask>();
}
TaskPtr invalid_task_;
......@@ -83,7 +84,7 @@ protected:
void
SetUp() override {
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<XSearchTask>();
auto task = std::make_shared<TestTask>();
table1_.Put(task);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册