提交 6c769d77 编写于 作者: Y youngwolf 提交者: youngowlf

Fix bug: the last msg will not be re-dispatched if on_msg_handle returned...

Fix bug: the last msg will not be re-dispatched if on_msg_handle returned false until new msgs come in.
Drop timer TIMER_SUSPEND_DISPATCH_MSG, it's useless.
上级 f645a52a
......@@ -151,9 +151,8 @@ protected:
static const unsigned char TIMER_BEGIN = st_timer::TIMER_END;
static const unsigned char TIMER_DISPATCH_MSG = TIMER_BEGIN;
static const unsigned char TIMER_SUSPEND_DISPATCH_MSG = TIMER_BEGIN + 1;
static const unsigned char TIMER_HANDLE_POST_BUFFER = TIMER_BEGIN + 2;
static const unsigned char TIMER_RE_DISPATCH_MSG = TIMER_BEGIN + 3;
static const unsigned char TIMER_HANDLE_POST_BUFFER = TIMER_BEGIN + 1;
static const unsigned char TIMER_RE_DISPATCH_MSG = TIMER_BEGIN + 2;
static const unsigned char TIMER_END = TIMER_BEGIN + 10;
st_socket(boost::asio::io_service& io_service_) : st_timer(io_service_), _id(-1), next_layer_(io_service_), packer_(boost::make_shared<Packer>()), started_(false) {reset_state();}
......@@ -241,7 +240,6 @@ public:
void suspend_dispatch_msg(bool suspend)
{
suspend_dispatch_msg_ = suspend;
stop_timer(TIMER_SUSPEND_DISPATCH_MSG);
do_dispatch_msg(true);
}
bool suspend_dispatch_msg() const {return suspend_dispatch_msg_;}
......@@ -348,24 +346,27 @@ protected:
if (!temp_msg_buffer.empty())
{
#ifndef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
out_container_type temp_2_msg_buffer;
auto begin_time = statistic::local_time();
for (auto iter = std::begin(temp_msg_buffer); !suspend_dispatch_msg_ && !posting && iter != std::end(temp_msg_buffer);)
if (on_msg(*iter))
temp_msg_buffer.erase(iter++);
else
temp_2_msg_buffer.splice(std::end(temp_2_msg_buffer), temp_msg_buffer, iter++);
auto time_duration = statistic::local_time() - begin_time;
stat.handle_time_1_sum += time_duration;
stat.recv_idle_sum += time_duration;
if (!temp_2_msg_buffer.empty())
if (!suspend_dispatch_msg_ && !posting)
{
boost::unique_lock<boost::shared_mutex> lock(recv_msg_buffer_mutex);
if (splice_helper(recv_msg_buffer, temp_2_msg_buffer))
do_dispatch_msg(false);
out_container_type temp_2_msg_buffer;
auto begin_time = statistic::local_time();
for (auto iter = std::begin(temp_msg_buffer); iter != std::end(temp_msg_buffer);)
if (on_msg(*iter))
temp_msg_buffer.erase(iter++);
else
temp_2_msg_buffer.splice(std::end(temp_2_msg_buffer), temp_msg_buffer, iter++);
auto time_duration = statistic::local_time() - begin_time;
stat.handle_time_1_sum += time_duration;
stat.recv_idle_sum += time_duration;
if (!temp_2_msg_buffer.empty())
{
boost::unique_lock<boost::shared_mutex> lock(recv_msg_buffer_mutex);
if (splice_helper(recv_msg_buffer, temp_2_msg_buffer))
do_dispatch_msg(false);
}
temp_msg_buffer.splice(std::begin(temp_msg_buffer), temp_2_msg_buffer);
}
temp_msg_buffer.splice(std::begin(temp_msg_buffer), temp_2_msg_buffer);
#else
boost::unique_lock<boost::shared_mutex> lock(recv_msg_buffer_mutex);
if (splice_helper(recv_msg_buffer, temp_msg_buffer))
......@@ -384,44 +385,44 @@ protected:
void do_dispatch_msg(bool need_lock)
{
if (suspend_dispatch_msg_ || posting)
return;
boost::unique_lock<boost::shared_mutex> lock(recv_msg_buffer_mutex, boost::defer_lock);
if (need_lock) lock.lock();
if (suspend_dispatch_msg_)
{
if (!dispatching && !recv_msg_buffer.empty())
set_timer(TIMER_SUSPEND_DISPATCH_MSG, 24 * 60 * 60 * 1000, [this](unsigned char id)->bool {return ST_THIS timer_handler(id);}); //one day
}
else if (!posting)
auto dispatch_all = false;
if (stopped())
dispatch_all = !(dispatching = false);
else if (!dispatching)
{
auto dispatch_all = false;
if (stopped())
dispatch_all = !(dispatching = false);
else if (!dispatching)
if (!started())
dispatch_all = true;
else if (!last_dispatch_msg.empty())
{
if (!started())
dispatch_all = true;
else if (!recv_msg_buffer.empty())
{
dispatching = true;
if (last_dispatch_msg.empty())
{
last_dispatch_msg.restart(recv_msg_buffer.front().begin_time);
last_dispatch_msg.swap(recv_msg_buffer.front());
recv_msg_buffer.pop_front();
}
post([this]() {ST_THIS msg_handler();});
}
dispatching = true;
post([this]() {ST_THIS msg_handler();});
}
if (dispatch_all)
else if (!recv_msg_buffer.empty())
{
last_dispatch_msg.restart(recv_msg_buffer.front().begin_time);
last_dispatch_msg.swap(recv_msg_buffer.front());
recv_msg_buffer.pop_front();
dispatching = true;
post([this]() {ST_THIS msg_handler();});
}
}
if (dispatch_all)
{
#ifndef ST_ASIO_DISCARD_MSG_WHEN_LINK_DOWN
st_asio_wrapper::do_something_to_all(recv_msg_buffer, [this](out_msg& msg) {ST_THIS on_msg_handle(msg, true);});
if (!last_dispatch_msg.empty())
on_msg_handle(last_dispatch_msg, true);
st_asio_wrapper::do_something_to_all(recv_msg_buffer, [this](out_msg& msg) {ST_THIS on_msg_handle(msg, true);});
#endif
recv_msg_buffer.clear();
}
last_dispatch_msg.clear();
recv_msg_buffer.clear();
}
}
......@@ -464,9 +465,6 @@ private:
stat.recv_idle_sum += statistic::local_time() - recv_idle_begin_time;
dispatch_msg();
break;
case TIMER_SUSPEND_DISPATCH_MSG: //suspend dispatching msgs
do_dispatch_msg(true);
break;
case TIMER_HANDLE_POST_BUFFER:
{
boost::unique_lock<boost::shared_mutex> lock(post_msg_buffer_mutex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册