提交 e83231f8 编写于 作者: Y youngowlf

Fix bug: st_connector::started() will return false after reconnected to the server;

Differentiated passive close and active close;
Record more details when link broken;
Fix typos in demo test_client.
上级 d2e29d4d
......@@ -67,10 +67,6 @@ public:
return false;
}
void disconnect(typename Pool::object_ctype& client_ptr) {force_close(client_ptr);}
void force_close(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->force_close();}
void graceful_close(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->graceful_close();}
};
} //namespace
......
......@@ -76,19 +76,54 @@ public:
//the following three functions can only be used when the connection is OK and you want to reconnect to the server.
//if the connection is broken unexpectedly, st_connector will try to reconnect to the server automatically.
void disconnect(bool reconnect = false) {force_close(reconnect);}
void force_close(bool reconnect = false) {reconnecting = reconnect; connected = false; st_tcp_socket_base<Socket, Packer, Unpacker>::force_close();}
void graceful_close(bool reconnect = false)
void force_close(bool reconnect = false)
{
if (!ST_THIS is_closing())
{
show_info("link:", "been closed.");
reconnecting = reconnect;
connected = false;
ST_THIS close_state = 1;
}
else if (1 == ST_THIS close_state)
return;
st_tcp_socket_base<Socket, Packer, Unpacker>::force_close();
}
void graceful_close(bool reconnect = false, bool sync = true)
{
if (ST_THIS is_closing())
return;
if (!is_connected())
force_close(reconnect);
else
{
show_info("link:", "been closing gracefully.");
reconnecting = reconnect;
connected = false;
st_tcp_socket_base<Socket, Packer, Unpacker>::graceful_close();
ST_THIS close_state = 2;
st_tcp_socket_base<Socket, Packer, Unpacker>::graceful_close(sync);
}
}
void show_info(const char* head, const char* tail) const
{
boost::system::error_code ec;
BOOST_AUTO(ep, ST_THIS lowest_layer().local_endpoint(ec));
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().c_str(), ep.port(), tail);
}
void show_info(const char* head, const char* tail, const boost::system::error_code& ec) const
{
boost::system::error_code ec2;
BOOST_AUTO(ep, ST_THIS lowest_layer().local_endpoint(ec2));
if (!ec2)
unified_out::info_out("%s %s:%hu %s (%d %s)", head, ep.address().to_string().c_str(), ep.port(), tail, ec.value(), ec.message().data());
}
protected:
virtual bool do_start() //connect or receive
{
......@@ -110,7 +145,7 @@ protected:
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); force_close();}
virtual void on_recv_error(const boost::system::error_code& ec)
{
unified_out::error_out("connection closed.");
show_info("link:", "broken/closed", ec);
bool reconnect = reconnecting;
if (ST_THIS is_closing())
......@@ -122,8 +157,9 @@ protected:
reconnect = false;
}
ST_THIS close_state = 0;
if (reconnect)
do_start();
ST_THIS start();
}
virtual bool on_timer(unsigned char id, const void* user_data)
......
......@@ -69,10 +69,7 @@ public:
{
BOOST_AUTO(raw_client_ptr, boost::dynamic_pointer_cast<Socket>(client_ptr));
if (ST_THIS del_object(raw_client_ptr))
{
raw_client_ptr->show_info("client:", "quit.");
raw_client_ptr->force_close();
}
}
void close_all_client()
......@@ -81,10 +78,7 @@ public:
//but in on_recv_error(), we need to lock object_can_mutex too(in del_object()), this will cause dead lock
boost::shared_lock<boost::shared_mutex> lock(ST_THIS object_can_mutex);
for (BOOST_AUTO(iter, ST_THIS object_can.begin()); iter != ST_THIS object_can.end(); ++iter)
{
(*iter)->show_info("client:", "been closed.");
(*iter)->force_close();
}
}
///////////////////////////////////////////////////
......
......@@ -41,6 +41,46 @@ public:
//and then do not forget to invoke st_server_socket_base::reset() to initialize father's member variables
virtual void reset() {st_tcp_socket_base<Socket, Packer, Unpacker>::reset();}
void disconnect() {force_close();}
void force_close()
{
if (!ST_THIS is_closing())
{
show_info("link:", "been closed.");
ST_THIS close_state = 1;
}
else if (1 == ST_THIS close_state)
return;
st_tcp_socket_base<Socket, Packer, Unpacker>::force_close();
}
void graceful_close()
{
if (ST_THIS is_closing())
return;
show_info("link:", "been closing gracefully.");
ST_THIS close_state = 2;
st_tcp_socket_base<Socket, Packer, Unpacker>::graceful_close();
}
void show_info(const char* head, const char* tail) const
{
boost::system::error_code ec;
BOOST_AUTO(ep, ST_THIS lowest_layer().remote_endpoint(ec));
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().c_str(), ep.port(), tail);
}
void show_info(const char* head, const char* tail, const boost::system::error_code& ec) const
{
boost::system::error_code ec2;
BOOST_AUTO(ep, ST_THIS lowest_layer().remote_endpoint(ec2));
if (!ec2)
unified_out::info_out("%s %s:%hu %s (%d %s)", head, ep.address().to_string().c_str(), ep.port(), tail, ec.value(), ec.message().data());
}
protected:
virtual bool do_start()
{
......@@ -57,12 +97,15 @@ protected:
//do not forget to force_close this socket(in del_client(), there's a force_close() invocation)
virtual void on_recv_error(const boost::system::error_code& ec)
{
ST_THIS show_info("link:", "broken/closed", ec);
#ifdef AUTO_CLEAR_CLOSED_SOCKET
ST_THIS show_info("client:", "quit.");
ST_THIS force_close();
#else
server.del_client(boost::dynamic_pointer_cast<st_timer>(ST_THIS shared_from_this()));
#endif
ST_THIS close_state = 0;
}
protected:
......
......@@ -102,7 +102,7 @@ public:
bool suspend_dispatch_msg() const {return suspend_dispatch_msg_;}
//get or change the packer at runtime
//changing packer at runtime is no thread-safe, you should avoid it from sending msg or posting msg, please pay special attention
//changing packer at runtime is not thread-safe, please pay special attention
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
boost::shared_ptr<i_packer<typename Packer::msg_type> > inner_packer() {return packer_;}
boost::shared_ptr<const i_packer<typename Packer::msg_type> > inner_packer() const {return packer_;}
......@@ -161,7 +161,7 @@ protected:
virtual bool is_send_allowed() const {return !suspend_send_msg_;} //can send msg or not(just put into send buffer)
//generally, you don't have to rewrite this to maintain the status of connections(TCP)
virtual void on_send_error(const boost::system::error_code& ec) {unified_out::error_out("send msg error: %d %s", ec.value(), ec.message().data());}
virtual void on_send_error(const boost::system::error_code& ec) {unified_out::error_out("send msg error (%d %s)", ec.value(), ec.message().data());}
//receiving error or peer endpoint quit(false ec means ok)
virtual void on_recv_error(const boost::system::error_code& ec) = 0;
......
......@@ -65,7 +65,13 @@ public:
///////////////////////////////////////////////////
protected:
virtual void uninit() {ST_THIS stop(); ST_THIS do_something_to_all(boost::bind(&Socket::graceful_close, _1, false));}
virtual void uninit() {ST_THIS stop(); ST_THIS do_something_to_all(boost::bind(&Socket::graceful_close, _1, false, true));}
public:
void disconnect(typename Pool::object_ctype& client_ptr, bool reconnect = false) {if (!reconnect) ST_THIS del_object(client_ptr); client_ptr->disconnect(reconnect);}
void force_close(typename Pool::object_ctype& client_ptr, bool reconnect = false) {if (!reconnect) ST_THIS del_object(client_ptr); client_ptr->force_close(reconnect);}
void graceful_close(typename Pool::object_ctype& client_ptr, bool reconnect = false, bool sync = true)
{if (!reconnect) ST_THIS del_object(client_ptr); client_ptr->graceful_close(reconnect, sync);}
};
typedef st_tcp_client_base<> st_tcp_client;
......
......@@ -51,32 +51,13 @@ public:
{
unpacker_->reset_state();
st_socket<Socket, Packer, Unpacker>::reset_state();
closing = false;
close_state = 0;
}
void disconnect() {force_close();}
void force_close() {clean_up();}
void graceful_close() //will block until closing success or time out
{
closing = true;
boost::system::error_code ec;
ST_THIS lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
if (ec) //graceful disconnecting is impossible
clean_up();
else
{
int loop_num = GRACEFUL_CLOSE_MAX_DURATION * 100; //seconds to 10 milliseconds
while (--loop_num >= 0 && closing)
boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(10));
if (loop_num < 0) //graceful disconnecting is impossible
clean_up();
}
}
bool is_closing() const {return closing;}
bool is_closing() const {return 0 != close_state;}
//get or change the unpacker at runtime
//changing unpacker at runtime is no thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
boost::shared_ptr<i_unpacker<out_msg_type> > inner_unpacker() {return unpacker_;}
boost::shared_ptr<const i_unpacker<out_msg_type> > inner_unpacker() const {return unpacker_;}
......@@ -97,15 +78,34 @@ public:
//msg sending interface
///////////////////////////////////////////////////
void show_info(const char* head, const char* tail)
protected:
void disconnect() {force_close();}
void force_close() {clean_up();}
void graceful_close(bool sync = true) //will block until closing success or time out if sync equal to true
{
boost::system::error_code ec;
BOOST_AUTO(ep, ST_THIS lowest_layer().remote_endpoint(ec));
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().c_str(), ep.port(), tail);
ST_THIS lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
if (ec) //graceful closing is impossible
{
clean_up();
return;
}
int loop_num = GRACEFUL_CLOSE_MAX_DURATION * 100; //seconds to 10 milliseconds
if (sync)
{
while (--loop_num >= 0 && is_closing())
boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(10));
if (loop_num < 0) //graceful closing is impossible
{
unified_out::info_out("failed to graceful close within %d seconds", GRACEFUL_CLOSE_MAX_DURATION);
clean_up();
}
}
else
ST_THIS set_timer(9, 10, reinterpret_cast<const void*>(loop_num));
}
protected:
//must mutex send_msg_buffer before invoke this function
virtual bool do_send_msg()
{
......@@ -136,6 +136,36 @@ protected:
virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_timer(unsigned char id, const void* user_data)
{
switch (id)
{
case 9:
if (is_closing())
{
int loop_num = reinterpret_cast<int>(user_data);
--loop_num;
if (loop_num > 0)
{
ST_THIS update_timer_info(9, 10, reinterpret_cast<const void*>(loop_num));
return true;
}
else
{
unified_out::info_out("failed to graceful close within %d seconds", GRACEFUL_CLOSE_MAX_DURATION);
clean_up();
}
}
break;
default:
return st_socket<Socket, Packer, Unpacker>::on_timer(id, user_data);
break;
}
return false;
}
//start the asynchronous read
//it's child's responsibility to invoke this properly, because st_tcp_socket_base doesn't know any of the connection status
void do_recv_msg()
......@@ -208,7 +238,7 @@ protected:
protected:
boost::shared_ptr<i_unpacker<out_msg_type> > unpacker_;
bool closing;
int close_state; //2-the first step of graceful close, 1-force close, 0-normal state
};
} //namespace st_tcp
......
......@@ -72,10 +72,11 @@ public:
typedef const object_type object_ctype;
typedef boost::container::set<object_type> container_type;
void set_timer(unsigned char id, size_t milliseconds, const void* user_data)
void update_timer_info(unsigned char id, size_t milliseconds, const void* user_data, bool start = false)
{
object_type ti = {id};
//lock timer_can
timer_can_mutex.lock_upgrade();
BOOST_AUTO(iter, timer_can.find(ti));
if (iter == timer_can.end())
......@@ -89,11 +90,41 @@ public:
else
timer_can_mutex.unlock_upgrade();
//items in timer_can not locked
iter->status = object_type::TIMER_OK;
iter->milliseconds = milliseconds;
iter->user_data = user_data;
if (start)
start_timer(*iter);
}
void set_timer(unsigned char id, size_t milliseconds, const void* user_data) {update_timer_info(id, milliseconds, user_data, true);}
object_type find_timer(unsigned char id)
{
object_type ti = {id, object_type::TIMER_CANCELED, 0, NULL};
boost::shared_lock<boost::shared_mutex> lock(timer_can_mutex);
BOOST_AUTO(iter, timer_can.find(ti));
if (iter == timer_can.end())
return *iter;
else
return ti;
}
bool start_timer(unsigned char id)
{
object_type ti = {id};
boost::shared_lock<boost::shared_mutex> lock(timer_can_mutex);
BOOST_AUTO(iter, timer_can.find(ti));
if (iter == timer_can.end())
return false;
lock.unlock();
start_timer(*iter);
return true;
}
void stop_timer(unsigned char id)
......
......@@ -37,6 +37,11 @@ public:
protected:
virtual void uninit() {ST_THIS stop(); ST_THIS do_something_to_all(boost::mem_fn(&Socket::graceful_close));}
public:
void disconnect(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->disconnect();}
void force_close(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->force_close();}
void graceful_close(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->graceful_close();}
};
typedef st_udp_client_base<> st_udp_client;
......
......@@ -90,7 +90,7 @@ public:
void graceful_close() {clean_up();}
//get or change the unpacker at runtime
//changing unpacker at runtime is no thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
boost::shared_ptr<i_udp_unpacker<typename Packer::msg_type> > inner_unpacker() {return unpacker_;}
boost::shared_ptr<const i_udp_unpacker<typename Packer::msg_type> > inner_unpacker() const {return unpacker_;}
......@@ -111,13 +111,7 @@ public:
//msg sending interface
///////////////////////////////////////////////////
void show_info(const char* head, const char* tail)
{
boost::system::error_code ec;
BOOST_AUTO(ep, ST_THIS lowest_layer().local_endpoint(ec));
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().c_str(), ep.port(), tail);
}
void show_info(const char* head, const char* tail) const {unified_out::info_out("%s %s:%hu %s", head, local_addr.address().to_string().c_str(), local_addr.port(), tail);}
protected:
virtual bool do_start()
......@@ -156,7 +150,7 @@ protected:
virtual void on_recv_error(const boost::system::error_code& ec)
{
if (boost::asio::error::operation_aborted != ec)
unified_out::error_out("recv msg error: %d %s", ec.value(), ec.message().data());
unified_out::error_out("recv msg error (%d %s)", ec.value(), ec.message().data());
}
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
......
......@@ -114,18 +114,24 @@ public:
void close_some_client(size_t n)
{
#ifdef AUTO_CLEAR_CLOSED_SOCKET
//method #1
// for (BOOST_AUTO(iter, object_can.begin()); n-- > 0 && iter != object_can.end(); ++iter)
// (*iter)->graceful_close();
//notice: this method need to define AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro, because it just closed the st_socket,
//not really removed them from object pool, this will cause test_client still send data via them, and wait responses from them.
//for this scenario, the smaller CLEAR_CLOSED_SOCKET_INTERVAL is, the better experience you will get, so set it to 1 second.
for (BOOST_AUTO(iter, object_can.begin()); n-- > 0 && iter != object_can.end(); ++iter)
(*iter)->graceful_close();
//notice: this method need to define AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro, because it just close the st_socket,
//not really remove them from object pool, this will cause test_client still send data via them, and wait responses from them.
//for this scenario, the smaller CLEAR_CLOSED_SOCKET_INTERVAL macro is, the better experience you will get, so set it to 1 second.
#else
//method #2
while (n-- > 0)
graceful_close(at(0));
//notice: this method directly remove clients from object pool, and close them, not require AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro
//this is a equivalence of calling i_server::del_client in st_server_socket_base::on_recv_error(see st_server_socket_base for more details).
//if you just want to reconnect to the server, you should do it like this:
// while (n-- > 0)
// graceful_close(at(n), true, false); //if parameter 'reconnect' is true, st_tcp_client will not remove clients from object pool
#endif
}
///////////////////////////////////////////////////
......@@ -140,7 +146,7 @@ public:
int main(int argc, const char* argv[])
{
///////////////////////////////////////////////////////////
printf("usage: test_client [<port=%d> [<ip=%s> [link num=1]]]\n", SERVER_PORT, SERVER_IP);
printf("usage: test_client [<port=%d> [<ip=%s> [link num=16]]]\n", SERVER_PORT, SERVER_IP);
size_t link_num = 16;
if (argc > 3)
......@@ -201,19 +207,21 @@ int main(int argc, const char* argv[])
{
if (n > client.size())
n = client.size();
link_num -= n;
client.close_some_client(n);
link_num = client.size();
}
continue;
}
#ifdef AUTO_CLEAR_CLOSED_SOCKET
if (client.size() != link_num)
{
puts("some closed links have not been cleared, did you defined AUTO_CLEAR_CLOSED_SOCKET macro?");
puts("some closed links have not been cleared, please define CLEAR_CLOSED_SOCKET_INTERVAL macro as 1 to speed up the auto cleaning, or wait for more time");
continue;
}
#endif
size_t msg_num = 1024;
size_t msg_len = 1024; //must greater than or equal to sizeof(size_t)
......
......@@ -67,10 +67,6 @@ public:
return false;
}
void disconnect(typename Pool::object_ctype& client_ptr) {force_close(client_ptr);}
void force_close(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->force_close();}
void graceful_close(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->graceful_close();}
};
} //namespace
......
......@@ -76,19 +76,54 @@ public:
//the following three functions can only be used when the connection is OK and you want to reconnect to the server.
//if the connection is broken unexpectedly, st_connector will try to reconnect to the server automatically.
void disconnect(bool reconnect = false) {force_close(reconnect);}
void force_close(bool reconnect = false) {reconnecting = reconnect; connected = false; st_tcp_socket_base<Socket, Packer, Unpacker>::force_close();}
void graceful_close(bool reconnect = false)
void force_close(bool reconnect = false)
{
if (!ST_THIS is_closing())
{
show_info("link:", "been closed.");
reconnecting = reconnect;
connected = false;
ST_THIS close_state = 1;
}
else if (1 == ST_THIS close_state)
return;
st_tcp_socket_base<Socket, Packer, Unpacker>::force_close();
}
void graceful_close(bool reconnect = false, bool sync = true)
{
if (ST_THIS is_closing())
return;
if (!is_connected())
force_close(reconnect);
else
{
show_info("link:", "been closing gracefully.");
reconnecting = reconnect;
connected = false;
st_tcp_socket_base<Socket, Packer, Unpacker>::graceful_close();
ST_THIS close_state = 2;
st_tcp_socket_base<Socket, Packer, Unpacker>::graceful_close(sync);
}
}
void show_info(const char* head, const char* tail) const
{
boost::system::error_code ec;
auto ep = ST_THIS lowest_layer().local_endpoint(ec);
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().c_str(), ep.port(), tail);
}
void show_info(const char* head, const char* tail, const boost::system::error_code& ec) const
{
boost::system::error_code ec2;
auto ep = ST_THIS lowest_layer().local_endpoint(ec2);
if (!ec2)
unified_out::info_out("%s %s:%hu %s (%d %s)", head, ep.address().to_string().c_str(), ep.port(), tail, ec.value(), ec.message().data());
}
protected:
virtual bool do_start() //connect or receive
{
......@@ -110,7 +145,7 @@ protected:
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); force_close();}
virtual void on_recv_error(const boost::system::error_code& ec)
{
unified_out::error_out("connection closed.");
show_info("link:", "broken/closed", ec);
auto reconnect = reconnecting;
if (ST_THIS is_closing())
......@@ -122,8 +157,9 @@ protected:
reconnect = false;
}
ST_THIS close_state = 0;
if (reconnect)
do_start();
ST_THIS start();
}
virtual bool on_timer(unsigned char id, const void* user_data)
......
......@@ -69,10 +69,7 @@ public:
{
auto raw_client_ptr(boost::dynamic_pointer_cast<Socket>(client_ptr));
if (ST_THIS del_object(raw_client_ptr))
{
raw_client_ptr->show_info("client:", "quit.");
raw_client_ptr->force_close();
}
}
void close_all_client()
......@@ -80,7 +77,6 @@ public:
//do not use graceful_close() as client does, because in this function, object_can_mutex has been locked, graceful_close will wait until on_recv_error() been invoked,
//but in on_recv_error(), we need to lock object_can_mutex too(in del_object()), this will cause dead lock
ST_THIS do_something_to_all([](typename Pool::object_ctype& item) {
item->show_info("client:", "been closed.");
item->force_close();
});
}
......
......@@ -41,6 +41,46 @@ public:
//and then do not forget to invoke st_server_socket_base::reset() to initialize father's member variables
virtual void reset() {st_tcp_socket_base<Socket, Packer, Unpacker>::reset();}
void disconnect() {force_close();}
void force_close()
{
if (!ST_THIS is_closing())
{
show_info("link:", "been closed.");
ST_THIS close_state = 1;
}
else if (1 == ST_THIS close_state)
return;
st_tcp_socket_base<Socket, Packer, Unpacker>::force_close();
}
void graceful_close()
{
if (ST_THIS is_closing())
return;
show_info("link:", "been closing gracefully.");
ST_THIS close_state = 2;
st_tcp_socket_base<Socket, Packer, Unpacker>::graceful_close();
}
void show_info(const char* head, const char* tail) const
{
boost::system::error_code ec;
auto ep = ST_THIS lowest_layer().remote_endpoint(ec);
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().c_str(), ep.port(), tail);
}
void show_info(const char* head, const char* tail, const boost::system::error_code& ec) const
{
boost::system::error_code ec2;
auto ep = ST_THIS lowest_layer().remote_endpoint(ec2);
if (!ec2)
unified_out::info_out("%s %s:%hu %s (%d %s)", head, ep.address().to_string().c_str(), ep.port(), tail, ec.value(), ec.message().data());
}
protected:
virtual bool do_start()
{
......@@ -57,12 +97,15 @@ protected:
//do not forget to force_close this socket(in del_client(), there's a force_close() invocation)
virtual void on_recv_error(const boost::system::error_code& ec)
{
ST_THIS show_info("link:", "broken/closed", ec);
#ifdef AUTO_CLEAR_CLOSED_SOCKET
ST_THIS show_info("client:", "quit.");
ST_THIS force_close();
#else
server.del_client(boost::dynamic_pointer_cast<st_timer>(ST_THIS shared_from_this()));
#endif
ST_THIS close_state = 0;
}
protected:
......
......@@ -102,7 +102,7 @@ public:
bool suspend_dispatch_msg() const {return suspend_dispatch_msg_;}
//get or change the packer at runtime
//changing packer at runtime is no thread-safe, you should avoid it from sending msg or posting msg, please pay special attention
//changing packer at runtime is not thread-safe, please pay special attention
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
boost::shared_ptr<i_packer<typename Packer::msg_type>> inner_packer() {return packer_;}
boost::shared_ptr<const i_packer<typename Packer::msg_type>> inner_packer() const {return packer_;}
......@@ -159,7 +159,7 @@ protected:
virtual bool is_send_allowed() const {return !suspend_send_msg_;} //can send msg or not(just put into send buffer)
//generally, you don't have to rewrite this to maintain the status of connections(TCP)
virtual void on_send_error(const boost::system::error_code& ec) {unified_out::error_out("send msg error: %d %s", ec.value(), ec.message().data());}
virtual void on_send_error(const boost::system::error_code& ec) {unified_out::error_out("send msg error (%d %s)", ec.value(), ec.message().data());}
//receiving error or peer endpoint quit(false ec means ok)
virtual void on_recv_error(const boost::system::error_code& ec) = 0;
......
......@@ -66,6 +66,12 @@ public:
protected:
virtual void uninit() {ST_THIS stop(); ST_THIS do_something_to_all([](typename Pool::object_ctype& item) {item->graceful_close();});}
public:
void disconnect(typename Pool::object_ctype& client_ptr, bool reconnect = false) {if (!reconnect) ST_THIS del_object(client_ptr); client_ptr->disconnect(reconnect);}
void force_close(typename Pool::object_ctype& client_ptr, bool reconnect = false) {if (!reconnect) ST_THIS del_object(client_ptr); client_ptr->force_close(reconnect);}
void graceful_close(typename Pool::object_ctype& client_ptr, bool reconnect = false, bool sync = true)
{if (!reconnect) ST_THIS del_object(client_ptr); client_ptr->graceful_close(reconnect, sync);}
};
typedef st_tcp_client_base<> st_tcp_client;
......
......@@ -51,32 +51,13 @@ public:
{
unpacker_->reset_state();
st_socket<Socket, Packer, Unpacker>::reset_state();
closing = false;
close_state = 0;
}
void disconnect() {force_close();}
void force_close() {clean_up();}
void graceful_close() //will block until closing success or time out
{
closing = true;
boost::system::error_code ec;
ST_THIS lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
if (ec) //graceful disconnecting is impossible
clean_up();
else
{
auto loop_num = GRACEFUL_CLOSE_MAX_DURATION * 100; //seconds to 10 milliseconds
while (--loop_num >= 0 && closing)
boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(10));
if (loop_num < 0) //graceful disconnecting is impossible
clean_up();
}
}
bool is_closing() const {return closing;}
bool is_closing() const {return 0 != close_state;}
//get or change the unpacker at runtime
//changing unpacker at runtime is no thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
boost::shared_ptr<i_unpacker<out_msg_type>> inner_unpacker() {return unpacker_;}
boost::shared_ptr<const i_unpacker<out_msg_type>> inner_unpacker() const {return unpacker_;}
......@@ -97,15 +78,34 @@ public:
//msg sending interface
///////////////////////////////////////////////////
void show_info(const char* head, const char* tail)
protected:
void disconnect() {force_close();}
void force_close() {clean_up();}
void graceful_close(bool sync = true) //will block until closing success or time out if sync equal to true
{
boost::system::error_code ec;
auto ep = ST_THIS lowest_layer().remote_endpoint(ec);
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().c_str(), ep.port(), tail);
ST_THIS lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
if (ec) //graceful closing is impossible
{
clean_up();
return;
}
auto loop_num = GRACEFUL_CLOSE_MAX_DURATION * 100; //seconds to 10 milliseconds
if (sync)
{
while (--loop_num >= 0 && is_closing())
boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(10));
if (loop_num < 0) //graceful closing is impossible
{
unified_out::info_out("failed to graceful close within %d seconds", GRACEFUL_CLOSE_MAX_DURATION);
clean_up();
}
}
else
ST_THIS set_timer(9, 10, reinterpret_cast<const void*>(loop_num));
}
protected:
//must mutex send_msg_buffer before invoke this function
virtual bool do_send_msg()
{
......@@ -136,6 +136,36 @@ protected:
virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_timer(unsigned char id, const void* user_data)
{
switch (id)
{
case 9:
if (is_closing())
{
auto loop_num = reinterpret_cast<int>(user_data);
--loop_num;
if (loop_num > 0)
{
ST_THIS update_timer_info(9, 10, reinterpret_cast<const void*>(loop_num));
return true;
}
else
{
unified_out::info_out("failed to graceful close within %d seconds", GRACEFUL_CLOSE_MAX_DURATION);
clean_up();
}
}
break;
default:
return st_socket<Socket, Packer, Unpacker>::on_timer(id, user_data);
break;
}
return false;
}
//start the asynchronous read
//it's child's responsibility to invoke this properly, because st_tcp_socket_base doesn't know any of the connection status
void do_recv_msg()
......@@ -208,7 +238,7 @@ protected:
protected:
boost::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
bool closing;
int close_state; //2-the first step of graceful close, 1-force close, 0-normal state
};
} //namespace st_tcp
......
......@@ -72,10 +72,11 @@ public:
typedef const object_type object_ctype;
typedef boost::container::set<object_type> container_type;
void set_timer(unsigned char id, size_t milliseconds, const void* user_data)
void update_timer_info(unsigned char id, size_t milliseconds, const void* user_data, bool start = false)
{
object_type ti = {id};
//lock timer_can
timer_can_mutex.lock_upgrade();
auto iter = timer_can.find(ti);
if (iter == std::end(timer_can))
......@@ -89,11 +90,41 @@ public:
else
timer_can_mutex.unlock_upgrade();
//items in timer_can not locked
iter->status = object_type::TIMER_OK;
iter->milliseconds = milliseconds;
iter->user_data = user_data;
if (start)
start_timer(*iter);
}
void set_timer(unsigned char id, size_t milliseconds, const void* user_data) {update_timer_info(id, milliseconds, user_data, true);}
object_type find_timer(unsigned char id)
{
object_type ti = {id, object_type::TIMER_CANCELED, 0, nullptr};
boost::shared_lock<boost::shared_mutex> lock(timer_can_mutex);
auto iter = timer_can.find(ti);
if (iter == std::end(timer_can))
return *iter;
else
return ti;
}
bool start_timer(unsigned char id)
{
object_type ti = {id};
boost::shared_lock<boost::shared_mutex> lock(timer_can_mutex);
auto iter = timer_can.find(ti);
if (iter == std::end(timer_can))
return false;
lock.unlock();
start_timer(*iter);
return true;
}
void stop_timer(unsigned char id)
......
......@@ -37,6 +37,11 @@ public:
protected:
virtual void uninit() {ST_THIS stop(); ST_THIS do_something_to_all([](typename Pool::object_ctype& item) {item->graceful_close();});}
public:
void disconnect(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->disconnect();}
void force_close(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->force_close();}
void graceful_close(typename Pool::object_ctype& client_ptr) {if (ST_THIS del_object(client_ptr)) client_ptr->graceful_close();}
};
typedef st_udp_client_base<> st_udp_client;
......
......@@ -90,7 +90,7 @@ public:
void graceful_close() {clean_up();}
//get or change the unpacker at runtime
//changing unpacker at runtime is no thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
boost::shared_ptr<i_udp_unpacker<typename Packer::msg_type>> inner_unpacker() {return unpacker_;}
boost::shared_ptr<const i_udp_unpacker<typename Packer::msg_type>> inner_unpacker() const {return unpacker_;}
......@@ -111,13 +111,7 @@ public:
//msg sending interface
///////////////////////////////////////////////////
void show_info(const char* head, const char* tail)
{
boost::system::error_code ec;
auto ep = ST_THIS lowest_layer().local_endpoint(ec);
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().c_str(), ep.port(), tail);
}
void show_info(const char* head, const char* tail) const {unified_out::info_out("%s %s:%hu %s", head, local_addr.address().to_string().c_str(), local_addr.port(), tail);}
protected:
virtual bool do_start()
......@@ -156,7 +150,7 @@ protected:
virtual void on_recv_error(const boost::system::error_code& ec)
{
if (boost::asio::error::operation_aborted != ec)
unified_out::error_out("recv msg error: %d %s", ec.value(), ec.message().data());
unified_out::error_out("recv msg error (%d %s)", ec.value(), ec.message().data());
}
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
......
......@@ -112,17 +112,23 @@ public:
void close_some_client(size_t n)
{
#ifdef AUTO_CLEAR_CLOSED_SOCKET
//method #1
// do_something_to_one([&n](object_ctype& item) {return n-- > 0 ? item->graceful_close(), false : true;});
//notice: this method need to define AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro, because it just closed the st_socket,
//not really removed them from object pool, this will cause test_client still send data via them, and wait responses from them.
//for this scenario, the smaller CLEAR_CLOSED_SOCKET_INTERVAL is, the better experience you will get, so set it to 1 second.
do_something_to_one([&n](object_ctype& item) {return n-- > 0 ? item->graceful_close(), false : true;});
//notice: this method need to define AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro, because it just close the st_socket,
//not really remove them from object pool, this will cause test_client still send data via them, and wait responses from them.
//for this scenario, the smaller CLEAR_CLOSED_SOCKET_INTERVAL macro is, the better experience you will get, so set it to 1 second.
#else
//method #2
while (n-- > 0)
graceful_close(at(0));
//notice: this method directly remove clients from object pool, and close them, not require AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro
//this is a equivalence of calling i_server::del_client in st_server_socket_base::on_recv_error(see st_server_socket_base for more details).
//if you just want to reconnect to the server, you should do it like this:
// while (n-- > 0)
// graceful_close(at(n), true, false); //if parameter 'reconnect' is true, st_tcp_client will not remove clients from object pool
#endif
}
///////////////////////////////////////////////////
......@@ -137,7 +143,7 @@ public:
int main(int argc, const char* argv[])
{
///////////////////////////////////////////////////////////
printf("usage: test_client [<port=%d> [<ip=%s> [link num=1]]]\n", SERVER_PORT, SERVER_IP);
printf("usage: test_client [<port=%d> [<ip=%s> [link num=16]]]\n", SERVER_PORT, SERVER_IP);
size_t link_num = 16;
if (argc > 3)
......@@ -198,19 +204,21 @@ int main(int argc, const char* argv[])
{
if (n > client.size())
n = client.size();
link_num -= n;
client.close_some_client(n);
link_num = client.size();
}
continue;
}
#ifdef AUTO_CLEAR_CLOSED_SOCKET
if (client.size() != link_num)
{
puts("some closed links have not been cleared, did you defined AUTO_CLEAR_CLOSED_SOCKET macro?");
puts("some closed links have not been cleared, please define CLEAR_CLOSED_SOCKET_INTERVAL macro as 1 to speed up the auto cleaning, or wait for more time");
continue;
}
#endif
size_t msg_num = 1024;
size_t msg_len = 1024; //must greater than or equal to sizeof(size_t)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册