提交 4f8ccf9d 编写于 作者: J jinhai

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-373 Add resource test

See merge request megasearch/milvus!383

Former-commit-id: 579f97477d802bd344d4a393cb51ea910f98b534
......@@ -23,6 +23,13 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-365 - Use tasktableitemptr instead in event
- MS-366 - Implement TaskTable
- MS-368 - Implement cost.cpp
- MS-371 - Add TaskTableUpdatedEvent
- MS-373 - Add resource test
- MS-374 - Add action definition
- MS-375 - Add Dump implementation for Event
- MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task
- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable
- MS-378 - Debug and Update normal_test in scheduler unittest
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -33,11 +33,7 @@ ResourceMgr::Add(ResourcePtr &&resource) {
resources_.emplace_back(resource);
size_t index = resources_.size() - 1;
resource->RegisterSubscriber([&](EventPtr event) {
queue_.emplace(event);
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.notify_one();
});
resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1));
return ret;
}
......@@ -46,27 +42,11 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect
if (auto observe_a = res1.lock()) {
if (auto observe_b = res2.lock()) {
observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
observe_b->AddNeighbour(std::static_pointer_cast<Node>(observe_a), connection);
}
}
}
void
ResourceMgr::EventProcess() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !queue_.empty(); });
if (!running_) {
break;
}
auto event = queue_.front();
queue_.pop();
if (subscriber_) {
subscriber_(event);
}
}
}
void
ResourceMgr::Start() {
......@@ -74,23 +54,33 @@ ResourceMgr::Start() {
for (auto &resource : resources_) {
resource->Start();
}
worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);
running_ = true;
worker_thread_ = std::thread(&ResourceMgr::event_process, this);
}
void
ResourceMgr::Stop() {
std::lock_guard<std::mutex> lck(resources_mutex_);
running_ = false;
{
std::lock_guard<std::mutex> lock(event_mutex_);
running_ = false;
queue_.push(nullptr);
event_cv_.notify_one();
}
worker_thread_.join();
std::lock_guard<std::mutex> lck(resources_mutex_);
for (auto &resource : resources_) {
resource->Stop();
}
}
void
ResourceMgr::PostEvent(const EventPtr &event) {
std::unique_lock<std::mutex> lock(event_mutex_);
queue_.emplace(event);
event_cv_.notify_one();
}
std::string
ResourceMgr::Dump() {
std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";
......@@ -103,6 +93,26 @@ ResourceMgr::Dump() {
return str;
}
void
ResourceMgr::event_process() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !queue_.empty(); });
auto event = queue_.front();
if (event == nullptr) {
break;
}
// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;
queue_.pop();
if (subscriber_) {
subscriber_(event);
}
}
}
}
}
}
......@@ -14,6 +14,7 @@
#include <condition_variable>
#include "resource/Resource.h"
#include "utils/Log.h"
namespace zilliz {
......@@ -59,6 +60,8 @@ public:
void
Stop();
void
PostEvent(const EventPtr& event);
// TODO: add stats interface(low)
......@@ -70,7 +73,7 @@ public:
private:
void
EventProcess();
event_process();
private:
std::queue<EventPtr> queue_;
......
......@@ -4,69 +4,129 @@
* Proprietary and confidential.
******************************************************************************/
#include <iostream>
#include "Scheduler.h"
#include "Cost.h"
#include "action/Action.h"
namespace zilliz {
namespace milvus {
namespace engine {
Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
: running_(false),
res_mgr_(std::move(res_mgr)) {
if (auto mgr = res_mgr_.lock()) {
mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
}
}
void
push_task(ResourcePtr &self, ResourcePtr &other) {
auto self_task_table = self->task_table();
auto other_task_table = other->task_table();
if (!other_task_table.Empty()) {
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, 1);
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
other_task_table.Put(task);
// TODO: mark moved future
other->WakeupLoader();
other->WakeupExecutor();
}
}
Scheduler::Start() {
running_ = true;
worker_thread_ = std::thread(&Scheduler::worker_function, this);
}
void
Scheduler::Stop() {
{
std::lock_guard<std::mutex> lock(event_mutex_);
running_ = false;
event_queue_.push(nullptr);
event_cv_.notify_one();
}
worker_thread_.join();
}
void
Scheduler::PostEvent(const EventPtr &event) {
std::lock_guard<std::mutex> lock(event_mutex_);
event_queue_.push(event);
event_cv_.notify_one();
// SERVER_LOG_DEBUG << "Scheduler post " << *event;
}
std::string
Scheduler::Dump() {
return std::string();
}
void
schedule(const ResourceWPtr &res) {
if (auto self = res.lock()) {
for (auto &nei : self->GetNeighbours()) {
if (auto n = nei.neighbour_node.lock()) {
auto neighbour = std::static_pointer_cast<Resource>(n);
push_task(self, neighbour);
}
Scheduler::worker_function() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
auto event = event_queue_.front();
if (event == nullptr) {
break;
}
// SERVER_LOG_DEBUG << "Scheduler process " << *event;
event_queue_.pop();
Process(event);
}
}
void
Scheduler::Process(const EventPtr &event) {
switch (event->Type()) {
case EventType::START_UP: {
OnStartUp(event);
break;
}
case EventType::COPY_COMPLETED: {
OnCopyCompleted(event);
break;
}
case EventType::FINISH_TASK: {
OnFinishTask(event);
break;
}
case EventType::TASK_TABLE_UPDATED: {
OnTaskTableUpdated(event);
break;
}
default: {
// TODO: logging
break;
}
}
}
void
Scheduler::OnStartUp(const EventPtr &event) {
schedule(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
}
void
Scheduler::OnFinishTask(const EventPtr &event) {
schedule(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupExecutor();
}
}
void
Scheduler::OnCopyCompleted(const EventPtr &event) {
schedule(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
resource->WakeupExecutor();
if (resource->Type()== ResourceType::DISK) {
Action::PushTaskToNeighbour(event->resource_);
}
}
}
void
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
schedule(event->resource_);
}
std::string
Scheduler::Dump() {
return std::string();
// Action::PushTaskToNeighbour(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
}
}
......
......@@ -13,6 +13,7 @@
#include "resource/Resource.h"
#include "ResourceMgr.h"
#include "utils/Log.h"
namespace zilliz {
......@@ -23,20 +24,32 @@ namespace engine {
class Scheduler {
public:
explicit
Scheduler(ResourceMgrWPtr res_mgr)
: running_(false),
res_mgr_(std::move(res_mgr)) {
// res_mgr.Register();
// res_mgr.Register();
// res_mgr.Register();
// res_mgr.Register();
}
Scheduler(ResourceMgrWPtr res_mgr);
Scheduler(const Scheduler &) = delete;
Scheduler(Scheduler &&) = delete;
/*
* Start worker thread;
*/
void
Start();
/*
* Stop worker thread, join it;
*/
void
Start() {
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
}
Stop();
/*
* Post event to scheduler event queue;
*/
void
PostEvent(const EventPtr &event);
/*
* Dump as string;
*/
std::string
Dump();
......@@ -45,24 +58,37 @@ private:
/*
* Process start up events;
*
* Actions:
* Pull task from neighbours;
*/
void
OnStartUp(const EventPtr &event);
/*
* Process finish task events;
*
* Actions:
* Pull task from neighbours;
*/
void
OnFinishTask(const EventPtr &event);
/*
* Process copy completed events;
*
* Actions:
* Mark task source MOVED;
* Pull task from neighbours;
*/
void
OnCopyCompleted(const EventPtr &event);
/*
* Process task table updated events;
* Process task table updated events, which happened on task_table->put;
*
* Actions:
* Push task to neighbours;
*/
void
OnTaskTableUpdated(const EventPtr &event);
......@@ -72,40 +98,13 @@ private:
* Dispatch event to event handler;
*/
void
Process(const EventPtr &event) {
switch (event->Type()) {
case EventType::START_UP: {
OnStartUp(event);
break;
}
case EventType::COPY_COMPLETED: {
OnCopyCompleted(event);
break;
}
case EventType::FINISH_TASK: {
OnFinishTask(event);
break;
}
case EventType::TASK_TABLE_UPDATED: {
OnTaskTableUpdated(event);
break;
}
default: {
break;
}
}
}
Process(const EventPtr &event);
/*
* Called by worker_thread_;
*/
void
worker_function() {
while (running_) {
auto event = event_queue_.front();
Process(event);
}
}
worker_function();
private:
bool running_;
......@@ -113,6 +112,8 @@ private:
ResourceMgrWPtr res_mgr_;
std::queue<EventPtr> event_queue_;
std::thread worker_thread_;
std::mutex event_mutex_;
std::condition_variable event_cv_;
};
using SchedulerPtr = std::shared_ptr<Scheduler>;
......
......@@ -7,6 +7,7 @@
#include "TaskTable.h"
#include "event/TaskTableUpdatedEvent.h"
#include <vector>
#include <sstream>
namespace zilliz {
......@@ -16,9 +17,11 @@ namespace engine {
void
TaskTable::Put(TaskPtr task) {
std::lock_guard<std::mutex> lock(id_mutex_);
auto item = std::make_shared<TaskTableItem>();
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::LOADED;
item->state = TaskTableItemState::START;
table_.push_back(item);
if (subscriber_) {
subscriber_();
......@@ -27,10 +30,12 @@ TaskTable::Put(TaskPtr task) {
void
TaskTable::Put(std::vector<TaskPtr> &tasks) {
std::lock_guard<std::mutex> lock(id_mutex_);
for (auto &task : tasks) {
auto item = std::make_shared<TaskTableItem>();
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::LOADED;
item->state = TaskTableItemState::START;
table_.push_back(item);
}
if (subscriber_) {
......@@ -59,8 +64,8 @@ TaskTable::Move(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::START) {
task->state = TaskTableItemState::LOADING;
if (task->state == TaskTableItemState::LOADED) {
task->state = TaskTableItemState::MOVING;
return true;
}
return false;
......@@ -126,9 +131,30 @@ TaskTable::Executed(uint64_t index) {
return false;
}
std::string
ToString(TaskTableItemState state) {
switch (state) {
case TaskTableItemState::INVALID: return "INVALID";
case TaskTableItemState::START: return "START";
case TaskTableItemState::LOADING: return "LOADING";
case TaskTableItemState::LOADED: return "LOADED";
case TaskTableItemState::EXECUTING: return "EXECUTING";
case TaskTableItemState::EXECUTED: return "EXECUTED";
case TaskTableItemState::MOVING: return "MOVING";
case TaskTableItemState::MOVED: return "MOVED";
default: return "";
}
}
std::string
TaskTable::Dump() {
return std::string();
std::stringstream ss;
for (auto &item : table_) {
ss << "<" << item->id;
ss << ", " << ToString(item->state);
ss << ">" << std::endl;
}
return ss.str();
}
}
......
......@@ -49,6 +49,9 @@ class TaskTable {
public:
TaskTable() = default;
TaskTable(const TaskTable &) = delete;
TaskTable(TaskTable &&) = delete;
inline void
RegisterSubscriber(std::function<void(void)> subscriber) {
subscriber_ = std::move(subscriber);
......@@ -167,6 +170,8 @@ public:
private:
// TODO: map better ?
std::uint64_t id_ = 0;
mutable std::mutex id_mutex_;
std::deque<TaskTableItemPtr> table_;
std::function<void(void)> subscriber_ = nullptr;
};
......
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "../resource/Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
class Action {
public:
/*
* Push task to neighbour;
*/
static void
PushTaskToNeighbour(const ResourceWPtr &self);
/*
* Pull task From neighbour;
*/
static void
PullTaskFromNeighbour(const ResourceWPtr &self);
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Action.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
Action::PullTaskFromNeighbour(const ResourceWPtr &self) {
// TODO: implement
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Action.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
push_task(ResourcePtr &self, ResourcePtr &other) {
auto &self_task_table = self->task_table();
auto &other_task_table = other->task_table();
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, 1);
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
other_task_table.Put(task);
// TODO: mark moved future
}
}
}
void
Action::PushTaskToNeighbour(const ResourceWPtr &res) {
if (auto self = res.lock()) {
for (auto &neighbour : self->GetNeighbours()) {
if (auto n = neighbour.neighbour_node.lock()) {
auto neighbour = std::static_pointer_cast<Resource>(n);
push_task(self, neighbour);
}
}
}
}
}
}
}
......@@ -18,6 +18,14 @@ public:
CopyCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
: Event(EventType::COPY_COMPLETED, std::move(resource)),
task_table_item_(std::move(task_table_item)) {}
inline std::string
Dump() const override {
return "<CopyCompletedEvent>";
}
friend std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event);
public:
TaskTableItemPtr task_table_item_;
};
......
......@@ -5,6 +5,8 @@
******************************************************************************/
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
......@@ -30,6 +32,13 @@ public:
return type_;
}
inline virtual std::string
Dump() const {
return "<Event>";
}
friend std::ostream &operator<<(std::ostream &out, const Event &event);
public:
EventType type_;
std::weak_ptr<Resource> resource_;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Event.h"
#include "StartUpEvent.h"
#include "CopyCompletedEvent.h"
#include "FinishTaskEvent.h"
#include "TaskTableUpdatedEvent.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::ostream &operator<<(std::ostream &out, const Event &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const StartUpEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) {
out << event.Dump();
return out;
}
}
}
}
......@@ -18,6 +18,13 @@ public:
: Event(EventType::FINISH_TASK, std::move(resource)),
task_table_item_(std::move(task_table_item)) {}
inline std::string
Dump() const override {
return "<FinishTaskEvent>";
}
friend std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event);
public:
TaskTableItemPtr task_table_item_;
};
......
......@@ -17,6 +17,13 @@ public:
explicit
StartUpEvent(std::weak_ptr<Resource> resource)
: Event(EventType::START_UP, std::move(resource)) {}
inline std::string
Dump() const override {
return "<StartUpEvent>";
}
friend std::ostream &operator<<(std::ostream &out, const StartUpEvent &event);
};
}
......
......@@ -17,6 +17,13 @@ public:
explicit
TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {}
inline std::string
Dump() const override {
return "<TaskTableUpdatedEvent>";
}
friend std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event);
};
......
......@@ -16,21 +16,11 @@ CpuResource::CpuResource(std::string name)
: Resource(std::move(name), ResourceType::CPU) {}
void CpuResource::LoadFile(TaskPtr task) {
//if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
//} else if (src.type == CPU) {
// memcpy(src, dest, len);
//} else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
//} else {
// // unknown type, exception
//}
task->Load(LoadType::DISK2CPU, 0);
}
void CpuResource::Process(TaskPtr task) {
task->Execute();
}
}
......
......@@ -12,7 +12,8 @@ namespace engine {
DiskResource::DiskResource(std::string name)
: Resource(std::move(name), ResourceType::DISK) {}
: Resource(std::move(name), ResourceType::DISK, true, false) {
}
void DiskResource::LoadFile(TaskPtr task) {
......
......@@ -16,11 +16,11 @@ GpuResource::GpuResource(std::string name)
: Resource(std::move(name), ResourceType::GPU) {}
void GpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::CPU2GPU, 0);
}
void GpuResource::Process(TaskPtr task) {
task->Execute();
}
}
......
......@@ -10,10 +10,15 @@ namespace zilliz {
namespace milvus {
namespace engine {
Resource::Resource(std::string name, ResourceType type)
Resource::Resource(std::string name,
ResourceType type,
bool enable_loader,
bool enable_executor)
: name_(std::move(name)),
type_(type),
running_(false),
enable_loader_(enable_loader),
enable_executor_(enable_executor),
load_flag_(false),
exec_flag_(false) {
task_table_.RegisterSubscriber([&] {
......@@ -25,28 +30,43 @@ Resource::Resource(std::string name, ResourceType type)
}
void Resource::Start() {
loader_thread_ = std::thread(&Resource::loader_function, this);
executor_thread_ = std::thread(&Resource::executor_function, this);
running_ = true;
if (enable_loader_) {
loader_thread_ = std::thread(&Resource::loader_function, this);
}
if (enable_executor_) {
executor_thread_ = std::thread(&Resource::executor_function, this);
}
}
void Resource::Stop() {
running_ = false;
WakeupLoader();
WakeupExecutor();
if (enable_loader_) {
WakeupLoader();
loader_thread_.join();
}
if (enable_executor_) {
WakeupExecutor();
executor_thread_.join();
}
}
TaskTable &Resource::task_table() {
return task_table_;
}
void Resource::WakeupExecutor() {
exec_cv_.notify_one();
}
void Resource::WakeupLoader() {
std::lock_guard<std::mutex> lock(load_mutex_);
load_flag_ = true;
load_cv_.notify_one();
}
void Resource::WakeupExecutor() {
std::lock_guard<std::mutex> lock(exec_mutex_);
exec_flag_ = true;
exec_cv_.notify_one();
}
TaskTableItemPtr Resource::pick_task_load() {
auto indexes = PickToLoad(task_table_, 3);
for (auto index : indexes) {
......@@ -73,9 +93,12 @@ void Resource::loader_function() {
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; });
load_flag_ = false;
auto task_item = pick_task_load();
if (task_item) {
LoadFile(task_item->task);
// TODO: wrapper loaded
task_item->state = TaskTableItemState::LOADED;
if (subscriber_) {
auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event));
......@@ -85,7 +108,6 @@ void Resource::loader_function() {
}
void Resource::executor_function() {
GetRegisterFunc(RegisterType::START_UP)->Exec();
if (subscriber_) {
auto event = std::make_shared<StartUpEvent>(shared_from_this());
subscriber_(std::static_pointer_cast<Event>(event));
......@@ -93,9 +115,11 @@ void Resource::executor_function() {
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
exec_flag_ = false;
auto task_item = pick_task_execute();
if (task_item) {
Process(task_item->task);
task_item->state = TaskTableItemState::EXECUTED;
if (subscriber_) {
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event));
......
......@@ -76,19 +76,22 @@ public:
public:
/*
* wake up executor;
* wake up loader;
*/
void
WakeupExecutor();
WakeupLoader();
/*
* wake up loader;
/*
* wake up executor;
*/
void
WakeupLoader();
WakeupExecutor();
protected:
Resource(std::string name, ResourceType type);
Resource(std::string name,
ResourceType type,
bool enable_loader = true,
bool enable_executor = true);
// TODO: SearchContextPtr to TaskPtr
/*
......@@ -138,7 +141,6 @@ private:
void
executor_function();
private:
std::string name_;
ResourceType type_;
......@@ -149,8 +151,8 @@ private:
std::function<void(EventPtr)> subscriber_ = nullptr;
bool running_;
bool loader_running_ = false;
bool executor_running_ = false;
bool enable_loader_ = true;
bool enable_executor_ = true;
std::thread loader_thread_;
std::thread executor_thread_;
......
......@@ -14,6 +14,8 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs)
......@@ -36,6 +38,8 @@ include_directories(/usr/include/mysql)
set(scheduler_test_src
${unittest_srcs}
${test_srcs}
${scheduler_action_srcs}
${scheduler_event_srcs}
${scheduler_resource_srcs}
${scheduler_task_srcs}
${scheduler_srcs}
......
......@@ -9,7 +9,7 @@ class CostTest : public ::testing::Test {
protected:
void
SetUp() override {
for (uint64_t i = 0; i < 7; ++i) {
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<XSearchTask>();
table_.Put(task);
}
......
......@@ -10,6 +10,8 @@ protected:
SetUp() override {
node1_ = std::make_shared<Node>();
node2_ = std::make_shared<Node>();
node3_ = std::make_shared<Node>();
node4_ = std::make_shared<Node>();
auto pcie = Connection("PCIe", 11.0);
......
#include "scheduler/ResourceFactory.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/Scheduler.h"
#include "scheduler/task/TestTask.h"
#include "utils/Log.h"
#include <gtest/gtest.h>
using namespace zilliz::milvus::engine;
TEST(normal_test, DISABLED_test1) {
TEST(normal_test, test1) {
// ResourceMgr only compose resources, provide unified event
auto res_mgr = std::make_shared<ResourceMgr>();
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd"));
......@@ -23,17 +24,35 @@ TEST(normal_test, DISABLED_test1) {
res_mgr->Start();
auto task1 = std::make_shared<XSearchTask>();
auto task2 = std::make_shared<XSearchTask>();
auto scheduler = new Scheduler(res_mgr);
scheduler->Start();
auto task1 = std::make_shared<TestTask>();
auto task2 = std::make_shared<TestTask>();
auto task3 = std::make_shared<TestTask>();
auto task4 = std::make_shared<TestTask>();
if (auto observe = disk.lock()) {
observe->task_table().Put(task1);
observe->task_table().Put(task2);
observe->task_table().Put(task1);
observe->task_table().Put(task1);
observe->task_table().Put(task3);
observe->task_table().Put(task4);
std::cout << "disk:" << std::endl;
std::cout << observe->task_table().Dump() << std::endl;
}
auto scheduler = new Scheduler(res_mgr);
scheduler->Start();
sleep(5);
if (auto observe = disk.lock()) {
std::cout << "disk:" << std::endl;
std::cout << observe->task_table().Dump() << std::endl;
}
if (auto observe = cpu.lock()) {
std::cout << "cpu:" << std::endl;
std::cout << observe->task_table().Dump() << std::endl;
}
scheduler->Stop();
res_mgr->Stop();
while (true) sleep(1);
ASSERT_EQ(task1->load_count_, 1);
ASSERT_EQ(task1->exec_count_, 1);
}
......@@ -27,15 +27,28 @@ protected:
gpu_resource_ = ResourceFactory::Create("gpu");
flag_ = false;
auto subscriber = [&](EventPtr) {
auto subscriber = [&](EventPtr event) {
std::unique_lock<std::mutex> lock(mutex_);
flag_ = true;
cv_.notify_one();
if (event->Type() == EventType::COPY_COMPLETED || event->Type() == EventType::FINISH_TASK) {
flag_ = true;
cv_.notify_one();
}
};
disk_resource_->RegisterSubscriber(subscriber);
cpu_resource_->RegisterSubscriber(subscriber);
gpu_resource_->RegisterSubscriber(subscriber);
disk_resource_->Start();
cpu_resource_->Start();
gpu_resource_->Start();
}
void
TearDown() override {
disk_resource_->Stop();
cpu_resource_->Stop();
gpu_resource_->Stop();
}
void
......
......@@ -45,8 +45,6 @@ protected:
invalid_task_ = nullptr;
task1_ = std::make_shared<XSearchTask>();
task2_ = std::make_shared<XSearchTask>();
empty_table_ = TaskTable();
}
TaskPtr invalid_task_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册