提交 e757727e 编写于 作者: W wxyu

MS-508 Update normal_test in scheduler


Former-commit-id: 343dae4a96cac46259094de9c1d025de3b3db53e
上级 1efe696f
...@@ -89,6 +89,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -89,6 +89,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-488 - Improve code format in scheduler - MS-488 - Improve code format in scheduler
- MS-502 - Update tasktable_test in scheduler - MS-502 - Update tasktable_test in scheduler
- MS-504 - Update node_test in scheduler - MS-504 - Update node_test in scheduler
- MS-508 - Update normal_test in scheduler
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr
......
...@@ -108,6 +108,7 @@ void ...@@ -108,6 +108,7 @@ void
Scheduler::OnFinishTask(const EventPtr &event) { Scheduler::OnFinishTask(const EventPtr &event) {
} }
// TODO: refactor the function
void void
Scheduler::OnLoadCompleted(const EventPtr &event) { Scheduler::OnLoadCompleted(const EventPtr &event) {
auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event); auto load_completed_event = std::static_pointer_cast<LoadCompletedEvent>(event);
...@@ -120,18 +121,23 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { ...@@ -120,18 +121,23 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) { if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
auto task = load_completed_event->task_table_item_->task; auto task = load_completed_event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task); auto search_task = std::static_pointer_cast<XSearchTask>(task);
auto location = search_task->index_engine_->GetLocation();
bool moved = false; bool moved = false;
for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) { // to support test task, REFACTOR
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location); if (auto index_engine = search_task->index_engine_) {
if (index != nullptr) { auto location = index_engine->GetLocation();
moved = true;
auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i); for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource); auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
break; if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
break;
}
} }
} }
if (not moved) { if (not moved) {
Action::PushTaskToNeighbourRandomly(task, resource); Action::PushTaskToNeighbourRandomly(task, resource);
} }
...@@ -147,7 +153,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { ...@@ -147,7 +153,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
// step 1: calculate shortest path per resource, from disk to compute resource // step 1: calculate shortest path per resource, from disk to compute resource
auto compute_resources = res_mgr_.lock()->GetComputeResource(); auto compute_resources = res_mgr_.lock()->GetComputeResource();
std::vector<std::vector<std::string>> paths; std::vector<std::vector<std::string>> paths;
std::vector<uint64_t > transport_costs; std::vector<uint64_t> transport_costs;
for (auto &res : compute_resources) { for (auto &res : compute_resources) {
std::vector<std::string> path; std::vector<std::string> path;
uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path); uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
...@@ -176,7 +182,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) { ...@@ -176,7 +182,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
task->path() = task_path; task->path() = task_path;
} }
if(self->name() == task->path().Last()) { if (self->name() == task->path().Last()) {
self->WakeupLoader(); self->WakeupLoader();
} else { } else {
auto next_res_name = task->path().Next(); auto next_res_name = task->path().Next();
......
...@@ -21,6 +21,7 @@ namespace milvus { ...@@ -21,6 +21,7 @@ namespace milvus {
namespace engine { namespace engine {
// TODO: refactor, not friendly to unittest, logical in framework code
class Scheduler { class Scheduler {
public: public:
explicit explicit
......
...@@ -83,11 +83,14 @@ CollectFileMetrics(int file_type, size_t file_size) { ...@@ -83,11 +83,14 @@ CollectFileMetrics(int file_type, size_t file_size) {
XSearchTask::XSearchTask(TableFileSchemaPtr file) XSearchTask::XSearchTask(TableFileSchemaPtr file)
: Task(TaskType::SearchTask), file_(file) { : Task(TaskType::SearchTask), file_(file) {
index_engine_ = EngineFactory::Build(file_->dimension_, if (file_) {
file_->location_, index_engine_ = EngineFactory::Build(file_->dimension_,
(EngineType) file_->engine_type_, file_->location_,
(MetricType) file_->metric_type_, (EngineType) file_->engine_type_,
file_->nlist_); (MetricType) file_->metric_type_,
file_->nlist_);
}
} }
void void
......
...@@ -12,6 +12,7 @@ namespace zilliz { ...@@ -12,6 +12,7 @@ namespace zilliz {
namespace milvus { namespace milvus {
namespace engine { namespace engine {
// TODO: rewrite
class XSearchTask : public Task { class XSearchTask : public Task {
public: public:
explicit explicit
......
...@@ -34,6 +34,7 @@ class Task; ...@@ -34,6 +34,7 @@ class Task;
using TaskPtr = std::shared_ptr<Task>; using TaskPtr = std::shared_ptr<Task>;
// TODO: re-design
class Task { class Task {
public: public:
explicit explicit
......
...@@ -13,7 +13,7 @@ namespace milvus { ...@@ -13,7 +13,7 @@ namespace milvus {
namespace engine { namespace engine {
TestTask::TestTask(TableFileSchemaPtr& file) : XSearchTask(file) {} TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {}
void void
TestTask::Load(LoadType type, uint8_t device_id) { TestTask::Load(LoadType type, uint8_t device_id) {
...@@ -22,9 +22,12 @@ TestTask::Load(LoadType type, uint8_t device_id) { ...@@ -22,9 +22,12 @@ TestTask::Load(LoadType type, uint8_t device_id) {
void void
TestTask::Execute() { TestTask::Execute() {
std::lock_guard<std::mutex> lock(mutex_); {
exec_count_++; std::lock_guard<std::mutex> lock(mutex_);
done_ = true; exec_count_++;
done_ = true;
}
cv_.notify_one();
} }
void void
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "scheduler/ResourceMgr.h" #include "scheduler/ResourceMgr.h"
#include "scheduler/Scheduler.h" #include "scheduler/Scheduler.h"
#include "scheduler/task/TestTask.h" #include "scheduler/task/TestTask.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/SchedInst.h" #include "scheduler/SchedInst.h"
#include "utils/Log.h" #include "utils/Log.h"
#include <gtest/gtest.h> #include <gtest/gtest.h>
...@@ -9,48 +10,44 @@ ...@@ -9,48 +10,44 @@
using namespace zilliz::milvus::engine; using namespace zilliz::milvus::engine;
TEST(normal_test, test1) {
TEST(normal_test, inst_test) {
// ResourceMgr only compose resources, provide unified event // ResourceMgr only compose resources, provide unified event
// auto res_mgr = std::make_shared<ResourceMgr>();
auto res_mgr = ResMgrInst::GetInstance(); auto res_mgr = ResMgrInst::GetInstance();
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd", true, false));
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0));
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu0", false, false));
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu", "gpu2", false, false));
auto IO = Connection("IO", 500.0); res_mgr->Add(ResourceFactory::Create("disk", "DISK", 0, true, false));
auto PCIE = Connection("IO", 11000.0); res_mgr->Add(ResourceFactory::Create("cpu", "CPU", 0, true, true));
res_mgr->Connect(disk, cpu, IO);
res_mgr->Connect(cpu, gpu1, PCIE);
res_mgr->Connect(cpu, gpu2, PCIE);
res_mgr->Start(); auto IO = Connection("IO", 500.0);
res_mgr->Connect("disk", "cpu", IO);
// auto scheduler = new Scheduler(res_mgr);
auto scheduler = SchedInst::GetInstance(); auto scheduler = SchedInst::GetInstance();
res_mgr->Start();
scheduler->Start(); scheduler->Start();
const uint64_t NUM_TASK = 1000; const uint64_t NUM_TASK = 1000;
std::vector<std::shared_ptr<TestTask>> tasks; std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr; TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM_TASK; ++i) { auto disks = res_mgr->GetDiskResources();
if (auto observe = disk.lock()) { ASSERT_FALSE(disks.empty());
if (auto observe = disks[0].lock()) {
for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto task = std::make_shared<TestTask>(dummy); auto task = std::make_shared<TestTask>(dummy);
task->label() = std::make_shared<DefaultLabel>();
tasks.push_back(task); tasks.push_back(task);
observe->task_table().Put(task); observe->task_table().Put(task);
} }
} }
sleep(1); for (auto &task : tasks) {
task->Wait();
ASSERT_EQ(task->load_count_, 1);
ASSERT_EQ(task->exec_count_, 1);
}
scheduler->Stop(); scheduler->Stop();
res_mgr->Stop(); res_mgr->Stop();
auto pcpu = cpu.lock();
for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto task = std::static_pointer_cast<TestTask>(pcpu->task_table()[i]->task);
ASSERT_EQ(task->load_count_, 1);
ASSERT_EQ(task->exec_count_, 1);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册