提交 f0cdf6a8 编写于 作者: Z zaxon 提交者: Jiangtao Hu

framework: code clean in sched (#1943)

上级 ff6b235f
......@@ -21,7 +21,6 @@
#include <vector>
#include "cyber/common/types.h"
#include "cyber/croutine/croutine.h"
#include "cyber/event/perf_event_cache.h"
#include "cyber/scheduler/processor.h"
#include "cyber/scheduler/scheduler.h"
......@@ -30,9 +29,9 @@ namespace apollo {
namespace cyber {
namespace scheduler {
using apollo::cyber::croutine::RoutineState;
using apollo::cyber::event::PerfEventCache;
using apollo::cyber::event::SchedPerf;
using croutine::RoutineState;
std::shared_ptr<CRoutine> ChoreographyContext::NextRoutine() {
if (unlikely(stop_)) {
......@@ -51,11 +50,14 @@ std::shared_ptr<CRoutine> ChoreographyContext::NextRoutine() {
it = cr_queue_.erase(it);
continue;
}
if (cr->UpdateState() == RoutineState::READY) {
PerfEventCache::Instance()->AddSchedEvent(SchedPerf::NEXT_RT, cr->id(),
PerfEventCache::Instance()->AddSchedEvent(SchedPerf::NEXT_RT,
cr->id(),
cr->processor_id());
return cr;
}
cr->Release();
++it;
}
......@@ -67,7 +69,6 @@ std::shared_ptr<CRoutine> ChoreographyContext::NextRoutine() {
bool ChoreographyContext::Enqueue(const std::shared_ptr<CRoutine> cr) {
PerfEventCache::Instance()->AddSchedEvent(SchedPerf::RT_CREATE, cr->id(),
cr->processor_id());
std::lock_guard<std::mutex> lk(mtx_cr_queue_);
cr_queue_.insert(
std::pair<uint32_t, std::shared_ptr<CRoutine>>(cr->priority(), cr));
......
......@@ -14,8 +14,8 @@
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_SCHEDULER_POLICY_CHOREOGRAPHY_H_
#define CYBER_SCHEDULER_POLICY_CHOREOGRAPHY_H_
#ifndef CYBER_SCHEDULER_POLICY_CHOREOGRAPHY_CONTEXT_H_
#define CYBER_SCHEDULER_POLICY_CHOREOGRAPHY_CONTEXT_H_
#include <functional>
#include <map>
......@@ -24,6 +24,8 @@
#include <string>
#include <unordered_map>
#include "cyber/base/atomic_rw_lock.h"
#include "cyber/croutine/croutine.h"
#include "cyber/scheduler/processor_context.h"
namespace apollo {
......@@ -39,6 +41,7 @@ class ChoreographyContext : public ProcessorContext {
public:
std::shared_ptr<CRoutine> NextRoutine() override;
void Wait() override;
bool Enqueue(const std::shared_ptr<CRoutine>);
void Notify();
......@@ -55,4 +58,4 @@ class ChoreographyContext : public ProcessorContext {
} // namespace cyber
} // namespace apollo
#endif // CYBER_SCHEDULER_POLICY_CHOREOGRAPHY_H_
#endif // CYBER_SCHEDULER_POLICY_CHOREOGRAPHY_CONTEXT_H_
......@@ -17,13 +17,12 @@
#include "cyber/scheduler/policy/classic.h"
#include "cyber/event/perf_event_cache.h"
#include "cyber/scheduler/policy/scheduler_classic.h"
#include "cyber/scheduler/processor.h"
namespace apollo {
namespace cyber {
namespace scheduler {
using apollo::cyber::croutine::RoutineState;
using apollo::cyber::base::ReadLockGuard;
using apollo::cyber::event::PerfEventCache;
using apollo::cyber::event::SchedPerf;
......@@ -43,11 +42,14 @@ std::shared_ptr<CRoutine> ClassicContext::NextRoutine() {
ReadLockGuard<AtomicRWLock> lk(rq_locks_[i]);
for (auto it = rq_[i].begin(); it != rq_[i].end(); ++it) {
auto cr = (*it);
if (!cr->Acquire()) {
continue;
}
if (cr->UpdateState() == RoutineState::READY) {
PerfEventCache::Instance()->AddSchedEvent(SchedPerf::NEXT_RT, cr->id(),
PerfEventCache::Instance()->AddSchedEvent(SchedPerf::NEXT_RT,
cr->id(),
cr->processor_id());
return cr;
}
......
......@@ -14,20 +14,24 @@
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_SCHEDULER_POLICY_CLASSIC_H_
#define CYBER_SCHEDULER_POLICY_CLASSIC_H_
#ifndef CYBER_SCHEDULER_POLICY_CLASSIC_CONTEXT_H_
#define CYBER_SCHEDULER_POLICY_CLASSIC_CONTEXT_H_
#include <array>
#include <functional>
#include <memory>
#include <mutex>
#include <vector>
#include "cyber/base/atomic_rw_lock.h"
#include "cyber/croutine/croutine.h"
#include "cyber/scheduler/processor_context.h"
namespace apollo {
namespace cyber {
namespace scheduler {
using base::AtomicRWLock;
using croutine::CRoutine;
#define MAX_PRIO 20
......@@ -36,6 +40,7 @@ class ClassicContext : public ProcessorContext {
public:
std::shared_ptr<CRoutine> NextRoutine() override;
void Wait() override;
static void Notify();
alignas(CACHELINE_SIZE) static std::mutex mtx_wq_;
......@@ -50,4 +55,4 @@ class ClassicContext : public ProcessorContext {
} // namespace cyber
} // namespace apollo
#endif // CYBER_SCHEDULER_POLICY_CLASSIC_H_
#endif // CYBER_SCHEDULER_POLICY_CLASSIC_CONTEXT_H_
......@@ -30,13 +30,14 @@ namespace apollo {
namespace cyber {
namespace scheduler {
using apollo::cyber::croutine::RoutineState;
using apollo::cyber::common::GlobalData;
using apollo::cyber::event::PerfEventCache;
using apollo::cyber::event::SchedPerf;
using apollo::cyber::common::GetAbsolutePath;
using apollo::cyber::common::PathExists;
using apollo::cyber::common::GetProtoFromFile;
using apollo::cyber::common::WorkRoot;
using apollo::cyber::event::PerfEventCache;
using apollo::cyber::event::SchedPerf;
SchedulerChoreography::SchedulerChoreography() {
// get sched config
......
......@@ -22,22 +22,27 @@
#include <unordered_map>
#include <vector>
#include "cyber/croutine/croutine.h"
#include "cyber/proto/choreography_conf.pb.h"
#include "cyber/scheduler/scheduler.h"
namespace apollo {
namespace cyber {
namespace scheduler {
using apollo::cyber::croutine::CRoutine;
using apollo::cyber::proto::ChoreographyTask;
using apollo::cyber::proto::InnerThread;
class SchedulerChoreography : public Scheduler {
public:
SchedulerChoreography();
void SetInnerThreadAttr(const std::thread* thr,
const std::string& name) override;
bool RemoveTask(const std::string& name) override;
private:
void CreateProcessor();
bool DispatchTask(const std::shared_ptr<CRoutine>) override;
......@@ -46,14 +51,17 @@ class SchedulerChoreography : public Scheduler {
std::unordered_map<std::string, ChoreographyTask> cr_confs_;
std::unordered_map<std::string, InnerThread> inner_thr_confs_;
int32_t choreography_processor_prio_;
int32_t pool_processor_prio_;
std::string choreography_affinity_;
std::string pool_affinity_;
std::string choreography_processor_policy_;
int32_t choreography_processor_prio_;
std::string pool_processor_policy_;
std::vector<int> choreography_cpuset_;
std::string pool_affinity_;
std::vector<int> pool_cpuset_;
std::string pool_processor_policy_;
int32_t pool_processor_prio_;
};
} // namespace scheduler
......
......@@ -18,25 +18,26 @@
#include <memory>
#include "cyber/event/perf_event_cache.h"
#include "cyber/common/environment.h"
#include "cyber/common/file.h"
#include "cyber/scheduler/processor.h"
#include "cyber/event/perf_event_cache.h"
#include "cyber/scheduler/policy/classic.h"
#include "cyber/scheduler/processor.h"
namespace apollo {
namespace cyber {
namespace scheduler {
using apollo::cyber::croutine::RoutineState;
using apollo::cyber::base::ReadLockGuard;
using apollo::cyber::base::WriteLockGuard;
using apollo::cyber::common::GlobalData;
using apollo::cyber::event::PerfEventCache;
using apollo::cyber::event::SchedPerf;
using apollo::cyber::common::GetAbsolutePath;
using apollo::cyber::common::PathExists;
using apollo::cyber::common::GetProtoFromFile;
using apollo::cyber::common::WorkRoot;
using apollo::cyber::event::PerfEventCache;
using apollo::cyber::event::SchedPerf;
SchedulerClassic::SchedulerClassic() {
// get sched config
......
......@@ -22,17 +22,21 @@
#include <vector>
#include <map>
#include "cyber/croutine/croutine.h"
#include "cyber/proto/classic_conf.pb.h"
#include "cyber/scheduler/scheduler.h"
namespace apollo {
namespace cyber {
namespace scheduler {
using apollo::cyber::croutine::CRoutine;
using apollo::cyber::proto::ClassicTask;
class SchedulerClassic : public Scheduler {
public:
SchedulerClassic();
bool RemoveTask(const std::string& name) override;
void SetInnerThreadAttr(const std::thread* thr,
......@@ -42,6 +46,7 @@ class SchedulerClassic : public Scheduler {
void CreateProcessor();
bool DispatchTask(const std::shared_ptr<CRoutine>) override;
bool NotifyProcessor(uint64_t crid) override;
std::map<std::string, ClassicTask> cr_tasks;
std::string affinity_;
......
......@@ -47,17 +47,20 @@ class Processor {
void SetAffinity(const std::vector<int>&, const std::string&, int);
void SetSchedPolicy(std::string spolicy, int sched_priority);
void Stop() { running_.exchange(false); }
void BindContext(const std::shared_ptr<ProcessorContext>& context) {
context_ = context;
}
private:
std::thread thread_;
std::shared_ptr<ProcessorContext> context_;
std::shared_ptr<RoutineContext> routine_context_ = nullptr;
std::atomic<bool> running_{false};
std::mutex mtx_ctx_;
std::condition_variable cv_ctx_;
std::mutex mtx_ctx_;
std::thread thread_;
std::atomic<bool> running_{false};
std::atomic<pid_t> tid_{-1};
};
......
......@@ -16,11 +16,6 @@
#include "cyber/scheduler/processor_context.h"
#include "cyber/common/log.h"
#include "cyber/common/types.h"
#include "cyber/croutine/croutine.h"
#include "cyber/scheduler/processor.h"
namespace apollo {
namespace cyber {
namespace scheduler {
......@@ -29,6 +24,7 @@ void ProcessorContext::ShutDown() {
if (!stop_) {
stop_ = true;
}
processor_->Stop();
}
......
......@@ -17,47 +17,42 @@
#ifndef CYBER_SCHEDULER_POLICY_PROCESSOR_CONTEXT_H_
#define CYBER_SCHEDULER_POLICY_PROCESSOR_CONTEXT_H_
#include <future>
#include <limits>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>
#include "cyber/base/atomic_hash_map.h"
#include "cyber/base/atomic_rw_lock.h"
#include "cyber/base/macros.h"
#include "cyber/croutine/croutine.h"
#include "cyber/scheduler/processor.h"
namespace apollo {
namespace cyber {
namespace scheduler {
using apollo::cyber::base::AtomicRWLock;
using croutine::CRoutine;
using croutine::RoutineState;
class Processor;
class ProcessorContext {
public:
virtual std::shared_ptr<CRoutine> NextRoutine() = 0;
virtual void Wait() = 0;
void ShutDown();
void BindProc(const std::shared_ptr<Processor>& processor) {
processor_ = processor;
}
virtual std::shared_ptr<CRoutine> NextRoutine() = 0;
virtual void Wait() = 0;
protected:
bool stop_ = false;
alignas(CACHELINE_SIZE) std::shared_ptr<Processor> processor_ = nullptr;
alignas(CACHELINE_SIZE) std::atomic_flag notified_ = ATOMIC_FLAG_INIT;
bool stop_ = false;
};
} // namespace scheduler
} // namespace cyber
} // namespace apollo
#endif // CYBER_SCHEDULER_POLICY_PROCESSOR_CONTEXT_H_
#endif // CYBER_SCHEDULER_PROCESSOR_CONTEXT_H_
......@@ -18,9 +18,11 @@
#include <utility>
#include "cyber/data/data_visitor.h"
#include "cyber/common/environment.h"
#include "cyber/common/file.h"
#include "cyber/common/global_data.h"
#include "cyber/common/util.h"
#include "cyber/data/data_visitor.h"
#include "cyber/event/perf_event_cache.h"
#include "cyber/scheduler/policy/choreography.h"
#include "cyber/scheduler/policy/classic.h"
......@@ -28,20 +30,19 @@
#include "cyber/scheduler/policy/scheduler_classic.h"
#include "cyber/scheduler/processor.h"
#include "cyber/scheduler/processor_context.h"
#include "cyber/common/environment.h"
#include "cyber/common/file.h"
namespace apollo {
namespace cyber {
namespace scheduler {
using apollo::cyber::common::GlobalData;
using apollo::cyber::event::PerfEventCache;
using apollo::cyber::event::SchedPerf;
using apollo::cyber::common::GetAbsolutePath;
using apollo::cyber::common::PathExists;
using apollo::cyber::common::GetProtoFromFile;
using apollo::cyber::common::GlobalData;
using apollo::cyber::common::PathExists;
using apollo::cyber::common::WorkRoot;
using apollo::cyber::event::PerfEventCache;
using apollo::cyber::event::SchedPerf;
Scheduler* Scheduler::Instance() {
static Scheduler* instance = nullptr;
......@@ -58,7 +59,7 @@ Scheduler* Scheduler::Instance() {
if (PathExists(cfg_file) && GetProtoFromFile(cfg_file, &cfg)) {
policy = cfg.scheduler_conf().policy();
} else {
AERROR << "Pls make sure schedconf exist and which format is correct.\n";
ADEBUG << "Pls make sure schedconf exist and which format is correct.\n";
}
if (!policy.compare(std::string("classic"))) {
......@@ -86,14 +87,15 @@ void Scheduler::ShutDown() {
bool Scheduler::CreateTask(const RoutineFactory& factory,
const std::string& name) {
return CreateTask(factory.create_routine(), name, factory.GetDataVisitor());
return CreateTask(factory.create_routine(),
name, factory.GetDataVisitor());
}
bool Scheduler::CreateTask(std::function<void()>&& func,
const std::string& name,
std::shared_ptr<DataVisitorBase> visitor) {
if (stop_) {
AERROR << "scheduler is stoped, cannot create task!";
ADEBUG << "scheduler is stoped, cannot create task!";
return false;
}
......@@ -125,7 +127,8 @@ bool Scheduler::NotifyTask(uint64_t crid) {
return NotifyProcessor(crid);
}
void Scheduler::ParseCpuset(const std::string &str, std::vector<int> *cpuset) {
void Scheduler::ParseCpuset(const std::string &str,
std::vector<int> *cpuset) {
std::vector<std::string> lines;
std::stringstream ss(str);
std::string l;
......@@ -135,7 +138,7 @@ void Scheduler::ParseCpuset(const std::string &str, std::vector<int> *cpuset) {
}
for (std::vector<std::string>::const_iterator it = lines.begin(),
e = lines.end(); it != e; it++) {
e = lines.end(); it != e; it++) {
std::stringstream ss(*it);
std::vector<std::string> range;
......@@ -146,11 +149,12 @@ void Scheduler::ParseCpuset(const std::string &str, std::vector<int> *cpuset) {
if (range.size() == 1) {
cpuset->push_back(std::stoi(range[0]));
} else if (range.size() == 2) {
for (int i = std::stoi(range[0]), e = std::stoi(range[1]); i <= e; i++) {
for (int i = std::stoi(range[0]),
e = std::stoi(range[1]); i <= e; i++) {
cpuset->push_back(i);
}
} else {
AERROR << "Parsing cpuset format error.";
ADEBUG << "Parsing cpuset format error.";
exit(0);
}
}
......
......@@ -33,8 +33,6 @@
#include "cyber/common/types.h"
#include "cyber/croutine/croutine.h"
#include "cyber/croutine/routine_factory.h"
#include "cyber/proto/choreography_conf.pb.h"
#include "cyber/proto/scheduler_conf.pb.h"
namespace apollo {
namespace cyber {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册