提交 fd6c9977 编写于 作者: Y youngwolf

1.2.4 release.

上级 01d43fe5
......@@ -27,7 +27,7 @@ using namespace ascs::ext::tcp;
class echo_socket : public client_socket
{
public:
echo_socket(asio::io_service& io_service_) : client_socket(io_service_), msg_len(ASCS_MSG_BUFFER_SIZE - ASCS_HEAD_LEN) {unpacker()->stripped(false);}
echo_socket(asio::io_context& io_context_) : client_socket(io_context_), msg_len(ASCS_MSG_BUFFER_SIZE - ASCS_HEAD_LEN) {unpacker()->stripped(false);}
void begin(size_t msg_len_) {msg_len = msg_len_;}
void check_delay(float max_delay) {if (is_connected() && last_send_time.elapsed() > max_delay) force_shutdown();}
......@@ -72,14 +72,6 @@ class echo_client : public multi_client_base<echo_socket>
public:
echo_client(service_pump& service_pump_) : multi_client_base<echo_socket>(service_pump_) {}
statistic get_statistic()
{
statistic stat;
do_something_to_all([&stat](object_ctype& item) {stat += item->get_statistic();});
return stat;
}
void begin(float max_delay, size_t msg_len)
{
do_something_to_all([msg_len](object_ctype& item) {item->begin(msg_len);});
......
......@@ -42,14 +42,6 @@ class echo_server : public server_base<echo_socket>
public:
echo_server(service_pump& service_pump_) : server_base<echo_socket>(service_pump_) {}
statistic get_statistic()
{
statistic stat;
do_something_to_all([&stat](object_ctype& item) {stat += item->get_statistic();});
return stat;
}
protected:
virtual bool on_accept(object_ctype& socket_ptr) {asio::ip::tcp::no_delay option(true); socket_ptr->lowest_layer().set_option(option); return true;}
};
......
......@@ -98,7 +98,7 @@ TCP_SEND_MSG_CALL_SWITCH(FUNNAME, void)
class echo_socket : public client_socket
{
public:
echo_socket(asio::io_service& io_service_) : client_socket(io_service_), recv_bytes(0), recv_index(0)
echo_socket(asio::io_context& io_context_) : client_socket(io_context_), recv_bytes(0), recv_index(0)
{
#if 2 == PACKER_UNPACKER_TYPE
std::dynamic_pointer_cast<ASCS_DEFAULT_UNPACKER>(unpacker())->fixed_length(1024);
......@@ -184,14 +184,6 @@ public:
return total_recv_bytes;
}
statistic get_statistic()
{
statistic stat;
do_something_to_all([&stat](object_ctype& item) {stat += item->get_statistic();});
return stat;
}
void clear_status() {do_something_to_all([](object_ctype& item) {item->clear_status();});}
void begin(size_t msg_num, size_t msg_len, char msg_fill) {do_something_to_all([=](object_ctype& item) {item->begin(msg_num, msg_len, msg_fill);});}
......
......@@ -157,14 +157,6 @@ class echo_server : public server_base<echo_socket, object_pool<echo_socket>, i_
public:
echo_server(service_pump& service_pump_) : server_base(service_pump_) {}
statistic get_statistic()
{
statistic stat;
do_something_to_all([&stat](object_ctype& item) {stat += item->get_statistic();});
return stat;
}
//from i_echo_server, pure virtual function, we must implement it.
virtual void test() {/*puts("in echo_server::test()");*/}
};
......
......@@ -17,7 +17,7 @@ extern fl_type file_size;
class file_socket : public base_socket, public client_socket
{
public:
file_socket(asio::io_service& io_service_) : client_socket(io_service_), index(-1) {}
file_socket(asio::io_context& io_context_) : client_socket(io_context_), index(-1) {}
virtual ~file_socket() {clear();}
//reset all, be ensure that there's no any operations performed on this file_socket when invoke it
......
......@@ -54,7 +54,7 @@ std::atomic_ushort completed_session_num;
class echo_socket : public client_socket
{
public:
echo_socket(asio::io_service& io_service_) : client_socket(io_service_) {}
echo_socket(asio::io_context& io_context_) : client_socket(io_context_) {}
void begin(size_t msg_num, const char* msg, size_t msg_len)
{
......@@ -119,14 +119,6 @@ class echo_client : public multi_client_base<echo_socket>
public:
echo_client(service_pump& service_pump_) : multi_client_base<echo_socket>(service_pump_) {}
statistic get_statistic()
{
statistic stat;
do_something_to_all([&stat](object_ctype& item) {stat += item->get_statistic();});
return stat;
}
void begin(size_t msg_num, const char* msg, size_t msg_len) {do_something_to_all([=](object_ctype& item) {item->begin(msg_num, msg, msg_len);});}
};
......
......@@ -96,14 +96,6 @@ class echo_server : public server_base<echo_socket>
public:
echo_server(service_pump& service_pump_) : server_base<echo_socket>(service_pump_) {}
statistic get_statistic()
{
statistic stat;
do_something_to_all([&stat](object_ctype& item) {stat += item->get_statistic();});
return stat;
}
protected:
virtual bool on_accept(object_ctype& socket_ptr) {asio::ip::tcp::no_delay option(true); socket_ptr->lowest_layer().set_option(option); return true;}
};
......
......@@ -274,7 +274,21 @@ struct statistic
#endif
statistic() {reset();}
void reset_number() {send_msg_sum = send_byte_sum = 0; recv_msg_sum = recv_byte_sum = 0;}
void reset_number()
{
send_msg_sum = 0;
send_byte_sum = 0;
recv_msg_sum = 0;
recv_byte_sum = 0;
last_send_time = 0;
last_recv_time = 0;
establish_time = 0;
break_time = 0;
}
#ifdef ASCS_FULL_STATISTIC
void reset() {reset_number(); reset_duration();}
void reset_duration()
......@@ -362,6 +376,12 @@ struct statistic
#endif
stat_duration handle_time_2_sum; //on_msg_handle consumed time, this indicate the efficiency of msg handling
stat_duration unpack_time_sum; //udp::socket_base will not gather this item
time_t last_send_time; //include heartbeat
time_t last_recv_time; //include heartbeat
time_t establish_time; //time of link establishment
time_t break_time; //time of link broken
};
class auto_duration
......
......@@ -172,7 +172,7 @@
* 2017.7.9 version 1.2.2
*
* SPECIAL ATTENTION (incompatible with old editions):
* No error_code will be presented anymore when call io_service::run, suggest to define macro ASCS_ENHANCED_STABILITY.
* No error_code will be presented anymore when call io_context::run, suggest to define macro ASCS_ENHANCED_STABILITY.
*
* HIGHLIGHT:
* Add two demos for concurrent test.
......@@ -227,6 +227,42 @@
* Rename tcp::client_base to tcp::multi_client_base, ext::tcp::client to ext::tcp::multi_client, udp::service_base to udp::multi_service_base,
* ext::udp::service to ext::udp::multi_service. Old ones are still available, but have became alias.
*
* ===============================================================
* 2017.9.17 version 1.2.4
*
* SPECIAL ATTENTION (incompatible with old editions):
* Function object_pool::invalid_object_pop only pop obsoleted objects with no additional reference.
* socket::stat.last_recv_time will not be updated before tcp::socket_base::on_connect anymore.
* For ssl socket, on_handshake will be invoked before on_connect (before, on_connect is before on_handshake).
*
* HIGHLIGHT:
*
* FIX:
* If start the same timer and return false in the timer's call_back, its status will be set to TIMER_CANCELED (the right value should be TIMER_OK).
* In old compilers (for example gcc 4.7), std::list::splice needs a non-const iterator as the insert point.
* If call stop_service after service_pump stopped, timer TIMER_DELAY_CLOSE will be left behind and be triggered after the next start_service,
* this will bring disorders to ascs::socket.
*
* ENHANCEMENTS:
* During congestion controlling, retry interval can be changed at runtime, you can use this feature for performance tuning,
* see macro ASCS_MSG_HANDLING_INTERVAL_STEP1 and ASCS_MSG_HANDLING_INTERVAL_STEP2 for more details.
* Avoid decreasing the number of service thread to less than one.
* Add a helper function object_pool::get_statistic.
* Add another overload of function object_pool::invalid_object_pop.
* Introduce asio::defer to object, be careful to use it.
* Add link's break time and establish time to the statistic object.
* Move virtual function client_socket_base::on_connect to tcp::socket_base, so server_socket_base will have it too (and ssl sockets).
*
* DELETION:
* Drop useless variables which need macro ASCS_DECREASE_THREAD_AT_RUNTIME to be defined.
*
* REFACTORING:
* Move variable last_send_time and last_recv_time from ascs::socket to ascs::socet::stat (a statistic object).
* Move common operations in client_socket_base::do_start and server_socket_base::do_start to tcp::socket_base::do_start and socket::do_start.
*
* REPLACEMENTS:
* Always use io_context instead of io_service (before asio 1.11, io_context will be a typedef of io_service).
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -236,8 +272,8 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10203 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.2.3"
#define ASCS_VER 10204 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.2.4"
//asio and compiler check
#ifdef _MSC_VER
......@@ -267,12 +303,13 @@
static_assert(ASIO_VERSION >= 101001, "ascs needs asio 1.10.1 or higher.");
#if ASIO_VERSION >= 101100 && defined(ASIO_NO_DEPRECATED)
#define io_service io_context
#if ASIO_VERSION < 101100
namespace asio {typedef io_service io_context;}
#endif
//asio and compiler check
//configurations
#ifndef ASCS_SERVER_IP
#define ASCS_SERVER_IP "127.0.0.1"
#endif
......@@ -298,7 +335,7 @@ static_assert(ASCS_MAX_MSG_NUM > 0, "message capacity must be bigger than zero."
//don't write any logs.
//#define ASCS_NO_UNIFIED_OUT
//if defined, service_pump will catch exceptions for asio::io_service::run().
//if defined, service_pump will catch exceptions for asio::io_context::run().
//#define ASCS_ENHANCED_STABILITY
//if defined, asio::steady_timer will be used in ascs::timer, otherwise, asio::system_timer will be used.
......@@ -487,6 +524,24 @@ static_assert(ASCS_HEARTBEAT_MAX_ABSENCE > 0, "heartbeat absence must be bigger
//#define ASCS_DECREASE_THREAD_AT_RUNTIME
//enable decreasing service thread at runtime.
#ifndef ASCS_MSG_HANDLING_INTERVAL_STEP1
#define ASCS_MSG_HANDLING_INTERVAL_STEP1 50 //milliseconds
#endif
static_assert(ASCS_MSG_HANDLING_INTERVAL_STEP1 >= 0, "the interval of msg handling step 1 must be bigger than or equal to zero.");
//msg handling step 1
//move msg from temp_msg_buffer to recv_msg_buffer (because on_msg return false or macro ASCS_FORCE_TO_USE_MSG_RECV_BUFFER been defined)
//if above process failed, retry it after ASCS_MSG_HANDLING_INTERVAL_STEP1 milliseconds later.
//this value can be changed via msg_handling_interval_step1(size_t) at runtime.
#ifndef ASCS_MSG_HANDLING_INTERVAL_STEP2
#define ASCS_MSG_HANDLING_INTERVAL_STEP2 50 //milliseconds
#endif
static_assert(ASCS_MSG_HANDLING_INTERVAL_STEP2 >= 0, "the interval of msg handling step 2 must be bigger than or equal to zero.");
//msg handling step 2
//call on_msg_handle, if failed, retry it after ASCS_MSG_HANDLING_INTERVAL_STEP2 milliseconds later.
//this value can be changed via msg_handling_interval_step2(size_t) at runtime.
//configurations
#endif /* _ASCS_CONFIG_H_ */
......@@ -97,7 +97,11 @@ public:
void splice(iterator _Where, _Mybase& _Right) {s += _Right.size(); impl.splice(_Where, _Right);} //just satisfy old compilers (for example gcc 4.7)
void splice(const_iterator _Where, _Mybase& _Right) {s += _Right.size(); impl.splice(_Where, _Right);}
//please add corresponding overloads which take non-const iterators for following 5 functions,
void splice(iterator _Where, _Myt& _Right) {s += _Right.size(); _Right.s = 0; impl.splice(_Where, _Right.impl);} //just satisfy old compilers (for example gcc 4.7)
void splice(const_iterator _Where, _Myt& _Right) {s += _Right.size(); _Right.s = 0; impl.splice(_Where, _Right.impl);}
//please add corresponding overloads which take non-const iterators for following 4 functions,
//i didn't provide them because ascs doesn't use them and it violated the standard, just some old compilers need them.
void splice(const_iterator _Where, _Mybase& _Right, const_iterator _First) {++s; impl.splice(_Where, _Right, _First);}
void splice(const_iterator _Where, _Mybase& _Right, const_iterator _First, const_iterator _Last)
......@@ -109,7 +113,6 @@ public:
impl.splice(_Where, _Right, _First, _Last);
}
void splice(const_iterator _Where, _Myt& _Right) {s += _Right.size(); _Right.s = 0; impl.splice(_Where, _Right.impl);}
void splice(const_iterator _Where, _Myt& _Right, const_iterator _First) {++s; --_Right.s; impl.splice(_Where, _Right.impl, _First);}
void splice(const_iterator _Where, _Myt& _Right, const_iterator _First, const_iterator _Last)
{
......
......@@ -24,7 +24,7 @@ protected:
virtual ~object() {}
public:
bool stopped() const {return io_service_.stopped();}
bool stopped() const {return io_context_.stopped();}
#if 0 == ASCS_DELAY_CLOSE
typedef std::function<void(const asio::error_code&)> handler_with_error;
......@@ -32,11 +32,14 @@ public:
#if (defined(_MSC_VER) && _MSC_VER > 1800) || (defined(__cplusplus) && __cplusplus > 201103L)
#if ASIO_VERSION >= 101100
template<typename F> void post(F&& handler) {asio::post(io_service_, [unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post(const F& handler) {asio::post(io_service_, [unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void post(F&& handler) {asio::post(io_context_, [unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post(const F& handler) {asio::post(io_context_, [unused(this->async_call_indicator), handler]() {handler();});}
//Don't use defer function unless you fully understand asio::defer, which was introduced in asio 1.11
template<typename F> void defer(F&& handler) {asio::defer(io_context_, [unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void defer(const F& handler) {asio::defer(io_context_, [unused(this->async_call_indicator), handler]() {handler();});}
#else
template<typename F> void post(F&& handler) {io_service_.post([unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post(const F& handler) {io_service_.post([unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void post(F&& handler) {io_context_.post([unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post(const F& handler) {io_context_.post([unused(this->async_call_indicator), handler]() {handler();});}
#endif
template<typename F> handler_with_error make_handler_error(F&& handler) const {return [unused(this->async_call_indicator), handler(std::move(handler))](const auto& ec) {handler(ec);};}
......@@ -48,9 +51,11 @@ public:
{return [unused(this->async_call_indicator), handler](const auto& ec, auto bytes_transferred) {handler(ec, bytes_transferred);};}
#else
#if ASIO_VERSION >= 101100
template<typename F> void post(const F& handler) {auto unused(async_call_indicator); asio::post(io_service_, [=]() {handler();});}
template<typename F> void post(const F& handler) {auto unused(async_call_indicator); asio::post(io_context_, [=]() {handler();});}
//Don't use defer function unless you fully understand asio::defer, which was introduced in asio 1.11
template<typename F> void defer(const F& handler) {auto unused(async_call_indicator); asio::defer(io_context_, [=]() {handler();});}
#else
template<typename F> void post(const F& handler) {auto unused(async_call_indicator); io_service_.post([=]() {handler();});}
template<typename F> void post(const F& handler) {auto unused(async_call_indicator); io_context_.post([=]() {handler();});}
#endif
template<typename F> handler_with_error make_handler_error(const F& handler) const {auto unused(async_call_indicator); return [=](const asio::error_code& ec) {handler(ec);};}
template<typename F> handler_with_error_size make_handler_error_size(const F& handler) const
......@@ -62,15 +67,18 @@ public:
inline void set_async_calling(bool) {}
protected:
object(asio::io_service& _io_service_) : async_call_indicator(std::make_shared<char>('\0')), io_service_(_io_service_) {}
object(asio::io_context& _io_context_) : async_call_indicator(std::make_shared<char>('\0')), io_context_(_io_context_) {}
std::shared_ptr<char> async_call_indicator;
#else
#if ASIO_VERSION >= 101100
template<typename F> void post(F&& handler) {asio::post(io_service_, std::move(handler));}
template<typename F> void post(const F& handler) {asio::post(io_service_, handler);}
template<typename F> void post(F&& handler) {asio::post(io_context_, std::move(handler));}
template<typename F> void post(const F& handler) {asio::post(io_context_, handler);}
//Don't use defer function unless you fully understand asio::defer, which was introduced in asio 1.11
template<typename F> void defer(F&& handler) {asio::defer(io_context_, std::move(handler));}
template<typename F> void defer(const F& handler) {asio::defer(io_context_, handler);}
#else
template<typename F> void post(F&& handler) {io_service_.post(std::move(handler));}
template<typename F> void post(const F& handler) {io_service_.post(handler);}
template<typename F> void post(F&& handler) {io_context_.post(std::move(handler));}
template<typename F> void post(const F& handler) {io_context_.post(handler);}
#endif
template<typename F> inline F&& make_handler_error(F&& f) const {return std::move(f);}
......@@ -84,11 +92,11 @@ protected:
inline void set_async_calling(bool value) {async_calling = value;}
protected:
object(asio::io_service& _io_service_) : async_calling(false), io_service_(_io_service_) {}
object(asio::io_context& _io_context_) : async_calling(false), io_context_(_io_context_) {}
bool async_calling;
#endif
asio::io_service& io_service_;
asio::io_context& io_context_;
};
} //namespace
......
......@@ -101,44 +101,28 @@ protected:
{
assert(object_ptr && !object_ptr->is_equal_to(-1));
std::unique_lock<std::mutex> lock(invalid_object_can_mutex);
auto iter = std::find_if(std::begin(invalid_object_can), std::end(invalid_object_can), [id](object_ctype& item) {return item->is_equal_to(id);});
//cannot use invalid_object_pop(uint_fast64_t), it's too arbitrary
if (iter != std::end(invalid_object_can) && (*iter).unique() && (*iter)->obsoleted())
auto old_object_ptr = invalid_object_pop(id);
if (old_object_ptr)
{
auto invalid_object_ptr(std::move(*iter));
invalid_object_can.erase(iter);
lock.unlock();
assert(!find(id));
std::lock_guard<std::mutex> lock(object_can_mutex);
object_can.erase(object_ptr->id());
object_ptr->id(id);
object_can.emplace(id, object_ptr); //must succeed
return invalid_object_ptr;
}
return object_type();
return old_object_ptr;
}
#if defined(ASCS_REUSE_OBJECT) && !defined(ASCS_RESTORE_OBJECT)
object_type reuse_object()
{
std::unique_lock<std::mutex> lock(invalid_object_can_mutex);
for (auto iter = std::begin(invalid_object_can); iter != std::end(invalid_object_can); ++iter)
if ((*iter).unique() && (*iter)->obsoleted())
{
auto object_ptr(std::move(*iter));
invalid_object_can.erase(iter);
lock.unlock();
object_ptr->reset();
return object_ptr;
}
auto object_ptr = invalid_object_pop();
if (object_ptr)
object_ptr->reset();
return object_type();
return object_ptr;
}
template<typename Arg>
......@@ -237,7 +221,7 @@ public:
{
std::lock_guard<std::mutex> lock(invalid_object_can_mutex);
auto iter = std::find_if(std::begin(invalid_object_can), std::end(invalid_object_can), [id](object_ctype& item) {return item->is_equal_to(id);});
if (iter != std::end(invalid_object_can))
if (iter != std::end(invalid_object_can) && (*iter).unique() && (*iter)->obsoleted())
{
auto object_ptr(std::move(*iter));
invalid_object_can.erase(iter);
......@@ -246,7 +230,19 @@ public:
return object_type();
}
void list_all_object() {do_something_to_all([](object_ctype& item) {item->show_info("", "");});}
//this method has linear complexity, please note.
object_type invalid_object_pop()
{
std::unique_lock<std::mutex> lock(invalid_object_can_mutex);
for (auto iter = std::begin(invalid_object_can); iter != std::end(invalid_object_can); ++iter)
if ((*iter).unique() && (*iter)->obsoleted())
{
auto object_ptr(std::move(*iter));
invalid_object_can.erase(iter);
return object_ptr;
}
return object_type();
}
//Kick out obsoleted objects
//Consider the following assumptions:
......@@ -282,7 +278,7 @@ public:
//free a specific number of objects
//if you used object pool(define ASCS_REUSE_OBJECT or ASCS_RESTORE_OBJECT), you can manually call this function to free some objects
// after the object pool(invalid_object_size()) goes big enough for memory saving (because the objects in invalid_object_can
// after the object pool(invalid_object_size()) gets big enough for memory saving (because the objects in invalid_object_can
// are waiting for reusing and will never be freed).
//if you don't used object pool, object_pool will invoke this function automatically and periodically, so you don't need to invoke this function exactly
//return affected object number.
......@@ -308,6 +304,9 @@ public:
return num_affected;
}
void list_all_object() {do_something_to_all([](object_ctype& item) {item->show_info("", "");});}
statistic get_statistic() {statistic stat; do_something_to_all([&](object_ctype& item) {stat += item->get_statistic();}); return stat;}
template<typename _Predicate> void do_something_to_all(const _Predicate& __pred)
{std::lock_guard<std::mutex> lock(object_can_mutex); for (typename container_type::value_type& item : object_can) __pred(item.second);}
......@@ -322,7 +321,7 @@ protected:
size_t max_size_;
//because all objects are dynamic created and stored in object_can, maybe when receiving error occur
//(you are recommended to delete the object from object_can, for example via i_server::del_socket), some other asynchronous calls are still queued in asio::io_service,
//(you are recommended to delete the object from object_can, for example via i_server::del_socket), some other asynchronous calls are still queued in asio::io_context,
//and will be dequeued in the future, we must guarantee these objects not be freed from the heap or reused, so we move these objects from object_can to invalid_object_can,
//and free them from the heap or reuse them in the near future.
//if ASCS_CLEAR_OBJECT_INTERVAL been defined, clear_obsoleted_object() will be invoked automatically and periodically to move all invalid objects into invalid_object_can.
......
......@@ -18,7 +18,7 @@
namespace ascs
{
class service_pump : public asio::io_service
class service_pump : public asio::io_context
{
public:
class i_service
......@@ -60,7 +60,18 @@ public:
typedef const object_type object_ctype;
typedef std::list<object_type> container_type;
service_pump() : started(false), real_thread_num(0), del_thread_num(0), del_thread_req(false) {}
service_pump() : started(false)
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
, real_thread_num(0), del_thread_num(0), del_thread_req(false)
#endif
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
#if ASIO_VERSION >= 101100
, work(asio::make_work_guard(*this))
#else
, work(std::make_shared<asio::io_service::work>(*this))
#endif
#endif
{}
virtual ~service_pump() {stop_service();}
object_type find(int id)
......@@ -159,20 +170,13 @@ public:
void add_service_thread(int thread_num) {for (auto i = 0; i < thread_num; ++i) service_threads.emplace_back([this]() {this->run();});}
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
void del_service_thread(int thread_num) {if (thread_num > 0) {del_thread_num.fetch_add(thread_num, std::memory_order_relaxed); del_thread_req = true;}}
int service_thread_num() const {return real_thread_num.load(std::memory_order_relaxed);}
void del_service_thread(int thread_num) {if (thread_num > 0) {del_thread_num += thread_num; del_thread_req = true;}}
int service_thread_num() const {return real_thread_num;}
#endif
protected:
void do_service(int thread_num)
{
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
#if ASIO_VERSION >= 101100
work = std::make_shared<asio::executor_work_guard<executor_type>>(get_executor());
#else
work = std::make_shared<asio::io_service::work>(*this);
#endif
#endif
started = true;
unified_out::info_out("service pump started.");
......@@ -191,8 +195,9 @@ protected:
service_threads.clear();
started = false;
del_thread_num.store(0, std::memory_order_relaxed);
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
del_thread_num = 0;
#endif
unified_out::info_out("service pump end.");
}
......@@ -217,8 +222,9 @@ protected:
size_t run()
{
size_t n = 0;
std::stringstream os;
std::unique_lock<std::mutex> lock(del_thread_mutex, std::defer_lock);
std::stringstream os;
os << "service thread[" << std::this_thread::get_id() << "] begin.";
unified_out::info_out(os.str().data());
++real_thread_num;
......@@ -226,21 +232,27 @@ protected:
{
if (del_thread_req)
{
if (del_thread_num.fetch_sub(1, std::memory_order_relaxed) > 0)
break;
if (--del_thread_num >= 0)
{
lock.lock();
if (real_thread_num > 1)
break;
else
lock.unlock();
}
else
{
del_thread_req = false;
del_thread_num.fetch_add(1, std::memory_order_relaxed);
++del_thread_num;
}
}
//we cannot always decrease service thread timely (because run_one can block).
size_t this_n = 0;
#ifdef ASCS_ENHANCED_STABILITY
try {this_n = asio::io_service::run_one();} catch (const asio::system_error& e) {if (!on_exception(e)) break;}
try {this_n = asio::io_context::run_one();} catch (const asio::system_error& e) {if (!on_exception(e)) break;}
#else
this_n = asio::io_service::run_one();
this_n = asio::io_context::run_one();
#endif
if (this_n > 0)
n += this_n; //n can overflow, please note.
......@@ -248,14 +260,15 @@ protected:
break;
}
--real_thread_num;
os.str(""); os << "service thread[" << std::this_thread::get_id() << "] end.";
os.str("");
os << "service thread[" << std::this_thread::get_id() << "] end.";
unified_out::info_out(os.str().data());
return n;
}
#else
#ifdef ASCS_ENHANCED_STABILITY
size_t run() {while (true) {try {return asio::io_service::run();} catch (const asio::system_error& e) {if (!on_exception(e)) return 0;}}}
size_t run() {while (true) {try {return asio::io_context::run();} catch (const asio::system_error& e) {if (!on_exception(e)) return 0;}}}
#endif
#endif
......@@ -273,18 +286,20 @@ private:
protected:
bool started;
container_type service_can;
std::mutex service_can_mutex;
std::list<std::thread> service_threads;
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
std::atomic_int_fast32_t real_thread_num;
std::atomic_int_fast32_t del_thread_num;
std::mutex del_thread_mutex;
bool del_thread_req;
#endif
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
#if ASIO_VERSION >= 101100
std::shared_ptr<asio::executor_work_guard<executor_type>> work;
asio::executor_work_guard<executor_type> work;
#else
std::shared_ptr<asio::io_service::work> work;
#endif
......
......@@ -32,8 +32,8 @@ public:
static const tid TIMER_END = TIMER_BEGIN + 10;
protected:
socket(asio::io_service& io_service_) : timer(io_service_), next_layer_(io_service_) {first_init();}
template<typename Arg> socket(asio::io_service& io_service_, Arg& arg) : timer(io_service_), next_layer_(io_service_, arg) {first_init();}
socket(asio::io_context& io_context_) : timer(io_context_), next_layer_(io_context_) {first_init();}
template<typename Arg> socket(asio::io_context& io_context_, Arg& arg) : timer(io_context_), next_layer_(io_context_, arg) {first_init();}
//helper function, just call it in constructor
void first_init()
......@@ -44,8 +44,9 @@ protected:
dispatching = false;
congestion_controlling = false;
started_ = false;
last_send_time = 0;
last_recv_time = 0;
recv_idle_began = false;
msg_handling_interval_step1_ = ASCS_MSG_HANDLING_INTERVAL_STEP1;
msg_handling_interval_step2_ = ASCS_MSG_HANDLING_INTERVAL_STEP2;
send_atomic.clear(std::memory_order_relaxed);
dispatch_atomic.clear(std::memory_order_relaxed);
start_atomic.clear(std::memory_order_relaxed);
......@@ -53,13 +54,21 @@ protected:
void reset()
{
packer_->reset();
auto need_clean_up = is_timer(TIMER_DELAY_CLOSE);
stop_all_timer(); //just in case, theoretically, timer TIMER_DELAY_CLOSE and TIMER_ASYNC_SHUTDOWN (used by tcp::socket_base) can left behind.
if (need_clean_up)
{
on_close();
set_async_calling(false);
}
clear_buffer();
packer_->reset();
sending = false;
dispatching = false;
congestion_controlling = false;
last_recv_time = 0;
stat.reset();
recv_idle_began = false;
}
void clear_buffer()
......@@ -118,13 +127,14 @@ public:
{
assert(interval > 0 && max_absence > 0);
if (last_recv_time > 0 && is_ready()) //check of last_recv_time is essential, because user may call check_heartbeat before do_start
if (stat.last_recv_time > 0 && is_ready()) //check of last_recv_time is essential, because user may call check_heartbeat before do_start
{
auto now = time(nullptr);
if (now - last_recv_time >= interval * max_absence)
return on_heartbeat_error();
if (now - stat.last_recv_time >= interval * max_absence)
if (!on_heartbeat_error())
return false;
if (!is_sending_msg() && now - last_send_time >= interval) //don't need to send heartbeat if we're sending messages
if (!is_sending_msg() && now - stat.last_send_time >= interval) //don't need to send heartbeat if we're sending messages
send_heartbeat();
}
......@@ -134,9 +144,15 @@ public:
bool is_sending_msg() const {return sending;}
bool is_dispatching_msg() const {return dispatching;}
void congestion_control(bool enable) {congestion_controlling = enable;}
void congestion_control(bool enable) {congestion_controlling = enable;} //enable congestion controlling in on_msg, disable it in on_msg_handle, please note.
bool congestion_control() const {return congestion_controlling;}
void msg_handling_interval_step1(size_t interval) {msg_handling_interval_step1_ = interval;}
size_t msg_handling_interval_step1() const {return msg_handling_interval_step1_;}
void msg_handling_interval_step2(size_t interval) {msg_handling_interval_step2_ = interval;}
size_t msg_handling_interval_step2() const {return msg_handling_interval_step2_;}
//in ascs, it's thread safe to access stat without mutex, because for a specific member of stat, ascs will never access it concurrently.
//in other words, in a specific thread, ascs just access only one member of stat.
//but user can access stat out of ascs via get_statistic function, although user can only read it, there's still a potential risk,
......@@ -171,7 +187,18 @@ public:
POP_ALL_PENDING_MSG(pop_all_pending_recv_msg, recv_msg_buffer, out_container_type)
protected:
virtual bool do_start() = 0;
virtual bool do_start()
{
stat.last_recv_time = time(nullptr);
#if ASCS_HEARTBEAT_INTERVAL > 0
start_heartbeat(ASCS_HEARTBEAT_INTERVAL);
#endif
send_msg(); //send buffer may have msgs, send them
do_recv_msg();
return true;
}
virtual bool do_send_msg() = 0;
virtual bool do_send_msg(InMsgType&& msg) = 0;
virtual void do_recv_msg() = 0;
......@@ -235,10 +262,17 @@ protected:
{
asio::error_code ec;
lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
stat.break_time = time(nullptr);
}
set_async_calling(true);
set_timer(TIMER_DELAY_CLOSE, ASCS_DELAY_CLOSE * 1000 + 50, [this](tid id)->bool {return this->timer_handler(TIMER_DELAY_CLOSE);});
if (stopped())
on_close();
else
{
set_async_calling(true);
set_timer(TIMER_DELAY_CLOSE, ASCS_DELAY_CLOSE * 1000 + 50, [this](tid id)->bool {return this->timer_handler(TIMER_DELAY_CLOSE);});
}
return true;
}
......@@ -269,11 +303,24 @@ protected:
}
if (temp_msg_buffer.empty() && recv_msg_buffer.size() < ASCS_MAX_MSG_NUM)
{
if (recv_idle_began)
{
recv_idle_began = false;
stat.recv_idle_sum += statistic::now() - recv_idle_begin_time;
}
do_recv_msg(); //receive msg in sequence
}
else
{
recv_idle_begin_time = statistic::now();
set_timer(TIMER_HANDLE_MSG, 50, [this](tid id)->bool {return this->timer_handler(TIMER_HANDLE_MSG);});
if (!recv_idle_began)
{
recv_idle_began = true;
recv_idle_begin_time = statistic::now();
}
set_timer(TIMER_HANDLE_MSG, msg_handling_interval_step1_, [this](tid id)->bool {return this->timer_handler(TIMER_HANDLE_MSG);});
}
}
......@@ -345,7 +392,6 @@ private:
switch (id)
{
case TIMER_HANDLE_MSG:
stat.recv_idle_sum += statistic::now() - recv_idle_begin_time;
handle_msg();
break;
case TIMER_DISPATCH_MSG:
......@@ -386,7 +432,7 @@ private:
{
last_dispatch_msg.restart(end_time);
dispatching = false;
set_timer(TIMER_DISPATCH_MSG, 50, [this](tid id)->bool {return this->timer_handler(TIMER_DISPATCH_MSG);});
set_timer(TIMER_DISPATCH_MSG, msg_handling_interval_step2_, [this](tid id)->bool {return this->timer_handler(TIMER_DISPATCH_MSG);});
}
else //dispatch msg in sequence
{
......@@ -411,7 +457,7 @@ protected:
out_container_type recv_msg_buffer;
std::list<out_msg> temp_msg_buffer; //the size of this list is always very small, so std::list is enough (std::list::size maybe has linear complexity)
//subclass will invoke handle_msg() when got some msgs. if these msgs can't be dispatched via on_msg() because of congestion control opened,
//socket will delay 50 milliseconds(non-blocking) to invoke handle_msg() again, temp_msg_buffer is used to hold these msgs temporarily.
//socket will delay 'msg_handling_interval_step1_' milliseconds(non-blocking) to invoke handle_msg() again, temp_msg_buffer is used to hold these msgs temporarily.
volatile bool sending;
std::atomic_flag send_atomic;
......@@ -426,9 +472,9 @@ protected:
struct statistic stat;
typename statistic::stat_time recv_idle_begin_time;
bool recv_idle_began;
//used by heartbeat function, subclass need to refresh them
time_t last_send_time, last_recv_time;
size_t msg_handling_interval_step1_, msg_handling_interval_step2_;
};
} //namespace
......
......@@ -30,9 +30,9 @@ public:
static const timer::tid TIMER_CONNECT = TIMER_BEGIN;
static const timer::tid TIMER_END = TIMER_BEGIN + 10;
client_socket_base(asio::io_service& io_service_) : super(io_service_), need_reconnect(true) {set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
client_socket_base(asio::io_context& io_context_) : super(io_context_), need_reconnect(true) {set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
template<typename Arg>
client_socket_base(asio::io_service& io_service_, Arg& arg) : super(io_service_, arg), need_reconnect(true) {set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
client_socket_base(asio::io_context& io_context_, Arg& arg) : super(io_context_, arg), need_reconnect(true) {set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
//reset all, be ensure that there's no any operations performed on this socket when invoke it
//subclass must re-write this function to initialize itself, and then do not forget to invoke superclass' reset function too
......@@ -99,24 +99,23 @@ public:
}
protected:
virtual bool do_start() //connect or receive
virtual bool do_start() //connect
{
if (!this->is_connected())
this->lowest_layer().async_connect(server_addr, this->make_handler_error([this](const asio::error_code& ec) {this->connect_handler(ec);}));
else
{
this->last_recv_time = time(nullptr);
#if ASCS_HEARTBEAT_INTERVAL > 0
this->start_heartbeat(ASCS_HEARTBEAT_INTERVAL);
#endif
this->send_msg(); //send buffer may have msgs, send them
this->do_recv_msg();
}
assert(!this->is_connected());
this->lowest_layer().async_connect(server_addr, this->make_handler_error([this](const asio::error_code& ec) {this->connect_handler(ec);}));
return true;
}
//after how much time(ms), client_socket_base will try to reconnect to the server, negative means give up.
virtual void connect_handler(const asio::error_code& ec)
{
if (!ec) //already started, so cannot call start()
super::do_start();
else
prepare_next_reconnect(ec);
}
//after how much time(ms), client_socket_base will try to reconnect to the server, negative value means give up.
virtual int prepare_reconnect(const asio::error_code& ec) {return ASCS_RECONNECT_INTERVAL;}
virtual void on_connect() {unified_out::info_out("connecting success.");}
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); force_shutdown();}
......@@ -162,19 +161,6 @@ protected:
return false;
}
private:
void connect_handler(const asio::error_code& ec)
{
if (!ec)
{
this->status = super::link_status::CONNECTED;
on_connect();
do_start();
}
else
prepare_next_reconnect(ec);
}
protected:
asio::ip::tcp::endpoint server_addr;
bool need_reconnect;
......
......@@ -77,20 +77,7 @@ public:
}
protected:
virtual bool do_start()
{
this->status = super::link_status::CONNECTED;
this->last_recv_time = time(nullptr);
#if ASCS_HEARTBEAT_INTERVAL > 0
this->start_heartbeat(ASCS_HEARTBEAT_INTERVAL);
#endif
this->send_msg(); //send buffer may have msgs, send them
this->do_recv_msg();
return true;
}
virtual void on_unpack_error() {unified_out::error_out("can not unpack msg."); this->force_shutdown();}
virtual void on_unpack_error() {unified_out::error_out("can not unpack msg."); force_shutdown();}
//do not forget to force_shutdown this socket(in del_socket(), there's a force_shutdown() invocation)
virtual void on_recv_error(const asio::error_code& ec)
{
......
......@@ -35,8 +35,8 @@ private:
protected:
enum link_status {CONNECTED, FORCE_SHUTTING_DOWN, GRACEFUL_SHUTTING_DOWN, BROKEN};
socket_base(asio::io_service& io_service_) : super(io_service_) {first_init();}
template<typename Arg> socket_base(asio::io_service& io_service_, Arg& arg) : super(io_service_, arg) {first_init();}
socket_base(asio::io_context& io_context_) : super(io_context_) {first_init();}
template<typename Arg> socket_base(asio::io_context& io_context_, Arg& arg) : super(io_context_, arg) {first_init();}
//helper function, just call it in constructor
void first_init() {status = link_status::BROKEN; unpacker_ = std::make_shared<Unpacker>();}
......@@ -140,6 +140,15 @@ protected:
return ec ? -1 : send_size;
}
virtual bool do_start()
{
status = link_status::CONNECTED;
this->stat.establish_time = time(nullptr);
on_connect(); //in this virtual function, this->stat.last_recv_time has not been updated, please note
return super::do_start();
}
//return false if send buffer is empty
virtual bool do_send_msg()
{
......@@ -194,6 +203,7 @@ protected:
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);}));
}
virtual void on_connect() {}
//msg can not be unpacked
//the link is still available, so don't need to shutdown this tcp::socket_base at both client and server endpoint
virtual void on_unpack_error() = 0;
......@@ -223,7 +233,7 @@ private:
{
if (!ec && bytes_transferred > 0)
{
this->last_recv_time = time(nullptr);
this->stat.last_recv_time = time(nullptr);
typename Unpacker::container_type temp_msg_can;
auto_duration dur(this->stat.unpack_time_sum);
......@@ -255,7 +265,7 @@ private:
{
if (!ec)
{
this->last_send_time = time(nullptr);
this->stat.last_send_time = time(nullptr);
this->stat.send_byte_sum += bytes_transferred;
if (last_send_msg.empty()) //send message with sync mode
......@@ -292,7 +302,7 @@ private:
bool async_shutdown_handler(size_t loop_num)
{
if (link_status::GRACEFUL_SHUTTING_DOWN == this->status)
if (link_status::GRACEFUL_SHUTTING_DOWN == status)
{
--loop_num;
if (loop_num > 0)
......
......@@ -32,25 +32,20 @@ class socket : public Socket
public:
template<typename Arg>
socket(Arg& arg, asio::ssl::context& ctx) : Socket(arg, ctx), authorized_(false) {}
virtual bool is_ready() {return authorized_ && Socket::is_ready();}
virtual void reset() {authorized_ = false;}
bool authorized() const {return authorized_;}
socket(Arg& arg, asio::ssl::context& ctx) : Socket(arg, ctx) {}
protected:
virtual void on_recv_error(const asio::error_code& ec)
{
if (is_ready())
if (this->is_ready())
{
authorized_ = false;
#ifndef ASCS_REUSE_SSL_STREAM
this->status = Socket::link_status::GRACEFUL_SHUTTING_DOWN;
this->show_info("ssl link:", "been shut down.");
asio::error_code ec;
this->next_layer().shutdown(ec);
if (ec && asio::error::eof != ec) //the endpoint who initiated a shutdown will get error eof.
if (ec && asio::error::eof != ec) //the endpoint who initiated a shutdown operation will get error eof.
unified_out::info_out("shutdown ssl link failed (maybe intentionally because of reusing)");
#endif
}
......@@ -66,24 +61,21 @@ protected:
unified_out::error_out("handshake failed: %s", ec.message().data());
}
void handle_handshake(const asio::error_code& ec) {on_handshake(ec); if (!ec) {authorized_ = true; Socket::do_start();}}
void shutdown_ssl(bool sync = true)
{
if (!is_ready())
if (!this->is_ready())
{
Socket::force_shutdown();
return;
}
authorized_ = false;
this->status = Socket::link_status::GRACEFUL_SHUTTING_DOWN;
if (!sync)
{
this->show_info("ssl link:", "been shutting down.");
this->next_layer().async_shutdown(this->make_handler_error([this](const asio::error_code& ec) {
if (ec && asio::error::eof != ec) //the endpoint who initiated a shutdown will get error eof.
if (ec && asio::error::eof != ec) //the endpoint who initiated a shutdown operation will get error eof.
unified_out::info_out("async shutdown ssl link failed (maybe intentionally because of reusing)");
}));
}
......@@ -93,13 +85,10 @@ protected:
asio::error_code ec;
this->next_layer().shutdown(ec);
if (ec && asio::error::eof != ec) //the endpoint who initiated a shutdown will get error eof.
if (ec && asio::error::eof != ec) //the endpoint who initiated a shutdown operation will get error eof.
unified_out::info_out("shutdown ssl link failed (maybe intentionally because of reusing)");
}
}
protected:
volatile bool authorized_;
};
template <typename Packer, typename Unpacker, typename Socket = asio::ssl::stream<asio::ip::tcp::socket>,
......@@ -111,13 +100,10 @@ private:
typedef socket<tcp::client_socket_base<Packer, Unpacker, Socket, InQueue, InContainer, OutQueue, OutContainer>> super;
public:
client_socket_base(asio::io_service& io_service_, asio::ssl::context& ctx) : super(io_service_, ctx) {}
client_socket_base(asio::io_context& io_context_, asio::ssl::context& ctx) : super(io_context_, ctx) {}
#ifndef ASCS_REUSE_SSL_STREAM
void disconnect(bool reconnect = false) {force_shutdown(reconnect);}
#ifdef ASCS_REUSE_SSL_STREAM
void force_shutdown(bool reconnect = false) {this->authorized_ = false; super::force_shutdown(reconnect);}
void graceful_shutdown(bool reconnect = false, bool sync = true) {this->authorized_ = false; super::graceful_shutdown(reconnect, sync);}
#else
void force_shutdown(bool reconnect = false) {graceful_shutdown(reconnect);}
void graceful_shutdown(bool reconnect = false, bool sync = true)
{
......@@ -130,24 +116,30 @@ public:
#endif
protected:
virtual bool do_start() //add handshake
virtual void connect_handler(const asio::error_code& ec) //intercept tcp::client_socket_base::connect_handler
{
if (!this->is_connected())
super::do_start();
else if (!this->authorized())
if (!ec)
this->next_layer().async_handshake(asio::ssl::stream_base::client, this->make_handler_error([this](const asio::error_code& ec) {this->handle_handshake(ec);}));
return true;
else
super::connect_handler(ec);
}
#ifndef ASCS_REUSE_SSL_STREAM
virtual int prepare_reconnect(const asio::error_code& ec) {return -1;}
virtual void on_recv_error(const asio::error_code& ec) {this->need_reconnect = false; super::on_recv_error(ec);}
#endif
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); force_shutdown();}
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); this->force_shutdown();}
private:
void handle_handshake(const asio::error_code& ec) {super::handle_handshake(ec); if (ec) force_shutdown();}
void handle_handshake(const asio::error_code& ec)
{
this->on_handshake(ec);
if (!ec)
super::connect_handler(ec); //return to tcp::client_socket_base::connect_handler
else
this->force_shutdown();
}
};
template<typename Object>
......@@ -179,28 +171,31 @@ private:
public:
server_socket_base(Server& server_, asio::ssl::context& ctx) : super(server_, ctx) {}
#ifndef ASCS_REUSE_SSL_STREAM
void disconnect() {force_shutdown();}
#ifdef ASCS_REUSE_SSL_STREAM
void force_shutdown() {this->authorized_ = false; super::force_shutdown();}
void graceful_shutdown(bool sync = false) {this->authorized_ = false; super::graceful_shutdown(sync);}
#else
void force_shutdown() {graceful_shutdown();} //must with async mode (the default value), because server_base::uninit will call this function
void graceful_shutdown(bool sync = false) {this->shutdown_ssl(sync);}
#endif
protected:
virtual bool do_start() //add handshake
virtual bool do_start() //intercept tcp::server_socket_base::do_start (to add handshake)
{
if (!this->authorized())
this->next_layer().async_handshake(asio::ssl::stream_base::server, this->make_handler_error([this](const asio::error_code& ec) {this->handle_handshake(ec);}));
this->next_layer().async_handshake(asio::ssl::stream_base::server, this->make_handler_error([this](const asio::error_code& ec) {this->handle_handshake(ec);}));
return true;
}
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); force_shutdown();}
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); this->force_shutdown();}
private:
void handle_handshake(const asio::error_code& ec) {super::handle_handshake(ec); if (ec) this->server.del_socket(this->shared_from_this());}
void handle_handshake(const asio::error_code& ec)
{
this->on_handshake(ec);
if (!ec)
super::do_start(); //return to tcp::server_socket_base::do_start
else
this->server.del_socket(this->shared_from_this());
}
};
template<typename Socket, typename Pool = object_pool<Socket>, typename Server = tcp::i_server> using server_base = tcp::server_base<Socket, Pool, Server>;
......
......@@ -51,25 +51,26 @@ public:
enum timer_status {TIMER_FAKE, TIMER_OK, TIMER_CANCELED};
tid id;
unsigned char seq;
timer_status status;
size_t interval_ms;
std::function<bool(tid)> call_back; //return true from call_back to continue the timer, or the timer will stop
std::shared_ptr<timer_type> timer;
timer_info() : id(0), status(TIMER_FAKE), interval_ms(0) {}
timer_info() : seq(-1), status(TIMER_FAKE), interval_ms(0) {}
};
typedef const timer_info timer_cinfo;
typedef std::vector<timer_info> container_type;
timer(asio::io_service& _io_service_) : object(_io_service_), timer_can(256) {tid id = -1; do_something_to_all([&id](timer_info& item) {item.id = ++id;});}
timer(asio::io_context& io_context_) : object(io_context_), timer_can((tid) -1) {tid id = -1; do_something_to_all([&id](timer_info& item) {item.id = ++id;});}
void update_timer_info(tid id, size_t interval, std::function<bool(tid)>&& call_back, bool start = false)
{
timer_info& ti = timer_can[id];
if (timer_info::TIMER_FAKE == ti.status)
ti.timer = std::make_shared<timer_type>(io_service_);
ti.timer = std::make_shared<timer_type>(io_context_);
ti.status = timer_info::TIMER_OK;
ti.interval_ms = interval;
ti.call_back.swap(call_back);
......@@ -121,10 +122,16 @@ protected:
#else
ti.timer->expires_from_now(milliseconds(ti.interval_ms));
#endif
ti.timer->async_wait(make_handler_error([this, &ti](const asio::error_code& ec) {
#if (defined(_MSC_VER) && _MSC_VER > 1800) || (defined(__cplusplus) && __cplusplus > 201103L)
ti.timer->async_wait(make_handler_error([this, &ti, prev_seq(++ti.seq)](const asio::error_code& ec) {
#else
auto prev_seq = ++ti.seq;
ti.timer->async_wait(make_handler_error([this, &ti, prev_seq](const asio::error_code& ec) {
#endif
if (!ec && ti.call_back(ti.id) && timer_info::TIMER_OK == ti.status)
this->start_timer(ti);
else
else if (prev_seq == ti.seq) //exclude a particular situation--start the same timer in call_back and return false
ti.status = timer_info::TIMER_CANCELED;
}));
}
......@@ -142,7 +149,7 @@ protected:
container_type timer_can;
private:
using object::io_service_;
using object::io_context_;
};
} //namespace
......
......@@ -33,7 +33,7 @@ private:
typedef socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type, InQueue, InContainer, OutQueue, OutContainer> super;
public:
socket_base(asio::io_service& io_service_) : super(io_service_), unpacker_(std::make_shared<Unpacker>()) {}
socket_base(asio::io_context& io_context_) : super(io_context_), unpacker_(std::make_shared<Unpacker>()) {}
virtual bool is_ready() {return this->lowest_layer().is_open();}
virtual void send_heartbeat()
......@@ -107,17 +107,6 @@ public:
void show_info(const char* head, const char* tail) const {unified_out::info_out("%s %s:%hu %s", head, local_addr.address().to_string().data(), local_addr.port(), tail);}
protected:
virtual bool do_start()
{
this->last_recv_time = time(nullptr);
#if ASCS_HEARTBEAT_INTERVAL > 0
this->start_heartbeat(ASCS_HEARTBEAT_INTERVAL);
#endif
do_recv_msg();
return true;
}
//send message with sync mode
//return -1 means error occurred, otherwise the number of bytes been sent
size_t do_sync_send_msg(typename Packer::msg_ctype& msg) {return do_sync_send_msg(peer_addr, msg);}
......@@ -177,7 +166,7 @@ protected:
virtual bool on_heartbeat_error()
{
this->last_recv_time = time(nullptr); //avoid repetitive warnings
this->stat.last_recv_time = time(nullptr); //avoid repetitive warnings
unified_out::warning_out("%s:%hu is not available", peer_addr.address().to_string().data(), peer_addr.port());
return true;
}
......@@ -208,7 +197,7 @@ private:
{
if (!ec && bytes_transferred > 0)
{
this->last_recv_time = time(nullptr);
this->stat.last_recv_time = time(nullptr);
auto msg = this->unpacker_->parse_msg(bytes_transferred);
if (!msg.empty())
......@@ -231,7 +220,7 @@ private:
{
if (!ec)
{
this->last_send_time = time(nullptr);
this->stat.last_send_time = time(nullptr);
this->stat.send_byte_sum += bytes_transferred;
++this->stat.send_msg_sum;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册