未验证 提交 828976eb 编写于 作者: Z zhangchaoltt 提交者: GitHub

 
framework: decouple classic sched algorithm deps on group data-struct (#6549)
上级 06741f21
...@@ -41,6 +41,21 @@ cc_library( ...@@ -41,6 +41,21 @@ cc_library(
deps = [ deps = [
"//cyber/croutine", "//cyber/croutine",
"//cyber/scheduler:processor", "//cyber/scheduler:processor",
"//cyber/scheduler:mutex_wrapper",
],
)
cc_library(
name = "mutex_wrapper",
hdrs = [
"common/mutex_wrapper.h",
],
)
cc_library(
name = "cv_wrapper",
hdrs = [
"common/cv_wrapper.h",
], ],
) )
...@@ -115,6 +130,8 @@ cc_library( ...@@ -115,6 +130,8 @@ cc_library(
"//cyber/croutine", "//cyber/croutine",
"//cyber/proto:classic_conf_cc_proto", "//cyber/proto:classic_conf_cc_proto",
"//cyber/scheduler:processor", "//cyber/scheduler:processor",
"//cyber/scheduler:mutex_wrapper",
"//cyber/scheduler:cv_wrapper",
], ],
) )
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_SCHEDULER_CV_WRAPPER_H_
#define CYBER_SCHEDULER_CV_WRAPPER_H_
#include <mutex>
namespace apollo {
namespace cyber {
namespace scheduler {
class CvWrapper {
public:
CvWrapper& operator=(const CvWrapper& other) = delete;
std::condition_variable& Cv() { return cv_; }
private:
mutable std::condition_variable cv_;
};
} // namespace scheduler
} // namespace cyber
} // namespace apollo
#endif // CYBER_SCHEDULER_CV_WRAPPER_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_SCHEDULER_MUTEX_WRAPPER_H_
#define CYBER_SCHEDULER_MUTEX_WRAPPER_H_
#include <mutex>
namespace apollo {
namespace cyber {
namespace scheduler {
class MutexWrapper {
public:
MutexWrapper& operator=(const MutexWrapper& other) = delete;
std::mutex& Mutex() { return mutex_; }
private:
mutable std::mutex mutex_;
};
} // namespace scheduler
} // namespace cyber
} // namespace apollo
#endif // CYBER_SCHEDULER_MUTEX_WRAPPER_H_
...@@ -38,14 +38,29 @@ GRP_WQ_CV ClassicContext::cv_wq_; ...@@ -38,14 +38,29 @@ GRP_WQ_CV ClassicContext::cv_wq_;
RQ_LOCK_GROUP ClassicContext::rq_locks_; RQ_LOCK_GROUP ClassicContext::rq_locks_;
CR_GROUP ClassicContext::cr_group_; CR_GROUP ClassicContext::cr_group_;
ClassicContext::ClassicContext() {
InitGroup(DEFAULT_GROUP_NAME);
}
ClassicContext::ClassicContext(const std::string& group_name) {
InitGroup(group_name);
}
void ClassicContext::InitGroup(const std::string& group_name) {
multi_pri_rq_ = &cr_group_[group_name];
lq_ = &rq_locks_[group_name];
mtx_wrapper_ = &mtx_wq_[group_name];
cw_ = &cv_wq_[group_name];
}
std::shared_ptr<CRoutine> ClassicContext::NextRoutine() { std::shared_ptr<CRoutine> ClassicContext::NextRoutine() {
if (unlikely(stop_)) { if (unlikely(stop_)) {
return nullptr; return nullptr;
} }
for (int i = MAX_PRIO - 1; i >= 0; --i) { for (int i = MAX_PRIO - 1; i >= 0; --i) {
ReadLockGuard<AtomicRWLock> lk(rq_locks_[group_name_].at(i)); ReadLockGuard<AtomicRWLock> lk(lq_->at(i));
for (auto& cr : cr_group_[group_name_].at(i)) { for (auto& cr : multi_pri_rq_->at(i)) {
if (!cr->Acquire()) { if (!cr->Acquire()) {
continue; continue;
} }
...@@ -71,32 +86,32 @@ std::shared_ptr<CRoutine> ClassicContext::NextRoutine() { ...@@ -71,32 +86,32 @@ std::shared_ptr<CRoutine> ClassicContext::NextRoutine() {
} }
void ClassicContext::Wait() { void ClassicContext::Wait() {
std::unique_lock<std::mutex> lk(mtx_wq_[group_name_]); std::unique_lock<std::mutex> lk(mtx_wrapper_->Mutex());
if (stop_) { if (stop_) {
return; return;
} }
if (unlikely(need_sleep_)) { if (unlikely(need_sleep_)) {
auto duration = wake_time_ - std::chrono::steady_clock::now(); auto duration = wake_time_ - std::chrono::steady_clock::now();
cv_wq_[group_name_].wait_for(lk, duration); cw_->Cv().wait_for(lk, duration);
need_sleep_ = false; need_sleep_ = false;
} else { } else {
cv_wq_[group_name_].wait(lk); cw_->Cv().wait(lk);
} }
} }
void ClassicContext::Shutdown() { void ClassicContext::Shutdown() {
{ {
std::lock_guard<std::mutex> lg(mtx_wq_[group_name_]); std::lock_guard<std::mutex> lg(mtx_wrapper_->Mutex());
if (!stop_) { if (!stop_) {
stop_ = true; stop_ = true;
} }
} }
cv_wq_[group_name_].notify_all(); cw_->Cv().notify_all();
} }
void ClassicContext::Notify(const std::string& group_name) { void ClassicContext::Notify(const std::string& group_name) {
cv_wq_[group_name].notify_one(); cv_wq_[group_name].Cv().notify_one();
} }
} // namespace scheduler } // namespace scheduler
......
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
#include "cyber/base/atomic_rw_lock.h" #include "cyber/base/atomic_rw_lock.h"
#include "cyber/croutine/croutine.h" #include "cyber/croutine/croutine.h"
#include "cyber/scheduler/processor_context.h" #include "cyber/scheduler/processor_context.h"
#include "cyber/scheduler/common/mutex_wrapper.h"
#include "cyber/scheduler/common/cv_wrapper.h"
namespace apollo { namespace apollo {
namespace cyber { namespace cyber {
...@@ -43,20 +45,20 @@ using CR_GROUP = std::unordered_map<std::string, MULTI_PRIO_QUEUE>; ...@@ -43,20 +45,20 @@ using CR_GROUP = std::unordered_map<std::string, MULTI_PRIO_QUEUE>;
using LOCK_QUEUE = std::array<base::AtomicRWLock, MAX_PRIO>; using LOCK_QUEUE = std::array<base::AtomicRWLock, MAX_PRIO>;
using RQ_LOCK_GROUP = std::unordered_map<std::string, LOCK_QUEUE>; using RQ_LOCK_GROUP = std::unordered_map<std::string, LOCK_QUEUE>;
using GRP_WQ_MUTEX = std::unordered_map<std::string, std::mutex>; using GRP_WQ_MUTEX = std::unordered_map<std::string, MutexWrapper>;
using GRP_WQ_CV = std::unordered_map<std::string, std::condition_variable>; using GRP_WQ_CV = std::unordered_map<std::string, CvWrapper>;
class ClassicContext : public ProcessorContext { class ClassicContext : public ProcessorContext {
public: public:
ClassicContext();
explicit ClassicContext(const std::string& group_name);
std::shared_ptr<CRoutine> NextRoutine() override; std::shared_ptr<CRoutine> NextRoutine() override;
void Wait() override; void Wait() override;
void Shutdown() override; void Shutdown() override;
static void Notify(const std::string& group_name); static void Notify(const std::string& group_name);
void SetGroupName(const std::string& group_name) { group_name_ = group_name; }
std::string group_name_;
alignas(CACHELINE_SIZE) static RQ_LOCK_GROUP rq_locks_; alignas(CACHELINE_SIZE) static RQ_LOCK_GROUP rq_locks_;
alignas(CACHELINE_SIZE) static CR_GROUP cr_group_; alignas(CACHELINE_SIZE) static CR_GROUP cr_group_;
...@@ -64,8 +66,15 @@ class ClassicContext : public ProcessorContext { ...@@ -64,8 +66,15 @@ class ClassicContext : public ProcessorContext {
alignas(CACHELINE_SIZE) static GRP_WQ_CV cv_wq_; alignas(CACHELINE_SIZE) static GRP_WQ_CV cv_wq_;
private: private:
void InitGroup(const std::string& group_name);
std::chrono::steady_clock::time_point wake_time_; std::chrono::steady_clock::time_point wake_time_;
bool need_sleep_ = false; bool need_sleep_ = false;
MULTI_PRIO_QUEUE *multi_pri_rq_ = nullptr;
LOCK_QUEUE *lq_ = nullptr;
MutexWrapper *mtx_wrapper_ = nullptr;
CvWrapper *cw_ = nullptr;
}; };
} // namespace scheduler } // namespace scheduler
......
...@@ -112,14 +112,9 @@ void SchedulerChoreography::CreateProcessor() { ...@@ -112,14 +112,9 @@ void SchedulerChoreography::CreateProcessor() {
// Put tasks w/o processor assigned into a classic pool. // Put tasks w/o processor assigned into a classic pool.
for (uint32_t i = 0; i < task_pool_size_; i++) { for (uint32_t i = 0; i < task_pool_size_; i++) {
auto proc = std::make_shared<Processor>();
auto ctx = std::make_shared<ClassicContext>(); auto ctx = std::make_shared<ClassicContext>();
ctx->SetGroupName(DEFAULT_GROUP_NAME);
ClassicContext::cr_group_[DEFAULT_GROUP_NAME];
ClassicContext::rq_locks_[DEFAULT_GROUP_NAME];
ClassicContext::mtx_wq_[DEFAULT_GROUP_NAME];
ClassicContext::cv_wq_[DEFAULT_GROUP_NAME];
auto proc = std::make_shared<Processor>();
proc->BindContext(ctx); proc->BindContext(ctx);
proc->SetAffinity(pool_cpuset_, pool_affinity_, i); proc->SetAffinity(pool_cpuset_, pool_affinity_, i);
proc->SetSchedPolicy(pool_processor_policy_, pool_processor_prio_); proc->SetSchedPolicy(pool_processor_policy_, pool_processor_prio_);
...@@ -131,15 +126,17 @@ void SchedulerChoreography::CreateProcessor() { ...@@ -131,15 +126,17 @@ void SchedulerChoreography::CreateProcessor() {
bool SchedulerChoreography::DispatchTask(const std::shared_ptr<CRoutine>& cr) { bool SchedulerChoreography::DispatchTask(const std::shared_ptr<CRoutine>& cr) {
// we use multi-key mutex to prevent race condition // we use multi-key mutex to prevent race condition
// when del && add cr with same crid // when del && add cr with same crid
if (likely(id_cr_wl_.find(cr->id()) == id_cr_wl_.end())) { MutexWrapper *wrapper = nullptr;
if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
{ {
std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_); std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
if (id_cr_wl_.find(cr->id()) == id_cr_wl_.end()) { if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
id_cr_wl_[cr->id()]; wrapper = new MutexWrapper();
id_map_mutex_.Set(cr->id(), wrapper);
} }
} }
} }
std::lock_guard<std::mutex> lg(id_cr_wl_[cr->id()]); std::lock_guard<std::mutex> lg(wrapper->Mutex());
// Assign sched cfg to tasks according to configuration. // Assign sched cfg to tasks according to configuration.
if (cr_confs_.find(cr->name()) != cr_confs_.end()) { if (cr_confs_.find(cr->name()) != cr_confs_.end()) {
...@@ -198,15 +195,17 @@ bool SchedulerChoreography::RemoveTask(const std::string& name) { ...@@ -198,15 +195,17 @@ bool SchedulerChoreography::RemoveTask(const std::string& name) {
bool SchedulerChoreography::RemoveCRoutine(uint64_t crid) { bool SchedulerChoreography::RemoveCRoutine(uint64_t crid) {
// we use multi-key mutex to prevent race condition // we use multi-key mutex to prevent race condition
// when del && add cr with same crid // when del && add cr with same crid
if (unlikely(id_cr_wl_.find(crid) == id_cr_wl_.end())) { MutexWrapper *wrapper = nullptr;
if (!id_map_mutex_.Get(crid, &wrapper)) {
{ {
std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_); std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
if (id_cr_wl_.find(crid) == id_cr_wl_.end()) { if (!id_map_mutex_.Get(crid, &wrapper)) {
id_cr_wl_[crid]; wrapper = new MutexWrapper();
id_map_mutex_.Set(crid, wrapper);
} }
} }
} }
std::lock_guard<std::mutex> lg(id_cr_wl_[crid]); std::lock_guard<std::mutex> lg(wrapper->Mutex());
// Find cr from id_cr && // Find cr from id_cr &&
// get cr prio if cr found // get cr prio if cr found
......
...@@ -90,14 +90,8 @@ void SchedulerClassic::CreateProcessor() { ...@@ -90,14 +90,8 @@ void SchedulerClassic::CreateProcessor() {
std::vector<int> cpuset; std::vector<int> cpuset;
ParseCpuset(group.cpuset(), &cpuset); ParseCpuset(group.cpuset(), &cpuset);
ClassicContext::cr_group_[group_name];
ClassicContext::rq_locks_[group_name];
ClassicContext::mtx_wq_[group_name];
ClassicContext::cv_wq_[group_name];
for (uint32_t i = 0; i < proc_num; i++) { for (uint32_t i = 0; i < proc_num; i++) {
auto ctx = std::make_shared<ClassicContext>(); auto ctx = std::make_shared<ClassicContext>(group_name);
ctx->SetGroupName(group_name);
pctxs_.emplace_back(ctx); pctxs_.emplace_back(ctx);
auto proc = std::make_shared<Processor>(); auto proc = std::make_shared<Processor>();
...@@ -112,15 +106,17 @@ void SchedulerClassic::CreateProcessor() { ...@@ -112,15 +106,17 @@ void SchedulerClassic::CreateProcessor() {
bool SchedulerClassic::DispatchTask(const std::shared_ptr<CRoutine>& cr) { bool SchedulerClassic::DispatchTask(const std::shared_ptr<CRoutine>& cr) {
// we use multi-key mutex to prevent race condition // we use multi-key mutex to prevent race condition
// when del && add cr with same crid // when del && add cr with same crid
if (likely(id_cr_wl_.find(cr->id()) == id_cr_wl_.end())) { MutexWrapper *wrapper = nullptr;
if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
{ {
std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_); std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
if (id_cr_wl_.find(cr->id()) == id_cr_wl_.end()) { if (!id_map_mutex_.Get(cr->id(), &wrapper)) {
id_cr_wl_[cr->id()]; wrapper = new MutexWrapper();
id_map_mutex_.Set(cr->id(), wrapper);
} }
} }
} }
std::lock_guard<std::mutex> lg(id_cr_wl_[cr->id()]); std::lock_guard<std::mutex> lg(wrapper->Mutex());
{ {
WriteLockGuard<AtomicRWLock> lk(id_cr_lock_); WriteLockGuard<AtomicRWLock> lk(id_cr_lock_);
...@@ -192,15 +188,17 @@ bool SchedulerClassic::RemoveTask(const std::string& name) { ...@@ -192,15 +188,17 @@ bool SchedulerClassic::RemoveTask(const std::string& name) {
bool SchedulerClassic::RemoveCRoutine(uint64_t crid) { bool SchedulerClassic::RemoveCRoutine(uint64_t crid) {
// we use multi-key mutex to prevent race condition // we use multi-key mutex to prevent race condition
// when del && add cr with same crid // when del && add cr with same crid
if (unlikely(id_cr_wl_.find(crid) == id_cr_wl_.end())) { MutexWrapper *wrapper = nullptr;
if (!id_map_mutex_.Get(crid, &wrapper)) {
{ {
std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_); std::lock_guard<std::mutex> wl_lg(cr_wl_mtx_);
if (id_cr_wl_.find(crid) == id_cr_wl_.end()) { if (!id_map_mutex_.Get(crid, &wrapper)) {
id_cr_wl_[crid]; wrapper = new MutexWrapper();
id_map_mutex_.Set(crid, wrapper);
} }
} }
} }
std::lock_guard<std::mutex> lg(id_cr_wl_[crid]); std::lock_guard<std::mutex> lg(wrapper->Mutex());
std::shared_ptr<CRoutine> cr = nullptr; std::shared_ptr<CRoutine> cr = nullptr;
int prio; int prio;
......
...@@ -79,7 +79,7 @@ void Processor::SetSchedPolicy(std::string spolicy, int sched_priority) { ...@@ -79,7 +79,7 @@ void Processor::SetSchedPolicy(std::string spolicy, int sched_priority) {
void Processor::Run() { void Processor::Run() {
tid_.store(static_cast<int>(syscall(SYS_gettid))); tid_.store(static_cast<int>(syscall(SYS_gettid)));
while (likely(running_)) { while (likely(running_.load())) {
if (likely(context_ != nullptr)) { if (likely(context_ != nullptr)) {
auto croutine = context_->NextRoutine(); auto croutine = context_->NextRoutine();
if (croutine) { if (croutine) {
......
...@@ -28,11 +28,13 @@ ...@@ -28,11 +28,13 @@
#include <vector> #include <vector>
#include "cyber/base/atomic_rw_lock.h" #include "cyber/base/atomic_rw_lock.h"
#include "cyber/base/atomic_hash_map.h"
#include "cyber/common/log.h" #include "cyber/common/log.h"
#include "cyber/common/macros.h" #include "cyber/common/macros.h"
#include "cyber/common/types.h" #include "cyber/common/types.h"
#include "cyber/croutine/croutine.h" #include "cyber/croutine/croutine.h"
#include "cyber/croutine/routine_factory.h" #include "cyber/croutine/routine_factory.h"
#include "cyber/scheduler/common/mutex_wrapper.h"
namespace apollo { namespace apollo {
namespace cyber { namespace cyber {
...@@ -40,6 +42,7 @@ namespace scheduler { ...@@ -40,6 +42,7 @@ namespace scheduler {
using apollo::cyber::base::AtomicRWLock; using apollo::cyber::base::AtomicRWLock;
using apollo::cyber::base::ReadLockGuard; using apollo::cyber::base::ReadLockGuard;
using apollo::cyber::base::AtomicHashMap;
using apollo::cyber::croutine::CRoutine; using apollo::cyber::croutine::CRoutine;
using apollo::cyber::croutine::RoutineFactory; using apollo::cyber::croutine::RoutineFactory;
using apollo::cyber::data::DataVisitorBase; using apollo::cyber::data::DataVisitorBase;
...@@ -73,7 +76,7 @@ class Scheduler { ...@@ -73,7 +76,7 @@ class Scheduler {
void ParseCpuset(const std::string&, std::vector<int>*); void ParseCpuset(const std::string&, std::vector<int>*);
AtomicRWLock id_cr_lock_; AtomicRWLock id_cr_lock_;
std::unordered_map<uint64_t, std::mutex> id_cr_wl_; AtomicHashMap<uint64_t, MutexWrapper*> id_map_mutex_;
std::mutex cr_wl_mtx_; std::mutex cr_wl_mtx_;
std::unordered_map<uint64_t, std::shared_ptr<CRoutine>> id_cr_; std::unordered_map<uint64_t, std::shared_ptr<CRoutine>> id_cr_;
......
...@@ -49,7 +49,6 @@ TEST(SchedulerPolicyTest, classic) { ...@@ -49,7 +49,6 @@ TEST(SchedulerPolicyTest, classic) {
auto processor = std::make_shared<Processor>(); auto processor = std::make_shared<Processor>();
auto ctx = std::make_shared<ClassicContext>(); auto ctx = std::make_shared<ClassicContext>();
processor->BindContext(ctx); processor->BindContext(ctx);
ctx->SetGroupName(DEFAULT_GROUP_NAME);
std::vector<std::future<void>> res; std::vector<std::future<void>> res;
// test single routine // test single routine
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册