提交 5b955069 编写于 作者: W wxyu

MS-373 Add resource test


Former-commit-id: 04d76175132c5aeb35ff2043c2a49f254681e2bb
上级 ad4fa2b5
...@@ -24,6 +24,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -24,6 +24,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-366 - Implement TaskTable - MS-366 - Implement TaskTable
- MS-368 - Implement cost.cpp - MS-368 - Implement cost.cpp
- MS-371 - Add TaskTableUpdatedEvent - MS-371 - Add TaskTableUpdatedEvent
- MS-373 - Add resource test
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr
......
...@@ -18,7 +18,7 @@ void ...@@ -18,7 +18,7 @@ void
TaskTable::Put(TaskPtr task) { TaskTable::Put(TaskPtr task) {
auto item = std::make_shared<TaskTableItem>(); auto item = std::make_shared<TaskTableItem>();
item->task = std::move(task); item->task = std::move(task);
item->state = TaskTableItemState::LOADED; item->state = TaskTableItemState::START;
table_.push_back(item); table_.push_back(item);
if (subscriber_) { if (subscriber_) {
subscriber_(); subscriber_();
...@@ -30,7 +30,7 @@ TaskTable::Put(std::vector<TaskPtr> &tasks) { ...@@ -30,7 +30,7 @@ TaskTable::Put(std::vector<TaskPtr> &tasks) {
for (auto &task : tasks) { for (auto &task : tasks) {
auto item = std::make_shared<TaskTableItem>(); auto item = std::make_shared<TaskTableItem>();
item->task = std::move(task); item->task = std::move(task);
item->state = TaskTableItemState::LOADED; item->state = TaskTableItemState::START;
table_.push_back(item); table_.push_back(item);
} }
if (subscriber_) { if (subscriber_) {
...@@ -59,8 +59,8 @@ TaskTable::Move(uint64_t index) { ...@@ -59,8 +59,8 @@ TaskTable::Move(uint64_t index) {
auto &task = table_[index]; auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex); std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::START) { if (task->state == TaskTableItemState::LOADED) {
task->state = TaskTableItemState::LOADING; task->state = TaskTableItemState::MOVING;
return true; return true;
} }
return false; return false;
......
...@@ -16,6 +16,7 @@ CpuResource::CpuResource(std::string name) ...@@ -16,6 +16,7 @@ CpuResource::CpuResource(std::string name)
: Resource(std::move(name), ResourceType::CPU) {} : Resource(std::move(name), ResourceType::CPU) {}
void CpuResource::LoadFile(TaskPtr task) { void CpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::DISK2CPU, 0);
//if (src.type == DISK) { //if (src.type == DISK) {
// fd = open(filename); // fd = open(filename);
// content = fd.read(); // content = fd.read();
...@@ -30,7 +31,7 @@ void CpuResource::LoadFile(TaskPtr task) { ...@@ -30,7 +31,7 @@ void CpuResource::LoadFile(TaskPtr task) {
} }
void CpuResource::Process(TaskPtr task) { void CpuResource::Process(TaskPtr task) {
task->Execute();
} }
} }
......
...@@ -16,11 +16,11 @@ GpuResource::GpuResource(std::string name) ...@@ -16,11 +16,11 @@ GpuResource::GpuResource(std::string name)
: Resource(std::move(name), ResourceType::GPU) {} : Resource(std::move(name), ResourceType::GPU) {}
void GpuResource::LoadFile(TaskPtr task) { void GpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::CPU2GPU, 0);
} }
void GpuResource::Process(TaskPtr task) { void GpuResource::Process(TaskPtr task) {
task->Execute();
} }
} }
......
...@@ -25,6 +25,7 @@ Resource::Resource(std::string name, ResourceType type) ...@@ -25,6 +25,7 @@ Resource::Resource(std::string name, ResourceType type)
} }
void Resource::Start() { void Resource::Start() {
running_ = true;
loader_thread_ = std::thread(&Resource::loader_function, this); loader_thread_ = std::thread(&Resource::loader_function, this);
executor_thread_ = std::thread(&Resource::executor_function, this); executor_thread_ = std::thread(&Resource::executor_function, this);
} }
...@@ -33,20 +34,26 @@ void Resource::Stop() { ...@@ -33,20 +34,26 @@ void Resource::Stop() {
running_ = false; running_ = false;
WakeupLoader(); WakeupLoader();
WakeupExecutor(); WakeupExecutor();
loader_thread_.join();
executor_thread_.join();
} }
TaskTable &Resource::task_table() { TaskTable &Resource::task_table() {
return task_table_; return task_table_;
} }
void Resource::WakeupExecutor() {
exec_cv_.notify_one();
}
void Resource::WakeupLoader() { void Resource::WakeupLoader() {
std::lock_guard<std::mutex> lock(load_mutex_);
load_flag_ = true;
load_cv_.notify_one(); load_cv_.notify_one();
} }
void Resource::WakeupExecutor() {
std::lock_guard<std::mutex> lock(exec_mutex_);
exec_flag_ = true;
exec_cv_.notify_one();
}
TaskTableItemPtr Resource::pick_task_load() { TaskTableItemPtr Resource::pick_task_load() {
auto indexes = PickToLoad(task_table_, 3); auto indexes = PickToLoad(task_table_, 3);
for (auto index : indexes) { for (auto index : indexes) {
...@@ -73,9 +80,12 @@ void Resource::loader_function() { ...@@ -73,9 +80,12 @@ void Resource::loader_function() {
while (running_) { while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_); std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; }); load_cv_.wait(lock, [&] { return load_flag_; });
load_flag_ = false;
auto task_item = pick_task_load(); auto task_item = pick_task_load();
if (task_item) { if (task_item) {
LoadFile(task_item->task); LoadFile(task_item->task);
// TODO: wrapper loaded
task_item->state = TaskTableItemState::LOADED;
if (subscriber_) { if (subscriber_) {
auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item); auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event)); subscriber_(std::static_pointer_cast<Event>(event));
...@@ -85,7 +95,6 @@ void Resource::loader_function() { ...@@ -85,7 +95,6 @@ void Resource::loader_function() {
} }
void Resource::executor_function() { void Resource::executor_function() {
GetRegisterFunc(RegisterType::START_UP)->Exec();
if (subscriber_) { if (subscriber_) {
auto event = std::make_shared<StartUpEvent>(shared_from_this()); auto event = std::make_shared<StartUpEvent>(shared_from_this());
subscriber_(std::static_pointer_cast<Event>(event)); subscriber_(std::static_pointer_cast<Event>(event));
...@@ -93,6 +102,7 @@ void Resource::executor_function() { ...@@ -93,6 +102,7 @@ void Resource::executor_function() {
while (running_) { while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_); std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; }); exec_cv_.wait(lock, [&] { return exec_flag_; });
exec_flag_ = false;
auto task_item = pick_task_execute(); auto task_item = pick_task_execute();
if (task_item) { if (task_item) {
Process(task_item->task); Process(task_item->task);
......
...@@ -76,16 +76,16 @@ public: ...@@ -76,16 +76,16 @@ public:
public: public:
/* /*
* wake up executor; * wake up loader;
*/ */
void void
WakeupExecutor(); WakeupLoader();
/* /*
* wake up loader; * wake up executor;
*/ */
void void
WakeupLoader(); WakeupExecutor();
protected: protected:
Resource(std::string name, ResourceType type); Resource(std::string name, ResourceType type);
...@@ -138,7 +138,6 @@ private: ...@@ -138,7 +138,6 @@ private:
void void
executor_function(); executor_function();
private: private:
std::string name_; std::string name_;
ResourceType type_; ResourceType type_;
......
...@@ -9,7 +9,7 @@ class CostTest : public ::testing::Test { ...@@ -9,7 +9,7 @@ class CostTest : public ::testing::Test {
protected: protected:
void void
SetUp() override { SetUp() override {
for (uint64_t i = 0; i < 7; ++i) { for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<XSearchTask>(); auto task = std::make_shared<XSearchTask>();
table_.Put(task); table_.Put(task);
} }
......
...@@ -10,6 +10,8 @@ protected: ...@@ -10,6 +10,8 @@ protected:
SetUp() override { SetUp() override {
node1_ = std::make_shared<Node>(); node1_ = std::make_shared<Node>();
node2_ = std::make_shared<Node>(); node2_ = std::make_shared<Node>();
node3_ = std::make_shared<Node>();
node4_ = std::make_shared<Node>();
auto pcie = Connection("PCIe", 11.0); auto pcie = Connection("PCIe", 11.0);
......
...@@ -27,15 +27,28 @@ protected: ...@@ -27,15 +27,28 @@ protected:
gpu_resource_ = ResourceFactory::Create("gpu"); gpu_resource_ = ResourceFactory::Create("gpu");
flag_ = false; flag_ = false;
auto subscriber = [&](EventPtr) { auto subscriber = [&](EventPtr event) {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
if (event->Type() == EventType::COPY_COMPLETED || event->Type() == EventType::FINISH_TASK) {
flag_ = true; flag_ = true;
cv_.notify_one(); cv_.notify_one();
}
}; };
disk_resource_->RegisterSubscriber(subscriber); disk_resource_->RegisterSubscriber(subscriber);
cpu_resource_->RegisterSubscriber(subscriber); cpu_resource_->RegisterSubscriber(subscriber);
gpu_resource_->RegisterSubscriber(subscriber); gpu_resource_->RegisterSubscriber(subscriber);
disk_resource_->Start();
cpu_resource_->Start();
gpu_resource_->Start();
}
void
TearDown() override {
disk_resource_->Stop();
cpu_resource_->Stop();
gpu_resource_->Stop();
} }
void void
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册