提交 3714ce6b 编写于 作者: Y youngwolf

Support sync message dispatching.

Fix statistics for batch message dispatching.
Hide all member variables for developers.
上级 631d677c
......@@ -113,11 +113,11 @@ public:
protected:
//msg handling
#ifdef ASCS_DISPATCH_BATCH_MSG
virtual size_t on_msg_handle(out_queue_type& can)
virtual size_t on_msg_handle(out_queue_type& msg_can)
{
//to consume part of messages in can, see echo_server.
//to consume a part of the messages in msg_can, see echo_server.
out_container_type tmp_can;
can.swap(tmp_can);
msg_can.swap(tmp_can); //must be thread safe
ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->handle_msg(msg);});
return tmp_can.size();
......
......@@ -102,7 +102,7 @@ protected:
//msg handling: send the original msg back(echo server)
#ifdef ASCS_DISPATCH_BATCH_MSG
virtual size_t on_msg_handle(out_queue_type& can)
virtual size_t on_msg_handle(out_queue_type& msg_can)
{
if (!is_send_buffer_available())
return 0;
......@@ -111,13 +111,13 @@ protected:
//this manner requires the container used by the message queue can be spliced (such as std::list, but not std::vector,
// ascs doesn't require this characteristic).
//these code can be compiled because we used list as the container of the message queue, see macro ASCS_OUTPUT_CONTAINER for more details
//to consume all of messages in can, see echo_client.
can.lock();
auto begin_iter = std::begin(can);
//to consume all messages in msg_can, see echo_client
msg_can.lock();
auto begin_iter = std::begin(msg_can);
//don't be too greedy, here is in a service thread, we should not block this thread for a long time
auto end_iter = can.size() > 10 ? std::next(begin_iter, 10) : std::end(can);
tmp_can.splice(std::end(tmp_can), can, begin_iter, end_iter);
can.unlock();
auto end_iter = msg_can.size() > 10 ? std::next(begin_iter, 10) : std::end(msg_can);
tmp_can.splice(std::end(tmp_can), msg_can, begin_iter, end_iter);
msg_can.unlock();
ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->send_msg(msg, true);});
return tmp_can.size();
......
......@@ -5,6 +5,7 @@
#define ASCS_SERVER_PORT 9527
#define ASCS_REUSE_OBJECT //use objects pool
#define ASCS_DELAY_CLOSE 5 //define this to avoid hooks for async call (and slightly improve efficiency)
#define ASCS_SYNC_DISPATCH
//#define ASCS_WANT_MSG_SEND_NOTIFY
#define ASCS_MSG_BUFFER_SIZE 65536
#define ASCS_INPUT_QUEUE non_lock_queue //we will never operate sending buffer concurrently, so need no locks
......@@ -51,7 +52,16 @@ protected:
virtual void on_connect() {asio::ip::tcp::no_delay option(true); lowest_layer().set_option(option); client_socket::on_connect();}
//msg handling
virtual bool on_msg_handle(out_msg_type& msg) {handle_msg(msg); return true;}
virtual size_t on_msg(std::list<out_msg_type>& msg_can) //must define macro ASCS_SYNC_DISPATCH
{
//consume all messages, to consume a part of the messages, see on_msg_handle() in demo echo_server
ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->handle_msg(msg);});
auto re = msg_can.size();
msg_can.clear();
return re;
}
//msg handling end
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
virtual void on_msg_send(in_msg_type& msg)
......
......@@ -5,6 +5,7 @@
#define ASCS_SERVER_PORT 9527
#define ASCS_REUSE_OBJECT //use objects pool
#define ASCS_DELAY_CLOSE 5 //define this to avoid hooks for async call (and slightly improve efficiency)
#define ASCS_SYNC_DISPATCH
#define ASCS_MSG_BUFFER_SIZE 65536
#define ASCS_INPUT_QUEUE non_lock_queue
#define ASCS_INPUT_CONTAINER list
......@@ -37,7 +38,15 @@ public:
protected:
//msg handling: send the original msg back(echo server)
virtual bool on_msg_handle(out_msg_type& msg) {return direct_send_msg(std::move(msg));}
virtual size_t on_msg(std::list<out_msg_type>& msg_can) //must define macro ASCS_SYNC_DISPATCH
{
//consume all messages, to consume a part of the messages, see on_msg_handle() in demo echo_server
ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->direct_send_msg(std::move(msg));});
auto re = msg_can.size();
msg_can.clear();
return re;
}
//msg handling end
};
......
......@@ -342,7 +342,7 @@ struct statistic
uint_fast64_t recv_byte_sum; //msgs (in bytes) returned by i_unpacker::parse_msg
stat_duration dispatch_dealy_sum; //from parse_msg(exclude msg unpacking) to on_msg_handle
stat_duration recv_idle_sum; //during this duration, socket suspended msg reception (receiving buffer overflow)
stat_duration handle_time_sum; //on_msg_handle consumed time, this indicate the efficiency of msg handling
stat_duration handle_time_sum; //on_msg_handle (and on_msg) consumed time, this indicate the efficiency of msg handling
stat_duration unpack_time_sum; //udp::socket_base will not gather this item
time_t last_send_time; //include heartbeat
......
......@@ -382,7 +382,7 @@
* 2018.8.21 version 1.3.2
*
* SPECIAL ATTENTION (incompatible with old editions):
* If macro ASCS_PASSIVE_RECV been defined, you may receive empty messages in on_msg_handle() and sync_recv_msg(), this makes you always having
* If macro ASCS_PASSIVE_RECV been defined, you may receive empty messages in on_msg() or on_msg_handle() and sync_recv_msg(), this makes you always having
* the chance to call recv_msg().
* i_unpacker has been moved from namespace ascs::tcp and ascs::udp to namespace ascs, and the signature of ascs::udp::i_unpacker::parse_msg
* has been changed to obey ascs::tcp::i_unpacker::parse_msg.
......@@ -408,6 +408,27 @@
*
* REPLACEMENTS:
*
* ===============================================================
* 2018.9.x version 1.3.3
*
* SPECIAL ATTENTION (incompatible with old editions):
*
* HIGHLIGHT:
* Support sync message dispatching, it's like previous on_msg() callback but with a message container instead of a message, and we also name it on_msg(),
* you need to defne macro ASCS_SYNC_DISPATCH to open this feature.
*
* FIX:
* Fix statistics for batch message dispatching.
*
* ENHANCEMENTS:
*
* DELETION:
*
* REFACTORING:
* Hide all member variables for developers.
*
* REPLACEMENTS:
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -417,8 +438,8 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10302 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.3.2"
#define ASCS_VER 10303 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.3.3"
//asio and compiler check
#ifdef _MSC_VER
......@@ -554,7 +575,7 @@ static_assert(ASCS_MAX_OBJECT_NUM > 0, "object capacity must be bigger than zero
#endif
//IO thread number
//listening, msg sending and receiving, msg handling (on_msg_handle()), all timers (include user timers) and other asynchronous calls (from executor)
//listening, msg sending and receiving, msg handling (on_msg() and on_msg_handle()), all timers (include user timers) and other asynchronous calls (from executor)
//keep big enough, no empirical value I can suggest, you must try to find it out in your own environment
#ifndef ASCS_SERVICE_THREAD_NUM
#define ASCS_SERVICE_THREAD_NUM 8
......@@ -690,7 +711,7 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus
//#define ASCS_DISPATCH_BATCH_MSG
//all messages will be dispatched via on_handle_msg with a variable-length container, this will change the signature of function on_msg_handle,
//it's very useful if you want to re-dispatch message in your own logic or with very simple message handling (such as echo server).
//it's your responsibility to remove handled messages from the container (can be part of them).
//it's your responsibility to remove handled messages from the container (can be a part of them).
//#define ASCS_ALIGNED_TIMER
//for example, start a timer at xx:xx:xx, interval is 10 seconds, the callback will be called at (xx:xx:xx + 10), and suppose that the callback
......@@ -711,9 +732,20 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus
// we must avoid to do sync message sending and receiving in service threads.
// if prior sync_recv_msg() not returned, the second sync_recv_msg() will return false immediately.
// with macro ASCS_PASSIVE_RECV, in sync_recv_msg(), recv_msg() will be automatically called.
// after returned from sync_recv_msg(), ascs will not maintain those messages that have been output.
//Sync message sending and receiving are not tracked by tracked_executor, please note.
//No matter you're doing sync message sending or async message sending, you can do sync message receiving or async message receiving concurrently.
//#define ASCS_SYNC_DISPATCH
//with this macro, virtual size_t on_msg(std::list<OutMsgType>& msg_can) will be provided, you can rewrite it and handle all or a part of the
// messages like virtual function on_msg_handle (with macro ASCS_DISPATCH_BATCH_MSG), if your logic is simple enough (like echo or pingpong test),
// this feature is recommended because it can slightly improve efficiency.
//now we have three ways to handle messages (sync_recv_msg, on_msg and on_msg_handle), the reponse order is the same as listed, if messages been successfully
// dispatched to sync_recv_msg, then the second two will do nothing, otherwise messages will be dispatched to on_msg, if on_msg only handled a part of (include
// zero) the messages, then on_msg_handle will continue to dispatch the rest of them.
//as before, on_msg will block the next receiving but only on current socket.
//configurations
#endif /* _ASCS_CONFIG_H_ */
......@@ -300,7 +300,7 @@ public:
template<typename _Predicate> void do_something_to_one(const _Predicate& __pred)
{std::lock_guard<std::mutex> lock(object_can_mutex); for (auto iter = std::begin(object_can); iter != std::end(object_can); ++iter) if (__pred(iter->second)) break;}
protected:
private:
std::atomic_uint_fast64_t cur_id;
container_type object_can;
......
......@@ -281,7 +281,7 @@ private:
service_can.emplace_back(i_service_);
}
protected:
private:
bool started;
container_type service_can;
std::mutex service_can_mutex;
......
......@@ -269,18 +269,32 @@ protected:
virtual void on_close() {unified_out::info_out("on_close()");}
virtual void after_close() {} //a good case for using this is to reconnect to the server, please refer to client_socket_base.
//return true (or > 0) means msg been handled, false (or 0) means msg cannot be handled right now, and socket will re-dispatch it asynchronously
#ifdef ASCS_SYNC_DISPATCH
//return the number of handled msg, if some msg left behind, socket will re-dispatch them asynchronously
//notice: using inconstant is for the convenience of swapping
virtual size_t on_msg(std::list<OutMsgType>& msg_can)
{
//it's always thread safe in this virtual function, because it blocks message receiving
ascs::do_something_to_all(msg_can, [](OutMsgType& msg) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data());});
auto re = msg_can.size();
msg_can.clear(); //have handled all messages
return re;
}
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
virtual size_t on_msg_handle(out_queue_type& can)
//return the number of handled msg, if some msg left behind, socket will re-dispatch them asynchronously
//notice: using inconstant is for the convenience of swapping
virtual size_t on_msg_handle(out_queue_type& msg_can)
{
out_container_type tmp_can;
can.swap(tmp_can);
msg_can.swap(tmp_can); //must be thread safe
ascs::do_something_to_all(tmp_can, [](OutMsgType& msg) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data());});
return tmp_can.size();
}
#else
//return true means msg been handled, false means msg cannot be handled right now, and socket will re-dispatch it asynchronously
virtual bool on_msg_handle(OutMsgType& msg) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data()); return true;}
#endif
......@@ -350,6 +364,17 @@ protected:
#endif
auto msg_num = temp_msg_can.size();
stat.recv_msg_sum += msg_num;
#ifdef ASCS_SYNC_DISPATCH
#ifdef ASCS_PASSIVE_RECV
on_msg(temp_msg_can);
if (temp_msg_can.empty())
return handled_msg();
#else
if (msg_num > 0)
on_msg(temp_msg_can);
#endif
#endif
#ifdef ASCS_PASSIVE_RECV
if (0 == msg_num)
{
......@@ -468,14 +493,22 @@ private:
if ((dispatching = !recv_msg_buffer.empty()))
{
auto begin_time = statistic::now();
stat.dispatch_dealy_sum += begin_time - recv_msg_buffer.front().begin_time;
#ifdef ASCS_FULL_STATISTIC
recv_msg_buffer.lock();
ascs::do_something_to_all(recv_msg_buffer, [&, this](out_msg& msg) {this->stat.dispatch_dealy_sum += begin_time - msg.begin_time;});
recv_msg_buffer.unlock();
#endif
auto re = on_msg_handle(recv_msg_buffer);
auto end_time = statistic::now();
stat.handle_time_sum += end_time - begin_time;
if (0 == re) //dispatch failed, re-dispatch
{
recv_msg_buffer.front().restart(end_time);
#ifdef ASCS_FULL_STATISTIC
recv_msg_buffer.lock();
ascs::do_something_to_all(recv_msg_buffer, [&end_time](out_msg& msg) {msg.restart(end_time);});
recv_msg_buffer.unlock();
#endif
set_timer(TIMER_DISPATCH_MSG, msg_handling_interval_, [this](tid id)->bool {return this->timer_handler(TIMER_DISPATCH_MSG);}); //hold dispatching
}
else
......
......@@ -60,7 +60,7 @@ public:
void disconnect(bool reconnect = false) {force_shutdown(reconnect);}
void force_shutdown(bool reconnect = false)
{
if (super::link_status::FORCE_SHUTTING_DOWN != this->status)
if (super::link_status::FORCE_SHUTTING_DOWN != status)
this->show_info("client link:", "been shut down.");
need_reconnect = reconnect;
......@@ -108,7 +108,7 @@ protected:
this->show_info("client link:", "broken/been shut down", ec);
force_shutdown(this->is_shutting_down() ? need_reconnect : prepare_reconnect(ec) >= 0);
this->status = super::link_status::BROKEN;
status = super::link_status::BROKEN;
}
virtual void on_async_shutdown_error() {force_shutdown(need_reconnect);}
......@@ -145,7 +145,9 @@ protected:
return false;
}
protected:
private:
using super::status;
asio::ip::tcp::endpoint server_addr;
bool need_reconnect;
};
......
......@@ -180,7 +180,7 @@ protected:
start_next_accept();
}
protected:
private:
asio::ip::tcp::endpoint server_addr;
asio::ip::tcp::acceptor acceptor;
};
......
......@@ -40,7 +40,7 @@ public:
void disconnect() {force_shutdown();}
void force_shutdown()
{
if (super::link_status::FORCE_SHUTTING_DOWN != this->status)
if (super::link_status::FORCE_SHUTTING_DOWN != status)
this->show_info("server link:", "been shut down.");
super::force_shutdown();
......@@ -73,7 +73,7 @@ protected:
#ifdef ASCS_CLEAR_OBJECT_INTERVAL
force_shutdown();
#else
this->status = super::link_status::BROKEN;
status = super::link_status::BROKEN;
server.del_socket(this->shared_from_this());
#endif
}
......@@ -82,6 +82,8 @@ protected:
virtual bool on_heartbeat_error() {this->show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;}
private:
using super::status;
Server& server;
};
......
......@@ -51,7 +51,7 @@ public:
virtual void send_heartbeat()
{
auto_duration dur(this->stat.pack_time_sum);
auto msg = this->packer_->pack_heartbeat();
auto msg = packer_->pack_heartbeat();
dur.end();
this->do_direct_send_msg(std::move(msg));
}
......@@ -117,7 +117,7 @@ public:
#ifdef ASCS_PASSIVE_RECV
//changing unpacker must before calling ascs::socket::recv_msg, and define ASCS_PASSIVE_RECV macro.
void unpacker(const std::shared_ptr<i_unpacker<out_msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
virtual void recv_msg() {if (!this->reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
virtual void recv_msg() {if (!reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
///////////////////////////////////////////////////
......@@ -173,9 +173,9 @@ protected:
virtual bool do_start()
{
status = link_status::CONNECTED;
this->stat.establish_time = time(nullptr);
stat.establish_time = time(nullptr);
on_connect(); //in this virtual function, this->stat.last_recv_time has not been updated (super::do_start will update it), please note
on_connect(); //in this virtual function, stat.last_recv_time has not been updated (super::do_start will update it), please note
return super::do_start();
}
......@@ -200,14 +200,14 @@ private:
size_t completion_checker(const asio::error_code& ec, size_t bytes_transferred)
{
auto_duration dur(this->stat.unpack_time_sum);
auto_duration dur(stat.unpack_time_sum);
return this->unpacker_->completion_condition(ec, bytes_transferred);
}
void do_recv_msg()
{
#ifdef ASCS_PASSIVE_RECV
if (this->reading)
if (reading)
return;
#endif
auto recv_buff = unpacker_->prepare_next_recv();
......@@ -217,7 +217,7 @@ private:
else
{
#ifdef ASCS_PASSIVE_RECV
this->reading = true;
reading = true;
#endif
asio::async_read(this->next_layer(), recv_buff,
[this](const asio::error_code& ec, size_t bytes_transferred)->size_t {return this->completion_checker(ec, bytes_transferred);}, make_strand_handler(strand,
......@@ -229,17 +229,17 @@ private:
{
if (!ec && bytes_transferred > 0)
{
this->stat.last_recv_time = time(nullptr);
stat.last_recv_time = time(nullptr);
auto_duration dur(this->stat.unpack_time_sum);
auto unpack_ok = unpacker_->parse_msg(bytes_transferred, this->temp_msg_can);
auto_duration dur(stat.unpack_time_sum);
auto unpack_ok = unpacker_->parse_msg(bytes_transferred, temp_msg_can);
dur.end();
if (!unpack_ok)
on_unpack_error(); //the user will decide whether to reset the unpacker or not in this callback
#ifdef ASCS_PASSIVE_RECV
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
if (this->handle_msg()) //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false
do_recv_msg(); //receive msg in sequence
......@@ -247,7 +247,7 @@ private:
else
{
#ifdef ASCS_PASSIVE_RECV
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
if (ec)
this->on_recv_error(ec);
......@@ -258,7 +258,7 @@ private:
bool do_send_msg(bool in_strand)
{
if (!in_strand && this->sending)
if (!in_strand && sending)
return true;
std::list<asio::const_buffer> bufs;
......@@ -272,10 +272,10 @@ private:
typename super::in_msg msg;
auto end_time = statistic::now();
typename super::in_queue_type::lock_guard lock(this->send_msg_buffer);
while (this->send_msg_buffer.try_dequeue_(msg))
typename super::in_queue_type::lock_guard lock(send_msg_buffer);
while (send_msg_buffer.try_dequeue_(msg))
{
this->stat.send_delay_sum += end_time - msg.begin_time;
stat.send_delay_sum += end_time - msg.begin_time;
size += msg.size();
last_send_msg.emplace_back(std::move(msg));
bufs.emplace_back(last_send_msg.back().data(), last_send_msg.back().size());
......@@ -284,7 +284,7 @@ private:
}
}
if ((this->sending = !bufs.empty()))
if ((sending = !bufs.empty()))
{
last_send_msg.front().restart();
asio::async_write(this->next_layer(), bufs, make_strand_handler(strand,
......@@ -299,11 +299,11 @@ private:
{
if (!ec)
{
this->stat.last_send_time = time(nullptr);
stat.last_send_time = time(nullptr);
this->stat.send_byte_sum += bytes_transferred;
this->stat.send_time_sum += statistic::now() - last_send_msg.front().begin_time;
this->stat.send_msg_sum += last_send_msg.size();
stat.send_byte_sum += bytes_transferred;
stat.send_time_sum += statistic::now() - last_send_msg.front().begin_time;
stat.send_msg_sum += last_send_msg.size();
#ifdef ASCS_SYNC_SEND
ascs::do_something_to_all(last_send_msg, [](typename super::in_msg& item) {if (item.cv) item.cv->notify_one();});
#endif
......@@ -311,11 +311,11 @@ private:
this->on_msg_send(last_send_msg.front());
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
if (this->send_msg_buffer.empty())
if (send_msg_buffer.empty())
this->on_all_msg_send(last_send_msg.back());
#endif
last_send_msg.clear();
if (!do_send_msg(true) && !this->send_msg_buffer.empty()) //send msg in sequence
if (!do_send_msg(true) && !send_msg_buffer.empty()) //send msg in sequence
do_send_msg(true); //just make sure no pending msgs
}
else
......@@ -323,7 +323,7 @@ private:
this->on_send_error(ec);
last_send_msg.clear(); //clear sending messages after on_send_error, then user can decide how to deal with them in on_send_error
this->sending = false;
sending = false;
}
}
......@@ -348,12 +348,22 @@ private:
}
protected:
list<typename super::in_msg> last_send_msg;
std::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
volatile link_status status;
private:
using super::stat;
using super::packer_;
using super::temp_msg_can;
using super::send_msg_buffer;
using super::sending;
#ifdef ASCS_PASSIVE_RECV
using super::reading;
#endif
std::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
list<typename super::in_msg> last_send_msg;
asio::io_context::strand strand;
};
......
......@@ -155,7 +155,7 @@ public:
typename object_pool::object_type create_object() {return create_object(this->get_service_pump());}
template<typename Arg> typename object_pool::object_type create_object(Arg& arg) {return super::create_object(arg, ctx);}
protected:
private:
asio::ssl::context ctx;
};
......
......@@ -38,7 +38,7 @@ public:
virtual bool is_ready() {return this->lowest_layer().is_open();}
virtual void send_heartbeat()
{
in_msg_type msg(peer_addr, this->packer_->pack_heartbeat());
in_msg_type msg(peer_addr, packer_->pack_heartbeat());
this->do_direct_send_msg(std::move(msg));
}
......@@ -98,7 +98,7 @@ public:
#ifdef ASCS_PASSIVE_RECV
//changing unpacker must before calling ascs::socket::recv_msg, and define ASCS_PASSIVE_RECV macro.
void unpacker(const std::shared_ptr<i_unpacker<typename Unpacker::msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
virtual void recv_msg() {if (!this->reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
virtual void recv_msg() {if (!reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
///////////////////////////////////////////////////
......@@ -130,7 +130,7 @@ protected:
virtual bool on_heartbeat_error()
{
this->stat.last_recv_time = time(nullptr); //avoid repetitive warnings
stat.last_recv_time = time(nullptr); //avoid repetitive warnings
unified_out::warning_out("%s:%hu is not available", peer_addr.address().to_string().data(), peer_addr.port());
return true;
}
......@@ -157,7 +157,7 @@ private:
void do_recv_msg()
{
#ifdef ASCS_PASSIVE_RECV
if (this->reading)
if (reading)
return;
#endif
auto recv_buff = unpacker_->prepare_next_recv();
......@@ -167,7 +167,7 @@ private:
else
{
#ifdef ASCS_PASSIVE_RECV
this->reading = true;
reading = true;
#endif
this->next_layer().async_receive_from(recv_buff, temp_addr, make_strand_handler(strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
......@@ -178,22 +178,22 @@ private:
{
if (!ec && bytes_transferred > 0)
{
this->stat.last_recv_time = time(nullptr);
stat.last_recv_time = time(nullptr);
typename Unpacker::container_type msg_can;
unpacker_->parse_msg(bytes_transferred, msg_can);
#ifdef ASCS_PASSIVE_RECV
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
ascs::do_something_to_all(msg_can, [this](typename Unpacker::msg_type& msg) {this->temp_msg_can.emplace_back(this->temp_addr, std::move(msg));});
ascs::do_something_to_all(msg_can, [this](typename Unpacker::msg_type& msg) {temp_msg_can.emplace_back(this->temp_addr, std::move(msg));});
if (this->handle_msg()) //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false
do_recv_msg(); //receive msg in sequence
}
else
{
#ifdef ASCS_PASSIVE_RECV
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
#if defined(_MSC_VER) || defined(__CYGWIN__) || defined(__MINGW32__) || defined(__MINGW64__)
if (ec && asio::error::connection_refused != ec && asio::error::connection_reset != ec)
......@@ -208,12 +208,12 @@ private:
bool do_send_msg(bool in_strand)
{
if (!in_strand && this->sending)
if (!in_strand && sending)
return true;
if ((this->sending = this->send_msg_buffer.try_dequeue(last_send_msg)))
if ((sending = send_msg_buffer.try_dequeue(last_send_msg)))
{
this->stat.send_delay_sum += statistic::now() - last_send_msg.begin_time;
stat.send_delay_sum += statistic::now() - last_send_msg.begin_time;
last_send_msg.restart();
this->next_layer().async_send_to(asio::buffer(last_send_msg.data(), last_send_msg.size()), last_send_msg.peer_addr, make_strand_handler(strand,
......@@ -228,11 +228,11 @@ private:
{
if (!ec)
{
this->stat.last_send_time = time(nullptr);
stat.last_send_time = time(nullptr);
this->stat.send_byte_sum += bytes_transferred;
this->stat.send_time_sum += statistic::now() - last_send_msg.begin_time;
++this->stat.send_msg_sum;
stat.send_byte_sum += bytes_transferred;
stat.send_time_sum += statistic::now() - last_send_msg.begin_time;
++stat.send_msg_sum;
#ifdef ASCS_SYNC_SEND
if (last_send_msg.cv)
last_send_msg.cv->notify_one();
......@@ -241,7 +241,7 @@ private:
this->on_msg_send(last_send_msg);
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
if (this->send_msg_buffer.empty())
if (send_msg_buffer.empty())
this->on_all_msg_send(last_send_msg);
#endif
}
......@@ -255,7 +255,7 @@ private:
//send msg in sequence
//on windows, sending a msg to addr_any may cause errors, please note
//for UDP, sending error will not stop subsequent sendings.
if (!do_send_msg(true) && !this->send_msg_buffer.empty())
if (!do_send_msg(true) && !send_msg_buffer.empty())
do_send_msg(true); //just make sure no pending msgs
}
......@@ -280,14 +280,24 @@ private:
return true;
}
protected:
private:
using super::stat;
using super::packer_;
using super::temp_msg_can;
using super::send_msg_buffer;
using super::sending;
#ifdef ASCS_PASSIVE_RECV
using super::reading;
#endif
typename super::in_msg last_send_msg;
std::shared_ptr<i_unpacker<typename Unpacker::msg_type>> unpacker_;
asio::ip::udp::endpoint local_addr;
asio::ip::udp::endpoint temp_addr; //used when receiving messages
asio::ip::udp::endpoint peer_addr;
private:
asio::io_context::strand strand;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册