提交 e848b7e3 编写于 作者: fengqikai1414's avatar fengqikai1414 提交者: Xiangquan Xiao

cyber: refactor timing wheel

上级 a08711a6
......@@ -61,6 +61,7 @@ cc_library(
"//cyber:state",
"//cyber/logger:async_logger",
"//cyber/node",
"//cyber/timer:timing_wheel",
],
)
......
......@@ -31,7 +31,7 @@
#include "cyber/scheduler/scheduler.h"
#include "cyber/service_discovery/topology_manager.h"
#include "cyber/task/task.h"
#include "cyber/timer/timer_manager.h"
#include "cyber/timer/timing_wheel.h"
#include "cyber/transport/transport.h"
namespace apollo {
......@@ -114,7 +114,7 @@ void Clear() {
return;
}
TaskManager::CleanUp();
TimerManager::CleanUp();
TimingWheel::CleanUp();
scheduler::CleanUp();
service_discovery::TopologyManager::CleanUp();
transport::Transport::CleanUp();
......
......@@ -6,53 +6,22 @@ cc_library(
name = "timer",
srcs = ["timer.cc"],
hdrs = ["timer.h"],
deps = [
"timer_manager",
"//cyber/common:global_data",
],
)
cc_library(
name = "timer_manager",
srcs = ["timer_manager.cc"],
hdrs = ["timer_manager.h"],
deps = [
"timing_wheel",
"//cyber/common:macros",
"//cyber/scheduler",
"//cyber/task",
"//cyber/time",
"//cyber/time:rate",
],
)
cc_test(
name = "timer_manager_test",
size = "small",
srcs = ["timer_manager_test.cc"],
deps = [
"//cyber:cyber_core",
"@gtest//:main",
"//cyber/common:global_data",
],
)
cc_library(
name = "timer_task",
srcs = ["timer_task.cc"],
hdrs = ["timer_task.h"],
deps = [
"//cyber/base:bounded_queue",
"//cyber/task",
],
)
cc_library(
name = "timing_slot",
srcs = ["timing_slot.cc"],
hdrs = ["timing_slot.h"],
name = "timer_bucket",
hdrs = ["timer_bucket.h"],
deps = [
"timer_task",
"//cyber/base:bounded_queue",
],
)
......@@ -61,19 +30,18 @@ cc_library(
srcs = ["timing_wheel.cc"],
hdrs = ["timing_wheel.h"],
deps = [
"timer_task",
"timing_slot",
"//cyber/base:bounded_queue",
"timer_bucket",
"//cyber/task",
"//cyber/time",
"//cyber/time:duration",
"//cyber/time:rate",
],
)
cc_test(
name = "timing_wheel_test",
name = "timer_test",
size = "small",
srcs = ["timing_wheel_test.cc"],
srcs = ["timer_test.cc"],
deps = [
"//cyber:cyber_core",
"//cyber:init",
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
* Copyright 2019 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -21,14 +21,24 @@
namespace apollo {
namespace cyber {
Timer::Timer() { tm_ = TimerManager::Instance(); }
namespace {
static std::atomic<uint64_t> global_timer_id = {0};
static uint64_t GenerateTimerId() { return global_timer_id.fetch_add(1); }
}
Timer::Timer() {
timing_wheel_ = TimingWheel::Instance();
timer_id_ = GenerateTimerId();
}
Timer::Timer(TimerOption opt) : timer_opt_(opt) {
tm_ = TimerManager::Instance();
timing_wheel_ = TimingWheel::Instance();
timer_id_ = GenerateTimerId();
}
Timer::Timer(uint32_t period, std::function<void()> callback, bool oneshot) {
tm_ = TimerManager::Instance();
timing_wheel_ = TimingWheel::Instance();
timer_id_ = GenerateTimerId();
timer_opt_.period = period;
timer_opt_.callback = callback;
timer_opt_.oneshot = oneshot;
......@@ -36,28 +46,68 @@ Timer::Timer(uint32_t period, std::function<void()> callback, bool oneshot) {
void Timer::SetTimerOption(TimerOption opt) { timer_opt_ = opt; }
bool Timer::InitTimerTask() {
if (timer_opt_.period == 0) {
AERROR << "Max interval must great than 0";
return false;
}
if (timer_opt_.period >= TIMER_MAX_INTERVAL_MS) {
AERROR << "Max interval must less than " << TIMER_MAX_INTERVAL_MS;
return false;
}
task_.reset(new TimerTask(timer_id_));
task_->interval_ms = timer_opt_.period;
task_->next_fire_duration_ms = task_->interval_ms;
if (timer_opt_.oneshot) {
task_->callback = timer_opt_.callback;
} else {
std::weak_ptr<TimerTask> task_wptr = this->task_;
task_->callback = [this, task_wptr]() {
auto start = std::chrono::steady_clock::now();
this->timer_opt_.callback();
auto end = std::chrono::steady_clock::now();
uint64_t execute_time_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count();
auto task = task_wptr.lock();
if (task) {
if (execute_time_ms >= task->interval_ms) {
task->next_fire_duration_ms = 1;
} else {
task->next_fire_duration_ms = task->interval_ms - execute_time_ms;
}
this->timing_wheel_->AddTask(task);
}
};
}
return true;
}
void Timer::Start() {
if (!common::GlobalData::Instance()->IsRealityMode()) {
return;
}
if (!started_.exchange(true)) {
timer_id_ =
tm_->Add(timer_opt_.period, timer_opt_.callback, timer_opt_.oneshot);
if (InitTimerTask()) {
timing_wheel_->AddTask(task_);
}
}
}
void Timer::Stop() {
if (started_.exchange(false)) {
ADEBUG << "stop timer " << timer_id_;
tm_->Remove(timer_id_);
timer_id_ = 0;
ADEBUG << "stop timer ";
task_.reset();
}
}
Timer::~Timer() {
if (timer_id_ != 0) {
tm_->Remove(timer_id_);
if (task_) {
Stop();
}
}
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
* Copyright 2019 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -20,13 +20,16 @@
#include <atomic>
#include <memory>
#include "cyber/timer/timer_manager.h"
#include "cyber/timer/timing_wheel.h"
namespace apollo {
namespace cyber {
struct TimerOption {
uint32_t period; // The period of the timer, unit is ms
TimerOption(uint32_t period, std::function<void()> callback, bool oneshot)
: period(period), callback(callback), oneshot(oneshot) {}
TimerOption() : period(0), callback(), oneshot() {}
uint32_t period = 0; // The period of the timer, unit is ms
std::function<void()> callback; // The tasks that the timer needs to perform
bool oneshot; // True: perform the callback only after the first timing cycle
// False: perform the callback every timed period
......@@ -74,9 +77,11 @@ class Timer {
void Stop();
private:
bool InitTimerTask();
uint64_t timer_id_;
TimerOption timer_opt_;
TimerManager* tm_ = nullptr;
uint64_t timer_id_ = 0;
TimingWheel* timing_wheel_ = nullptr;
std::shared_ptr<TimerTask> task_;
std::atomic<bool> started_ = {false};
};
......
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
* Copyright 2019 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -14,32 +14,34 @@
* limitations under the License.
*****************************************************************************/
#include "cyber/timer/timer_task.h"
#ifndef CYBER_TIMER_TIMER_BUCKET_H_
#define CYBER_TIMER_TIMER_BUCKET_H_
#include <list>
#include <memory>
#include <mutex>
#include "cyber/task/task.h"
#include "cyber/timer/timer_task.h"
namespace apollo {
namespace cyber {
void TimerTask::Fire(bool async) {
if (status_ != INIT) {
return;
}
if (oneshot_) // not repeat. so always on ready
status_ = EXPIRED;
if (async) {
cyber::Async(handler_);
} else {
handler_();
class TimerBucket {
public:
void AddTask(const std::shared_ptr<TimerTask>& task) {
std::lock_guard<std::mutex> lock(mutex_);
task_list_.push_back(task);
}
}
bool TimerTask::Cancel() {
if (State() != INIT) {
return false;
}
status_ = CANCELED;
return true;
}
std::mutex& mutex() { return mutex_; }
std::list<std::weak_ptr<TimerTask>>& task_list() { return task_list_; }
private:
std::mutex mutex_;
std::list<std::weak_ptr<TimerTask>> task_list_;
};
} // namespace cyber
} // namespace apollo
#endif // CYBER_TIMER_TIMER_BUCKET_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include "cyber/timer/timer_manager.h"
#include "cyber/common/log.h"
#include "cyber/scheduler/scheduler_factory.h"
#include "cyber/time/duration.h"
#include "cyber/time/rate.h"
namespace apollo {
namespace cyber {
TimerManager::TimerManager()
: timing_wheel_(Duration(0.01)),
time_gran_(Duration(0.01)),
running_(false) {} // default time gran = 1ms
TimerManager::~TimerManager() {
if (running_) {
Shutdown();
}
}
void TimerManager::Start() {
std::lock_guard<std::mutex> lock(running_mutex_);
if (!running_) {
ADEBUG << "TimerManager->Start() ok";
running_ = true;
scheduler_thread_ = std::thread([this]() { this->ThreadFuncImpl(); });
scheduler::Instance()->SetInnerThreadAttr("timer", &scheduler_thread_);
}
}
void TimerManager::Shutdown() {
std::lock_guard<std::mutex> lock(running_mutex_);
if (running_) {
running_ = false;
if (scheduler_thread_.joinable()) {
scheduler_thread_.join();
}
}
}
uint64_t TimerManager::Add(uint64_t interval, std::function<void()> handler,
bool oneshot) {
if (!running_) {
Start();
}
uint64_t timer_id = timing_wheel_.StartTimer(interval, handler, oneshot);
return timer_id;
}
void TimerManager::Remove(uint64_t timer_id) {
timing_wheel_.StopTimer(timer_id);
}
bool TimerManager::IsRunning() { return running_; }
void TimerManager::ThreadFuncImpl() {
Rate rate(time_gran_);
while (running_) {
timing_wheel_.Step();
rate.Sleep();
}
}
} // namespace cyber
} // namespace apollo
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_TIMER_TIMER_MANAGER_H_
#define CYBER_TIMER_TIMER_MANAGER_H_
#include <fstream>
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include "cyber/common/macros.h"
#include "cyber/time/duration.h"
#include "cyber/timer/timing_wheel.h"
namespace apollo {
namespace cyber {
class TimerManager {
public:
virtual ~TimerManager();
void Start();
void Shutdown();
bool IsRunning();
uint64_t Add(uint64_t interval, std::function<void()> handler, bool oneshot);
void Remove(uint64_t timer_id);
private:
TimingWheel timing_wheel_;
Duration time_gran_;
bool running_ = false;
mutable std::mutex running_mutex_;
std::thread scheduler_thread_;
void ThreadFuncImpl();
DECLARE_SINGLETON(TimerManager)
};
} // namespace cyber
} // namespace apollo
#endif // CYBER_TIMER_TIMER_MANAGER_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include "cyber/timer/timer_manager.h"
#include <gtest/gtest.h>
#include <memory>
#include "cyber/common/log.h"
namespace apollo {
namespace cyber {
TEST(TimerManagerTest, Basic) {
auto tm = TimerManager::Instance();
ASSERT_EQ(tm->IsRunning(), false);
tm->Start();
ASSERT_EQ(tm->IsRunning(), true);
tm->Shutdown();
ASSERT_EQ(tm->IsRunning(), false);
}
} // namespace cyber
} // namespace apollo
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
* Copyright 2019 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -17,55 +17,20 @@
#ifndef CYBER_TIMER_TIMER_TASK_H_
#define CYBER_TIMER_TIMER_TASK_H_
#include <algorithm>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "cyber/base/bounded_queue.h"
#include <functional>
namespace apollo {
namespace cyber {
using CallHandler = std::function<void()>;
class TimerTask {
public:
TimerTask(uint64_t id, uint64_t it, uint64_t ivl, CallHandler h, bool ons)
: tid_(id), init_time_(it), interval_(ivl), handler_(h), oneshot_(ons) {}
TimerTask() = default;
private:
enum STATS { INIT = 0, CANCELED, EXPIRED };
STATS State() { return status_; }
volatile STATS status_ = INIT;
uint64_t tid_ = 0;
public:
uint64_t init_time_ = 0;
uint64_t deadline_ = 0;
uint64_t interval_ = 0;
CallHandler handler_;
uint64_t rest_rounds_ = 0;
bool oneshot_ = true;
uint64_t fire_count_ = 0;
public:
uint64_t Id() { return tid_; }
void Fire(bool async);
bool Cancel();
bool IsCanceled() { return State() == CANCELED; }
class TimerBucket;
bool IsExpired() { return State() == EXPIRED; }
struct TimerTask {
explicit TimerTask(uint64_t timer_id) : timer_id_(timer_id) {}
uint64_t timer_id_ = 0;
std::function<void()> callback;
uint64_t interval_ms = 0;
uint64_t remainder_interval_ms = 0;
uint64_t next_fire_duration_ms = 0;
};
} // namespace cyber
......
......@@ -14,69 +14,90 @@
* limitations under the License.
*****************************************************************************/
#include "cyber/timer/timing_wheel.h"
/* note that the frame code of the following is Generated by script */
#include <gtest/gtest.h>
#include <unistd.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <string>
#include <utility>
#include "cyber/common/log.h"
#include "cyber/common/util.h"
#include "cyber/cyber.h"
#include "cyber/init.h"
#include "cyber/timer/timer.h"
namespace apollo {
namespace cyber {
namespace timer {
class TestHandler {
public:
TestHandler() = default;
using cyber::Timer;
using cyber::TimerOption;
void increment() { count_++; }
TEST(TimerTest, one_shot) {
int count = 0;
Timer timer(100, [&count] { count = 100; }, true);
timer.Start();
std::this_thread::sleep_for(std::chrono::milliseconds(90));
EXPECT_EQ(0, count);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(100, count);
timer.Stop();
}
uint64_t count() { return count_; }
TEST(TimerTest, cycle) {
int count = 0;
Timer timers[1000];
TimerOption opt;
opt.oneshot = false;
opt.callback = [=] { AINFO << count; };
for (int i = 0; i < 1000; i++) {
opt.period = i + 1;
timers[i].SetTimerOption(opt);
timers[i].Start();
}
private:
std::atomic<uint64_t> count_ = {0};
};
std::this_thread::sleep_for(std::chrono::seconds(3));
for (int i = 0; i < 1000; i++) {
timers[i].Stop();
}
}
TEST(TimingWheelTest, Oneshot) {
TimingWheel tw;
std::shared_ptr<TestHandler> th(new TestHandler());
ASSERT_EQ(0, th->count());
std::function<void(void)> f = std::bind(&TestHandler::increment, th.get());
tw.Step();
tw.StartTimer(10, f, true);
for (int i = 0; i < 10; i++) {
tw.Step();
usleep(1000);
TEST(TimerTest, start_stop) {
int count = 0;
Timer timer(2, [count] { AINFO << count; }, false);
for (int i = 0; i < 100; i++) {
timer.Start();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
timer.Stop();
}
ASSERT_EQ(1, th->count());
}
TEST(TimingWheelTest, Period) {
TimingWheel tw;
std::shared_ptr<TestHandler> th(new TestHandler());
ASSERT_EQ(0, th->count());
std::function<void(void)> f = std::bind(&TestHandler::increment, th.get());
tw.Step();
tw.StartTimer(10, f, false);
for (uint64_t i = 0; i < 100; i++) {
tw.Step();
if ((i + 1) % 10 == 0) {
usleep(10 * 1000);
ASSERT_TRUE(i <= th->count() && i + 1 >= th->count());
}
TEST(TimerTest, test1) {
auto count = 0;
auto func = [count]() { AINFO << count; };
TimerOption to{1000, func, false};
{
Timer t;
t.SetTimerOption(to);
common::GlobalData::Instance()->EnableSimulationMode();
t.Start();
common::GlobalData::Instance()->DisableSimulationMode();
t.Start();
t.Start();
}
}
} // namespace timer
} // namespace cyber
} // namespace apollo
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
apollo::cyber::Init(argv[0]);
auto res = RUN_ALL_TESTS();
return res;
return RUN_ALL_TESTS();
}
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#include "cyber/timer/timing_slot.h"
#include "cyber/common/log.h"
#include "cyber/timer/timer_task.h"
namespace apollo {
namespace cyber {
void TimingSlot::AddTask(const std::shared_ptr<TimerTask>& task) {
tasks_.emplace(task->Id(), task);
}
void TimingSlot::EnumTaskList(
uint64_t deadline, bool async, BoundedQueue<HandlePackage>* hander_queue,
BoundedQueue<std::shared_ptr<TimerTask>>* rep_queue) {
for (auto it = tasks_.begin(); it != tasks_.end();) {
auto task = it->second; // *it;
auto del_it = it;
it++;
// std::cout << "judge: task->" << task->deadline << " : " << deadline;
if (task->deadline_ <= deadline) {
if (task->rest_rounds_ == 0) {
if (async) {
HandlePackage hp;
hp.handle = task->handler_;
hp.id = task->Id();
if (!hander_queue->Enqueue(hp)) {
AERROR << "hander queue is full";
}
} else {
task->Fire(false);
}
if (!task->oneshot_) { // repeat timer,push back
task->fire_count_++;
rep_queue->Enqueue(task);
}
tasks_.erase(del_it);
} else {
AERROR << "task deadline overflow...";
}
} else { // no expired, -- rounds
task->rest_rounds_--;
}
}
}
} // namespace cyber
} // namespace apollo
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*****************************************************************************/
#ifndef CYBER_TIMER_TIMING_SLOT_H_
#define CYBER_TIMER_TIMING_SLOT_H_
#include <algorithm>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "cyber/base/bounded_queue.h"
namespace apollo {
namespace cyber {
using apollo::cyber::base::BoundedQueue;
using CallHandler = std::function<void()>;
class TimerTask;
struct HandlePackage {
uint64_t id;
CallHandler handle;
};
class TimingSlot {
private:
// no needs for multi-thread
std::unordered_map<uint64_t, std::shared_ptr<TimerTask>> tasks_;
public:
TimingSlot() = default;
void AddTask(const std::shared_ptr<TimerTask>& task);
void RemoveTask(uint64_t id) { tasks_.erase(id); }
void EnumTaskList(uint64_t deadline, bool async,
BoundedQueue<HandlePackage>* hander_queue,
BoundedQueue<std::shared_ptr<TimerTask>>* rep_list);
}; // TimeSlot end
} // namespace cyber
} // namespace apollo
#endif // CYBER_TIMER_TIMING_SLOT_H_
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
* Copyright 2019 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -15,164 +15,109 @@
*****************************************************************************/
#include "cyber/timer/timing_wheel.h"
#include <algorithm>
#include "cyber/base/for_each.h"
#include "cyber/common/log.h"
#include "cyber/task/task.h"
#include "cyber/time/time.h"
#include "cyber/timer/timer_task.h"
namespace apollo {
namespace cyber {
TimingWheel::TimingWheel() {
if (!add_queue_.Init(BOUNDED_QUEUE_SIZE)) {
AERROR << "Add queue init failed.";
throw std::runtime_error("Add queue init failed.");
}
if (!repeat_queue_.Init(BOUNDED_QUEUE_SIZE)) {
AERROR << "Repeated task queue init failed.";
throw std::runtime_error("Repeated queue init failed.");
}
if (!handler_queue_.Init(BOUNDED_QUEUE_SIZE)) {
AERROR << "Handler queue init failed.";
throw std::runtime_error("Handler queue init failed.");
}
}
TimingWheel::TimingWheel(const Duration& tick_duration) {
tick_duration_ = tick_duration.ToNanosecond();
resolution_ = tick_duration_ / 1000000UL;
if (!add_queue_.Init(BOUNDED_QUEUE_SIZE)) {
AERROR << "Add queue init failed.";
throw std::runtime_error("Add queue init failed.");
}
if (!repeat_queue_.Init(BOUNDED_QUEUE_SIZE)) {
AERROR << "Repeated task queue init failed.";
throw std::runtime_error("Repeated queue init failed.");
}
if (!handler_queue_.Init(BOUNDED_QUEUE_SIZE)) {
AERROR << "Handler queue init failed.";
throw std::runtime_error("Handler queue init failed.");
}
}
uint64_t TimingWheel::StartTimer(uint64_t interval, CallHandler handler,
bool oneshot) {
if (id_counter_ > UINT64_MAX) {
AERROR << "Timer ID pool is full.";
return -1;
} else if (interval > TIMER_TASK_MAX_INTERVAL) {
AERROR << "The interval of timer task MUST less than or equal "
<< TIMER_TASK_MAX_INTERVAL << "ms.";
return -1;
} else if (interval < resolution_) {
AERROR << "The interval of timer task MUST larger than or equal "
<< resolution_ << "ms.";
return -1;
}
auto now = Time::Now().ToNanosecond();
auto task = std::make_shared<TimerTask>(
++id_counter_, now, interval,
[handler](void) {
static std::mutex task_mutex;
std::lock_guard<std::mutex> lock(task_mutex);
handler();
},
oneshot);
if (add_queue_.Enqueue(task)) {
ADEBUG << "start timer id: " << id_counter_;
return id_counter_;
} else {
--id_counter_;
AERROR << "add queue is full, Enqueue failed!";
return -1;
void TimingWheel::Start() {
std::lock_guard<std::mutex> lock(running_mutex_);
if (!running_) {
ADEBUG << "TimeWheel start ok";
running_ = true;
tick_thread_ = std::thread([this]() { this->TickFunc(); });
scheduler::Instance()->SetInnerThreadAttr("timer", &tick_thread_);
}
}
void TimingWheel::Step() {
if (start_time_ == 0) {
start_time_ = Time::Now().ToNanosecond();
}
uint64_t deadline = tick_duration_ * (tick_ + 1);
uint64_t idx = tick_ & mask_;
RemoveCancelledTasks(idx);
FillAddSlot();
time_slots_[idx].EnumTaskList(deadline, true, &handler_queue_,
&repeat_queue_);
// timing wheel tick one time
tick_++;
FillRepeatSlot();
while (!handler_queue_.Empty()) {
HandlePackage hp;
if (handler_queue_.Dequeue(&hp)) {
cyber::Async(hp.handle);
void TimingWheel::Shutdown() {
std::lock_guard<std::mutex> lock(running_mutex_);
if (running_) {
running_ = false;
if (tick_thread_.joinable()) {
tick_thread_.join();
}
}
}
void TimingWheel::StopTimer(uint64_t timer_id) {
void TimingWheel::Tick() {
auto& bucket = work_wheel_[current_work_wheel_index_];
{
std::lock_guard<std::mutex> lg(cancelled_mutex_);
cancelled_list_.push_back(timer_id);
std::lock_guard<std::mutex> lock(bucket.mutex());
auto ite = bucket.task_list().begin();
while (ite != bucket.task_list().end()) {
auto task = ite->lock();
if (task) {
ADEBUG << "index: " << current_work_wheel_index_
<< " timer id: " << task->timer_id_;
auto callback = task->callback;
cyber::Async([this, callback] {
if (this->running_) {
callback();
}
});
}
ite = bucket.task_list().erase(ite);
}
}
current_work_wheel_index_ = GetWorkWheelIndex(current_work_wheel_index_ + 1);
if (current_work_wheel_index_ == 0) {
current_assistant_wheel_index_ =
GetAssistantWheelIndex(current_assistant_wheel_index_ + 1);
Cascade(current_assistant_wheel_index_);
}
}
void TimingWheel::RemoveCancelledTasks(uint64_t slot_index) {
if (slot_index >= TIMING_WHEEL_SIZE) {
return;
}
{
std::lock_guard<std::mutex> lg(cancelled_mutex_);
for (auto id : cancelled_list_) {
FOR_EACH(i, 0, TIMING_WHEEL_SIZE) { time_slots_[i].RemoveTask(id); }
void TimingWheel::AddTask(const std::shared_ptr<TimerTask>& task) {
if (!running_) {
Start();
}
auto work_wheel_index =
current_work_wheel_index_ + task->next_fire_duration_ms;
if (work_wheel_index >= WORK_WHEEL_SIZE) {
auto real_work_wheel_index = GetWorkWheelIndex(work_wheel_index);
task->remainder_interval_ms = real_work_wheel_index;
auto assistant_ticks = work_wheel_index / WORK_WHEEL_SIZE;
if (assistant_ticks == 1 &&
real_work_wheel_index != current_work_wheel_index_) {
work_wheel_[real_work_wheel_index].AddTask(task);
ADEBUG << "add task to work wheel. index :" << real_work_wheel_index;
} else {
auto assistant_wheel_index = GetAssistantWheelIndex(
current_assistant_wheel_index_ + assistant_ticks);
assistant_wheel_[assistant_wheel_index].AddTask(task);
ADEBUG << "add task to assistant wheel. index : "
<< assistant_wheel_index;
}
cancelled_list_.clear();
} else {
work_wheel_[work_wheel_index].AddTask(task);
ADEBUG << "add task to work wheel. index :" << work_wheel_index;
}
}
void TimingWheel::FillAddSlot() {
std::shared_ptr<TimerTask> task;
while (!add_queue_.Empty()) {
if (!add_queue_.Dequeue(&task)) {
return;
void TimingWheel::Cascade(const uint64_t assistant_wheel_index) {
auto& bucket = assistant_wheel_[assistant_wheel_index];
std::lock_guard<std::mutex> lock(bucket.mutex());
auto ite = bucket.task_list().begin();
while (ite != bucket.task_list().end()) {
auto task = ite->lock();
if (task) {
work_wheel_[task->remainder_interval_ms].AddTask(task);
}
FillSlot(task);
ite = bucket.task_list().erase(ite);
}
}
void TimingWheel::FillRepeatSlot() {
std::shared_ptr<TimerTask> task;
while (!repeat_queue_.Empty()) {
if (!repeat_queue_.Dequeue(&task)) {
return;
}
FillSlot(task);
void TimingWheel::TickFunc() {
Rate rate(TIMER_RESOLUTION_MS * 1000000); // ms to ns
while (running_) {
Tick();
rate.Sleep();
}
}
void TimingWheel::FillSlot(const std::shared_ptr<TimerTask>& task) {
task->deadline_ = task->init_time_ +
(task->fire_count_ + 1) * (task->interval_) * 1000 * 1000 -
start_time_;
TimingWheel::TimingWheel() {}
// Calculate how many tickes have been run since the time wheel start
uint64_t t = task->deadline_ / tick_duration_; // perTick = 1ms
if (t < tick_) {
task->rest_rounds_ = 0;
} else {
task->rest_rounds_ = (t - tick_) / TIMING_WHEEL_SIZE;
}
uint64_t ticks = std::max(t, tick_); // right now
uint64_t idx = ticks & mask_;
time_slots_[idx].AddTask(task);
ADEBUG << "task id " << task->Id() << " insert to index " << idx;
}
} // namespace cyber
} // namespace apollo
/******************************************************************************
* Copyright 2018 The Apollo Authors. All Rights Reserved.
* Copyright 2019 The Apollo Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -17,74 +17,65 @@
#ifndef CYBER_TIMER_TIMING_WHEEL_H_
#define CYBER_TIMER_TIMING_WHEEL_H_
#include <algorithm>
#include <functional>
#include <iostream>
#include <future>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
#include "cyber/base/bounded_queue.h"
#include "cyber/time/duration.h"
#include "cyber/timer/timing_slot.h"
#include "cyber/common/log.h"
#include "cyber/common/macros.h"
#include "cyber/scheduler/scheduler_factory.h"
#include "cyber/time/rate.h"
#include "cyber/timer/timer_bucket.h"
namespace apollo {
namespace cyber {
using apollo::cyber::base::BoundedQueue;
using CallHandler = std::function<void()>;
struct TimerTask;
static const int TIMING_WHEEL_SIZE = 128;
static const int THREAD_POOL_SIZE = 4;
static const uint64_t BOUNDED_QUEUE_SIZE = 200;
static const int TIMER_TASK_MAX_INTERVAL = 1000;
class TimerTask;
static const uint64_t WORK_WHEEL_SIZE = 512;
static const uint64_t ASSISTANT_WHEEL_SIZE = 64;
static const uint64_t TIMER_RESOLUTION_MS = 1;
static const uint64_t TIMER_MAX_INTERVAL_MS =
WORK_WHEEL_SIZE * ASSISTANT_WHEEL_SIZE;
class TimingWheel {
public:
TimingWheel();
explicit TimingWheel(const Duration& tick_duration);
~TimingWheel() = default;
uint64_t StartTimer(uint64_t interval, CallHandler handler, bool oneshot);
void StopTimer(uint64_t timer_id);
void Step();
~TimingWheel() {
if (running_) {
Shutdown();
}
}
private:
void FillAddSlot();
void FillRepeatSlot();
void FillSlot(const std::shared_ptr<TimerTask>& task);
void RemoveCancelledTasks(uint64_t slot_index);
void Start();
uint64_t id_counter_ = 0;
void Shutdown();
uint64_t tick_ = 0;
void Tick();
uint64_t start_time_ = 0;
void AddTask(const std::shared_ptr<TimerTask>& task);
TimingSlot time_slots_[TIMING_WHEEL_SIZE];
void Cascade(const uint64_t assistant_wheel_index);
uint64_t mask_ = TIMING_WHEEL_SIZE - 1;
void TickFunc();
uint64_t tick_duration_ = 10 * 1000 * 1000; // 10ms
uint64_t resolution_ = 10; // 10ms
// we need implement a lock-free high performance concurrent queue.
// Now, just a blocking-queue just for works.
std::list<uint64_t> cancelled_list_;
std::mutex cancelled_mutex_;
BoundedQueue<std::shared_ptr<TimerTask>> add_queue_;
BoundedQueue<std::shared_ptr<TimerTask>> repeat_queue_;
BoundedQueue<HandlePackage> handler_queue_;
private:
uint64_t GetWorkWheelIndex(const uint64_t index) {
return index & (WORK_WHEEL_SIZE - 1);
}
uint64_t GetAssistantWheelIndex(const uint64_t index) {
return index & (ASSISTANT_WHEEL_SIZE - 1);
}
bool running_ = false;
std::mutex running_mutex_;
TimerBucket work_wheel_[WORK_WHEEL_SIZE];
TimerBucket assistant_wheel_[ASSISTANT_WHEEL_SIZE];
uint64_t current_work_wheel_index_ = 0;
uint64_t current_assistant_wheel_index_ = 0;
std::thread tick_thread_;
DECLARE_SINGLETON(TimingWheel)
};
} // namespace cyber
......
......@@ -30,7 +30,7 @@ cc_library(
"underlay_message",
"underlay_message_type",
"//cyber/service_discovery:role",
"//cyber/timer:timer_manager",
"//cyber/task:task",
"@fastrtps",
],
)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册