提交 7f777119 编写于 作者: J jinhai

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-383 Modify condition variable usage in scheduler

See merge request megasearch/milvus!390

Former-commit-id: 262de61e8cbc2430d7dfea90e5db0b8c8cbccd2b
...@@ -32,6 +32,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -32,6 +32,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-378 - Debug and Update normal_test in scheduler unittest - MS-378 - Debug and Update normal_test in scheduler unittest
- MS-379 - Add Dump implementation in Resource - MS-379 - Add Dump implementation in Resource
- MS-380 - Update resource loader and executor, work util all finished - MS-380 - Update resource loader and executor, work util all finished
- MS-383 - Modify condition variable usage in scheduler
## New Feature ## New Feature
- MS-343 - Implement ResourceMgr - MS-343 - Implement ResourceMgr
......
...@@ -76,7 +76,7 @@ ResourceMgr::Stop() { ...@@ -76,7 +76,7 @@ ResourceMgr::Stop() {
void void
ResourceMgr::PostEvent(const EventPtr &event) { ResourceMgr::PostEvent(const EventPtr &event) {
std::unique_lock<std::mutex> lock(event_mutex_); std::lock_guard<std::mutex> lock(event_mutex_);
queue_.emplace(event); queue_.emplace(event);
event_cv_.notify_one(); event_cv_.notify_one();
} }
...@@ -100,13 +100,14 @@ ResourceMgr::event_process() { ...@@ -100,13 +100,14 @@ ResourceMgr::event_process() {
event_cv_.wait(lock, [this] { return !queue_.empty(); }); event_cv_.wait(lock, [this] { return !queue_.empty(); });
auto event = queue_.front(); auto event = queue_.front();
queue_.pop();
lock.unlock();
if (event == nullptr) { if (event == nullptr) {
break; break;
} }
// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event; // ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;
queue_.pop();
if (subscriber_) { if (subscriber_) {
subscriber_(event); subscriber_(event);
} }
......
...@@ -41,10 +41,11 @@ Scheduler::Stop() { ...@@ -41,10 +41,11 @@ Scheduler::Stop() {
void void
Scheduler::PostEvent(const EventPtr &event) { Scheduler::PostEvent(const EventPtr &event) {
std::lock_guard<std::mutex> lock(event_mutex_); {
event_queue_.push(event); std::lock_guard<std::mutex> lock(event_mutex_);
event_queue_.push(event);
}
event_cv_.notify_one(); event_cv_.notify_one();
// SERVER_LOG_DEBUG << "Scheduler post " << *event;
} }
std::string std::string
...@@ -58,12 +59,11 @@ Scheduler::worker_function() { ...@@ -58,12 +59,11 @@ Scheduler::worker_function() {
std::unique_lock<std::mutex> lock(event_mutex_); std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !event_queue_.empty(); }); event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
auto event = event_queue_.front(); auto event = event_queue_.front();
event_queue_.pop();
if (event == nullptr) { if (event == nullptr) {
break; break;
} }
// SERVER_LOG_DEBUG << "Scheduler process " << *event;
event_queue_.pop();
Process(event); Process(event);
} }
} }
...@@ -105,16 +105,14 @@ Scheduler::OnStartUp(const EventPtr &event) { ...@@ -105,16 +105,14 @@ Scheduler::OnStartUp(const EventPtr &event) {
void void
Scheduler::OnFinishTask(const EventPtr &event) { Scheduler::OnFinishTask(const EventPtr &event) {
if (auto resource = event->resource_.lock()) { if (auto resource = event->resource_.lock()) {
resource->WakeupExecutor();
} }
} }
void void
Scheduler::OnCopyCompleted(const EventPtr &event) { Scheduler::OnCopyCompleted(const EventPtr &event) {
if (auto resource = event->resource_.lock()) { if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
resource->WakeupExecutor(); resource->WakeupExecutor();
if (resource->Type()== ResourceType::DISK) { if (resource->Type() == ResourceType::DISK) {
Action::PushTaskToNeighbour(event->resource_); Action::PushTaskToNeighbour(event->resource_);
} }
} }
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
* Proprietary and confidential. * Proprietary and confidential.
******************************************************************************/ ******************************************************************************/
#include <iostream>
#include "Action.h" #include "Action.h"
...@@ -16,7 +17,7 @@ push_task(const ResourcePtr &self, const ResourcePtr &other) { ...@@ -16,7 +17,7 @@ push_task(const ResourcePtr &self, const ResourcePtr &other) {
auto &self_task_table = self->task_table(); auto &self_task_table = self->task_table();
auto &other_task_table = other->task_table(); auto &other_task_table = other->task_table();
CacheMgr cache; CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, 1); auto indexes = PickToMove(self_task_table, cache, 10);
for (auto index : indexes) { for (auto index : indexes) {
if (self_task_table.Move(index)) { if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task; auto task = self_task_table.Get(index)->task;
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
* Unauthorized copying of this file, via any medium is strictly prohibited. * Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential. * Proprietary and confidential.
******************************************************************************/ ******************************************************************************/
#include <iostream>
#include "Resource.h" #include "Resource.h"
...@@ -61,19 +62,23 @@ TaskTable &Resource::task_table() { ...@@ -61,19 +62,23 @@ TaskTable &Resource::task_table() {
} }
void Resource::WakeupLoader() { void Resource::WakeupLoader() {
std::lock_guard<std::mutex> lock(load_mutex_); {
load_flag_ = true; std::lock_guard<std::mutex> lock(load_mutex_);
load_flag_ = true;
}
load_cv_.notify_one(); load_cv_.notify_one();
} }
void Resource::WakeupExecutor() { void Resource::WakeupExecutor() {
std::lock_guard<std::mutex> lock(exec_mutex_); {
exec_flag_ = true; std::lock_guard<std::mutex> lock(exec_mutex_);
exec_flag_ = true;
}
exec_cv_.notify_one(); exec_cv_.notify_one();
} }
TaskTableItemPtr Resource::pick_task_load() { TaskTableItemPtr Resource::pick_task_load() {
auto indexes = PickToLoad(task_table_, 3); auto indexes = PickToLoad(task_table_, 10);
for (auto index : indexes) { for (auto index : indexes) {
// try to set one task loading, then return // try to set one task loading, then return
if (task_table_.Load(index)) if (task_table_.Load(index))
...@@ -99,6 +104,7 @@ void Resource::loader_function() { ...@@ -99,6 +104,7 @@ void Resource::loader_function() {
std::unique_lock<std::mutex> lock(load_mutex_); std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; }); load_cv_.wait(lock, [&] { return load_flag_; });
load_flag_ = false; load_flag_ = false;
lock.unlock();
while (true) { while (true) {
auto task_item = pick_task_load(); auto task_item = pick_task_load();
if (task_item == nullptr) { if (task_item == nullptr) {
...@@ -125,6 +131,7 @@ void Resource::executor_function() { ...@@ -125,6 +131,7 @@ void Resource::executor_function() {
std::unique_lock<std::mutex> lock(exec_mutex_); std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; }); exec_cv_.wait(lock, [&] { return exec_flag_; });
exec_flag_ = false; exec_flag_ = false;
lock.unlock();
while (true) { while (true) {
auto task_item = pick_task_execute(); auto task_item = pick_task_execute();
if (task_item == nullptr) { if (task_item == nullptr) {
......
...@@ -27,59 +27,60 @@ TEST(normal_test, test1) { ...@@ -27,59 +27,60 @@ TEST(normal_test, test1) {
auto scheduler = new Scheduler(res_mgr); auto scheduler = new Scheduler(res_mgr);
scheduler->Start(); scheduler->Start();
auto task1 = std::make_shared<TestTask>(); const uint64_t NUM_TASK = 10;
auto task2 = std::make_shared<TestTask>(); std::vector<std::shared_ptr<TestTask>> tasks;
auto task3 = std::make_shared<TestTask>(); for (uint64_t i = 0; i < NUM_TASK; ++i) {
auto task4 = std::make_shared<TestTask>(); if (auto observe = disk.lock()) {
if (auto observe = disk.lock()) { auto task = std::make_shared<TestTask>();
observe->task_table().Put(task1); tasks.push_back(task);
observe->task_table().Put(task2); observe->task_table().Put(task);
observe->task_table().Put(task3); }
observe->task_table().Put(task4);
} }
// if (auto disk_r = disk.lock()) { if (auto disk_r = disk.lock()) {
// if (auto cpu_r = cpu.lock()) { if (auto cpu_r = cpu.lock()) {
// if (auto gpu1_r = gpu1.lock()) { if (auto gpu1_r = gpu1.lock()) {
// if (auto gpu2_r = gpu2.lock()) { if (auto gpu2_r = gpu2.lock()) {
// std::cout << "<<<<<<<<<<before<<<<<<<<<<" << std::endl; std::cout << "<<<<<<<<<<before<<<<<<<<<<" << std::endl;
// std::cout << "disk:" << std::endl; std::cout << "disk:" << std::endl;
// std::cout << disk_r->task_table().Dump() << std::endl; std::cout << disk_r->task_table().Dump() << std::endl;
// std::cout << "cpu:" << std::endl; std::cout << "cpu:" << std::endl;
// std::cout << cpu_r->task_table().Dump() << std::endl; std::cout << cpu_r->task_table().Dump() << std::endl;
// std::cout << "gpu1:" << std::endl; std::cout << "gpu1:" << std::endl;
// std::cout << gpu1_r->task_table().Dump() << std::endl; std::cout << gpu1_r->task_table().Dump() << std::endl;
// std::cout << "gpu2:" << std::endl; std::cout << "gpu2:" << std::endl;
// std::cout << gpu2_r->task_table().Dump() << std::endl; std::cout << gpu2_r->task_table().Dump() << std::endl;
// std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl; std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl;
// } }
// } }
// } }
// } }
sleep(5); sleep(1);
// if (auto disk_r = disk.lock()) { if (auto disk_r = disk.lock()) {
// if (auto cpu_r = cpu.lock()) { if (auto cpu_r = cpu.lock()) {
// if (auto gpu1_r = gpu1.lock()) { if (auto gpu1_r = gpu1.lock()) {
// if (auto gpu2_r = gpu2.lock()) { if (auto gpu2_r = gpu2.lock()) {
// std::cout << "<<<<<<<<<<after<<<<<<<<<<" << std::endl; std::cout << "<<<<<<<<<<after<<<<<<<<<<" << std::endl;
// std::cout << "disk:" << std::endl; std::cout << "disk:" << std::endl;
// std::cout << disk_r->task_table().Dump() << std::endl; std::cout << disk_r->task_table().Dump() << std::endl;
// std::cout << "cpu:" << std::endl; std::cout << "cpu:" << std::endl;
// std::cout << cpu_r->task_table().Dump() << std::endl; std::cout << cpu_r->task_table().Dump() << std::endl;
// std::cout << "gpu1:" << std::endl; std::cout << "gpu1:" << std::endl;
// std::cout << gpu1_r->task_table().Dump() << std::endl; std::cout << gpu1_r->task_table().Dump() << std::endl;
// std::cout << "gpu2:" << std::endl; std::cout << "gpu2:" << std::endl;
// std::cout << gpu2_r->task_table().Dump() << std::endl; std::cout << gpu2_r->task_table().Dump() << std::endl;
// std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl; std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl;
// } }
// } }
// } }
// } }
scheduler->Stop(); scheduler->Stop();
res_mgr->Stop(); res_mgr->Stop();
ASSERT_EQ(task1->load_count_, 1); for (uint64_t i = 0 ; i < NUM_TASK; ++i) {
ASSERT_EQ(task1->exec_count_, 1); ASSERT_EQ(tasks[i]->load_count_, 1);
ASSERT_EQ(tasks[i]->exec_count_, 1);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册