diff --git a/cyber/BUILD b/cyber/BUILD index 5f3b237c31b353819fcdc3cf27952c30a7d0c89d..1333c47e510ce832297797c13ad7f54ed9395258 100644 --- a/cyber/BUILD +++ b/cyber/BUILD @@ -61,6 +61,7 @@ cc_library( "//cyber:state", "//cyber/logger:async_logger", "//cyber/node", + "//cyber/timer:timing_wheel", ], ) diff --git a/cyber/init.cc b/cyber/init.cc index ff6d94f79144dba7e6a49d309bfb72657153273b..d4e4e09d550c8d0b9fca03f0c4cbd4dc23bc9035 100644 --- a/cyber/init.cc +++ b/cyber/init.cc @@ -31,7 +31,7 @@ #include "cyber/scheduler/scheduler.h" #include "cyber/service_discovery/topology_manager.h" #include "cyber/task/task.h" -#include "cyber/timer/timer_manager.h" +#include "cyber/timer/timing_wheel.h" #include "cyber/transport/transport.h" namespace apollo { @@ -114,7 +114,7 @@ void Clear() { return; } TaskManager::CleanUp(); - TimerManager::CleanUp(); + TimingWheel::CleanUp(); scheduler::CleanUp(); service_discovery::TopologyManager::CleanUp(); transport::Transport::CleanUp(); diff --git a/cyber/timer/BUILD b/cyber/timer/BUILD index 2975f44eeba4b8fdac16780c9dcfadd2ef44a5fd..d3f8ea9ed1017456e61cb12ed104b92776b05540 100644 --- a/cyber/timer/BUILD +++ b/cyber/timer/BUILD @@ -6,53 +6,22 @@ cc_library( name = "timer", srcs = ["timer.cc"], hdrs = ["timer.h"], - deps = [ - "timer_manager", - "//cyber/common:global_data", - ], -) - -cc_library( - name = "timer_manager", - srcs = ["timer_manager.cc"], - hdrs = ["timer_manager.h"], deps = [ "timing_wheel", - "//cyber/common:macros", - "//cyber/scheduler", - "//cyber/task", - "//cyber/time", - "//cyber/time:rate", - ], -) - -cc_test( - name = "timer_manager_test", - size = "small", - srcs = ["timer_manager_test.cc"], - deps = [ - "//cyber:cyber_core", - "@gtest//:main", + "//cyber/common:global_data", ], ) cc_library( name = "timer_task", - srcs = ["timer_task.cc"], hdrs = ["timer_task.h"], - deps = [ - "//cyber/base:bounded_queue", - "//cyber/task", - ], ) cc_library( - name = "timing_slot", - srcs = ["timing_slot.cc"], - hdrs = ["timing_slot.h"], + name = "timer_bucket", + hdrs = ["timer_bucket.h"], deps = [ "timer_task", - "//cyber/base:bounded_queue", ], ) @@ -61,19 +30,18 @@ cc_library( srcs = ["timing_wheel.cc"], hdrs = ["timing_wheel.h"], deps = [ - "timer_task", - "timing_slot", - "//cyber/base:bounded_queue", + "timer_bucket", "//cyber/task", "//cyber/time", "//cyber/time:duration", + "//cyber/time:rate", ], ) cc_test( - name = "timing_wheel_test", + name = "timer_test", size = "small", - srcs = ["timing_wheel_test.cc"], + srcs = ["timer_test.cc"], deps = [ "//cyber:cyber_core", "//cyber:init", diff --git a/cyber/timer/timer.cc b/cyber/timer/timer.cc index aba520747f82142d6e7964907428f4ea28680656..1e6b688967e89e7c3894e1d6c9ce256ee23c05e8 100644 --- a/cyber/timer/timer.cc +++ b/cyber/timer/timer.cc @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. + * Copyright 2019 The Apollo Authors. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,14 +21,24 @@ namespace apollo { namespace cyber { -Timer::Timer() { tm_ = TimerManager::Instance(); } +namespace { +static std::atomic global_timer_id = {0}; +static uint64_t GenerateTimerId() { return global_timer_id.fetch_add(1); } +} + +Timer::Timer() { + timing_wheel_ = TimingWheel::Instance(); + timer_id_ = GenerateTimerId(); +} Timer::Timer(TimerOption opt) : timer_opt_(opt) { - tm_ = TimerManager::Instance(); + timing_wheel_ = TimingWheel::Instance(); + timer_id_ = GenerateTimerId(); } Timer::Timer(uint32_t period, std::function callback, bool oneshot) { - tm_ = TimerManager::Instance(); + timing_wheel_ = TimingWheel::Instance(); + timer_id_ = GenerateTimerId(); timer_opt_.period = period; timer_opt_.callback = callback; timer_opt_.oneshot = oneshot; @@ -36,28 +46,68 @@ Timer::Timer(uint32_t period, std::function callback, bool oneshot) { void Timer::SetTimerOption(TimerOption opt) { timer_opt_ = opt; } +bool Timer::InitTimerTask() { + if (timer_opt_.period == 0) { + AERROR << "Max interval must great than 0"; + return false; + } + + if (timer_opt_.period >= TIMER_MAX_INTERVAL_MS) { + AERROR << "Max interval must less than " << TIMER_MAX_INTERVAL_MS; + return false; + } + + task_.reset(new TimerTask(timer_id_)); + task_->interval_ms = timer_opt_.period; + task_->next_fire_duration_ms = task_->interval_ms; + + if (timer_opt_.oneshot) { + task_->callback = timer_opt_.callback; + } else { + std::weak_ptr task_wptr = this->task_; + task_->callback = [this, task_wptr]() { + auto start = std::chrono::steady_clock::now(); + this->timer_opt_.callback(); + auto end = std::chrono::steady_clock::now(); + uint64_t execute_time_ms = + std::chrono::duration_cast(end - start) + .count(); + auto task = task_wptr.lock(); + if (task) { + if (execute_time_ms >= task->interval_ms) { + task->next_fire_duration_ms = 1; + } else { + task->next_fire_duration_ms = task->interval_ms - execute_time_ms; + } + this->timing_wheel_->AddTask(task); + } + }; + } + return true; +} + void Timer::Start() { if (!common::GlobalData::Instance()->IsRealityMode()) { return; } if (!started_.exchange(true)) { - timer_id_ = - tm_->Add(timer_opt_.period, timer_opt_.callback, timer_opt_.oneshot); + if (InitTimerTask()) { + timing_wheel_->AddTask(task_); + } } } void Timer::Stop() { if (started_.exchange(false)) { - ADEBUG << "stop timer " << timer_id_; - tm_->Remove(timer_id_); - timer_id_ = 0; + ADEBUG << "stop timer "; + task_.reset(); } } Timer::~Timer() { - if (timer_id_ != 0) { - tm_->Remove(timer_id_); + if (task_) { + Stop(); } } diff --git a/cyber/timer/timer.h b/cyber/timer/timer.h index d50193d849ea69e12f159162389305a28f8529a3..7ade4fed84fa9cf89edeea73520b60d3b4dba351 100644 --- a/cyber/timer/timer.h +++ b/cyber/timer/timer.h @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. + * Copyright 2019 The Apollo Authors. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,13 +20,16 @@ #include #include -#include "cyber/timer/timer_manager.h" +#include "cyber/timer/timing_wheel.h" namespace apollo { namespace cyber { struct TimerOption { - uint32_t period; // The period of the timer, unit is ms + TimerOption(uint32_t period, std::function callback, bool oneshot) + : period(period), callback(callback), oneshot(oneshot) {} + TimerOption() : period(0), callback(), oneshot() {} + uint32_t period = 0; // The period of the timer, unit is ms std::function callback; // The tasks that the timer needs to perform bool oneshot; // True: perform the callback only after the first timing cycle // False: perform the callback every timed period @@ -74,9 +77,11 @@ class Timer { void Stop(); private: + bool InitTimerTask(); + uint64_t timer_id_; TimerOption timer_opt_; - TimerManager* tm_ = nullptr; - uint64_t timer_id_ = 0; + TimingWheel* timing_wheel_ = nullptr; + std::shared_ptr task_; std::atomic started_ = {false}; }; diff --git a/cyber/timer/timer_task.cc b/cyber/timer/timer_bucket.h similarity index 58% rename from cyber/timer/timer_task.cc rename to cyber/timer/timer_bucket.h index 5918e5f7015adbafff6ea629fed1b2159f680357..cca2c450e64c9d5cb5ffb21d5059a161a89b9258 100644 --- a/cyber/timer/timer_task.cc +++ b/cyber/timer/timer_bucket.h @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. + * Copyright 2019 The Apollo Authors. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,32 +14,34 @@ * limitations under the License. *****************************************************************************/ -#include "cyber/timer/timer_task.h" +#ifndef CYBER_TIMER_TIMER_BUCKET_H_ +#define CYBER_TIMER_TIMER_BUCKET_H_ + +#include +#include +#include -#include "cyber/task/task.h" +#include "cyber/timer/timer_task.h" namespace apollo { namespace cyber { -void TimerTask::Fire(bool async) { - if (status_ != INIT) { - return; - } - if (oneshot_) // not repeat. so always on ready - status_ = EXPIRED; - if (async) { - cyber::Async(handler_); - } else { - handler_(); +class TimerBucket { + public: + void AddTask(const std::shared_ptr& task) { + std::lock_guard lock(mutex_); + task_list_.push_back(task); } -} -bool TimerTask::Cancel() { - if (State() != INIT) { - return false; - } - status_ = CANCELED; - return true; -} + std::mutex& mutex() { return mutex_; } + std::list>& task_list() { return task_list_; } + + private: + std::mutex mutex_; + std::list> task_list_; +}; + } // namespace cyber } // namespace apollo + +#endif // CYBER_TIMER_TIMER_BUCKET_H_ diff --git a/cyber/timer/timer_manager.cc b/cyber/timer/timer_manager.cc deleted file mode 100644 index 0c561a298adc0d0ef0f9f5e56520424e646ac5e1..0000000000000000000000000000000000000000 --- a/cyber/timer/timer_manager.cc +++ /dev/null @@ -1,82 +0,0 @@ -/****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *****************************************************************************/ - -#include "cyber/timer/timer_manager.h" - -#include "cyber/common/log.h" -#include "cyber/scheduler/scheduler_factory.h" -#include "cyber/time/duration.h" -#include "cyber/time/rate.h" - -namespace apollo { -namespace cyber { - -TimerManager::TimerManager() - : timing_wheel_(Duration(0.01)), - time_gran_(Duration(0.01)), - running_(false) {} // default time gran = 1ms - -TimerManager::~TimerManager() { - if (running_) { - Shutdown(); - } -} - -void TimerManager::Start() { - std::lock_guard lock(running_mutex_); - if (!running_) { - ADEBUG << "TimerManager->Start() ok"; - running_ = true; - scheduler_thread_ = std::thread([this]() { this->ThreadFuncImpl(); }); - scheduler::Instance()->SetInnerThreadAttr("timer", &scheduler_thread_); - } -} - -void TimerManager::Shutdown() { - std::lock_guard lock(running_mutex_); - if (running_) { - running_ = false; - if (scheduler_thread_.joinable()) { - scheduler_thread_.join(); - } - } -} - -uint64_t TimerManager::Add(uint64_t interval, std::function handler, - bool oneshot) { - if (!running_) { - Start(); - } - uint64_t timer_id = timing_wheel_.StartTimer(interval, handler, oneshot); - return timer_id; -} - -void TimerManager::Remove(uint64_t timer_id) { - timing_wheel_.StopTimer(timer_id); -} - -bool TimerManager::IsRunning() { return running_; } - -void TimerManager::ThreadFuncImpl() { - Rate rate(time_gran_); - while (running_) { - timing_wheel_.Step(); - rate.Sleep(); - } -} - -} // namespace cyber -} // namespace apollo diff --git a/cyber/timer/timer_manager.h b/cyber/timer/timer_manager.h deleted file mode 100644 index 98ceed74b15d6b06ec07acc5d854c0c08d12814a..0000000000000000000000000000000000000000 --- a/cyber/timer/timer_manager.h +++ /dev/null @@ -1,58 +0,0 @@ -/****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *****************************************************************************/ - -#ifndef CYBER_TIMER_TIMER_MANAGER_H_ -#define CYBER_TIMER_TIMER_MANAGER_H_ - -#include -#include -#include -#include -#include -#include -#include - -#include "cyber/common/macros.h" -#include "cyber/time/duration.h" -#include "cyber/timer/timing_wheel.h" - -namespace apollo { -namespace cyber { - -class TimerManager { - public: - virtual ~TimerManager(); - void Start(); - void Shutdown(); - bool IsRunning(); - uint64_t Add(uint64_t interval, std::function handler, bool oneshot); - void Remove(uint64_t timer_id); - - private: - TimingWheel timing_wheel_; - Duration time_gran_; - bool running_ = false; - mutable std::mutex running_mutex_; - std::thread scheduler_thread_; - void ThreadFuncImpl(); - - DECLARE_SINGLETON(TimerManager) -}; - -} // namespace cyber -} // namespace apollo - -#endif // CYBER_TIMER_TIMER_MANAGER_H_ diff --git a/cyber/timer/timer_manager_test.cc b/cyber/timer/timer_manager_test.cc deleted file mode 100644 index efcd77994234f1d947f61556ddba4f6735680370..0000000000000000000000000000000000000000 --- a/cyber/timer/timer_manager_test.cc +++ /dev/null @@ -1,37 +0,0 @@ -/****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *****************************************************************************/ - -#include "cyber/timer/timer_manager.h" - -#include -#include - -#include "cyber/common/log.h" - -namespace apollo { -namespace cyber { - -TEST(TimerManagerTest, Basic) { - auto tm = TimerManager::Instance(); - ASSERT_EQ(tm->IsRunning(), false); - tm->Start(); - ASSERT_EQ(tm->IsRunning(), true); - tm->Shutdown(); - ASSERT_EQ(tm->IsRunning(), false); -} - -} // namespace cyber -} // namespace apollo diff --git a/cyber/timer/timer_task.h b/cyber/timer/timer_task.h index f12d648a75a1920eeaadc267195a08245c977647..596a25a487a162d70b7f020c86883b864a2aafd0 100644 --- a/cyber/timer/timer_task.h +++ b/cyber/timer/timer_task.h @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. + * Copyright 2019 The Apollo Authors. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,55 +17,20 @@ #ifndef CYBER_TIMER_TIMER_TASK_H_ #define CYBER_TIMER_TIMER_TASK_H_ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "cyber/base/bounded_queue.h" +#include namespace apollo { namespace cyber { -using CallHandler = std::function; - -class TimerTask { - public: - TimerTask(uint64_t id, uint64_t it, uint64_t ivl, CallHandler h, bool ons) - : tid_(id), init_time_(it), interval_(ivl), handler_(h), oneshot_(ons) {} - TimerTask() = default; - - private: - enum STATS { INIT = 0, CANCELED, EXPIRED }; - STATS State() { return status_; } - volatile STATS status_ = INIT; - uint64_t tid_ = 0; - - public: - uint64_t init_time_ = 0; - uint64_t deadline_ = 0; - uint64_t interval_ = 0; - CallHandler handler_; - uint64_t rest_rounds_ = 0; - bool oneshot_ = true; - uint64_t fire_count_ = 0; - - public: - uint64_t Id() { return tid_; } - - void Fire(bool async); - - bool Cancel(); - - bool IsCanceled() { return State() == CANCELED; } +class TimerBucket; - bool IsExpired() { return State() == EXPIRED; } +struct TimerTask { + explicit TimerTask(uint64_t timer_id) : timer_id_(timer_id) {} + uint64_t timer_id_ = 0; + std::function callback; + uint64_t interval_ms = 0; + uint64_t remainder_interval_ms = 0; + uint64_t next_fire_duration_ms = 0; }; } // namespace cyber diff --git a/cyber/timer/timer_test.cc b/cyber/timer/timer_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..e370b8a20bc9fa7ab668049cae1f34f50ce2d305 --- /dev/null +++ b/cyber/timer/timer_test.cc @@ -0,0 +1,103 @@ +/****************************************************************************** + * Copyright 2018 The Apollo Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *****************************************************************************/ + +/* note that the frame code of the following is Generated by script */ + +#include +#include +#include +#include +#include + +#include "cyber/common/util.h" +#include "cyber/cyber.h" +#include "cyber/init.h" + +#include "cyber/timer/timer.h" + +namespace apollo { +namespace cyber { +namespace timer { + +using cyber::Timer; +using cyber::TimerOption; + +TEST(TimerTest, one_shot) { + int count = 0; + Timer timer(100, [&count] { count = 100; }, true); + timer.Start(); + std::this_thread::sleep_for(std::chrono::milliseconds(90)); + EXPECT_EQ(0, count); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_EQ(100, count); + timer.Stop(); +} + +TEST(TimerTest, cycle) { + int count = 0; + Timer timers[1000]; + TimerOption opt; + opt.oneshot = false; + opt.callback = [=] { AINFO << count; }; + for (int i = 0; i < 1000; i++) { + opt.period = i + 1; + timers[i].SetTimerOption(opt); + timers[i].Start(); + } + + std::this_thread::sleep_for(std::chrono::seconds(3)); + for (int i = 0; i < 1000; i++) { + timers[i].Stop(); + } +} + +TEST(TimerTest, start_stop) { + int count = 0; + Timer timer(2, [count] { AINFO << count; }, false); + for (int i = 0; i < 100; i++) { + timer.Start(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + timer.Stop(); + } +} + +TEST(TimerTest, test1) { + auto count = 0; + + auto func = [count]() { AINFO << count; }; + + TimerOption to{1000, func, false}; + + { + Timer t; + t.SetTimerOption(to); + common::GlobalData::Instance()->EnableSimulationMode(); + t.Start(); + common::GlobalData::Instance()->DisableSimulationMode(); + t.Start(); + t.Start(); + } +} + +} // namespace timer +} // namespace cyber +} // namespace apollo + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + apollo::cyber::Init(argv[0]); + return RUN_ALL_TESTS(); +} diff --git a/cyber/timer/timing_slot.cc b/cyber/timer/timing_slot.cc deleted file mode 100644 index e57aa0e286fb2fcbd65385e55e4350441a7d0249..0000000000000000000000000000000000000000 --- a/cyber/timer/timing_slot.cc +++ /dev/null @@ -1,65 +0,0 @@ -/****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *****************************************************************************/ - -#include "cyber/timer/timing_slot.h" - -#include "cyber/common/log.h" -#include "cyber/timer/timer_task.h" - -namespace apollo { -namespace cyber { - -void TimingSlot::AddTask(const std::shared_ptr& task) { - tasks_.emplace(task->Id(), task); -} - -void TimingSlot::EnumTaskList( - uint64_t deadline, bool async, BoundedQueue* hander_queue, - BoundedQueue>* rep_queue) { - for (auto it = tasks_.begin(); it != tasks_.end();) { - auto task = it->second; // *it; - auto del_it = it; - it++; - // std::cout << "judge: task->" << task->deadline << " : " << deadline; - if (task->deadline_ <= deadline) { - if (task->rest_rounds_ == 0) { - if (async) { - HandlePackage hp; - hp.handle = task->handler_; - hp.id = task->Id(); - if (!hander_queue->Enqueue(hp)) { - AERROR << "hander queue is full"; - } - } else { - task->Fire(false); - } - - if (!task->oneshot_) { // repeat timer,push back - task->fire_count_++; - rep_queue->Enqueue(task); - } - tasks_.erase(del_it); - - } else { - AERROR << "task deadline overflow..."; - } - } else { // no expired, -- rounds - task->rest_rounds_--; - } - } -} -} // namespace cyber -} // namespace apollo diff --git a/cyber/timer/timing_slot.h b/cyber/timer/timing_slot.h deleted file mode 100644 index d3c99fdc5ba88ed35eb93a57796ce5558f490ed0..0000000000000000000000000000000000000000 --- a/cyber/timer/timing_slot.h +++ /dev/null @@ -1,64 +0,0 @@ -/****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *****************************************************************************/ - -#ifndef CYBER_TIMER_TIMING_SLOT_H_ -#define CYBER_TIMER_TIMING_SLOT_H_ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "cyber/base/bounded_queue.h" - -namespace apollo { -namespace cyber { - -using apollo::cyber::base::BoundedQueue; -using CallHandler = std::function; - -class TimerTask; - -struct HandlePackage { - uint64_t id; - CallHandler handle; -}; - -class TimingSlot { - private: - // no needs for multi-thread - std::unordered_map> tasks_; - - public: - TimingSlot() = default; - void AddTask(const std::shared_ptr& task); - void RemoveTask(uint64_t id) { tasks_.erase(id); } - - void EnumTaskList(uint64_t deadline, bool async, - BoundedQueue* hander_queue, - BoundedQueue>* rep_list); -}; // TimeSlot end - -} // namespace cyber -} // namespace apollo - -#endif // CYBER_TIMER_TIMING_SLOT_H_ diff --git a/cyber/timer/timing_wheel.cc b/cyber/timer/timing_wheel.cc index aff2372732f88c1691893e6da66cae96e854d0a9..22d1039c8bce43af04ab1ab6fef8d93de40462cd 100644 --- a/cyber/timer/timing_wheel.cc +++ b/cyber/timer/timing_wheel.cc @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. + * Copyright 2019 The Apollo Authors. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,164 +15,109 @@ *****************************************************************************/ #include "cyber/timer/timing_wheel.h" - -#include - -#include "cyber/base/for_each.h" -#include "cyber/common/log.h" #include "cyber/task/task.h" -#include "cyber/time/time.h" -#include "cyber/timer/timer_task.h" namespace apollo { namespace cyber { -TimingWheel::TimingWheel() { - if (!add_queue_.Init(BOUNDED_QUEUE_SIZE)) { - AERROR << "Add queue init failed."; - throw std::runtime_error("Add queue init failed."); - } - if (!repeat_queue_.Init(BOUNDED_QUEUE_SIZE)) { - AERROR << "Repeated task queue init failed."; - throw std::runtime_error("Repeated queue init failed."); - } - if (!handler_queue_.Init(BOUNDED_QUEUE_SIZE)) { - AERROR << "Handler queue init failed."; - throw std::runtime_error("Handler queue init failed."); - } -} - -TimingWheel::TimingWheel(const Duration& tick_duration) { - tick_duration_ = tick_duration.ToNanosecond(); - resolution_ = tick_duration_ / 1000000UL; - if (!add_queue_.Init(BOUNDED_QUEUE_SIZE)) { - AERROR << "Add queue init failed."; - throw std::runtime_error("Add queue init failed."); - } - if (!repeat_queue_.Init(BOUNDED_QUEUE_SIZE)) { - AERROR << "Repeated task queue init failed."; - throw std::runtime_error("Repeated queue init failed."); - } - if (!handler_queue_.Init(BOUNDED_QUEUE_SIZE)) { - AERROR << "Handler queue init failed."; - throw std::runtime_error("Handler queue init failed."); - } -} - -uint64_t TimingWheel::StartTimer(uint64_t interval, CallHandler handler, - bool oneshot) { - if (id_counter_ > UINT64_MAX) { - AERROR << "Timer ID pool is full."; - return -1; - } else if (interval > TIMER_TASK_MAX_INTERVAL) { - AERROR << "The interval of timer task MUST less than or equal " - << TIMER_TASK_MAX_INTERVAL << "ms."; - return -1; - } else if (interval < resolution_) { - AERROR << "The interval of timer task MUST larger than or equal " - << resolution_ << "ms."; - return -1; - } - auto now = Time::Now().ToNanosecond(); - auto task = std::make_shared( - ++id_counter_, now, interval, - [handler](void) { - static std::mutex task_mutex; - std::lock_guard lock(task_mutex); - handler(); - }, - oneshot); - if (add_queue_.Enqueue(task)) { - ADEBUG << "start timer id: " << id_counter_; - return id_counter_; - } else { - --id_counter_; - AERROR << "add queue is full, Enqueue failed!"; - return -1; +void TimingWheel::Start() { + std::lock_guard lock(running_mutex_); + if (!running_) { + ADEBUG << "TimeWheel start ok"; + running_ = true; + tick_thread_ = std::thread([this]() { this->TickFunc(); }); + scheduler::Instance()->SetInnerThreadAttr("timer", &tick_thread_); } } -void TimingWheel::Step() { - if (start_time_ == 0) { - start_time_ = Time::Now().ToNanosecond(); - } - uint64_t deadline = tick_duration_ * (tick_ + 1); - uint64_t idx = tick_ & mask_; - RemoveCancelledTasks(idx); - FillAddSlot(); - time_slots_[idx].EnumTaskList(deadline, true, &handler_queue_, - &repeat_queue_); - - // timing wheel tick one time - tick_++; - - FillRepeatSlot(); - - while (!handler_queue_.Empty()) { - HandlePackage hp; - if (handler_queue_.Dequeue(&hp)) { - cyber::Async(hp.handle); +void TimingWheel::Shutdown() { + std::lock_guard lock(running_mutex_); + if (running_) { + running_ = false; + if (tick_thread_.joinable()) { + tick_thread_.join(); } } } -void TimingWheel::StopTimer(uint64_t timer_id) { +void TimingWheel::Tick() { + auto& bucket = work_wheel_[current_work_wheel_index_]; { - std::lock_guard lg(cancelled_mutex_); - cancelled_list_.push_back(timer_id); + std::lock_guard lock(bucket.mutex()); + auto ite = bucket.task_list().begin(); + while (ite != bucket.task_list().end()) { + auto task = ite->lock(); + if (task) { + ADEBUG << "index: " << current_work_wheel_index_ + << " timer id: " << task->timer_id_; + auto callback = task->callback; + cyber::Async([this, callback] { + if (this->running_) { + callback(); + } + }); + } + ite = bucket.task_list().erase(ite); + } + } + current_work_wheel_index_ = GetWorkWheelIndex(current_work_wheel_index_ + 1); + if (current_work_wheel_index_ == 0) { + current_assistant_wheel_index_ = + GetAssistantWheelIndex(current_assistant_wheel_index_ + 1); + Cascade(current_assistant_wheel_index_); } } -void TimingWheel::RemoveCancelledTasks(uint64_t slot_index) { - if (slot_index >= TIMING_WHEEL_SIZE) { - return; - } - { - std::lock_guard lg(cancelled_mutex_); - for (auto id : cancelled_list_) { - FOR_EACH(i, 0, TIMING_WHEEL_SIZE) { time_slots_[i].RemoveTask(id); } +void TimingWheel::AddTask(const std::shared_ptr& task) { + if (!running_) { + Start(); + } + + auto work_wheel_index = + current_work_wheel_index_ + task->next_fire_duration_ms; + if (work_wheel_index >= WORK_WHEEL_SIZE) { + auto real_work_wheel_index = GetWorkWheelIndex(work_wheel_index); + task->remainder_interval_ms = real_work_wheel_index; + auto assistant_ticks = work_wheel_index / WORK_WHEEL_SIZE; + if (assistant_ticks == 1 && + real_work_wheel_index != current_work_wheel_index_) { + work_wheel_[real_work_wheel_index].AddTask(task); + ADEBUG << "add task to work wheel. index :" << real_work_wheel_index; + } else { + auto assistant_wheel_index = GetAssistantWheelIndex( + current_assistant_wheel_index_ + assistant_ticks); + assistant_wheel_[assistant_wheel_index].AddTask(task); + ADEBUG << "add task to assistant wheel. index : " + << assistant_wheel_index; } - cancelled_list_.clear(); + } else { + work_wheel_[work_wheel_index].AddTask(task); + ADEBUG << "add task to work wheel. index :" << work_wheel_index; } } -void TimingWheel::FillAddSlot() { - std::shared_ptr task; - while (!add_queue_.Empty()) { - if (!add_queue_.Dequeue(&task)) { - return; +void TimingWheel::Cascade(const uint64_t assistant_wheel_index) { + auto& bucket = assistant_wheel_[assistant_wheel_index]; + std::lock_guard lock(bucket.mutex()); + auto ite = bucket.task_list().begin(); + while (ite != bucket.task_list().end()) { + auto task = ite->lock(); + if (task) { + work_wheel_[task->remainder_interval_ms].AddTask(task); } - FillSlot(task); + ite = bucket.task_list().erase(ite); } } -void TimingWheel::FillRepeatSlot() { - std::shared_ptr task; - while (!repeat_queue_.Empty()) { - if (!repeat_queue_.Dequeue(&task)) { - return; - } - FillSlot(task); +void TimingWheel::TickFunc() { + Rate rate(TIMER_RESOLUTION_MS * 1000000); // ms to ns + while (running_) { + Tick(); + rate.Sleep(); } } -void TimingWheel::FillSlot(const std::shared_ptr& task) { - task->deadline_ = task->init_time_ + - (task->fire_count_ + 1) * (task->interval_) * 1000 * 1000 - - start_time_; +TimingWheel::TimingWheel() {} - // Calculate how many tickes have been run since the time wheel start - uint64_t t = task->deadline_ / tick_duration_; // perTick = 1ms - if (t < tick_) { - task->rest_rounds_ = 0; - } else { - task->rest_rounds_ = (t - tick_) / TIMING_WHEEL_SIZE; - } - uint64_t ticks = std::max(t, tick_); // right now - uint64_t idx = ticks & mask_; - time_slots_[idx].AddTask(task); - - ADEBUG << "task id " << task->Id() << " insert to index " << idx; -} } // namespace cyber } // namespace apollo diff --git a/cyber/timer/timing_wheel.h b/cyber/timer/timing_wheel.h index d7b172b489f2215fa03c42395d38a5e6d8c84836..c6fc694ce96b19d45149a37b868e1633ce3c1c5e 100644 --- a/cyber/timer/timing_wheel.h +++ b/cyber/timer/timing_wheel.h @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. + * Copyright 2019 The Apollo Authors. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,74 +17,65 @@ #ifndef CYBER_TIMER_TIMING_WHEEL_H_ #define CYBER_TIMER_TIMING_WHEEL_H_ -#include -#include -#include +#include #include -#include #include -#include #include -#include -#include -#include -#include "cyber/base/bounded_queue.h" -#include "cyber/time/duration.h" -#include "cyber/timer/timing_slot.h" +#include "cyber/common/log.h" +#include "cyber/common/macros.h" +#include "cyber/scheduler/scheduler_factory.h" +#include "cyber/time/rate.h" +#include "cyber/timer/timer_bucket.h" namespace apollo { namespace cyber { -using apollo::cyber::base::BoundedQueue; -using CallHandler = std::function; +struct TimerTask; -static const int TIMING_WHEEL_SIZE = 128; -static const int THREAD_POOL_SIZE = 4; -static const uint64_t BOUNDED_QUEUE_SIZE = 200; -static const int TIMER_TASK_MAX_INTERVAL = 1000; - -class TimerTask; +static const uint64_t WORK_WHEEL_SIZE = 512; +static const uint64_t ASSISTANT_WHEEL_SIZE = 64; +static const uint64_t TIMER_RESOLUTION_MS = 1; +static const uint64_t TIMER_MAX_INTERVAL_MS = + WORK_WHEEL_SIZE * ASSISTANT_WHEEL_SIZE; class TimingWheel { public: - TimingWheel(); - explicit TimingWheel(const Duration& tick_duration); - ~TimingWheel() = default; - - uint64_t StartTimer(uint64_t interval, CallHandler handler, bool oneshot); - - void StopTimer(uint64_t timer_id); - - void Step(); + ~TimingWheel() { + if (running_) { + Shutdown(); + } + } - private: - void FillAddSlot(); - void FillRepeatSlot(); - void FillSlot(const std::shared_ptr& task); - - void RemoveCancelledTasks(uint64_t slot_index); + void Start(); - uint64_t id_counter_ = 0; + void Shutdown(); - uint64_t tick_ = 0; + void Tick(); - uint64_t start_time_ = 0; + void AddTask(const std::shared_ptr& task); - TimingSlot time_slots_[TIMING_WHEEL_SIZE]; + void Cascade(const uint64_t assistant_wheel_index); - uint64_t mask_ = TIMING_WHEEL_SIZE - 1; + void TickFunc(); - uint64_t tick_duration_ = 10 * 1000 * 1000; // 10ms - uint64_t resolution_ = 10; // 10ms - - // we need implement a lock-free high performance concurrent queue. - // Now, just a blocking-queue just for works. - std::list cancelled_list_; - std::mutex cancelled_mutex_; - BoundedQueue> add_queue_; - BoundedQueue> repeat_queue_; - BoundedQueue handler_queue_; + private: + uint64_t GetWorkWheelIndex(const uint64_t index) { + return index & (WORK_WHEEL_SIZE - 1); + } + uint64_t GetAssistantWheelIndex(const uint64_t index) { + return index & (ASSISTANT_WHEEL_SIZE - 1); + } + + bool running_ = false; + std::mutex running_mutex_; + TimerBucket work_wheel_[WORK_WHEEL_SIZE]; + TimerBucket assistant_wheel_[ASSISTANT_WHEEL_SIZE]; + uint64_t current_work_wheel_index_ = 0; + uint64_t current_assistant_wheel_index_ = 0; + std::thread tick_thread_; + + DECLARE_SINGLETON(TimingWheel) }; } // namespace cyber diff --git a/cyber/timer/timing_wheel_test.cc b/cyber/timer/timing_wheel_test.cc deleted file mode 100644 index b6a09a707816484a40de9ead9372792a0c0c928f..0000000000000000000000000000000000000000 --- a/cyber/timer/timing_wheel_test.cc +++ /dev/null @@ -1,82 +0,0 @@ -/****************************************************************************** - * Copyright 2018 The Apollo Authors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *****************************************************************************/ - -#include "cyber/timer/timing_wheel.h" - -#include -#include -#include -#include - -#include "cyber/common/log.h" -#include "cyber/cyber.h" -#include "cyber/init.h" - -namespace apollo { -namespace cyber { - -class TestHandler { - public: - TestHandler() = default; - - void increment() { count_++; } - - uint64_t count() { return count_; } - - private: - std::atomic count_ = {0}; -}; - -TEST(TimingWheelTest, Oneshot) { - TimingWheel tw; - std::shared_ptr th(new TestHandler()); - ASSERT_EQ(0, th->count()); - std::function f = std::bind(&TestHandler::increment, th.get()); - tw.Step(); - tw.StartTimer(10, f, true); - for (int i = 0; i < 10; i++) { - tw.Step(); - usleep(1000); - } - ASSERT_EQ(1, th->count()); -} - -TEST(TimingWheelTest, Period) { - TimingWheel tw; - std::shared_ptr th(new TestHandler()); - ASSERT_EQ(0, th->count()); - std::function f = std::bind(&TestHandler::increment, th.get()); - tw.Step(); - tw.StartTimer(10, f, false); - - for (uint64_t i = 0; i < 100; i++) { - tw.Step(); - if ((i + 1) % 10 == 0) { - usleep(10 * 1000); - ASSERT_TRUE(i <= th->count() && i + 1 >= th->count()); - } - } -} - -} // namespace cyber -} // namespace apollo - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - apollo::cyber::Init(argv[0]); - auto res = RUN_ALL_TESTS(); - return res; -} diff --git a/cyber/transport/BUILD b/cyber/transport/BUILD index 187e17b12213789934cee747e5398d6e1c615945..9da466a4ad3d45524ad34507d3099cde7a601b9c 100644 --- a/cyber/transport/BUILD +++ b/cyber/transport/BUILD @@ -30,7 +30,7 @@ cc_library( "underlay_message", "underlay_message_type", "//cyber/service_discovery:role", - "//cyber/timer:timer_manager", + "//cyber/task:task", "@fastrtps", ], )