diff --git a/examples/echo_client/echo_client.cpp b/examples/echo_client/echo_client.cpp index ea054147a3bab2e4f44e1240607e38a237db6411..3d66d80ba39d4310d38a2513db29379f5b11e241 100644 --- a/examples/echo_client/echo_client.cpp +++ b/examples/echo_client/echo_client.cpp @@ -113,11 +113,11 @@ public: protected: //msg handling #ifdef ASCS_DISPATCH_BATCH_MSG - virtual size_t on_msg_handle(out_queue_type& can) + virtual size_t on_msg_handle(out_queue_type& msg_can) { - //to consume part of messages in can, see echo_server. + //to consume a part of the messages in msg_can, see echo_server. out_container_type tmp_can; - can.swap(tmp_can); + msg_can.swap(tmp_can); //must be thread safe ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->handle_msg(msg);}); return tmp_can.size(); diff --git a/examples/echo_server/echo_server.cpp b/examples/echo_server/echo_server.cpp index 4adad39a65087917f040fa7e8f1efba25bacc766..88b2b3e418ca99a1732198d12e6b9b998069c59b 100644 --- a/examples/echo_server/echo_server.cpp +++ b/examples/echo_server/echo_server.cpp @@ -102,7 +102,7 @@ protected: //msg handling: send the original msg back(echo server) #ifdef ASCS_DISPATCH_BATCH_MSG - virtual size_t on_msg_handle(out_queue_type& can) + virtual size_t on_msg_handle(out_queue_type& msg_can) { if (!is_send_buffer_available()) return 0; @@ -111,13 +111,13 @@ protected: //this manner requires the container used by the message queue can be spliced (such as std::list, but not std::vector, // ascs doesn't require this characteristic). //these code can be compiled because we used list as the container of the message queue, see macro ASCS_OUTPUT_CONTAINER for more details - //to consume all of messages in can, see echo_client. - can.lock(); - auto begin_iter = std::begin(can); + //to consume all messages in msg_can, see echo_client + msg_can.lock(); + auto begin_iter = std::begin(msg_can); //don't be too greedy, here is in a service thread, we should not block this thread for a long time - auto end_iter = can.size() > 10 ? std::next(begin_iter, 10) : std::end(can); - tmp_can.splice(std::end(tmp_can), can, begin_iter, end_iter); - can.unlock(); + auto end_iter = msg_can.size() > 10 ? std::next(begin_iter, 10) : std::end(msg_can); + tmp_can.splice(std::end(tmp_can), msg_can, begin_iter, end_iter); + msg_can.unlock(); ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->send_msg(msg, true);}); return tmp_can.size(); diff --git a/examples/pingpong_client/pingpong_client.cpp b/examples/pingpong_client/pingpong_client.cpp index d614f1b52f522f143f69acf387d3457ff67df450..3f6ed13a52f92468f5ce281a4e942b1114603e07 100644 --- a/examples/pingpong_client/pingpong_client.cpp +++ b/examples/pingpong_client/pingpong_client.cpp @@ -5,6 +5,7 @@ #define ASCS_SERVER_PORT 9527 #define ASCS_REUSE_OBJECT //use objects pool #define ASCS_DELAY_CLOSE 5 //define this to avoid hooks for async call (and slightly improve efficiency) +#define ASCS_SYNC_DISPATCH //#define ASCS_WANT_MSG_SEND_NOTIFY #define ASCS_MSG_BUFFER_SIZE 65536 #define ASCS_INPUT_QUEUE non_lock_queue //we will never operate sending buffer concurrently, so need no locks @@ -51,7 +52,16 @@ protected: virtual void on_connect() {asio::ip::tcp::no_delay option(true); lowest_layer().set_option(option); client_socket::on_connect();} //msg handling - virtual bool on_msg_handle(out_msg_type& msg) {handle_msg(msg); return true;} + virtual size_t on_msg(std::list& msg_can) //must define macro ASCS_SYNC_DISPATCH + { + //consume all messages, to consume a part of the messages, see on_msg_handle() in demo echo_server + ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->handle_msg(msg);}); + auto re = msg_can.size(); + msg_can.clear(); + + return re; + } + //msg handling end #ifdef ASCS_WANT_MSG_SEND_NOTIFY virtual void on_msg_send(in_msg_type& msg) diff --git a/examples/pingpong_server/pingpong_server.cpp b/examples/pingpong_server/pingpong_server.cpp index 57c0b2b1efa7655d9638cc72f6b16b025b49fae9..55b101bd2f3403f5f7954a11949a02d296ec2800 100644 --- a/examples/pingpong_server/pingpong_server.cpp +++ b/examples/pingpong_server/pingpong_server.cpp @@ -5,6 +5,7 @@ #define ASCS_SERVER_PORT 9527 #define ASCS_REUSE_OBJECT //use objects pool #define ASCS_DELAY_CLOSE 5 //define this to avoid hooks for async call (and slightly improve efficiency) +#define ASCS_SYNC_DISPATCH #define ASCS_MSG_BUFFER_SIZE 65536 #define ASCS_INPUT_QUEUE non_lock_queue #define ASCS_INPUT_CONTAINER list @@ -37,7 +38,15 @@ public: protected: //msg handling: send the original msg back(echo server) - virtual bool on_msg_handle(out_msg_type& msg) {return direct_send_msg(std::move(msg));} + virtual size_t on_msg(std::list& msg_can) //must define macro ASCS_SYNC_DISPATCH + { + //consume all messages, to consume a part of the messages, see on_msg_handle() in demo echo_server + ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->direct_send_msg(std::move(msg));}); + auto re = msg_can.size(); + msg_can.clear(); + + return re; + } //msg handling end }; diff --git a/include/ascs/base.h b/include/ascs/base.h index 5436dba7528e06de7a3af06f1447351c4874b525..8de619201711d959a3e562ab4ff1a14649ba0efa 100644 --- a/include/ascs/base.h +++ b/include/ascs/base.h @@ -342,7 +342,7 @@ struct statistic uint_fast64_t recv_byte_sum; //msgs (in bytes) returned by i_unpacker::parse_msg stat_duration dispatch_dealy_sum; //from parse_msg(exclude msg unpacking) to on_msg_handle stat_duration recv_idle_sum; //during this duration, socket suspended msg reception (receiving buffer overflow) - stat_duration handle_time_sum; //on_msg_handle consumed time, this indicate the efficiency of msg handling + stat_duration handle_time_sum; //on_msg_handle (and on_msg) consumed time, this indicate the efficiency of msg handling stat_duration unpack_time_sum; //udp::socket_base will not gather this item time_t last_send_time; //include heartbeat diff --git a/include/ascs/config.h b/include/ascs/config.h index 30ec1828ce6f705642347c36446fcbd18cf9963f..f15e3313385709720764ef229b0f79ad21b969ae 100644 --- a/include/ascs/config.h +++ b/include/ascs/config.h @@ -382,7 +382,7 @@ * 2018.8.21 version 1.3.2 * * SPECIAL ATTENTION (incompatible with old editions): - * If macro ASCS_PASSIVE_RECV been defined, you may receive empty messages in on_msg_handle() and sync_recv_msg(), this makes you always having + * If macro ASCS_PASSIVE_RECV been defined, you may receive empty messages in on_msg() or on_msg_handle() and sync_recv_msg(), this makes you always having * the chance to call recv_msg(). * i_unpacker has been moved from namespace ascs::tcp and ascs::udp to namespace ascs, and the signature of ascs::udp::i_unpacker::parse_msg * has been changed to obey ascs::tcp::i_unpacker::parse_msg. @@ -408,6 +408,27 @@ * * REPLACEMENTS: * + * =============================================================== + * 2018.9.x version 1.3.3 + * + * SPECIAL ATTENTION (incompatible with old editions): + * + * HIGHLIGHT: + * Support sync message dispatching, it's like previous on_msg() callback but with a message container instead of a message, and we also name it on_msg(), + * you need to defne macro ASCS_SYNC_DISPATCH to open this feature. + * + * FIX: + * Fix statistics for batch message dispatching. + * + * ENHANCEMENTS: + * + * DELETION: + * + * REFACTORING: + * Hide all member variables for developers. + * + * REPLACEMENTS: + * */ #ifndef _ASCS_CONFIG_H_ @@ -417,8 +438,8 @@ # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) -#define ASCS_VER 10302 //[x]xyyzz -> [x]x.[y]y.[z]z -#define ASCS_VERSION "1.3.2" +#define ASCS_VER 10303 //[x]xyyzz -> [x]x.[y]y.[z]z +#define ASCS_VERSION "1.3.3" //asio and compiler check #ifdef _MSC_VER @@ -554,7 +575,7 @@ static_assert(ASCS_MAX_OBJECT_NUM > 0, "object capacity must be bigger than zero #endif //IO thread number -//listening, msg sending and receiving, msg handling (on_msg_handle()), all timers (include user timers) and other asynchronous calls (from executor) +//listening, msg sending and receiving, msg handling (on_msg() and on_msg_handle()), all timers (include user timers) and other asynchronous calls (from executor) //keep big enough, no empirical value I can suggest, you must try to find it out in your own environment #ifndef ASCS_SERVICE_THREAD_NUM #define ASCS_SERVICE_THREAD_NUM 8 @@ -690,7 +711,7 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus //#define ASCS_DISPATCH_BATCH_MSG //all messages will be dispatched via on_handle_msg with a variable-length container, this will change the signature of function on_msg_handle, //it's very useful if you want to re-dispatch message in your own logic or with very simple message handling (such as echo server). -//it's your responsibility to remove handled messages from the container (can be part of them). +//it's your responsibility to remove handled messages from the container (can be a part of them). //#define ASCS_ALIGNED_TIMER //for example, start a timer at xx:xx:xx, interval is 10 seconds, the callback will be called at (xx:xx:xx + 10), and suppose that the callback @@ -711,9 +732,20 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus // we must avoid to do sync message sending and receiving in service threads. // if prior sync_recv_msg() not returned, the second sync_recv_msg() will return false immediately. // with macro ASCS_PASSIVE_RECV, in sync_recv_msg(), recv_msg() will be automatically called. +// after returned from sync_recv_msg(), ascs will not maintain those messages that have been output. //Sync message sending and receiving are not tracked by tracked_executor, please note. //No matter you're doing sync message sending or async message sending, you can do sync message receiving or async message receiving concurrently. + +//#define ASCS_SYNC_DISPATCH +//with this macro, virtual size_t on_msg(std::list& msg_can) will be provided, you can rewrite it and handle all or a part of the +// messages like virtual function on_msg_handle (with macro ASCS_DISPATCH_BATCH_MSG), if your logic is simple enough (like echo or pingpong test), +// this feature is recommended because it can slightly improve efficiency. +//now we have three ways to handle messages (sync_recv_msg, on_msg and on_msg_handle), the reponse order is the same as listed, if messages been successfully +// dispatched to sync_recv_msg, then the second two will do nothing, otherwise messages will be dispatched to on_msg, if on_msg only handled a part of (include +// zero) the messages, then on_msg_handle will continue to dispatch the rest of them. +//as before, on_msg will block the next receiving but only on current socket. + //configurations #endif /* _ASCS_CONFIG_H_ */ diff --git a/include/ascs/object_pool.h b/include/ascs/object_pool.h index 0ed0fb2e08e35b1157894430e0b1ff88aba2bb4b..dfe8b02818f20a0385c746a6979660eefe06c2b4 100644 --- a/include/ascs/object_pool.h +++ b/include/ascs/object_pool.h @@ -300,7 +300,7 @@ public: template void do_something_to_one(const _Predicate& __pred) {std::lock_guard lock(object_can_mutex); for (auto iter = std::begin(object_can); iter != std::end(object_can); ++iter) if (__pred(iter->second)) break;} -protected: +private: std::atomic_uint_fast64_t cur_id; container_type object_can; diff --git a/include/ascs/service_pump.h b/include/ascs/service_pump.h index 475a8039c91fc9c7a037f10c314b19050324a5b8..68c47e96248347ec2cebf47f1f13bf917edcf42e 100644 --- a/include/ascs/service_pump.h +++ b/include/ascs/service_pump.h @@ -281,7 +281,7 @@ private: service_can.emplace_back(i_service_); } -protected: +private: bool started; container_type service_can; std::mutex service_can_mutex; diff --git a/include/ascs/socket.h b/include/ascs/socket.h index 61d8293e612a66a24331aed6df895ee744c8677b..6de073e47127384a7037d1d79bf6426bc80888a3 100644 --- a/include/ascs/socket.h +++ b/include/ascs/socket.h @@ -269,18 +269,32 @@ protected: virtual void on_close() {unified_out::info_out("on_close()");} virtual void after_close() {} //a good case for using this is to reconnect to the server, please refer to client_socket_base. - //return true (or > 0) means msg been handled, false (or 0) means msg cannot be handled right now, and socket will re-dispatch it asynchronously +#ifdef ASCS_SYNC_DISPATCH + //return the number of handled msg, if some msg left behind, socket will re-dispatch them asynchronously //notice: using inconstant is for the convenience of swapping + virtual size_t on_msg(std::list& msg_can) + { + //it's always thread safe in this virtual function, because it blocks message receiving + ascs::do_something_to_all(msg_can, [](OutMsgType& msg) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data());}); + auto re = msg_can.size(); + msg_can.clear(); //have handled all messages + + return re; + } +#endif #ifdef ASCS_DISPATCH_BATCH_MSG - virtual size_t on_msg_handle(out_queue_type& can) + //return the number of handled msg, if some msg left behind, socket will re-dispatch them asynchronously + //notice: using inconstant is for the convenience of swapping + virtual size_t on_msg_handle(out_queue_type& msg_can) { out_container_type tmp_can; - can.swap(tmp_can); + msg_can.swap(tmp_can); //must be thread safe ascs::do_something_to_all(tmp_can, [](OutMsgType& msg) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data());}); return tmp_can.size(); } #else + //return true means msg been handled, false means msg cannot be handled right now, and socket will re-dispatch it asynchronously virtual bool on_msg_handle(OutMsgType& msg) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data()); return true;} #endif @@ -350,6 +364,17 @@ protected: #endif auto msg_num = temp_msg_can.size(); stat.recv_msg_sum += msg_num; + +#ifdef ASCS_SYNC_DISPATCH +#ifdef ASCS_PASSIVE_RECV + on_msg(temp_msg_can); + if (temp_msg_can.empty()) + return handled_msg(); +#else + if (msg_num > 0) + on_msg(temp_msg_can); +#endif +#endif #ifdef ASCS_PASSIVE_RECV if (0 == msg_num) { @@ -468,14 +493,22 @@ private: if ((dispatching = !recv_msg_buffer.empty())) { auto begin_time = statistic::now(); - stat.dispatch_dealy_sum += begin_time - recv_msg_buffer.front().begin_time; +#ifdef ASCS_FULL_STATISTIC + recv_msg_buffer.lock(); + ascs::do_something_to_all(recv_msg_buffer, [&, this](out_msg& msg) {this->stat.dispatch_dealy_sum += begin_time - msg.begin_time;}); + recv_msg_buffer.unlock(); +#endif auto re = on_msg_handle(recv_msg_buffer); auto end_time = statistic::now(); stat.handle_time_sum += end_time - begin_time; if (0 == re) //dispatch failed, re-dispatch { - recv_msg_buffer.front().restart(end_time); +#ifdef ASCS_FULL_STATISTIC + recv_msg_buffer.lock(); + ascs::do_something_to_all(recv_msg_buffer, [&end_time](out_msg& msg) {msg.restart(end_time);}); + recv_msg_buffer.unlock(); +#endif set_timer(TIMER_DISPATCH_MSG, msg_handling_interval_, [this](tid id)->bool {return this->timer_handler(TIMER_DISPATCH_MSG);}); //hold dispatching } else diff --git a/include/ascs/tcp/client_socket.h b/include/ascs/tcp/client_socket.h index 0a6f3b2ede8c11fc6d20c9eaf4da1efe14397b23..1eab4f38ce090c537fa9d83b29f4456b3b5849a0 100644 --- a/include/ascs/tcp/client_socket.h +++ b/include/ascs/tcp/client_socket.h @@ -60,7 +60,7 @@ public: void disconnect(bool reconnect = false) {force_shutdown(reconnect);} void force_shutdown(bool reconnect = false) { - if (super::link_status::FORCE_SHUTTING_DOWN != this->status) + if (super::link_status::FORCE_SHUTTING_DOWN != status) this->show_info("client link:", "been shut down."); need_reconnect = reconnect; @@ -108,7 +108,7 @@ protected: this->show_info("client link:", "broken/been shut down", ec); force_shutdown(this->is_shutting_down() ? need_reconnect : prepare_reconnect(ec) >= 0); - this->status = super::link_status::BROKEN; + status = super::link_status::BROKEN; } virtual void on_async_shutdown_error() {force_shutdown(need_reconnect);} @@ -145,7 +145,9 @@ protected: return false; } -protected: +private: + using super::status; + asio::ip::tcp::endpoint server_addr; bool need_reconnect; }; diff --git a/include/ascs/tcp/server.h b/include/ascs/tcp/server.h index c8d302866d9e03dea60977ee3f8fb37450dbb853..8a15eea24c0d0e01577afe0f80b29adaf0a724f7 100644 --- a/include/ascs/tcp/server.h +++ b/include/ascs/tcp/server.h @@ -180,7 +180,7 @@ protected: start_next_accept(); } -protected: +private: asio::ip::tcp::endpoint server_addr; asio::ip::tcp::acceptor acceptor; }; diff --git a/include/ascs/tcp/server_socket.h b/include/ascs/tcp/server_socket.h index 247598c09d26aee82d92027178ed5fb6e390e662..22b0621ab6bf28c082004ddb206cd06526e82f4e 100644 --- a/include/ascs/tcp/server_socket.h +++ b/include/ascs/tcp/server_socket.h @@ -40,7 +40,7 @@ public: void disconnect() {force_shutdown();} void force_shutdown() { - if (super::link_status::FORCE_SHUTTING_DOWN != this->status) + if (super::link_status::FORCE_SHUTTING_DOWN != status) this->show_info("server link:", "been shut down."); super::force_shutdown(); @@ -73,7 +73,7 @@ protected: #ifdef ASCS_CLEAR_OBJECT_INTERVAL force_shutdown(); #else - this->status = super::link_status::BROKEN; + status = super::link_status::BROKEN; server.del_socket(this->shared_from_this()); #endif } @@ -82,6 +82,8 @@ protected: virtual bool on_heartbeat_error() {this->show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;} private: + using super::status; + Server& server; }; diff --git a/include/ascs/tcp/socket.h b/include/ascs/tcp/socket.h index d9197210b45d34f08c24d9e7bbe2a63bc8e20f29..d99799c16454d9ceb1af1207f48a8a269a170dfa 100644 --- a/include/ascs/tcp/socket.h +++ b/include/ascs/tcp/socket.h @@ -51,7 +51,7 @@ public: virtual void send_heartbeat() { auto_duration dur(this->stat.pack_time_sum); - auto msg = this->packer_->pack_heartbeat(); + auto msg = packer_->pack_heartbeat(); dur.end(); this->do_direct_send_msg(std::move(msg)); } @@ -117,7 +117,7 @@ public: #ifdef ASCS_PASSIVE_RECV //changing unpacker must before calling ascs::socket::recv_msg, and define ASCS_PASSIVE_RECV macro. void unpacker(const std::shared_ptr>& _unpacker_) {unpacker_ = _unpacker_;} - virtual void recv_msg() {if (!this->reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});} + virtual void recv_msg() {if (!reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});} #endif /////////////////////////////////////////////////// @@ -173,9 +173,9 @@ protected: virtual bool do_start() { status = link_status::CONNECTED; - this->stat.establish_time = time(nullptr); + stat.establish_time = time(nullptr); - on_connect(); //in this virtual function, this->stat.last_recv_time has not been updated (super::do_start will update it), please note + on_connect(); //in this virtual function, stat.last_recv_time has not been updated (super::do_start will update it), please note return super::do_start(); } @@ -200,14 +200,14 @@ private: size_t completion_checker(const asio::error_code& ec, size_t bytes_transferred) { - auto_duration dur(this->stat.unpack_time_sum); + auto_duration dur(stat.unpack_time_sum); return this->unpacker_->completion_condition(ec, bytes_transferred); } void do_recv_msg() { #ifdef ASCS_PASSIVE_RECV - if (this->reading) + if (reading) return; #endif auto recv_buff = unpacker_->prepare_next_recv(); @@ -217,7 +217,7 @@ private: else { #ifdef ASCS_PASSIVE_RECV - this->reading = true; + reading = true; #endif asio::async_read(this->next_layer(), recv_buff, [this](const asio::error_code& ec, size_t bytes_transferred)->size_t {return this->completion_checker(ec, bytes_transferred);}, make_strand_handler(strand, @@ -229,17 +229,17 @@ private: { if (!ec && bytes_transferred > 0) { - this->stat.last_recv_time = time(nullptr); + stat.last_recv_time = time(nullptr); - auto_duration dur(this->stat.unpack_time_sum); - auto unpack_ok = unpacker_->parse_msg(bytes_transferred, this->temp_msg_can); + auto_duration dur(stat.unpack_time_sum); + auto unpack_ok = unpacker_->parse_msg(bytes_transferred, temp_msg_can); dur.end(); if (!unpack_ok) on_unpack_error(); //the user will decide whether to reset the unpacker or not in this callback #ifdef ASCS_PASSIVE_RECV - this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle() + reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle() #endif if (this->handle_msg()) //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false do_recv_msg(); //receive msg in sequence @@ -247,7 +247,7 @@ private: else { #ifdef ASCS_PASSIVE_RECV - this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle() + reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle() #endif if (ec) this->on_recv_error(ec); @@ -258,7 +258,7 @@ private: bool do_send_msg(bool in_strand) { - if (!in_strand && this->sending) + if (!in_strand && sending) return true; std::list bufs; @@ -272,10 +272,10 @@ private: typename super::in_msg msg; auto end_time = statistic::now(); - typename super::in_queue_type::lock_guard lock(this->send_msg_buffer); - while (this->send_msg_buffer.try_dequeue_(msg)) + typename super::in_queue_type::lock_guard lock(send_msg_buffer); + while (send_msg_buffer.try_dequeue_(msg)) { - this->stat.send_delay_sum += end_time - msg.begin_time; + stat.send_delay_sum += end_time - msg.begin_time; size += msg.size(); last_send_msg.emplace_back(std::move(msg)); bufs.emplace_back(last_send_msg.back().data(), last_send_msg.back().size()); @@ -284,7 +284,7 @@ private: } } - if ((this->sending = !bufs.empty())) + if ((sending = !bufs.empty())) { last_send_msg.front().restart(); asio::async_write(this->next_layer(), bufs, make_strand_handler(strand, @@ -299,11 +299,11 @@ private: { if (!ec) { - this->stat.last_send_time = time(nullptr); + stat.last_send_time = time(nullptr); - this->stat.send_byte_sum += bytes_transferred; - this->stat.send_time_sum += statistic::now() - last_send_msg.front().begin_time; - this->stat.send_msg_sum += last_send_msg.size(); + stat.send_byte_sum += bytes_transferred; + stat.send_time_sum += statistic::now() - last_send_msg.front().begin_time; + stat.send_msg_sum += last_send_msg.size(); #ifdef ASCS_SYNC_SEND ascs::do_something_to_all(last_send_msg, [](typename super::in_msg& item) {if (item.cv) item.cv->notify_one();}); #endif @@ -311,11 +311,11 @@ private: this->on_msg_send(last_send_msg.front()); #endif #ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY - if (this->send_msg_buffer.empty()) + if (send_msg_buffer.empty()) this->on_all_msg_send(last_send_msg.back()); #endif last_send_msg.clear(); - if (!do_send_msg(true) && !this->send_msg_buffer.empty()) //send msg in sequence + if (!do_send_msg(true) && !send_msg_buffer.empty()) //send msg in sequence do_send_msg(true); //just make sure no pending msgs } else @@ -323,7 +323,7 @@ private: this->on_send_error(ec); last_send_msg.clear(); //clear sending messages after on_send_error, then user can decide how to deal with them in on_send_error - this->sending = false; + sending = false; } } @@ -348,12 +348,22 @@ private: } protected: - list last_send_msg; - std::shared_ptr> unpacker_; - volatile link_status status; private: + using super::stat; + using super::packer_; + using super::temp_msg_can; + + using super::send_msg_buffer; + using super::sending; + +#ifdef ASCS_PASSIVE_RECV + using super::reading; +#endif + + std::shared_ptr> unpacker_; + list last_send_msg; asio::io_context::strand strand; }; diff --git a/include/ascs/tcp/ssl/ssl.h b/include/ascs/tcp/ssl/ssl.h index 8082fed6745d13aacaac3a58793f6674ab4fcca7..c06db825ea46a3e7fa1eee0aade6d039bbf3b9ac 100644 --- a/include/ascs/tcp/ssl/ssl.h +++ b/include/ascs/tcp/ssl/ssl.h @@ -155,7 +155,7 @@ public: typename object_pool::object_type create_object() {return create_object(this->get_service_pump());} template typename object_pool::object_type create_object(Arg& arg) {return super::create_object(arg, ctx);} -protected: +private: asio::ssl::context ctx; }; diff --git a/include/ascs/udp/socket.h b/include/ascs/udp/socket.h index 1185fde243151d304d2e90d266b37687a76754ec..95a7696220149186a38ba804874270b4e86a35a3 100644 --- a/include/ascs/udp/socket.h +++ b/include/ascs/udp/socket.h @@ -38,7 +38,7 @@ public: virtual bool is_ready() {return this->lowest_layer().is_open();} virtual void send_heartbeat() { - in_msg_type msg(peer_addr, this->packer_->pack_heartbeat()); + in_msg_type msg(peer_addr, packer_->pack_heartbeat()); this->do_direct_send_msg(std::move(msg)); } @@ -98,7 +98,7 @@ public: #ifdef ASCS_PASSIVE_RECV //changing unpacker must before calling ascs::socket::recv_msg, and define ASCS_PASSIVE_RECV macro. void unpacker(const std::shared_ptr>& _unpacker_) {unpacker_ = _unpacker_;} - virtual void recv_msg() {if (!this->reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});} + virtual void recv_msg() {if (!reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});} #endif /////////////////////////////////////////////////// @@ -130,7 +130,7 @@ protected: virtual bool on_heartbeat_error() { - this->stat.last_recv_time = time(nullptr); //avoid repetitive warnings + stat.last_recv_time = time(nullptr); //avoid repetitive warnings unified_out::warning_out("%s:%hu is not available", peer_addr.address().to_string().data(), peer_addr.port()); return true; } @@ -157,7 +157,7 @@ private: void do_recv_msg() { #ifdef ASCS_PASSIVE_RECV - if (this->reading) + if (reading) return; #endif auto recv_buff = unpacker_->prepare_next_recv(); @@ -167,7 +167,7 @@ private: else { #ifdef ASCS_PASSIVE_RECV - this->reading = true; + reading = true; #endif this->next_layer().async_receive_from(recv_buff, temp_addr, make_strand_handler(strand, this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);}))); @@ -178,22 +178,22 @@ private: { if (!ec && bytes_transferred > 0) { - this->stat.last_recv_time = time(nullptr); + stat.last_recv_time = time(nullptr); typename Unpacker::container_type msg_can; unpacker_->parse_msg(bytes_transferred, msg_can); #ifdef ASCS_PASSIVE_RECV - this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle() + reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle() #endif - ascs::do_something_to_all(msg_can, [this](typename Unpacker::msg_type& msg) {this->temp_msg_can.emplace_back(this->temp_addr, std::move(msg));}); + ascs::do_something_to_all(msg_can, [this](typename Unpacker::msg_type& msg) {temp_msg_can.emplace_back(this->temp_addr, std::move(msg));}); if (this->handle_msg()) //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false do_recv_msg(); //receive msg in sequence } else { #ifdef ASCS_PASSIVE_RECV - this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle() + reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle() #endif #if defined(_MSC_VER) || defined(__CYGWIN__) || defined(__MINGW32__) || defined(__MINGW64__) if (ec && asio::error::connection_refused != ec && asio::error::connection_reset != ec) @@ -208,12 +208,12 @@ private: bool do_send_msg(bool in_strand) { - if (!in_strand && this->sending) + if (!in_strand && sending) return true; - if ((this->sending = this->send_msg_buffer.try_dequeue(last_send_msg))) + if ((sending = send_msg_buffer.try_dequeue(last_send_msg))) { - this->stat.send_delay_sum += statistic::now() - last_send_msg.begin_time; + stat.send_delay_sum += statistic::now() - last_send_msg.begin_time; last_send_msg.restart(); this->next_layer().async_send_to(asio::buffer(last_send_msg.data(), last_send_msg.size()), last_send_msg.peer_addr, make_strand_handler(strand, @@ -228,11 +228,11 @@ private: { if (!ec) { - this->stat.last_send_time = time(nullptr); + stat.last_send_time = time(nullptr); - this->stat.send_byte_sum += bytes_transferred; - this->stat.send_time_sum += statistic::now() - last_send_msg.begin_time; - ++this->stat.send_msg_sum; + stat.send_byte_sum += bytes_transferred; + stat.send_time_sum += statistic::now() - last_send_msg.begin_time; + ++stat.send_msg_sum; #ifdef ASCS_SYNC_SEND if (last_send_msg.cv) last_send_msg.cv->notify_one(); @@ -241,7 +241,7 @@ private: this->on_msg_send(last_send_msg); #endif #ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY - if (this->send_msg_buffer.empty()) + if (send_msg_buffer.empty()) this->on_all_msg_send(last_send_msg); #endif } @@ -255,7 +255,7 @@ private: //send msg in sequence //on windows, sending a msg to addr_any may cause errors, please note //for UDP, sending error will not stop subsequent sendings. - if (!do_send_msg(true) && !this->send_msg_buffer.empty()) + if (!do_send_msg(true) && !send_msg_buffer.empty()) do_send_msg(true); //just make sure no pending msgs } @@ -280,14 +280,24 @@ private: return true; } -protected: +private: + using super::stat; + using super::packer_; + using super::temp_msg_can; + + using super::send_msg_buffer; + using super::sending; + +#ifdef ASCS_PASSIVE_RECV + using super::reading; +#endif + typename super::in_msg last_send_msg; std::shared_ptr> unpacker_; asio::ip::udp::endpoint local_addr; asio::ip::udp::endpoint temp_addr; //used when receiving messages asio::ip::udp::endpoint peer_addr; -private: asio::io_context::strand strand; };