提交 fe2d6b45 编写于 作者: G GoLancer 提交者: Jiangtao Hu

framework: decouple processor & processor_context

上级 7f43593a
......@@ -6,10 +6,22 @@ cc_library(
name = "processor",
srcs = [
"processor.cc",
"processor_context.cc",
],
hdrs = [
"processor.h",
],
deps = [
"//cyber/scheduler:processor_context",
"//cyber/data:data",
],
)
cc_library(
name = "processor_context",
srcs = [
"processor_context.cc",
],
hdrs = [
"processor_context.h",
],
deps = [
......
......@@ -22,7 +22,6 @@
#include "cyber/common/types.h"
#include "cyber/event/perf_event_cache.h"
#include "cyber/scheduler/processor.h"
namespace apollo {
namespace cyber {
......
......@@ -94,7 +94,6 @@ void ClassicContext::Shutdown() {
}
}
cv_wq_.notify_all();
processor_->Stop();
}
void ClassicContext::Notify() { cv_wq_.notify_one(); }
......
......@@ -105,7 +105,6 @@ void SchedulerChoreography::CreateProcessor() {
proc->SetAffinity(choreography_cpuset_, choreography_affinity_, i);
proc->SetSchedPolicy(choreography_processor_policy_,
choreography_processor_prio_);
ctx->BindProc(proc);
pctxs_.emplace_back(ctx);
}
......@@ -117,7 +116,6 @@ void SchedulerChoreography::CreateProcessor() {
proc->BindContext(ctx);
proc->SetAffinity(pool_cpuset_, pool_affinity_, i);
proc->SetSchedPolicy(pool_processor_policy_, pool_processor_prio_);
ctx->BindProc(proc);
pctxs_.emplace_back(ctx);
}
}
......
......@@ -16,7 +16,9 @@
#include "cyber/scheduler/policy/scheduler_classic.h"
#include <algorithm>
#include <memory>
#include <utility>
#include "cyber/common/environment.h"
#include "cyber/common/file.h"
......@@ -77,14 +79,14 @@ SchedulerClassic::SchedulerClassic() {
void SchedulerClassic::CreateProcessor() {
for (uint32_t i = 0; i < proc_num_; i++) {
auto proc = std::make_shared<Processor>();
auto ctx = std::make_shared<ClassicContext>();
pctxs_.emplace_back(ctx);
auto proc = std::make_shared<Processor>();
proc->BindContext(ctx);
proc->SetAffinity(cpuset_, affinity_, i);
proc->SetSchedPolicy(processor_policy_, processor_prio_);
ctx->BindProc(proc);
pctxs_.emplace_back(ctx);
processors_.push_back(std::move(proc));
}
}
......
......@@ -100,6 +100,27 @@ void Processor::Run() {
}
}
void Processor::Stop() {
if (!running_.exchange(false)) {
return;
}
if (context_) {
context_->Shutdown();
}
cv_ctx_.notify_one();
if (thread_.joinable()) {
thread_.join();
}
}
void Processor::BindContext(const std::shared_ptr<ProcessorContext> &context) {
context_ = context;
std::call_once(thread_flag_,
[this]() { thread_ = std::thread(&Processor::Run, this); });
}
} // namespace scheduler
} // namespace cyber
} // namespace apollo
......@@ -41,38 +41,25 @@ using croutine::RoutineContext;
class Processor {
public:
Processor();
~Processor();
virtual ~Processor();
void Run();
void Stop();
void BindContext(const std::shared_ptr<ProcessorContext>& context);
void SetAffinity(const std::vector<int>&, const std::string&, int);
void SetSchedPolicy(std::string spolicy, int sched_priority);
void Stop() {
if (!running_.exchange(false)) {
return;
}
cv_ctx_.notify_one();
if (thread_.joinable()) {
thread_.join();
}
}
void BindContext(const std::shared_ptr<ProcessorContext>& context) {
context_ = context;
std::call_once(thread_flag_,
[this]() { thread_ = std::thread(&Processor::Run, this); });
}
private:
std::shared_ptr<ProcessorContext> context_;
std::shared_ptr<RoutineContext> routine_context_;
std::once_flag thread_flag_;
std::condition_variable cv_ctx_;
std::once_flag thread_flag_;
std::mutex mtx_ctx_;
std::thread thread_;
std::atomic<bool> running_{false};
std::atomic<pid_t> tid_{-1};
std::atomic<bool> running_{false};
};
} // namespace scheduler
......
......@@ -24,9 +24,8 @@ void ProcessorContext::Shutdown() {
if (!stop_) {
stop_ = true;
}
processor_->Stop();
}
} // namespace scheduler
} // namespace cyber
} // namespace apollo
......@@ -23,7 +23,6 @@
#include "cyber/base/macros.h"
#include "cyber/croutine/croutine.h"
#include "cyber/scheduler/processor.h"
namespace apollo {
namespace cyber {
......@@ -36,18 +35,11 @@ class Processor;
class ProcessorContext {
public:
virtual void Shutdown();
void BindProc(const std::shared_ptr<Processor>& processor) {
processor_ = processor;
}
virtual std::shared_ptr<CRoutine> NextRoutine() = 0;
virtual void Wait() = 0;
protected:
bool stop_ = false;
alignas(CACHELINE_SIZE) std::shared_ptr<Processor> processor_ = nullptr;
alignas(CACHELINE_SIZE) std::atomic_flag notified_ = ATOMIC_FLAG_INIT;
};
......
......@@ -118,10 +118,8 @@ void Scheduler::Shutdown() {
std::vector<uint64_t> cr_list;
{
ReadLockGuard<AtomicRWLock> lk(id_cr_lock_);
for (auto it = id_cr_.begin(); it != id_cr_.end(); ++it) {
auto cr = it->second;
auto id = cr->id();
cr_list.emplace_back(id);
for (auto& cr : id_cr_) {
cr_list.emplace_back(cr.second->id());
}
}
......@@ -129,6 +127,11 @@ void Scheduler::Shutdown() {
RemoveCRoutine(id);
}
for (auto& processor : processors_) {
processor->Stop();
}
processors_.clear();
pctxs_.clear();
}
} // namespace scheduler
......
......@@ -44,6 +44,7 @@ using apollo::cyber::croutine::CRoutine;
using apollo::cyber::croutine::RoutineFactory;
using apollo::cyber::data::DataVisitorBase;
class Processor;
class ProcessorContext;
class Scheduler {
......@@ -76,6 +77,7 @@ class Scheduler {
std::unordered_map<uint64_t, std::shared_ptr<CRoutine>> id_cr_;
std::vector<std::shared_ptr<ProcessorContext>> pctxs_;
std::vector<std::shared_ptr<Processor>> processors_;
uint32_t proc_num_ = 0;
uint32_t task_pool_size_ = 0;
......
......@@ -37,7 +37,6 @@ TEST(SchedulerPolicyTest, choreo) {
auto processor = std::make_shared<Processor>();
auto ctx = std::make_shared<ChoreographyContext>();
processor->BindContext(ctx);
ctx->BindProc(processor);
std::shared_ptr<CRoutine> cr = std::make_shared<CRoutine>(func);
auto task_id = GlobalData::RegisterTaskName("choreo");
......@@ -50,7 +49,6 @@ TEST(SchedulerPolicyTest, classic) {
auto processor = std::make_shared<Processor>();
auto ctx = std::make_shared<ClassicContext>();
processor->BindContext(ctx);
ctx->BindProc(processor);
std::vector<std::future<void>> res;
// test single routine
......@@ -71,28 +69,19 @@ TEST(SchedulerPolicyTest, classic) {
future.wait_for(std::chrono::milliseconds(1000));
}
res.clear();
ctx->Shutdown();
}
TEST(SchedulerPolicyTest, sched_classic) {
GlobalData::Instance()->SetProcessGroup("example_classic_sched");
auto sched1 = std::make_shared<SchedulerClassic>();
auto sched = std::make_shared<SchedulerClassic>();
std::shared_ptr<CRoutine> cr = std::make_shared<CRoutine>(func);
auto task_id = GlobalData::RegisterTaskName("ABC");
cr->set_id(task_id);
EXPECT_TRUE(sched1->DispatchTask(cr));
EXPECT_TRUE(sched->DispatchTask(cr));
// dispatch the same task
EXPECT_FALSE(sched1->DispatchTask(cr));
EXPECT_TRUE(sched1->RemoveTask("ABC"));
sched1->Shutdown();
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
apollo::cyber::Init(argv[0]);
auto res = RUN_ALL_TESTS();
return res;
EXPECT_FALSE(sched->DispatchTask(cr));
EXPECT_TRUE(sched->RemoveTask("ABC"));
}
} // namespace scheduler
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册