提交 0c13e204 编写于 作者: fengqikai1414's avatar fengqikai1414 提交者: fengqikai1414

Cyber: update timer

上级 2b4ce050
......@@ -15,9 +15,6 @@
*****************************************************************************/
#include "cyber/examples/common_component_example/common_component_example.h"
#include "cyber/class_loader/class_loader.h"
#include "cyber/component/component.h"
bool CommonComponentSample::Init() {
AINFO << "Commontest component init";
return true;
......
......@@ -15,7 +15,6 @@
*****************************************************************************/
#include <memory>
#include "cyber/class_loader/class_loader.h"
#include "cyber/component/component.h"
#include "cyber/examples/proto/examples.pb.h"
......
......@@ -65,7 +65,7 @@ bool Timer::InitTimerTask() {
task_->callback = [callback = this->timer_opt_.callback, task_weak_ptr]() {
auto task = task_weak_ptr.lock();
if (task) {
std::lock_guard<std::mutex> lg(task->mtx_);
std::lock_guard<std::mutex> lg(task->mutex);
callback();
}
};
......@@ -76,47 +76,47 @@ bool Timer::InitTimerTask() {
if (!task) {
return;
}
std::lock_guard<std::mutex> lg(task->mtx_);
std::lock_guard<std::mutex> lg(task->mutex);
auto start = Time::MonoTime().ToNanosecond();
callback();
auto end = Time::MonoTime().ToNanosecond();
uint64_t execute_time_ns = end - start;
uint64_t execute_time_ms =
#if defined(__aarch64__)
::llround(static_cast<double>(execute_time_ns) / 1000000);
::llround(static_cast<double>(execute_time_ns) / 1e6);
#else
std::llround(static_cast<double>(execute_time_ns) / 1000000);
std::llround(static_cast<double>(execute_time_ns) / 1e6);
#endif
if (task->last_execute_time_ns_ == 0) {
task->last_execute_time_ns_ = start;
if (task->last_execute_time_ns == 0) {
task->last_execute_time_ns = start;
} else {
task->accumulated_error_ns_ +=
start - task->last_execute_time_ns_ - task->interval_ms * 1000000;
task->accumulated_error_ns +=
start - task->last_execute_time_ns - task->interval_ms * 1000000;
}
ADEBUG << "start: " << start << "\t last: " << task->last_execute_time_ns_
ADEBUG << "start: " << start << "\t last: " << task->last_execute_time_ns
<< "\t execut time:" << execute_time_ms
<< "\t accumulated_error_ns: " << task->accumulated_error_ns_;
task->last_execute_time_ns_ = start;
<< "\t accumulated_error_ns: " << task->accumulated_error_ns;
task->last_execute_time_ns = start;
if (execute_time_ms >= task->interval_ms) {
task->next_fire_duration_ms = 1;
task->next_fire_duration_ms = TIMER_RESOLUTION_MS;
} else {
#if defined(__aarch64__)
int64_t accumulated_error_ms = ::llround(
#else
int64_t accumulated_error_ms = std::llround(
#endif
static_cast<double>(task->accumulated_error_ns_) / 1000000);
if (static_cast<int64_t>(task->interval_ms - execute_time_ms - 1) >=
accumulated_error_ms) {
static_cast<double>(task->accumulated_error_ns) / 1e6);
if (static_cast<int64_t>(task->interval_ms - execute_time_ms -
TIMER_RESOLUTION_MS) >= accumulated_error_ms) {
task->next_fire_duration_ms =
task->interval_ms - execute_time_ms - accumulated_error_ms;
} else {
task->next_fire_duration_ms = 1;
task->next_fire_duration_ms = TIMER_RESOLUTION_MS;
}
ADEBUG << "error ms: " << accumulated_error_ms
<< " execute time: " << execute_time_ms
<< " next fire: " << task->next_fire_duration_ms
<< " error ns: " << task->accumulated_error_ns_;
<< " error ns: " << task->accumulated_error_ns;
}
TimingWheel::Instance()->AddTask(task);
};
......@@ -140,8 +140,12 @@ void Timer::Start() {
void Timer::Stop() {
if (started_.exchange(false) && task_) {
AINFO << "stop timer, the timer_id: " << timer_id_;
std::lock_guard<std::mutex> lg(task_->mtx_);
task_.reset();
// using a shared pointer to hold task_->mutex before task_ reset
auto tmp_task = task_;
{
std::lock_guard<std::mutex> lg(tmp_task->mutex);
task_.reset();
}
}
}
......
......@@ -32,9 +32,9 @@ struct TimerTask {
uint64_t interval_ms = 0;
uint64_t remainder_interval_ms = 0;
uint64_t next_fire_duration_ms = 0;
int64_t accumulated_error_ns_ = 0;
uint64_t last_execute_time_ns_ = 0;
std::mutex mtx_;
int64_t accumulated_error_ns = 0;
uint64_t last_execute_time_ns = 0;
std::mutex mutex;
};
} // namespace cyber
......
......@@ -72,8 +72,8 @@ void TimingWheel::AddTask(const std::shared_ptr<TimerTask>& task,
Start();
}
auto work_wheel_index =
current_work_wheel_index + task->next_fire_duration_ms;
auto work_wheel_index = current_work_wheel_index +
task->next_fire_duration_ms / TIMER_RESOLUTION_MS;
if (work_wheel_index >= WORK_WHEEL_SIZE) {
auto real_work_wheel_index = GetWorkWheelIndex(work_wheel_index);
task->remainder_interval_ms = real_work_wheel_index;
......@@ -83,9 +83,13 @@ void TimingWheel::AddTask(const std::shared_ptr<TimerTask>& task,
work_wheel_[real_work_wheel_index].AddTask(task);
ADEBUG << "add task to work wheel. index :" << real_work_wheel_index;
} else {
auto assistant_wheel_index = GetAssistantWheelIndex(
current_assistant_wheel_index_ + assistant_ticks);
assistant_wheel_[assistant_wheel_index].AddTask(task);
auto assistant_wheel_index = 0;
{
std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
assistant_wheel_index = GetAssistantWheelIndex(
current_assistant_wheel_index_ + assistant_ticks);
assistant_wheel_[assistant_wheel_index].AddTask(task);
}
ADEBUG << "add task to assistant wheel. index : "
<< assistant_wheel_index;
}
......@@ -116,11 +120,17 @@ void TimingWheel::TickFunc() {
// AINFO_EVERY(1000) << "Tick " << TickCount();
tick_count_++;
rate.Sleep();
current_work_wheel_index_ =
GetWorkWheelIndex(current_work_wheel_index_ + 1);
{
std::lock_guard<std::mutex> lock(current_work_wheel_index_mutex_);
current_work_wheel_index_ =
GetWorkWheelIndex(current_work_wheel_index_ + 1);
}
if (current_work_wheel_index_ == 0) {
current_assistant_wheel_index_ =
GetAssistantWheelIndex(current_assistant_wheel_index_ + 1);
{
std::lock_guard<std::mutex> lock(current_assistant_wheel_index_mutex_);
current_assistant_wheel_index_ =
GetAssistantWheelIndex(current_assistant_wheel_index_ + 1);
}
Cascade(current_assistant_wheel_index_);
}
}
......
......@@ -35,9 +35,9 @@ struct TimerTask;
static const uint64_t WORK_WHEEL_SIZE = 512;
static const uint64_t ASSISTANT_WHEEL_SIZE = 64;
static const uint64_t TIMER_RESOLUTION_MS = 1;
static const uint64_t TIMER_RESOLUTION_MS = 2;
static const uint64_t TIMER_MAX_INTERVAL_MS =
WORK_WHEEL_SIZE * ASSISTANT_WHEEL_SIZE;
WORK_WHEEL_SIZE * ASSISTANT_WHEEL_SIZE * TIMER_RESOLUTION_MS;
class TimingWheel {
public:
......@@ -78,7 +78,9 @@ class TimingWheel {
TimerBucket work_wheel_[WORK_WHEEL_SIZE];
TimerBucket assistant_wheel_[ASSISTANT_WHEEL_SIZE];
uint64_t current_work_wheel_index_ = 0;
std::mutex current_work_wheel_index_mutex_;
uint64_t current_assistant_wheel_index_ = 0;
std::mutex current_assistant_wheel_index_mutex_;
std::thread tick_thread_;
DECLARE_SINGLETON(TimingWheel)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册