diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index e0f3b1de253775628566084332afba804c1aa1ad..87dfa83427f77ab130711256b7cc90d5c734c72e 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -32,6 +32,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-378 - Debug and Update normal_test in scheduler unittest - MS-379 - Add Dump implementation in Resource - MS-380 - Update resource loader and executor, work util all finished +- MS-383 - Modify condition variable usage in scheduler ## New Feature - MS-343 - Implement ResourceMgr diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 916aaa238a0debf8636a637ac4aaf2c953f1aa81..1c8e7607c36b148bc5d7d52884250b5f640a5904 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -76,7 +76,7 @@ ResourceMgr::Stop() { void ResourceMgr::PostEvent(const EventPtr &event) { - std::unique_lock lock(event_mutex_); + std::lock_guard lock(event_mutex_); queue_.emplace(event); event_cv_.notify_one(); } @@ -100,13 +100,14 @@ ResourceMgr::event_process() { event_cv_.wait(lock, [this] { return !queue_.empty(); }); auto event = queue_.front(); + queue_.pop(); + lock.unlock(); if (event == nullptr) { break; } // ENGINE_LOG_DEBUG << "ResourceMgr process " << *event; - queue_.pop(); if (subscriber_) { subscriber_(event); } diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index 191d1957aa3a591cd21766a160bd3f60f1f53ea8..6eef6014fb4ca2f4a6b02a2abda75b2b0b3f82e2 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -41,10 +41,11 @@ Scheduler::Stop() { void Scheduler::PostEvent(const EventPtr &event) { - std::lock_guard lock(event_mutex_); - event_queue_.push(event); + { + std::lock_guard lock(event_mutex_); + event_queue_.push(event); + } event_cv_.notify_one(); -// SERVER_LOG_DEBUG << "Scheduler post " << *event; } std::string @@ -58,12 +59,11 @@ Scheduler::worker_function() { std::unique_lock lock(event_mutex_); event_cv_.wait(lock, [this] { return !event_queue_.empty(); }); auto event = event_queue_.front(); + event_queue_.pop(); if (event == nullptr) { break; } -// SERVER_LOG_DEBUG << "Scheduler process " << *event; - event_queue_.pop(); Process(event); } } @@ -105,16 +105,14 @@ Scheduler::OnStartUp(const EventPtr &event) { void Scheduler::OnFinishTask(const EventPtr &event) { if (auto resource = event->resource_.lock()) { - resource->WakeupExecutor(); } } void Scheduler::OnCopyCompleted(const EventPtr &event) { if (auto resource = event->resource_.lock()) { - resource->WakeupLoader(); resource->WakeupExecutor(); - if (resource->Type()== ResourceType::DISK) { + if (resource->Type() == ResourceType::DISK) { Action::PushTaskToNeighbour(event->resource_); } } diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index a9b43b3f057f88646f4e7f1ce300bc00acccddbe..3c01fc492840fd72ba34ce9959dadcda3985c7cd 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -4,6 +4,7 @@ * Proprietary and confidential. ******************************************************************************/ +#include #include "Action.h" @@ -16,7 +17,7 @@ push_task(const ResourcePtr &self, const ResourcePtr &other) { auto &self_task_table = self->task_table(); auto &other_task_table = other->task_table(); CacheMgr cache; - auto indexes = PickToMove(self_task_table, cache, 1); + auto indexes = PickToMove(self_task_table, cache, 10); for (auto index : indexes) { if (self_task_table.Move(index)) { auto task = self_task_table.Get(index)->task; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 2c46a703c6f92e06b6f949721500c456a3e06cf3..0bb886fdf04cf5cd2b12fcb81452c33d5fc04779 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -3,6 +3,7 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ +#include #include "Resource.h" @@ -61,19 +62,23 @@ TaskTable &Resource::task_table() { } void Resource::WakeupLoader() { - std::lock_guard lock(load_mutex_); - load_flag_ = true; + { + std::lock_guard lock(load_mutex_); + load_flag_ = true; + } load_cv_.notify_one(); } void Resource::WakeupExecutor() { - std::lock_guard lock(exec_mutex_); - exec_flag_ = true; + { + std::lock_guard lock(exec_mutex_); + exec_flag_ = true; + } exec_cv_.notify_one(); } TaskTableItemPtr Resource::pick_task_load() { - auto indexes = PickToLoad(task_table_, 3); + auto indexes = PickToLoad(task_table_, 10); for (auto index : indexes) { // try to set one task loading, then return if (task_table_.Load(index)) @@ -99,6 +104,7 @@ void Resource::loader_function() { std::unique_lock lock(load_mutex_); load_cv_.wait(lock, [&] { return load_flag_; }); load_flag_ = false; + lock.unlock(); while (true) { auto task_item = pick_task_load(); if (task_item == nullptr) { @@ -125,6 +131,7 @@ void Resource::executor_function() { std::unique_lock lock(exec_mutex_); exec_cv_.wait(lock, [&] { return exec_flag_; }); exec_flag_ = false; + lock.unlock(); while (true) { auto task_item = pick_task_execute(); if (task_item == nullptr) { diff --git a/cpp/unittest/scheduler/normal_test.cpp b/cpp/unittest/scheduler/normal_test.cpp index 4d1fa36de8474bb9ab1a669545cb9908530c2725..d9e4ad218c11c216eb18b1bef9af96d62c9f4f6a 100644 --- a/cpp/unittest/scheduler/normal_test.cpp +++ b/cpp/unittest/scheduler/normal_test.cpp @@ -27,59 +27,60 @@ TEST(normal_test, test1) { auto scheduler = new Scheduler(res_mgr); scheduler->Start(); - auto task1 = std::make_shared(); - auto task2 = std::make_shared(); - auto task3 = std::make_shared(); - auto task4 = std::make_shared(); - if (auto observe = disk.lock()) { - observe->task_table().Put(task1); - observe->task_table().Put(task2); - observe->task_table().Put(task3); - observe->task_table().Put(task4); + const uint64_t NUM_TASK = 10; + std::vector> tasks; + for (uint64_t i = 0; i < NUM_TASK; ++i) { + if (auto observe = disk.lock()) { + auto task = std::make_shared(); + tasks.push_back(task); + observe->task_table().Put(task); + } } -// if (auto disk_r = disk.lock()) { -// if (auto cpu_r = cpu.lock()) { -// if (auto gpu1_r = gpu1.lock()) { -// if (auto gpu2_r = gpu2.lock()) { -// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; -// std::cout << "cpu:" << std::endl; -// std::cout << cpu_r->task_table().Dump() << std::endl; -// std::cout << "gpu1:" << std::endl; -// std::cout << gpu1_r->task_table().Dump() << std::endl; -// std::cout << "gpu2:" << std::endl; -// std::cout << gpu2_r->task_table().Dump() << std::endl; -// std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl; -// } -// } -// } -// } + if (auto disk_r = disk.lock()) { + if (auto cpu_r = cpu.lock()) { + if (auto gpu1_r = gpu1.lock()) { + if (auto gpu2_r = gpu2.lock()) { + std::cout << "<<<<<<<<<task_table().Dump() << std::endl; + std::cout << "cpu:" << std::endl; + std::cout << cpu_r->task_table().Dump() << std::endl; + std::cout << "gpu1:" << std::endl; + std::cout << gpu1_r->task_table().Dump() << std::endl; + std::cout << "gpu2:" << std::endl; + std::cout << gpu2_r->task_table().Dump() << std::endl; + std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl; + } + } + } + } - sleep(5); + sleep(1); -// if (auto disk_r = disk.lock()) { -// if (auto cpu_r = cpu.lock()) { -// if (auto gpu1_r = gpu1.lock()) { -// if (auto gpu2_r = gpu2.lock()) { -// std::cout << "<<<<<<<<<task_table().Dump() << std::endl; -// std::cout << "cpu:" << std::endl; -// std::cout << cpu_r->task_table().Dump() << std::endl; -// std::cout << "gpu1:" << std::endl; -// std::cout << gpu1_r->task_table().Dump() << std::endl; -// std::cout << "gpu2:" << std::endl; -// std::cout << gpu2_r->task_table().Dump() << std::endl; -// std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl; -// } -// } -// } -// } + if (auto disk_r = disk.lock()) { + if (auto cpu_r = cpu.lock()) { + if (auto gpu1_r = gpu1.lock()) { + if (auto gpu2_r = gpu2.lock()) { + std::cout << "<<<<<<<<<task_table().Dump() << std::endl; + std::cout << "cpu:" << std::endl; + std::cout << cpu_r->task_table().Dump() << std::endl; + std::cout << "gpu1:" << std::endl; + std::cout << gpu1_r->task_table().Dump() << std::endl; + std::cout << "gpu2:" << std::endl; + std::cout << gpu2_r->task_table().Dump() << std::endl; + std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl; + } + } + } + } scheduler->Stop(); res_mgr->Stop(); - ASSERT_EQ(task1->load_count_, 1); - ASSERT_EQ(task1->exec_count_, 1); + for (uint64_t i = 0 ; i < NUM_TASK; ++i) { + ASSERT_EQ(tasks[i]->load_count_, 1); + ASSERT_EQ(tasks[i]->exec_count_, 1); + } }