提交 58fa0485 编写于 作者: Y youngowlf

Continue last committing.

上级 6c769d77
......@@ -119,7 +119,7 @@ public:
boost::uint_fast64_t recv_byte_sum; //include msgs in receiving buffer
stat_duration dispatch_dealy_sum; //from parse_msg(exclude msg unpacking) to on_msg_handle
stat_duration recv_idle_sum;
//during this duration, st_socket suspended msg reception because of full receiving buffer, posting msgs or invoke on_msg
//during this duration, st_socket suspended msg reception because of full receiving buffer or posting msgs
#ifndef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
stat_duration handle_time_1_sum; //on_msg consumed time, this indicate the efficiency of msg handling
#endif
......@@ -151,9 +151,8 @@ protected:
static const unsigned char TIMER_BEGIN = st_timer::TIMER_END;
static const unsigned char TIMER_DISPATCH_MSG = TIMER_BEGIN;
static const unsigned char TIMER_SUSPEND_DISPATCH_MSG = TIMER_BEGIN + 1;
static const unsigned char TIMER_HANDLE_POST_BUFFER = TIMER_BEGIN + 2;
static const unsigned char TIMER_RE_DISPATCH_MSG = TIMER_BEGIN + 3;
static const unsigned char TIMER_HANDLE_POST_BUFFER = TIMER_BEGIN + 1;
static const unsigned char TIMER_RE_DISPATCH_MSG = TIMER_BEGIN + 2;
static const unsigned char TIMER_END = TIMER_BEGIN + 10;
st_socket(boost::asio::io_service& io_service_) : st_timer(io_service_), _id(-1), next_layer_(io_service_), packer_(boost::make_shared<Packer>()), started_(false) {reset_state();}
......@@ -241,7 +240,6 @@ public:
void suspend_dispatch_msg(bool suspend)
{
suspend_dispatch_msg_ = suspend;
stop_timer(TIMER_SUSPEND_DISPATCH_MSG);
do_dispatch_msg(true);
}
bool suspend_dispatch_msg() const {return suspend_dispatch_msg_;}
......@@ -350,24 +348,27 @@ protected:
if (!temp_msg_buffer.empty())
{
#ifndef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
out_container_type temp_2_msg_buffer;
BOOST_AUTO(begin_time, statistic::local_time());
for (BOOST_AUTO(iter, temp_msg_buffer.begin()); !suspend_dispatch_msg_ && !posting && iter != temp_msg_buffer.end();)
if (on_msg(*iter))
temp_msg_buffer.erase(iter++);
else
temp_2_msg_buffer.splice(temp_2_msg_buffer.end(), temp_msg_buffer, iter++);
BOOST_AUTO(time_duration, statistic::local_time() - begin_time);
stat.handle_time_1_sum += time_duration;
stat.recv_idle_sum += time_duration;
if (!temp_2_msg_buffer.empty())
if (!suspend_dispatch_msg_ && !posting)
{
boost::unique_lock<boost::shared_mutex> lock(recv_msg_buffer_mutex);
if (splice_helper(recv_msg_buffer, temp_2_msg_buffer))
do_dispatch_msg(false);
out_container_type temp_2_msg_buffer;
BOOST_AUTO(begin_time, statistic::local_time());
for (BOOST_AUTO(iter, temp_msg_buffer.begin()); iter != temp_msg_buffer.end();)
if (on_msg(*iter))
temp_msg_buffer.erase(iter++);
else
temp_2_msg_buffer.splice(temp_2_msg_buffer.end(), temp_msg_buffer, iter++);
BOOST_AUTO(time_duration, statistic::local_time() - begin_time);
stat.handle_time_1_sum += time_duration;
stat.recv_idle_sum += time_duration;
if (!temp_2_msg_buffer.empty())
{
boost::unique_lock<boost::shared_mutex> lock(recv_msg_buffer_mutex);
if (splice_helper(recv_msg_buffer, temp_2_msg_buffer))
do_dispatch_msg(false);
}
temp_msg_buffer.splice(temp_msg_buffer.begin(), temp_2_msg_buffer);
}
temp_msg_buffer.splice(temp_msg_buffer.begin(), temp_2_msg_buffer);
#else
boost::unique_lock<boost::shared_mutex> lock(recv_msg_buffer_mutex);
if (splice_helper(recv_msg_buffer, temp_msg_buffer))
......@@ -386,44 +387,44 @@ protected:
void do_dispatch_msg(bool need_lock)
{
if (suspend_dispatch_msg_ || posting)
return;
boost::unique_lock<boost::shared_mutex> lock(recv_msg_buffer_mutex, boost::defer_lock);
if (need_lock) lock.lock();
if (suspend_dispatch_msg_)
{
if (!dispatching && !recv_msg_buffer.empty())
set_timer(TIMER_SUSPEND_DISPATCH_MSG, 24 * 60 * 60 * 1000, boost::bind(&st_socket::timer_handler, this, _1)); //one day
}
else if (!posting)
bool dispatch_all = false;
if (stopped())
dispatch_all = !(dispatching = false);
else if (!dispatching)
{
bool dispatch_all = false;
if (stopped())
dispatch_all = !(dispatching = false);
else if (!dispatching)
if (!started())
dispatch_all = true;
else if (!last_dispatch_msg.empty())
{
if (!started())
dispatch_all = true;
else if (!recv_msg_buffer.empty())
{
dispatching = true;
if (last_dispatch_msg.empty())
{
last_dispatch_msg.restart(recv_msg_buffer.front().begin_time);
last_dispatch_msg.swap(recv_msg_buffer.front());
recv_msg_buffer.pop_front();
}
post(boost::bind(&st_socket::msg_handler, this));
}
dispatching = true;
post(boost::bind(&st_socket::msg_handler, this));
}
if (dispatch_all)
else if (!recv_msg_buffer.empty())
{
last_dispatch_msg.restart(recv_msg_buffer.front().begin_time);
last_dispatch_msg.swap(recv_msg_buffer.front());
recv_msg_buffer.pop_front();
dispatching = true;
post(boost::bind(&st_socket::msg_handler, this));
}
}
if (dispatch_all)
{
#ifndef ST_ASIO_DISCARD_MSG_WHEN_LINK_DOWN
st_asio_wrapper::do_something_to_all(recv_msg_buffer, boost::bind(&st_socket::on_msg_handle, this, _1, true));
if (!last_dispatch_msg.empty())
on_msg_handle(last_dispatch_msg, true);
st_asio_wrapper::do_something_to_all(recv_msg_buffer, boost::bind(&st_socket::on_msg_handle, this, _1, true));
#endif
recv_msg_buffer.clear();
}
last_dispatch_msg.clear();
recv_msg_buffer.clear();
}
}
......@@ -466,9 +467,6 @@ private:
stat.recv_idle_sum += statistic::local_time() - recv_idle_begin_time;
dispatch_msg();
break;
case TIMER_SUSPEND_DISPATCH_MSG: //suspend dispatching msgs
do_dispatch_msg(true);
break;
case TIMER_HANDLE_POST_BUFFER:
{
boost::unique_lock<boost::shared_mutex> lock(post_msg_buffer_mutex);
......
......@@ -119,7 +119,7 @@ public:
uint_fast64_t recv_byte_sum; //include msgs in receiving buffer
stat_duration dispatch_dealy_sum; //from parse_msg(exclude msg unpacking) to on_msg_handle
stat_duration recv_idle_sum;
//during this duration, st_socket suspended msg reception because of full receiving buffer, posting msgs or invoke on_msg
//during this duration, st_socket suspended msg reception because of full receiving buffer or posting msgs
#ifndef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
stat_duration handle_time_1_sum; //on_msg consumed time, this indicate the efficiency of msg handling
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册