提交 32aba299 编写于 作者: Y youngwolf

1.3.2 release.

Fully support sync message sending and receiving.
Unify ascs::tcp::i_unpacker and ascs::udp::i_unpacker (the latter obey the former).
Simplify statistic::to_string().
Fix race condition when aligning timers.
Fix error check for UDP on cygwin and mingw.
Fix bug: demo file_client may not be able to receive all content of the file it required if you get more than one file in a single request.
Fix compilation warning in class udp::socket_base with macro ASCS_PASSIVE_RECV.
上级 f3230c77
......@@ -6,6 +6,10 @@
#define ASCS_DELAY_CLOSE 1 //this demo not used object pool and doesn't need life cycle management,
//so, define this to avoid hooks for async call (and slightly improve efficiency),
//any value which is bigger than zero is okay.
#define ASCS_SYNC_RECV
//#define ASCS_PASSIVE_RECV //because we not defined this macro, this demo will use mix model to receive messages, which means
//some messages will be dispatched via on_msg_handle(), some messages will be returned via sync_recv_msg(),
//if the server send messages quickly enough, you will see them cross together.
#define ASCS_DISPATCH_BATCH_MSG
#define ASCS_ALIGNED_TIMER
#define ASCS_CUSTOM_LOG
......@@ -46,12 +50,31 @@ public:
};
#include <ascs/ext/tcp.h>
using namespace ascs;
using namespace ascs::ext;
using namespace ascs::ext::tcp;
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
#define RECONNECT "reconnect"
std::thread create_sync_recv_thread(single_client& client)
{
return std::thread([&client]() {
typename ASCS_DEFAULT_UNPACKER::container_type msg_can;
while (client.sync_recv_msg(msg_can))
{
#ifdef ASCS_PASSIVE_RECV
do_something_to_all(msg_can, [](single_service::out_msg_type& msg) {if (!msg.empty()) printf("sync recv(" ASCS_SF ") : %s\n", msg.size(), msg.data());});
#else
do_something_to_all(msg_can, [](single_client::out_msg_type& msg) {printf("sync recv(" ASCS_SF ") : %s\n", msg.size(), msg.data());});
#endif
msg_can.clear();
}
puts("sync recv end.");
});
}
int main(int argc, const char* argv[])
{
printf("usage: %s [<port=%d> [ip=%s]]\n", argv[0], ASCS_SERVER_PORT + 100, ASCS_SERVER_IP);
......@@ -73,15 +96,23 @@ int main(int argc, const char* argv[])
client.set_server_addr(ASCS_SERVER_PORT + 100, ASCS_SERVER_IP);
sp.start_service();
std::this_thread::sleep_for(std::chrono::milliseconds(500)); //to be more efficiently, start the worker thread in tcp::socket_base::on_connect().
auto t = create_sync_recv_thread(client);
while(sp.is_running())
{
std::string str;
std::cin >> str;
if (QUIT_COMMAND == str)
{
sp.stop_service();
t.join();
}
else if (RESTART_COMMAND == str)
{
sp.stop_service();
t.join();
t = create_sync_recv_thread(client);
sp.start_service();
}
else if (RECONNECT == str)
......
......@@ -126,7 +126,9 @@ int main(int argc, const char* argv[])
{
std::string str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
if (str.empty())
;
else if (QUIT_COMMAND == str)
sp.stop_service();
else if (STATISTIC == str)
{
......
......@@ -69,7 +69,9 @@ int main(int argc, const char* argv[])
{
std::string str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
if (str.empty())
;
else if (QUIT_COMMAND == str)
sp.stop_service();
else if (STATISTIC == str)
{
......
......@@ -8,7 +8,7 @@
#define ASCS_DISPATCH_BATCH_MSG
#define ASCS_ENHANCED_STABILITY
//#define ASCS_FULL_STATISTIC //full statistic will slightly impact efficiency
//#define ASCS_USE_STEADY_TIMER
#define ASCS_USE_STEADY_TIMER
#define ASCS_ALIGNED_TIMER
#define ASCS_AVOID_AUTO_STOP_SERVICE
#define ASCS_DECREASE_THREAD_AT_RUNTIME
......@@ -196,7 +196,9 @@ int main(int argc, const char* argv[])
{
std::string str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
if (str.empty())
;
else if (QUIT_COMMAND == str)
sp.stop_service();
else if (RESTART_COMMAND == str)
{
......
......@@ -57,7 +57,9 @@ int main(int argc, const char* argv[])
{
std::string str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
if (str.empty())
;
else if (QUIT_COMMAND == str)
sp.stop_service();
else if (RESTART_COMMAND == str)
{
......
......@@ -23,7 +23,9 @@ public:
//reset all, be ensure that there's no any operations performed on this file_socket when invoke it
virtual void reset() {clear(); client_socket::reset();}
bool is_idle() const {return TRANS_IDLE == state;}
void set_index(int index_) {index = index_;}
bool get_file(const std::string& file_name)
{
assert(!file_name.empty());
......@@ -83,7 +85,6 @@ protected:
private:
void clear()
{
state = TRANS_IDLE;
if (nullptr != file)
{
fclose(file);
......@@ -91,6 +92,7 @@ private:
}
unpacker(std::make_shared<ASCS_DEFAULT_UNPACKER>());
state = TRANS_IDLE;
}
void trans_end() {clear();}
......@@ -101,7 +103,7 @@ private:
assert(msg.empty());
auto unp = std::dynamic_pointer_cast<data_unpacker>(unpacker());
if (nullptr == unp || unp->is_finished())
if (!unp || unp->is_finished())
trans_end();
return;
......@@ -239,8 +241,11 @@ private:
if (received_size < file_size)
return true;
printf("\r100%%\nend, speed: %f MBps.\n", file_size / begin_time.elapsed() / 1024 / 1024);
printf("\r100%%\nend, speed: %f MBps.\n\n", file_size / begin_time.elapsed() / 1024 / 1024);
change_timer_status(id, timer_info::TIMER_CANCELED);
//wait all file_socket to clean up themselves
do_something_to_all([](object_ctype& item) {while (!item->is_idle()) std::this_thread::sleep_for(std::chrono::milliseconds(10));});
get_file();
return false;
......
......@@ -51,7 +51,9 @@ int main(int argc, const char* argv[])
{
std::string str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
if (str.empty())
;
else if (QUIT_COMMAND == str)
sp.stop_service();
else if (RESTART_COMMAND == str)
{
......
......@@ -138,7 +138,9 @@ int main(int argc, const char* argv[])
{
std::string str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
if (str.empty())
;
else if (QUIT_COMMAND == str)
sp.stop_service();
else if (STATISTIC == str)
{
......@@ -154,7 +156,7 @@ int main(int argc, const char* argv[])
sp.add_service_thread(1);
else if (DECREASE_THREAD == str)
sp.del_service_thread(1);
else if (!str.empty())
else
{
size_t msg_num = 1024;
size_t msg_len = 1024; //must greater than or equal to sizeof(size_t)
......
......@@ -75,7 +75,9 @@ int main(int argc, const char* argv[])
{
std::string str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
if (str.empty())
;
else if (QUIT_COMMAND == str)
sp.stop_service();
else if (STATISTIC == str)
{
......
......@@ -5,14 +5,15 @@
#define ASCS_DELAY_CLOSE 1 //this demo not used object pool and doesn't need life cycle management,
//so, define this to avoid hooks for async call (and slightly improve efficiency),
//any value which is bigger than zero is okay.
//#if defined(_MSC_VER) && _MSC_VER <= 1800
//#define ASCS_DEFAULT_PACKER replaceable_packer<shared_buffer<i_buffer>>
//#else
//#define ASCS_DEFAULT_PACKER replaceable_packer<>
//#endif
#define ASCS_SYNC_RECV
#define ASCS_PASSIVE_RECV //if you annotate this definition, this demo will use mix model to receive messages, which means
//some messages will be dispatched via on_msg_handle(), some messages will be returned via sync_recv_msg(),
//type more than one messages (separate them by space) in one line with ENTER key to send them,
//you will see them cross together on the receiver's screen.
//#define ASCS_DEFAULT_UDP_UNPACKER replaceable_udp_unpacker<>
#define ASCS_HEARTBEAT_INTERVAL 5 //neither udp_unpacker nor replaceable_udp_unpacker support heartbeat message,
//so heartbeat will be treated as normal message.
#define ASCS_SYNC_SEND
//configuration
#include <ascs/ext/udp.h>
......@@ -22,6 +23,23 @@ using namespace ascs::ext::udp;
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
std::thread create_sync_recv_thread(single_service& service)
{
return std::thread([&service]() {
std::list<single_service::out_msg_type> msg_can;
while (service.sync_recv_msg(msg_can))
{
#ifdef ASCS_PASSIVE_RECV
do_something_to_all(msg_can, [](single_service::out_msg_type& msg) {if (!msg.empty()) printf("sync recv(" ASCS_SF ") : %s\n", msg.size(), msg.data());});
#else
do_something_to_all(msg_can, [](single_service::out_msg_type& msg) {printf("sync recv(" ASCS_SF ") : %s\n", msg.size(), msg.data());});
#endif
msg_can.clear();
}
puts("sync recv end.");
});
}
int main(int argc, const char* argv[])
{
printf("usage: %s <my port> <peer port> [peer ip=127.0.0.1]\n", argv[0]);
......@@ -45,19 +63,27 @@ int main(int argc, const char* argv[])
// service.lowest_layer().open(ASCS_UDP_DEFAULT_IP_VERSION);
// service.lowest_layer().set_option(asio::ip::multicast::join_group(asio::ip::address::from_string("x.x.x.x")));
// sp.start_service();
auto t = create_sync_recv_thread(service);
while(sp.is_running())
{
std::string str;
std::cin >> str;
if (QUIT_COMMAND == str)
{
sp.stop_service();
t.join();
}
else if (RESTART_COMMAND == str)
{
sp.stop_service();
t.join();
sp.start_service();
t = create_sync_recv_thread(service);
}
else
service.safe_send_native_msg(str, false); //to send to different endpoints, use overloads that take a const asio::ip::udp::endpoint& parameter
service.sync_safe_send_native_msg(str, false); //to send to different endpoints, use overloads that take a const asio::ip::udp::endpoint& parameter
}
return 0;
......
......@@ -30,6 +30,9 @@
#include <atomic>
#include <sstream>
#include <iomanip>
#ifdef ASCS_SYNC_SEND
#include <condition_variable>
#endif
#include <asio.hpp>
......@@ -188,35 +191,32 @@ public:
};
//unpacker concept
namespace tcp
template<typename MsgType>
class i_unpacker
{
template<typename MsgType>
class i_unpacker
{
public:
typedef MsgType msg_type;
typedef const msg_type msg_ctype;
typedef std::list<msg_type> container_type;
typedef ASCS_RECV_BUFFER_TYPE buffer_type;
public:
typedef MsgType msg_type;
typedef const msg_type msg_ctype;
typedef std::list<msg_type> container_type;
typedef ASCS_RECV_BUFFER_TYPE buffer_type;
bool stripped() const {return _stripped;}
void stripped(bool stripped_) {_stripped = stripped_;}
bool stripped() const {return _stripped;}
void stripped(bool stripped_) {_stripped = stripped_;}
protected:
i_unpacker() : _stripped(true) {}
virtual ~i_unpacker() {}
protected:
i_unpacker() : _stripped(true) {}
virtual ~i_unpacker() {}
public:
virtual void reset() = 0;
//heartbeat must not be included in msg_can, otherwise you must handle heartbeat at where you handle normal messges.
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) = 0;
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred) = 0;
virtual buffer_type prepare_next_recv() = 0;
private:
bool _stripped;
};
} //namespace
public:
virtual void reset() {};
//heartbeat must not be included in msg_can, otherwise you must handle heartbeat at where you handle normal messges.
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) = 0;
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred) {return 0;}
virtual buffer_type prepare_next_recv() = 0;
private:
bool _stripped;
};
namespace udp
{
......@@ -240,24 +240,6 @@ namespace udp
udp_msg& operator=(udp_msg&& other) {MsgType::clear(); swap(other); return *this;}
#endif
};
template<typename MsgType>
class i_unpacker
{
public:
typedef MsgType msg_type;
typedef const msg_type msg_ctype;
typedef ASCS_RECV_BUFFER_TYPE buffer_type;
protected:
virtual ~i_unpacker() {}
public:
virtual void reset() {}
//heartbeat must not be returned (use empty message instead), otherwise you must handle heartbeat at where you handle normal messges.
virtual msg_type parse_msg(size_t bytes_transferred) = 0;
virtual buffer_type prepare_next_recv() = 0;
};
} //namespace
//unpacker concept
......@@ -327,29 +309,24 @@ struct statistic
std::string to_string() const
{
std::ostringstream s;
#ifdef ASCS_FULL_STATISTIC
s << "send corresponding statistic:\n"
<< "message sum: " << send_msg_sum << std::endl
<< "size in bytes: " << send_byte_sum << std::endl
#ifdef ASCS_FULL_STATISTIC
<< "send delay: " << std::chrono::duration_cast<std::chrono::duration<float>>(send_delay_sum).count() << std::endl
<< "send duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(send_time_sum).count() << std::endl
<< "pack duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(pack_time_sum).count() << std::endl
#endif
<< "\nrecv corresponding statistic:\n"
<< "message sum: " << recv_msg_sum << std::endl
<< "size in bytes: " << recv_byte_sum << std::endl
<< "dispatch delay: " << std::chrono::duration_cast<std::chrono::duration<float>>(dispatch_dealy_sum).count() << std::endl
<< "size in bytes: " << recv_byte_sum
#ifdef ASCS_FULL_STATISTIC
<< "\ndispatch delay: " << std::chrono::duration_cast<std::chrono::duration<float>>(dispatch_dealy_sum).count() << std::endl
<< "recv idle duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(recv_idle_sum).count() << std::endl
<< "on_msg_handle duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(handle_time_sum).count() << std::endl
<< "unpack duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(unpack_time_sum).count();
#else
s << std::setfill('0') << "send corresponding statistic:\n"
<< "message sum: " << send_msg_sum << std::endl
<< "size in bytes: " << send_byte_sum << std::endl
<< "\nrecv corresponding statistic:\n"
<< "message sum: " << recv_msg_sum << std::endl
<< "size in bytes: " << recv_byte_sum;
<< "unpack duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(unpack_time_sum).count()
#endif
return s.str();
;return s.str();
}
//send corresponding statistic
......@@ -389,13 +366,35 @@ private:
statistic::stat_duration& duration;
};
#ifdef ASCS_SYNC_SEND
template<typename T>
struct obj_with_begin_time : public T
{
obj_with_begin_time(bool need_cv = false) {check_and_create_cv(need_cv);}
obj_with_begin_time(T&& obj, bool need_cv = false) : T(std::move(obj)) {check_and_create_cv(need_cv);}
obj_with_begin_time& operator=(T&& obj) {T::operator=(std::move(obj)); restart(); return *this;}
obj_with_begin_time(obj_with_begin_time&& other) : T(std::move(other)), begin_time(std::move(other.begin_time)), cv(std::move(other.cv)) {}
obj_with_begin_time& operator=(obj_with_begin_time&& other) {T::operator=(std::move(other)); begin_time = std::move(other.begin_time); cv = std::move(other.cv); return *this;}
void restart() {restart(statistic::now());}
void restart(const typename statistic::stat_time& begin_time_) {begin_time = begin_time_;}
void check_and_create_cv(bool need_cv) {if (!need_cv) cv.reset(); else if (!cv) cv = std::make_shared<std::condition_variable>();}
void swap(T& obj, bool need_cv = false) {T::swap(obj); restart(); check_and_create_cv(need_cv);}
void swap(obj_with_begin_time& other) {T::swap(other); std::swap(begin_time, other.begin_time); cv.swap(other.cv);}
void clear() {cv.reset(); T::clear();}
typename statistic::stat_time begin_time;
std::shared_ptr<std::condition_variable> cv;
};
#else
template<typename T>
struct obj_with_begin_time : public T
{
obj_with_begin_time() {}
obj_with_begin_time(T&& obj) : T(std::move(obj)) {restart();}
obj_with_begin_time& operator=(T&& obj) {T::operator=(std::move(obj)); restart(); return *this;}
//following two functions are used by concurrent queue only, ascs just use swap
obj_with_begin_time(obj_with_begin_time&& other) : T(std::move(other)), begin_time(std::move(other.begin_time)) {}
obj_with_begin_time& operator=(obj_with_begin_time&& other) {T::operator=(std::move(other)); begin_time = std::move(other.begin_time); return *this;}
......@@ -406,6 +405,7 @@ struct obj_with_begin_time : public T
typename statistic::stat_time begin_time;
};
#endif
//free functions, used to do something to any container(except map and multimap) optionally with any mutex
template<typename _Can, typename _Mutex, typename _Predicate>
......@@ -424,31 +424,6 @@ void do_something_to_one(_Can& __can, _Mutex& __mutex, const _Predicate& __pred)
template<typename _Can, typename _Predicate>
void do_something_to_one(_Can& __can, const _Predicate& __pred) {for (auto iter = std::begin(__can); iter != std::end(__can); ++iter) if (__pred(*iter)) break;}
template<typename _Can>
bool splice_helper(_Can& dest_can, _Can& src_can, size_t max_size = ASCS_MAX_MSG_NUM)
{
if (src_can.empty())
return false;
auto size = dest_can.size();
if (size >= max_size) //dest_can cannot hold more items.
return false;
size = max_size - size; //maximum items can be handled this time
auto left_size = src_can.size();
if (left_size > size) //some items left behind
{
left_size -= size;
auto begin_iter = std::begin(src_can);
auto end_iter = left_size > size ? std::next(begin_iter, size) : std::prev(std::end(src_can), left_size); //minimize iterator movement
dest_can.splice(std::end(dest_can), src_can, begin_iter, end_iter);
}
else
dest_can.splice(std::end(dest_can), src_can);
return true;
}
//member functions, used to do something to any member container(except map and multimap) optionally with any member mutex
#define DO_SOMETHING_TO_ALL_MUTEX(CAN, MUTEX) DO_SOMETHING_TO_ALL_MUTEX_NAME(do_something_to_all, CAN, MUTEX)
#define DO_SOMETHING_TO_ALL(CAN) DO_SOMETHING_TO_ALL_NAME(do_something_to_all, CAN)
......@@ -488,7 +463,7 @@ template<typename _Predicate> void NAME(const _Predicate& __pred) const {for (au
TYPE FUNNAME(const char* pstr, size_t len, bool can_overflow) {return FUNNAME(&pstr, &len, 1, can_overflow);} \
template<typename Buffer> TYPE FUNNAME(const Buffer& buffer, bool can_overflow) {return FUNNAME(buffer.data(), buffer.size(), can_overflow);}
#define TCP_SEND_MSG(FUNNAME, NATIVE) \
#define TCP_SEND_MSG(FUNNAME, NATIVE, SEND_FUNNAME) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow) \
{ \
if (!can_overflow && !this->is_send_buffer_available()) \
......@@ -496,7 +471,7 @@ bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_
auto_duration dur(this->stat.pack_time_sum); \
auto msg = this->packer_->pack_msg(pstr, len, num, NATIVE); \
dur.end(); \
return this->do_direct_send_msg(std::move(msg)); \
return this->SEND_FUNNAME(std::move(msg)); \
} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
......@@ -522,14 +497,14 @@ template<typename Buffer> TYPE FUNNAME(const Buffer& buffer, bool can_overflow)
template<typename Buffer> TYPE FUNNAME(const asio::ip::udp::endpoint& peer_addr, const Buffer& buffer, bool can_overflow) \
{return FUNNAME(peer_addr, buffer.data(), buffer.size(), can_overflow);}
#define UDP_SEND_MSG(FUNNAME, NATIVE) \
#define UDP_SEND_MSG(FUNNAME, NATIVE, SEND_FUNNAME) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow) {return FUNNAME(peer_addr, pstr, len, num, can_overflow);} \
bool FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, bool can_overflow) \
{ \
if (!can_overflow && !this->is_send_buffer_available()) \
return false; \
in_msg_type msg(peer_addr, this->packer_->pack_msg(pstr, len, num, NATIVE)); \
return this->do_direct_send_msg(std::move(msg)); \
return this->SEND_FUNNAME(std::move(msg)); \
} \
UDP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
......
......@@ -348,8 +348,7 @@
* Not support sync sending mode anymore, so we reduced an atomic object in ascs::socket.
*
* REFACTORING:
* If you want to change unpacker at runtime, first, you must define macro ASCS_PASSIVE_RECV, second, you must call ascs::socket::recv_msg and
* guarantee only zero or one recv_msg invocation (include initiating and asynchronous operation, this may need mutex, please carefully design your logic),
* If you want to change unpacker at runtime, first, you must define macro ASCS_PASSIVE_RECV, second, you must call ascs::socket::recv_msg,
* see file_client for more details.
* Class object has been split into executor and tracked_executor, object_pool use the former, and ascs::socket use the latter.
*
......@@ -379,6 +378,36 @@
*
* REPLACEMENTS:
*
* ===============================================================
* 2018.8.21 version 1.3.2
*
* SPECIAL ATTENTION (incompatible with old editions):
* If macro ASCS_PASSIVE_RECV been defined, you may receive empty messages in on_msg_handle() and sync_recv_msg(), this makes you always having
* the chance to call recv_msg().
* i_unpacker has been moved from namespace ascs::tcp and ascs::udp to namespace ascs, and the signature of ascs::udp::i_unpacker::parse_msg
* has been changed to obey ascs::tcp::i_unpacker::parse_msg.
*
* HIGHLIGHT:
* Fully support sync message sending and receiving (even be able to mix with async message sending and receiving without any limitations), but please note
* that this feature will slightly impact efficiency even if you always use async message sending and receiving, so only open this feature when realy needed.
*
* FIX:
* Fix race condition when aligning timers, see macro ASCS_ALIGNED_TIMER for more details.
* Fix error check for UDP on cygwin and mingw.
* Fix bug: demo file_client may not be able to receive all content of the file it required if you get more than one file in a single request.
*
* ENHANCEMENTS:
* Add new macro ASCS_SYNC_SEND and ASCS_SYNC_RECV to support sync message sending and receiving.
*
* DELETION:
*
* REFACTORING:
* i_unpacker has been moved from namespace ascs::tcp and ascs::udp to namespace ascs, and the signature of ascs::udp::i_unpacker::parse_msg
* has been changed to obey ascs::tcp::i_unpacker::parse_msg, the purpose of this change is to make socket::sync_recv_msg() can be easily
* implemented, otherwise, sync_recv_msg() must be implemented by tcp::socket_base and udp::socket_base respectively.
*
* REPLACEMENTS:
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -388,8 +417,8 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10301 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.3.1"
#define ASCS_VER 10302 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.3.2"
//asio and compiler check
#ifdef _MSC_VER
......@@ -525,7 +554,7 @@ static_assert(ASCS_MAX_OBJECT_NUM > 0, "object capacity must be bigger than zero
#endif
//IO thread number
//listening, msg sending and receiving, msg handling (on_msg_handle() and on_msg()), all timers(include user timers) and other asynchronous calls (from executor)
//listening, msg sending and receiving, msg handling (on_msg_handle()), all timers (include user timers) and other asynchronous calls (from executor)
//keep big enough, no empirical value I can suggest, you must try to find it out in your own environment
#ifndef ASCS_SERVICE_THREAD_NUM
#define ASCS_SERVICE_THREAD_NUM 8
......@@ -628,7 +657,7 @@ static_assert(ASCS_HEARTBEAT_MAX_ABSENCE > 0, "heartbeat absence must be bigger
// this seems not a normal procedure, but it works, I believe that asio's defect caused this problem.
//#define ASCS_AVOID_AUTO_STOP_SERVICE
//wrap service_pump with asio::io_service::work (asio::executor_work_guard), then it will never run out
//wrap service_pump with asio::io_service::work (asio::executor_work_guard), then it will never run out until you explicitly call stop_service().
//#define ASCS_DECREASE_THREAD_AT_RUNTIME
//enable decreasing service thread at runtime.
......@@ -652,8 +681,11 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus
//#define ASCS_PASSIVE_RECV
//to gain the ability of changing the unpacker at runtime, with this mcro, ascs will not do message receiving automatically (except the firt one),
//user need to call ascs::socket::recv_msg(), if you need to change the unpacker, do it before recv_msg() invocation, please note.
//because user can call recv_msg() at any time, it's your responsibility to keep the recv buffer not overflowed, please pay special attention.
// so you need to manually call recv_msg(), if you need to change the unpacker, do it before recv_msg() invocation, please note.
//during async message receiving, calling recv_msg() will fail, this is by design to avoid asio::io_context using up all virtual memory.
//because user can greedily call recv_msg(), it's your responsibility to keep the recv buffer from overflowed, please pay special attention.
//this macro also makes you to be able to pause message receiving, then, if there's no other tasks (like timers), service_pump will stop itself,
// to avoid this, please define macro ASCS_AVOID_AUTO_STOP_SERVICE.
//#define ASCS_DISPATCH_BATCH_MSG
//all messages will be dispatched via on_handle_msg with a variable-length container, this will change the signature of function on_msg_handle,
......@@ -665,6 +697,23 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus
//returned at (xx:xx:xx + 11), then the interval will be temporarily changed to 9 seconds to make the next callback to be called at (xx:xx:xx + 20),
//if you don't define this macro, the next callback will be called at (xx:xx:xx + 21), plase note.
//#define ASCS_SYNC_SEND
//#ifndef ASCS_SYNC_RECV
//define these macro to gain additional series of sync message sending and receiving, they are:
// sync_send_msg
// sync_send_native_msg
// sync_safe_send_msg
// sync_safe_send_native_msg
// sync_recv_msg
//please note that:
// this feature will slightly impact efficiency even if you always use async message sending and receiving, so only open this feature
// when realy needed, and DO NOT call pop_first_pending_send_msg and pop_all_pending_send_msg during sync message sending.
// we must avoid to do sync message sending and receiving in service threads.
// if prior sync_recv_msg() not returned, the second sync_recv_msg() will return false immediately.
// with macro ASCS_PASSIVE_RECV, in sync_recv_msg(), recv_msg() will be automatically called.
//Sync message sending and receiving are not tracked by tracked_executor, please note.
//No matter you're doing sync message sending or async message sending, you can do sync message receiving or async message receiving concurrently.
//configurations
#endif /* _ASCS_CONFIG_H_ */
......@@ -20,7 +20,7 @@
namespace ascs { namespace ext {
//protocol: length + body
class unpacker : public tcp::i_unpacker<std::string>
class unpacker : public i_unpacker<std::string>
{
public:
unpacker() {reset();}
......@@ -127,10 +127,11 @@ protected:
};
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
class udp_unpacker : public udp::i_unpacker<std::string>
class udp_unpacker : public i_unpacker<std::string>
{
public:
virtual msg_type parse_msg(size_t bytes_transferred) {assert(bytes_transferred <= ASCS_MSG_BUFFER_SIZE); return msg_type(raw_buff.data(), bytes_transferred);}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{assert(bytes_transferred <= ASCS_MSG_BUFFER_SIZE); msg_can.emplace_back(raw_buff.data(), bytes_transferred); return true;}
#ifdef ASCS_SCATTERED_RECV_BUFFER
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
......@@ -147,10 +148,10 @@ protected:
//protocol: length + body
//T can be replaceable_buffer (an alias of auto_buffer) or shared_buffer, the latter makes output messages seemingly copyable,
template<typename T = replaceable_buffer>
class replaceable_unpacker : public ascs::tcp::i_unpacker<T>
class replaceable_unpacker : public ascs::i_unpacker<T>
{
private:
typedef ascs::tcp::i_unpacker<T> super;
typedef ascs::i_unpacker<T> super;
public:
virtual void reset() {unpacker_.reset();}
......@@ -178,19 +179,20 @@ protected:
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
//T can be replaceable_buffer (an alias of auto_buffer) or shared_buffer, the latter makes output messages seemingly copyable.
template<typename T = replaceable_buffer>
class replaceable_udp_unpacker : public ascs::udp::i_unpacker<T>
class replaceable_udp_unpacker : public ascs::i_unpacker<T>
{
private:
typedef ascs::udp::i_unpacker<T> super;
typedef ascs::i_unpacker<T> super;
public:
virtual typename super::msg_type parse_msg(size_t bytes_transferred)
virtual typename super::msg_type parse_msg(size_t bytes_transferred, typename super::container_type& msg_can)
{
assert(bytes_transferred <= ASCS_MSG_BUFFER_SIZE);
auto raw_msg = new string_buffer();
raw_msg->assign(raw_buff.data(), bytes_transferred);
return typename super::msg_type(raw_msg);
msg_can.emplace_back(raw_msg);
return true;
}
#ifdef ASCS_SCATTERED_RECV_BUFFER
......@@ -208,7 +210,7 @@ protected:
//protocol: length + body
//this unpacker demonstrate how to forbid memory replication while parsing msgs (let asio write msg directly).
//not support unstripped messages, please note (you can fix this defect if you like).
class non_copy_unpacker : public tcp::i_unpacker<basic_buffer>
class non_copy_unpacker : public i_unpacker<basic_buffer>
{
public:
non_copy_unpacker() {reset();}
......@@ -286,7 +288,7 @@ private:
//protocol: fixed length
//non-copy
class fixed_length_unpacker : public tcp::i_unpacker<basic_buffer>
class fixed_length_unpacker : public i_unpacker<basic_buffer>
{
public:
fixed_length_unpacker() : _fixed_length(1024) {}
......@@ -324,7 +326,7 @@ private:
};
//protocol: [prefix] + body + suffix
class prefix_suffix_unpacker : public tcp::i_unpacker<std::string>
class prefix_suffix_unpacker : public i_unpacker<std::string>
{
public:
prefix_suffix_unpacker() {reset();}
......@@ -440,7 +442,7 @@ private:
};
//protocol: stream (non-protocol)
class stream_unpacker : public tcp::i_unpacker<std::string>
class stream_unpacker : public i_unpacker<std::string>
{
public:
virtual void reset() {}
......
......@@ -19,7 +19,7 @@
namespace ascs
{
template<typename Socket, typename Packer, typename Unpacker, typename InMsgType, typename OutMsgType,
template<typename Socket, typename Packer, typename InMsgType, typename OutMsgType,
template<typename, typename> class InQueue, template<typename> class InContainer,
template<typename, typename> class OutQueue, template<typename> class OutContainer>
class socket : public timer<tracked_executor>
......@@ -47,9 +47,15 @@ protected:
sending = false;
#ifdef ASCS_PASSIVE_RECV
reading = false;
#endif
#ifdef ASCS_SYNC_RECV
status = sync_recv_status::NOT_REQUESTED;
#endif
started_ = false;
dispatching = false;
#ifndef ASCS_DISPATCH_BATCH_MSG
dispatched = true;
#endif
recv_idle_began = false;
msg_resuming_interval_ = ASCS_MSG_RESUMING_INTERVAL;
msg_handling_interval_ = ASCS_MSG_HANDLING_INTERVAL;
......@@ -71,8 +77,14 @@ protected:
sending = false;
#ifdef ASCS_PASSIVE_RECV
reading = false;
#endif
#ifdef ASCS_SYNC_RECV
status = sync_recv_status::NOT_REQUESTED;
#endif
dispatching = false;
#ifndef ASCS_DISPATCH_BATCH_MSG
dispatched = true;
#endif
recv_idle_began = false;
clear_buffer();
}
......@@ -179,13 +191,50 @@ public:
bool is_send_buffer_available() const {return send_msg_buffer.size() < ASCS_MAX_MSG_NUM;}
//if you define macro ASCS_PASSIVE_RECV and call recv_msg greedily, the receiving buffer may overflow, this can exhaust all virtual memory,
//to avoid this problem, call recv_msg only if is_recv_buffer_available returns true.
//to avoid this problem, call recv_msg only if is_recv_buffer_available() returns true.
bool is_recv_buffer_available() const {return recv_msg_buffer.size() < ASCS_MAX_MSG_NUM;}
//don't use the packer but insert into send buffer directly
bool direct_send_msg(const InMsgType& msg, bool can_overflow = false) {return direct_send_msg(InMsgType(msg), can_overflow);}
bool direct_send_msg(const InMsgType& msg, bool can_overflow = false)
{return can_overflow || is_send_buffer_available() ? do_direct_send_msg(InMsgType(msg)) : false;}
bool direct_send_msg(InMsgType&& msg, bool can_overflow = false) {return can_overflow || is_send_buffer_available() ? do_direct_send_msg(std::move(msg)) : false;}
#ifdef ASCS_SYNC_SEND
//don't use the packer but insert into send buffer directly, then wait for the sending to finish.
bool direct_sync_send_msg(const InMsgType& msg, bool can_overflow = false)
{return can_overflow || is_send_buffer_available() ? do_direct_sync_send_msg(InMsgType(msg)) : false;}
bool direct_sync_send_msg(InMsgType&& msg, bool can_overflow = false) {return can_overflow || is_send_buffer_available() ? do_direct_sync_send_msg(std::move(msg)) : false;}
#endif
#ifdef ASCS_SYNC_RECV
bool sync_recv_msg(std::list<OutMsgType>& msg_can)
{
if (stopped())
return false;
std::unique_lock<std::mutex> lock(sync_recv_mutex);
if (sync_recv_status::NOT_REQUESTED != status)
return false;
#ifdef ASCS_PASSIVE_RECV
recv_msg();
#endif
status = sync_recv_status::REQUESTED;
sync_recv_cv.wait(lock);
auto re = sync_recv_status::RESPONDED == status;
status = sync_recv_status::NOT_REQUESTED;
if (re)
{
msg_can.clear();
msg_can.swap(temp_msg_can);
}
sync_recv_cv.notify_one();
return re;
}
#endif
//how many msgs waiting for sending or dispatching
GET_PENDING_MSG_NUM(get_pending_send_msg_num, send_msg_buffer)
GET_PENDING_MSG_NUM(get_pending_recv_msg_num, recv_msg_buffer)
......@@ -272,6 +321,9 @@ protected:
if (stopped())
{
#ifdef ASCS_SYNC_RECV
sync_recv_cv.notify_one();
#endif
on_close();
after_close();
}
......@@ -284,32 +336,42 @@ protected:
return true;
}
bool handle_msg(OutMsgType&& temp_msg)
bool handle_msg()
{
if (!temp_msg.empty())
{
++stat.recv_msg_sum;
stat.recv_byte_sum += temp_msg.size();
recv_msg_buffer.enqueue(out_msg(std::move(temp_msg)));
dispatch_msg();
}
#ifdef ASCS_PASSIVE_RECV
if (temp_msg_can.empty())
temp_msg_can.emplace_back(); //empty message, makes users always having the chance to call recv_msg().
#endif
return handled_msg();
}
#ifdef ASCS_SYNC_RECV
std::unique_lock<std::mutex> lock(sync_recv_mutex);
if (sync_recv_status::REQUESTED == status)
{
status = sync_recv_status::RESPONDED;
sync_recv_cv.notify_one();
template<typename T> bool handle_msg(T& temp_msg_can)
{
sync_recv_cv.wait(lock);
}
lock.unlock();
#endif
auto msg_num = temp_msg_can.size();
if (msg_num > 0)
{
#ifndef ASCS_PASSIVE_RECV
stat.recv_msg_sum += msg_num;
#endif
std::list<out_msg> temp_buffer(msg_num);
auto op_iter = temp_buffer.begin();
for (auto iter = temp_msg_can.begin(); iter != temp_msg_can.end(); ++op_iter, ++iter)
{
#ifdef ASCS_PASSIVE_RECV
if (!iter->empty())
++stat.recv_msg_sum;
#endif
stat.recv_byte_sum += iter->size();
op_iter->swap(*iter);
}
temp_msg_can.clear();
recv_msg_buffer.move_items_in(temp_buffer);
dispatch_msg();
......@@ -335,6 +397,30 @@ protected:
return true;
}
#ifdef ASCS_SYNC_SEND
bool do_direct_sync_send_msg(InMsgType&& msg)
{
if (stopped())
return false;
else if (msg.empty())
{
unified_out::error_out("found an empty message, please check your packer.");
return false;
}
auto unused = in_msg(std::move(msg), true);
auto cv = unused.cv;
send_msg_buffer.enqueue(std::move(unused));
if (!sending && is_ready())
send_msg();
std::unique_lock<std::mutex> lock(sync_send_mutex);
cv->wait(lock);
return true;
}
#endif
private:
virtual void recv_msg() = 0;
virtual void send_msg() = 0;
......@@ -400,7 +486,7 @@ private:
else
{
#else
if ((dispatching = !last_dispatch_msg.empty() || recv_msg_buffer.try_dequeue(last_dispatch_msg)))
if ((dispatching = !dispatched || recv_msg_buffer.try_dequeue(last_dispatch_msg)))
{
auto begin_time = statistic::now();
stat.dispatch_dealy_sum += begin_time - last_dispatch_msg.begin_time;
......@@ -415,6 +501,7 @@ private:
}
else
{
dispatched = true;
last_dispatch_msg.clear();
#endif
dispatching = false;
......@@ -448,6 +535,9 @@ private:
lowest_layer().close(ec);
}
change_timer_status(TIMER_DELAY_CLOSE, timer_info::TIMER_CANCELED);
#ifdef ASCS_SYNC_RECV
sync_recv_cv.notify_one();
#endif
on_close();
after_close();
set_async_calling(false);
......@@ -463,6 +553,7 @@ private:
protected:
struct statistic stat;
std::shared_ptr<i_packer<typename Packer::msg_type>> packer_;
std::list<OutMsgType> temp_msg_can;
in_queue_type send_msg_buffer;
volatile bool sending;
......@@ -473,8 +564,12 @@ protected:
private:
bool recv_idle_began;
volatile bool dispatching;
volatile bool started_; //has started or not
volatile bool dispatching;
#ifndef ASCS_DISPATCH_BATCH_MSG
bool dispatched;
out_msg last_dispatch_msg;
#endif
typename statistic::stat_time recv_idle_begin_time;
out_queue_type recv_msg_buffer;
......@@ -482,13 +577,21 @@ private:
uint_fast64_t _id;
Socket next_layer_;
#ifndef ASCS_DISPATCH_BATCH_MSG
out_msg last_dispatch_msg;
#endif
std::atomic_flag start_atomic;
asio::io_context::strand strand;
#ifdef ASCS_SYNC_SEND
std::mutex sync_send_mutex;
#endif
#ifdef ASCS_SYNC_RECV
enum sync_recv_status {NOT_REQUESTED, REQUESTED, RESPONDED};
volatile sync_recv_status status;
std::mutex sync_recv_mutex;
std::condition_variable sync_recv_cv;
#endif
unsigned msg_resuming_interval_, msg_handling_interval_;
};
......
......@@ -21,7 +21,7 @@ namespace ascs { namespace tcp {
template <typename Socket, typename Packer, typename Unpacker,
template<typename, typename> class InQueue, template<typename> class InContainer,
template<typename, typename> class OutQueue, template<typename> class OutContainer>
class socket_base : public socket<Socket, Packer, Unpacker, typename Packer::msg_type, typename Unpacker::msg_type, InQueue, InContainer, OutQueue, OutContainer>
class socket_base : public socket<Socket, Packer, typename Packer::msg_type, typename Unpacker::msg_type, InQueue, InContainer, OutQueue, OutContainer>
{
public:
typedef typename Packer::msg_type in_msg_type;
......@@ -30,7 +30,7 @@ public:
typedef typename Unpacker::msg_ctype out_msg_ctype;
private:
typedef socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type, InQueue, InContainer, OutQueue, OutContainer> super;
typedef socket<Socket, Packer, in_msg_type, out_msg_type, InQueue, InContainer, OutQueue, OutContainer> super;
protected:
enum link_status {CONNECTED, FORCE_SHUTTING_DOWN, GRACEFUL_SHUTTING_DOWN, BROKEN};
......@@ -122,12 +122,21 @@ public:
///////////////////////////////////////////////////
//msg sending interface
TCP_SEND_MSG(send_msg, false) //use the packer with native = false to pack the msgs
TCP_SEND_MSG(send_native_msg, true) //use the packer with native = true to pack the msgs
TCP_SEND_MSG(send_msg, false, do_direct_send_msg) //use the packer with native = false to pack the msgs
TCP_SEND_MSG(send_native_msg, true, do_direct_send_msg) //use the packer with native = true to pack the msgs
//guarantee send msg successfully even if can_overflow equal to false
//success at here just means put the msg into tcp::socket_base's send buffer
TCP_SAFE_SEND_MSG(safe_send_msg, send_msg)
TCP_SAFE_SEND_MSG(safe_send_native_msg, send_native_msg)
#ifdef ASCS_SYNC_SEND
TCP_SEND_MSG(sync_send_msg, false, do_direct_sync_send_msg) //use the packer with native = false to pack the msgs
TCP_SEND_MSG(sync_send_native_msg, true, do_direct_sync_send_msg) //use the packer with native = true to pack the msgs
//guarantee send msg successfully even if can_overflow equal to false
//success at here just means put the msg into tcp::socket_base's send buffer
TCP_SAFE_SEND_MSG(sync_safe_send_msg, sync_send_msg)
TCP_SAFE_SEND_MSG(sync_safe_send_native_msg, sync_send_native_msg)
#endif
//msg sending interface
///////////////////////////////////////////////////
......@@ -218,30 +227,33 @@ private:
void recv_handler(const asio::error_code& ec, size_t bytes_transferred)
{
#ifdef ASCS_PASSIVE_RECV
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
if (ec)
this->on_recv_error(ec);
else if (bytes_transferred > 0)
if (!ec && bytes_transferred > 0)
{
this->stat.last_recv_time = time(nullptr);
typename Unpacker::container_type temp_msg_can;
auto_duration dur(this->stat.unpack_time_sum);
auto unpack_ok = unpacker_->parse_msg(bytes_transferred, temp_msg_can);
auto unpack_ok = unpacker_->parse_msg(bytes_transferred, this->temp_msg_can);
dur.end();
if (!unpack_ok)
on_unpack_error(); //the user will decide whether to reset the unpacker or not in this callback
if (this->handle_msg(temp_msg_can)) //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false
#ifdef ASCS_PASSIVE_RECV
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
if (this->handle_msg()) //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false
do_recv_msg(); //receive msg in sequence
}
#ifndef ASCS_PASSIVE_RECV
else
do_recv_msg(); //receive msg in sequence
{
#ifdef ASCS_PASSIVE_RECV
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
if (ec)
this->on_recv_error(ec);
else if (this->handle_msg()) //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false
do_recv_msg(); //receive msg in sequence
}
}
bool do_send_msg(bool in_strand)
......@@ -292,6 +304,9 @@ private:
this->stat.send_byte_sum += bytes_transferred;
this->stat.send_time_sum += statistic::now() - last_send_msg.front().begin_time;
this->stat.send_msg_sum += last_send_msg.size();
#ifdef ASCS_SYNC_SEND
ascs::do_something_to_all(last_send_msg, [](typename super::in_msg& item) {if (item.cv) item.cv->notify_one();});
#endif
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
this->on_msg_send(last_send_msg.front());
#endif
......
......@@ -123,16 +123,16 @@ public:
DO_SOMETHING_TO_ONE_MUTEX(timer_can, timer_can_mutex)
protected:
bool start_timer(timer_info& ti)
bool start_timer(timer_info& ti, unsigned interval_ms)
{
if (!ti.call_back)
return false;
ti.status = timer_info::TIMER_STARTED;
#if ASIO_VERSION >= 101100
ti.timer.expires_after(milliseconds(ti.interval_ms));
ti.timer.expires_after(milliseconds(interval_ms));
#else
ti.timer.expires_from_now(milliseconds(ti.interval_ms));
ti.timer.expires_from_now(milliseconds(interval_ms));
#endif
//if timer already started, this will cancel it first
......@@ -150,9 +150,7 @@ protected:
if (elapsed_ms > ti.interval_ms)
elapsed_ms %= ti.interval_ms;
ti.interval_ms -= elapsed_ms;
this->start_timer(ti);
ti.interval_ms += elapsed_ms;
this->start_timer(ti, ti.interval_ms - elapsed_ms);
}
#else
if (!ec && ti.call_back(ti.id) && timer_info::TIMER_STARTED == ti.status)
......@@ -164,6 +162,7 @@ protected:
return true;
}
bool start_timer(timer_info& ti) {return start_timer(ti, ti.interval_ms);}
void stop_timer(timer_info& ti)
{
......
......@@ -21,7 +21,7 @@ namespace ascs { namespace udp {
template <typename Packer, typename Unpacker, typename Socket = asio::ip::udp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class socket_base : public socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer>
class socket_base : public socket<Socket, Packer, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer>
{
public:
typedef udp_msg<typename Packer::msg_type> in_msg_type;
......@@ -30,7 +30,7 @@ public:
typedef const out_msg_type out_msg_ctype;
private:
typedef socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type, InQueue, InContainer, OutQueue, OutContainer> super;
typedef socket<Socket, Packer, in_msg_type, out_msg_type, InQueue, InContainer, OutQueue, OutContainer> super;
public:
socket_base(asio::io_context& io_context_) : super(io_context_), unpacker_(std::make_shared<Unpacker>()), strand(io_context_) {}
......@@ -103,12 +103,21 @@ public:
///////////////////////////////////////////////////
//msg sending interface
UDP_SEND_MSG(send_msg, false) //use the packer with native = false to pack the msgs
UDP_SEND_MSG(send_native_msg, true) //use the packer with native = true to pack the msgs
UDP_SEND_MSG(send_msg, false, do_direct_send_msg) //use the packer with native = false to pack the msgs
UDP_SEND_MSG(send_native_msg, true, do_direct_send_msg) //use the packer with native = true to pack the msgs
//guarantee send msg successfully even if can_overflow equal to false
//success at here just means put the msg into udp::socket_base's send buffer
UDP_SAFE_SEND_MSG(safe_send_msg, send_msg)
UDP_SAFE_SEND_MSG(safe_send_native_msg, send_native_msg)
#ifdef ASCS_SYNC_SEND
UDP_SEND_MSG(sync_send_msg, false, do_direct_sync_send_msg) //use the packer with native = false to pack the msgs
UDP_SEND_MSG(sync_send_native_msg, true, do_direct_sync_send_msg) //use the packer with native = true to pack the msgs
//guarantee send msg successfully even if can_overflow equal to false
//success at here just means put the msg into tcp::socket_base's send buffer
UDP_SAFE_SEND_MSG(sync_safe_send_msg, sync_send_msg)
UDP_SAFE_SEND_MSG(sync_safe_send_native_msg, sync_send_native_msg)
#endif
//msg sending interface
///////////////////////////////////////////////////
......@@ -167,29 +176,34 @@ private:
void recv_handler(const asio::error_code& ec, size_t bytes_transferred)
{
if (!ec && bytes_transferred > 0)
{
this->stat.last_recv_time = time(nullptr);
typename Unpacker::container_type msg_can;
unpacker_->parse_msg(bytes_transferred, msg_can);
#ifdef ASCS_PASSIVE_RECV
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
auto keep_reading = !ec;
if (ec)
ascs::do_something_to_all(msg_can, [this](typename Unpacker::msg_type& msg) {this->temp_msg_can.emplace_back(this->temp_addr, std::move(msg));});
if (this->handle_msg()) //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false
do_recv_msg(); //receive msg in sequence
}
else
{
#ifdef _MSC_VER
if (asio::error::connection_refused == ec || asio::error::connection_reset == ec)
keep_reading = true;
else
#ifdef ASCS_PASSIVE_RECV
this->reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
#if defined(_MSC_VER) || defined(__CYGWIN__) || defined(__MINGW32__) || defined(__MINGW64__)
if (ec && asio::error::connection_refused != ec && asio::error::connection_reset != ec)
#else
if (ec)
#endif
on_recv_error(ec);
else if (this->handle_msg()) //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false
do_recv_msg(); //receive msg in sequence
}
else if (bytes_transferred > 0)
{
this->stat.last_recv_time = time(nullptr);
keep_reading = this->handle_msg(in_msg_type(temp_addr, unpacker_->parse_msg(bytes_transferred))); //if macro ASCS_PASSIVE_RECV been defined, handle_msg will always return false
}
#ifndef ASCS_PASSIVE_RECV
if (keep_reading)
do_recv_msg(); //receive msg in sequence
#endif
}
bool do_send_msg(bool in_strand)
......@@ -217,20 +231,18 @@ private:
this->stat.last_send_time = time(nullptr);
this->stat.send_byte_sum += bytes_transferred;
this->stat.send_time_sum += statistic::now() - last_send_msg.begin_time;
++this->stat.send_msg_sum;
if (!last_send_msg.empty())
{
assert(bytes_transferred == last_send_msg.size());
this->stat.send_time_sum += statistic::now() - last_send_msg.begin_time;
#ifdef ASCS_SYNC_SEND
if (last_send_msg.cv) last_send_msg.cv->notify_one();
#endif
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
this->on_msg_send(last_send_msg);
this->on_msg_send(last_send_msg);
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
if (this->send_msg_buffer.empty())
this->on_all_msg_send(last_send_msg);
if (this->send_msg_buffer.empty())
this->on_all_msg_send(last_send_msg);
#endif
}
}
else
this->on_send_error(ec);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册