From 828976ebab887712ca5a35f9154f077bb9c77c16 Mon Sep 17 00:00:00 2001 From: zhangchaoltt Date: Fri, 18 Jan 2019 12:33:30 +0800 Subject: [PATCH] framework: decouple classic sched algorithm deps on group data-struct (#6549) --- cyber/scheduler/BUILD | 17 +++++++++ cyber/scheduler/common/cv_wrapper.h | 38 +++++++++++++++++++ cyber/scheduler/common/mutex_wrapper.h | 38 +++++++++++++++++++ cyber/scheduler/policy/classic_context.cc | 31 +++++++++++---- cyber/scheduler/policy/classic_context.h | 19 +++++++--- .../policy/scheduler_choreography.cc | 27 +++++++------ cyber/scheduler/policy/scheduler_classic.cc | 28 +++++++------- cyber/scheduler/processor.cc | 2 +- cyber/scheduler/scheduler.h | 5 ++- cyber/scheduler/scheduler_policy_test.cc | 1 - 10 files changed, 161 insertions(+), 45 deletions(-) create mode 100644 cyber/scheduler/common/cv_wrapper.h create mode 100644 cyber/scheduler/common/mutex_wrapper.h diff --git a/cyber/scheduler/BUILD b/cyber/scheduler/BUILD index da5c4edb59..f94cf93021 100644 --- a/cyber/scheduler/BUILD +++ b/cyber/scheduler/BUILD @@ -41,6 +41,21 @@ cc_library( deps = [ "//cyber/croutine", "//cyber/scheduler:processor", + "//cyber/scheduler:mutex_wrapper", + ], +) + +cc_library( + name = "mutex_wrapper", + hdrs = [ + "common/mutex_wrapper.h", + ], +) + +cc_library( + name = "cv_wrapper", + hdrs = [ + "common/cv_wrapper.h", ], ) @@ -115,6 +130,8 @@ cc_library( "//cyber/croutine", "//cyber/proto:classic_conf_cc_proto", "//cyber/scheduler:processor", + "//cyber/scheduler:mutex_wrapper", + "//cyber/scheduler:cv_wrapper", ], ) diff --git a/cyber/scheduler/common/cv_wrapper.h b/cyber/scheduler/common/cv_wrapper.h new file mode 100644 index 0000000000..9620b3faec --- /dev/null +++ b/cyber/scheduler/common/cv_wrapper.h @@ -0,0 +1,38 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#ifndef CYBER_SCHEDULER_CV_WRAPPER_H_ +#define CYBER_SCHEDULER_CV_WRAPPER_H_ + +#include + +namespace apollo { +namespace cyber { +namespace scheduler { + +class CvWrapper { + public: + CvWrapper& operator=(const CvWrapper& other) = delete; + std::condition_variable& Cv() { return cv_; } + + private: + mutable std::condition_variable cv_; +}; + +} // namespace scheduler +} // namespace cyber +} // namespace apollo +#endif // CYBER_SCHEDULER_CV_WRAPPER_H_ diff --git a/cyber/scheduler/common/mutex_wrapper.h b/cyber/scheduler/common/mutex_wrapper.h new file mode 100644 index 0000000000..a026a19fa1 --- /dev/null +++ b/cyber/scheduler/common/mutex_wrapper.h @@ -0,0 +1,38 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +#ifndef CYBER_SCHEDULER_MUTEX_WRAPPER_H_ +#define CYBER_SCHEDULER_MUTEX_WRAPPER_H_ + +#include + +namespace apollo { +namespace cyber { +namespace scheduler { + +class MutexWrapper { + public: + MutexWrapper& operator=(const MutexWrapper& other) = delete; + std::mutex& Mutex() { return mutex_; } + + private: + mutable std::mutex mutex_; +}; + +} // namespace scheduler +} // namespace cyber +} // namespace apollo +#endif // CYBER_SCHEDULER_MUTEX_WRAPPER_H_ diff --git a/cyber/scheduler/policy/classic_context.cc b/cyber/scheduler/policy/classic_context.cc index 437e704798..3e6b5ad003 100644 --- a/cyber/scheduler/policy/classic_context.cc +++ b/cyber/scheduler/policy/classic_context.cc @@ -38,14 +38,29 @@ GRP_WQ_CV ClassicContext::cv_wq_; RQ_LOCK_GROUP ClassicContext::rq_locks_; CR_GROUP ClassicContext::cr_group_; +ClassicContext::ClassicContext() { + InitGroup(DEFAULT_GROUP_NAME); +} + +ClassicContext::ClassicContext(const std::string& group_name) { + InitGroup(group_name); +} + +void ClassicContext::InitGroup(const std::string& group_name) { + multi_pri_rq_ = &cr_group_[group_name]; + lq_ = &rq_locks_[group_name]; + mtx_wrapper_ = &mtx_wq_[group_name]; + cw_ = &cv_wq_[group_name]; +} + std::shared_ptr ClassicContext::NextRoutine() { if (unlikely(stop_)) { return nullptr; } for (int i = MAX_PRIO - 1; i >= 0; --i) { - ReadLockGuard lk(rq_locks_[group_name_].at(i)); - for (auto& cr : cr_group_[group_name_].at(i)) { + ReadLockGuard lk(lq_->at(i)); + for (auto& cr : multi_pri_rq_->at(i)) { if (!cr->Acquire()) { continue; } @@ -71,32 +86,32 @@ std::shared_ptr ClassicContext::NextRoutine() { } void ClassicContext::Wait() { - std::unique_lock lk(mtx_wq_[group_name_]); + std::unique_lock lk(mtx_wrapper_->Mutex()); if (stop_) { return; } if (unlikely(need_sleep_)) { auto duration = wake_time_ - std::chrono::steady_clock::now(); - cv_wq_[group_name_].wait_for(lk, duration); + cw_->Cv().wait_for(lk, duration); need_sleep_ = false; } else { - cv_wq_[group_name_].wait(lk); + cw_->Cv().wait(lk); } } void ClassicContext::Shutdown() { { - std::lock_guard lg(mtx_wq_[group_name_]); + std::lock_guard lg(mtx_wrapper_->Mutex()); if (!stop_) { stop_ = true; } } - cv_wq_[group_name_].notify_all(); + cw_->Cv().notify_all(); } void ClassicContext::Notify(const std::string& group_name) { - cv_wq_[group_name].notify_one(); + cv_wq_[group_name].Cv().notify_one(); } } // namespace scheduler diff --git a/cyber/scheduler/policy/classic_context.h b/cyber/scheduler/policy/classic_context.h index 70275dba27..431570e5b2 100644 --- a/cyber/scheduler/policy/classic_context.h +++ b/cyber/scheduler/policy/classic_context.h @@ -28,6 +28,8 @@ #include "cyber/base/atomic_rw_lock.h" #include "cyber/croutine/croutine.h" #include "cyber/scheduler/processor_context.h" +#include "cyber/scheduler/common/mutex_wrapper.h" +#include "cyber/scheduler/common/cv_wrapper.h" namespace apollo { namespace cyber { @@ -43,20 +45,20 @@ using CR_GROUP = std::unordered_map; using LOCK_QUEUE = std::array; using RQ_LOCK_GROUP = std::unordered_map; -using GRP_WQ_MUTEX = std::unordered_map; -using GRP_WQ_CV = std::unordered_map; +using GRP_WQ_MUTEX = std::unordered_map; +using GRP_WQ_CV = std::unordered_map; class ClassicContext : public ProcessorContext { public: + ClassicContext(); + explicit ClassicContext(const std::string& group_name); + std::shared_ptr NextRoutine() override; void Wait() override; void Shutdown() override; static void Notify(const std::string& group_name); - void SetGroupName(const std::string& group_name) { group_name_ = group_name; } - std::string group_name_; - alignas(CACHELINE_SIZE) static RQ_LOCK_GROUP rq_locks_; alignas(CACHELINE_SIZE) static CR_GROUP cr_group_; @@ -64,8 +66,15 @@ class ClassicContext : public ProcessorContext { alignas(CACHELINE_SIZE) static GRP_WQ_CV cv_wq_; private: + void InitGroup(const std::string& group_name); + std::chrono::steady_clock::time_point wake_time_; bool need_sleep_ = false; + + MULTI_PRIO_QUEUE *multi_pri_rq_ = nullptr; + LOCK_QUEUE *lq_ = nullptr; + MutexWrapper *mtx_wrapper_ = nullptr; + CvWrapper *cw_ = nullptr; }; } // namespace scheduler diff --git a/cyber/scheduler/policy/scheduler_choreography.cc b/cyber/scheduler/policy/scheduler_choreography.cc index a8af078599..5785bab5e7 100644 --- a/cyber/scheduler/policy/scheduler_choreography.cc +++ b/cyber/scheduler/policy/scheduler_choreography.cc @@ -112,14 +112,9 @@ void SchedulerChoreography::CreateProcessor() { // Put tasks w/o processor assigned into a classic pool. for (uint32_t i = 0; i < task_pool_size_; i++) { - auto proc = std::make_shared(); auto ctx = std::make_shared(); - ctx->SetGroupName(DEFAULT_GROUP_NAME); - ClassicContext::cr_group_[DEFAULT_GROUP_NAME]; - ClassicContext::rq_locks_[DEFAULT_GROUP_NAME]; - ClassicContext::mtx_wq_[DEFAULT_GROUP_NAME]; - ClassicContext::cv_wq_[DEFAULT_GROUP_NAME]; + auto proc = std::make_shared(); proc->BindContext(ctx); proc->SetAffinity(pool_cpuset_, pool_affinity_, i); proc->SetSchedPolicy(pool_processor_policy_, pool_processor_prio_); @@ -131,15 +126,17 @@ void SchedulerChoreography::CreateProcessor() { bool SchedulerChoreography::DispatchTask(const std::shared_ptr& cr) { // we use multi-key mutex to prevent race condition // when del && add cr with same crid - if (likely(id_cr_wl_.find(cr->id()) == id_cr_wl_.end())) { + MutexWrapper *wrapper = nullptr; + if (!id_map_mutex_.Get(cr->id(), &wrapper)) { { std::lock_guard wl_lg(cr_wl_mtx_); - if (id_cr_wl_.find(cr->id()) == id_cr_wl_.end()) { - id_cr_wl_[cr->id()]; + if (!id_map_mutex_.Get(cr->id(), &wrapper)) { + wrapper = new MutexWrapper(); + id_map_mutex_.Set(cr->id(), wrapper); } } } - std::lock_guard lg(id_cr_wl_[cr->id()]); + std::lock_guard lg(wrapper->Mutex()); // Assign sched cfg to tasks according to configuration. if (cr_confs_.find(cr->name()) != cr_confs_.end()) { @@ -198,15 +195,17 @@ bool SchedulerChoreography::RemoveTask(const std::string& name) { bool SchedulerChoreography::RemoveCRoutine(uint64_t crid) { // we use multi-key mutex to prevent race condition // when del && add cr with same crid - if (unlikely(id_cr_wl_.find(crid) == id_cr_wl_.end())) { + MutexWrapper *wrapper = nullptr; + if (!id_map_mutex_.Get(crid, &wrapper)) { { std::lock_guard wl_lg(cr_wl_mtx_); - if (id_cr_wl_.find(crid) == id_cr_wl_.end()) { - id_cr_wl_[crid]; + if (!id_map_mutex_.Get(crid, &wrapper)) { + wrapper = new MutexWrapper(); + id_map_mutex_.Set(crid, wrapper); } } } - std::lock_guard lg(id_cr_wl_[crid]); + std::lock_guard lg(wrapper->Mutex()); // Find cr from id_cr && // get cr prio if cr found diff --git a/cyber/scheduler/policy/scheduler_classic.cc b/cyber/scheduler/policy/scheduler_classic.cc index 2c6c9d8ce3..b540f5b619 100644 --- a/cyber/scheduler/policy/scheduler_classic.cc +++ b/cyber/scheduler/policy/scheduler_classic.cc @@ -90,14 +90,8 @@ void SchedulerClassic::CreateProcessor() { std::vector cpuset; ParseCpuset(group.cpuset(), &cpuset); - ClassicContext::cr_group_[group_name]; - ClassicContext::rq_locks_[group_name]; - ClassicContext::mtx_wq_[group_name]; - ClassicContext::cv_wq_[group_name]; - for (uint32_t i = 0; i < proc_num; i++) { - auto ctx = std::make_shared(); - ctx->SetGroupName(group_name); + auto ctx = std::make_shared(group_name); pctxs_.emplace_back(ctx); auto proc = std::make_shared(); @@ -112,15 +106,17 @@ void SchedulerClassic::CreateProcessor() { bool SchedulerClassic::DispatchTask(const std::shared_ptr& cr) { // we use multi-key mutex to prevent race condition // when del && add cr with same crid - if (likely(id_cr_wl_.find(cr->id()) == id_cr_wl_.end())) { + MutexWrapper *wrapper = nullptr; + if (!id_map_mutex_.Get(cr->id(), &wrapper)) { { std::lock_guard wl_lg(cr_wl_mtx_); - if (id_cr_wl_.find(cr->id()) == id_cr_wl_.end()) { - id_cr_wl_[cr->id()]; + if (!id_map_mutex_.Get(cr->id(), &wrapper)) { + wrapper = new MutexWrapper(); + id_map_mutex_.Set(cr->id(), wrapper); } } } - std::lock_guard lg(id_cr_wl_[cr->id()]); + std::lock_guard lg(wrapper->Mutex()); { WriteLockGuard lk(id_cr_lock_); @@ -192,15 +188,17 @@ bool SchedulerClassic::RemoveTask(const std::string& name) { bool SchedulerClassic::RemoveCRoutine(uint64_t crid) { // we use multi-key mutex to prevent race condition // when del && add cr with same crid - if (unlikely(id_cr_wl_.find(crid) == id_cr_wl_.end())) { + MutexWrapper *wrapper = nullptr; + if (!id_map_mutex_.Get(crid, &wrapper)) { { std::lock_guard wl_lg(cr_wl_mtx_); - if (id_cr_wl_.find(crid) == id_cr_wl_.end()) { - id_cr_wl_[crid]; + if (!id_map_mutex_.Get(crid, &wrapper)) { + wrapper = new MutexWrapper(); + id_map_mutex_.Set(crid, wrapper); } } } - std::lock_guard lg(id_cr_wl_[crid]); + std::lock_guard lg(wrapper->Mutex()); std::shared_ptr cr = nullptr; int prio; diff --git a/cyber/scheduler/processor.cc b/cyber/scheduler/processor.cc index 499504b9a8..00642091b1 100644 --- a/cyber/scheduler/processor.cc +++ b/cyber/scheduler/processor.cc @@ -79,7 +79,7 @@ void Processor::SetSchedPolicy(std::string spolicy, int sched_priority) { void Processor::Run() { tid_.store(static_cast(syscall(SYS_gettid))); - while (likely(running_)) { + while (likely(running_.load())) { if (likely(context_ != nullptr)) { auto croutine = context_->NextRoutine(); if (croutine) { diff --git a/cyber/scheduler/scheduler.h b/cyber/scheduler/scheduler.h index f6a9b707cc..cd3219dc40 100644 --- a/cyber/scheduler/scheduler.h +++ b/cyber/scheduler/scheduler.h @@ -28,11 +28,13 @@ #include #include "cyber/base/atomic_rw_lock.h" +#include "cyber/base/atomic_hash_map.h" #include "cyber/common/log.h" #include "cyber/common/macros.h" #include "cyber/common/types.h" #include "cyber/croutine/croutine.h" #include "cyber/croutine/routine_factory.h" +#include "cyber/scheduler/common/mutex_wrapper.h" namespace apollo { namespace cyber { @@ -40,6 +42,7 @@ namespace scheduler { using apollo::cyber::base::AtomicRWLock; using apollo::cyber::base::ReadLockGuard; +using apollo::cyber::base::AtomicHashMap; using apollo::cyber::croutine::CRoutine; using apollo::cyber::croutine::RoutineFactory; using apollo::cyber::data::DataVisitorBase; @@ -73,7 +76,7 @@ class Scheduler { void ParseCpuset(const std::string&, std::vector*); AtomicRWLock id_cr_lock_; - std::unordered_map id_cr_wl_; + AtomicHashMap id_map_mutex_; std::mutex cr_wl_mtx_; std::unordered_map> id_cr_; diff --git a/cyber/scheduler/scheduler_policy_test.cc b/cyber/scheduler/scheduler_policy_test.cc index 5d12ef9c83..7df8e0f1e5 100644 --- a/cyber/scheduler/scheduler_policy_test.cc +++ b/cyber/scheduler/scheduler_policy_test.cc @@ -49,7 +49,6 @@ TEST(SchedulerPolicyTest, classic) { auto processor = std::make_shared(); auto ctx = std::make_shared(); processor->BindContext(ctx); - ctx->SetGroupName(DEFAULT_GROUP_NAME); std::vector> res; // test single routine -- GitLab