提交 501a4c47 编写于 作者: Y youngwolf

Use post_strand instead of dispatch_strand in send_msg and recv_msg.

More guarantee on object reusing.
A small refactoring.
上级 85c3ac09
Subproject commit f0a1e1c7c0387ad16358c81eb52528f190df625c
Subproject commit 5302adea05c8a1c5d3923fa93afd7cf437df5937
......@@ -236,7 +236,7 @@ public:
using typename i_unpacker<MsgType>::container_type;
using typename i_unpacker<MsgType>::buffer_type;
virtual void reset() {assert(false);}
virtual void reset() {}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) {assert(false); return false;}
virtual buffer_type prepare_next_recv() {assert(false); return buffer_type();}
};
......
......@@ -809,6 +809,8 @@
*
* SPECIAL ATTENTION (incompatible with old editions):
* Graceful shutdown does not support sync mode anymore.
* Use post_strand instead of dispatch_strand in send_msg and recv_msg, because we don't synchronize socket's member variable sending and reading,
* there's still a race condition even in the same strand because of memory synchronization.
*
* HIGHLIGHT:
* Make shutdown thread safe.
......
......@@ -54,6 +54,7 @@ protected:
sr_status = sync_recv_status::NOT_REQUESTED;
#endif
started_ = false;
obsoleted_ = false;
dispatching = false;
recv_idle_began = false;
send_buf_size_ = ASCS_MAX_SEND_BUF;
......@@ -78,15 +79,12 @@ protected:
void reset()
{
reset_io_context_refs();
assert(!is_timer(TIMER_DELAY_CLOSE));
if (is_timer(TIMER_DELAY_CLOSE))
throw std::runtime_error("invalid resetting or object reusing");
auto need_clean_up = is_timer(TIMER_DELAY_CLOSE);
reset_io_context_refs();
stop_all_timer(); //just in case, theoretically, timer TIMER_DELAY_CLOSE and TIMER_ASYNC_SHUTDOWN (used by tcp::socket_base) can left behind.
if (need_clean_up)
{
on_close();
set_async_calling(false);
}
stat.reset();
packer_->reset();
......@@ -98,6 +96,7 @@ protected:
#ifdef ASCS_SYNC_RECV
sr_status = sync_recv_status::NOT_REQUESTED;
#endif
obsoleted_ = false;
dispatching = false;
recv_idle_began = false;
clear_buffer();
......@@ -112,6 +111,16 @@ protected:
recv_buffer.clear();
}
//execute in the IO strand -- rw_strand
void post_in_io_strand(const std::function<void()>& handler) {post_strand(rw_strand, handler);}
//execute in the IO strand -- rw_strand, or current thead, use it carefully
void dispatch_in_io_strand(const std::function<void()>& handler) {dispatch_strand(rw_strand, handler);}
//execute in the dispatch strand -- dis_strand
void post_in_dis_strand(const std::function<void()>& handler) {post_strand(dis_strand, handler);}
//execute in the dispatch strand -- dis_strand, or current thead, use it carefully
void dispatch_in_dis_strand(const std::function<void()>& handler) {dispatch_strand(dis_strand, handler);}
public:
#ifdef ASCS_SYNC_SEND
typedef obj_with_begin_time_promise<InMsgType> in_msg;
......@@ -136,7 +145,7 @@ public:
typename Socket::lowest_layer_type& lowest_layer() {return next_layer().lowest_layer();}
const typename Socket::lowest_layer_type& lowest_layer() const {return next_layer().lowest_layer();}
virtual bool obsoleted() {return !started_ && !is_async_calling();}
virtual bool obsoleted() {return obsoleted_ && !is_async_calling();}
virtual bool is_ready() = 0; //is ready for sending and receiving messages
virtual void send_heartbeat() = 0;
virtual const char* type_name() const = 0;
......@@ -159,19 +168,19 @@ public:
#ifdef ASCS_PASSIVE_RECV
bool is_reading() const {return reading;}
void recv_msg() {if (!reading && is_ready()) dispatch_strand(rw_strand, [this]() {this->do_recv_msg();});}
void recv_msg() {if (!reading && is_ready()) post_in_io_strand([this]() {this->do_recv_msg();});}
#else
private:
void recv_msg() {dispatch_strand(rw_strand, [this]() {this->do_recv_msg();});}
void recv_msg() {post_in_io_strand([this]() {this->do_recv_msg();});}
public:
#endif
#ifndef ASCS_EXPOSE_SEND_INTERFACE
protected:
#endif
#ifdef ASCS_ARBITRARY_SEND
void send_msg() {dispatch_strand(rw_strand, [this]() {this->do_send_msg();});}
void send_msg() {post_in_io_strand([this]() {this->do_send_msg();});}
#else
void send_msg() {if (!sending && is_ready()) dispatch_strand(rw_strand, [this]() {this->do_send_msg();});}
void send_msg() {if (!sending && is_ready()) post_in_io_strand([this]() {this->do_send_msg();});}
#endif
public:
......@@ -442,11 +451,12 @@ protected:
unpacker_->reset(); //very important, otherwise, the unpacker will never be able to parse any more messages if its buffer has legacy data
on_close();
after_close();
obsoleted_ = true;
}
else
{
set_async_calling(true);
set_timer(TIMER_DELAY_CLOSE, ASCS_DELAY_CLOSE * 1000 + 50, [this](tid id)->bool {return this->timer_handler(TIMER_DELAY_CLOSE);});
set_timer(TIMER_DELAY_CLOSE, ASCS_DELAY_CLOSE * 1000 + 50, [this](tid id)->bool {return this->timer_handler(id);});
}
return true;
......@@ -655,7 +665,7 @@ private:
}
//do not use dispatch_strand at here, because the handler (do_dispatch_msg) may call this function, which can lead stack overflow.
void dispatch_msg() {if (!dispatching) post_strand(dis_strand, [this]() {this->do_dispatch_msg();});}
void dispatch_msg() {if (!dispatching) post_in_dis_strand([this]() {this->do_dispatch_msg();});}
void do_dispatch_msg()
{
#ifdef ASCS_DISPATCH_BATCH_MSG
......@@ -675,7 +685,7 @@ private:
#ifdef ASCS_FULL_STATISTIC
recv_buffer.do_something_to_all([&](out_msg& msg) {msg.restart(end_time);});
#endif
set_timer(TIMER_DISPATCH_MSG, msg_handling_interval_, [this](tid id)->bool {return this->timer_handler(TIMER_DISPATCH_MSG);}); //hold dispatching
set_timer(TIMER_DISPATCH_MSG, msg_handling_interval_, [this](tid id)->bool {return this->timer_handler(id);}); //hold dispatching
}
else
{
......@@ -692,7 +702,7 @@ private:
if (!re) //dispatch failed, re-dispatch
{
dispatching_msg.restart(end_time);
set_timer(TIMER_DISPATCH_MSG, msg_handling_interval_, [this](tid id)->bool {return this->timer_handler(TIMER_DISPATCH_MSG);}); //hold dispatching
set_timer(TIMER_DISPATCH_MSG, msg_handling_interval_, [this](tid id)->bool {return this->timer_handler(id);}); //hold dispatching
}
else
{
......@@ -711,12 +721,12 @@ private:
switch (id)
{
case TIMER_DISPATCH_MSG:
post_strand(dis_strand, [this]() {this->do_dispatch_msg();});
post_in_dis_strand([this]() {this->do_dispatch_msg();});
break;
case TIMER_DELAY_CLOSE:
if (!is_last_async_call())
{
stop_all_timer(TIMER_DELAY_CLOSE);
stop_all_timer(id);
return true;
}
else if (lowest_layer().is_open())
......@@ -726,9 +736,10 @@ private:
}
unpacker_->reset(); //very important, otherwise, the unpacker will never be able to parse any more messages if its buffer has legacy data
on_close();
change_timer_status(TIMER_DELAY_CLOSE, timer_info::TIMER_CANCELED);
change_timer_status(id, timer_info::TIMER_CANCELED);
after_close();
set_async_calling(false);
obsoleted_ = true;
break;
default:
assert(false);
......@@ -756,6 +767,8 @@ private:
bool recv_idle_began;
volatile bool started_; //has started or not
volatile bool obsoleted_;
volatile bool dispatching;
#ifndef ASCS_DISPATCH_BATCH_MSG
out_msg dispatching_msg;
......
......@@ -64,6 +64,7 @@ public:
virtual const char* type_name() const {return "TCP (client endpoint)";}
virtual int type_id() const {return 1;}
virtual bool obsoleted() {return !need_reconnect && super::obsoleted();}
virtual void reset() {need_reconnect = ASCS_RECONNECT; super::reset();}
#ifdef _MSC_VER
......@@ -77,7 +78,7 @@ public:
//if you don't want to reconnect to the server after link broken, define macro ASCS_RECONNECT as false, call set_reconnect(false) in on_connect()
// or rewrite after_close() virtual function and do nothing in it.
//if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function.
//disconnect(bool), force_shutdown(bool) and graceful_shutdown(bool, bool) can overwrite reconnecting behavior, please note.
//disconnect(bool), force_shutdown(bool) and graceful_shutdown(bool) can overwrite reconnecting behavior, please note.
//reset() virtual function will set reconnecting behavior according to macro ASCS_RECONNECT, please note.
//if prepare_reconnect returns negative value, reconnecting will be closed, please note.
void set_reconnect(bool reconnect) {need_reconnect = reconnect;}
......@@ -95,7 +96,7 @@ public:
else if (super::link_status::FORCE_SHUTTING_DOWN != this->status)
this->show_info("client link:", "been shut down.");
this->force_shutdown_in_strand();
super::force_shutdown();
}
//this function is not thread safe, please note.
......@@ -110,7 +111,7 @@ public:
else if (!this->is_shutting_down())
this->show_info("client link:", "being shut down gracefully.");
this->graceful_shutdown_in_strand();
super::graceful_shutdown();
}
protected:
......@@ -138,38 +139,25 @@ protected:
virtual int prepare_reconnect(const asio::error_code& ec) {return ASCS_RECONNECT_INTERVAL;}
virtual void on_connect() {unified_out::info_out(ASCS_LLF " connecting success.", this->id());}
virtual void on_unpack_error() {unified_out::info_out(ASCS_LLF " can not unpack msg.", this->id()); this->unpacker()->dump_left_data(); force_shutdown(need_reconnect);}
virtual void on_recv_error(const asio::error_code& ec)
{
this->show_info(ec, "client link:", "broken/been shut down");
force_shutdown(need_reconnect);
this->status = super::link_status::BROKEN;
#ifndef ASCS_CLEAR_OBJECT_INTERVAL
if (!need_reconnect && nullptr != matrix)
matrix->del_socket(this->id());
#endif
}
virtual void on_recv_error(const asio::error_code& ec) {this->show_info(ec, "client link:", "broken/been shut down"); force_shutdown(need_reconnect);}
virtual void on_async_shutdown_error() {force_shutdown(need_reconnect);}
virtual bool on_heartbeat_error()
{
this->show_info("client link:", "broke unexpectedly.");
force_shutdown(need_reconnect);
return false;
}
virtual bool on_heartbeat_error() {this->show_info("client link:", "broke unexpectedly."); force_shutdown(need_reconnect); return false;}
virtual void on_close()
{
if (!need_reconnect)
{
this->clear_io_context_refs();
#ifndef ASCS_CLEAR_OBJECT_INTERVAL
if (nullptr != matrix)
matrix->del_socket(this->id());
#endif
}
super::on_close();
}
//reconnect at here rather than in on_recv_error to make sure no async invocations performed on this socket before reconnecting.
//if you don't want to reconnect the server after link broken, rewrite this virtual function and do nothing in it or call close_reconnt().
//if you don't want to reconnect the server after link broken, rewrite this virtual function and do nothing in it or call set_reconnect(false).
//if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function.
virtual void after_close() {if (need_reconnect) this->start();}
......
......@@ -38,7 +38,7 @@ public:
if (super::link_status::FORCE_SHUTTING_DOWN != this->status)
this->show_info("server link:", "been shut down.");
this->force_shutdown_in_strand();
super::force_shutdown();
}
//this function is not thread safe, please note.
......@@ -49,7 +49,7 @@ public:
else if (!this->is_shutting_down())
this->show_info("server link:", "being shut down gracefully.");
this->graceful_shutdown_in_strand();
super::graceful_shutdown();
}
protected:
......@@ -57,22 +57,16 @@ protected:
const Server& get_server() const {return server;}
virtual void on_unpack_error() {unified_out::error_out(ASCS_LLF " can not unpack msg.", this->id()); this->unpacker()->dump_left_data(); force_shutdown();}
//do not forget to force_shutdown this socket(in del_socket(), there's a force_shutdown() invocation)
virtual void on_recv_error(const asio::error_code& ec)
{
this->show_info(ec, "server link:", "broken/been shut down");
force_shutdown();
#ifndef ASCS_CLEAR_OBJECT_INTERVAL
server.del_socket(this->shared_from_this());
#endif
}
virtual void on_recv_error(const asio::error_code& ec) {this->show_info(ec, "server link:", "broken/been shut down"); force_shutdown();}
virtual void on_async_shutdown_error() {force_shutdown();}
virtual bool on_heartbeat_error() {this->show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;}
virtual void on_close()
{
this->clear_io_context_refs();
#ifndef ASCS_CLEAR_OBJECT_INTERVAL
server.del_socket(this->shared_from_this(), false);
#endif
super::on_close();
}
......
......@@ -86,7 +86,6 @@ public:
static const typename super::tid TIMER_ASYNC_SHUTDOWN = TIMER_BEGIN;
static const typename super::tid TIMER_END = TIMER_BEGIN + 5;
virtual bool obsoleted() {return !is_shutting_down() && super::obsoleted();}
virtual bool is_ready() {return is_connected();}
virtual void send_heartbeat()
{
......@@ -227,30 +226,8 @@ public:
///////////////////////////////////////////////////
protected:
//do something in the read/write strand -- rw_strand
void do_something_in_strand(const std::function<void()>& handler) {this->dispatch_strand(rw_strand, handler);}
void force_shutdown_in_strand() {do_something_in_strand([this]() {this->force_shutdown();});}
void graceful_shutdown_in_strand() {do_something_in_strand([this]() {this->graceful_shutdown();});}
//following two functions must be called in the read/write strand
void force_shutdown() {if (link_status::FORCE_SHUTTING_DOWN != status) shutdown();}
void graceful_shutdown()
{
if (is_broken())
shutdown();
else if (!is_shutting_down())
{
status = link_status::GRACEFUL_SHUTTING_DOWN;
asio::error_code ec;
this->lowest_layer().shutdown(asio::socket_base::shutdown_send, ec);
if (ec) //graceful shutdown is impossible
shutdown();
else
this->set_timer(TIMER_ASYNC_SHUTDOWN, 10, [this](typename super::tid id)->bool {return this->shutdown_handler(ASCS_GRACEFUL_SHUTDOWN_MAX_DURATION * 100);});
}
}
void force_shutdown() {this->dispatch_in_io_strand([this]() {this->_force_shutdown();});}
void graceful_shutdown() {this->dispatch_in_io_strand([this]() {this->_graceful_shutdown();});}
//used by ssl and websocket
void start_graceful_shutdown_monitoring()
......@@ -313,6 +290,24 @@ private:
using super::do_direct_sync_send_msg;
#endif
void _force_shutdown() {if (link_status::FORCE_SHUTTING_DOWN != status) shutdown();}
void _graceful_shutdown()
{
if (is_broken())
shutdown();
else if (!is_shutting_down())
{
status = link_status::GRACEFUL_SHUTTING_DOWN;
asio::error_code ec;
this->lowest_layer().shutdown(asio::socket_base::shutdown_send, ec);
if (ec) //graceful shutdown is impossible
shutdown();
else
this->set_timer(TIMER_ASYNC_SHUTDOWN, 10, [this](typename super::tid id)->bool {return this->shutdown_handler(ASCS_GRACEFUL_SHUTDOWN_MAX_DURATION * 100);});
}
}
void shutdown()
{
if (is_broken())
......
......@@ -45,7 +45,7 @@ protected:
void shutdown_ssl()
{
this->status = Socket::link_status::GRACEFUL_SHUTTING_DOWN;
this->do_something_in_strand([this]() {
this->dispatch_in_io_strand([this]() {
this->show_info("ssl link:", "been shutting down.");
this->start_graceful_shutdown_monitoring();
this->next_layer().async_shutdown(this->make_handler_error([this](const asio::error_code& ec) {
......@@ -91,10 +91,10 @@ public:
protected:
virtual void on_unpack_error() {unified_out::info_out(ASCS_LLF " can not unpack msg.", this->id()); this->unpacker()->dump_left_data(); force_shutdown(this->is_reconnect());}
virtual void after_close()
virtual void on_close()
{
this->reset_next_layer(this->get_context());
super::after_close();
super::on_close();
}
private:
......
......@@ -97,7 +97,7 @@ public:
const typename Family::endpoint& get_peer_addr() const {return peer_addr;}
void disconnect() {force_shutdown();}
void force_shutdown() {show_info("link:", "been shutting down."); this->dispatch_strand(rw_strand, [this]() {this->close(true);});}
void force_shutdown() {show_info("link:", "been shutting down."); this->dispatch_in_io_strand([this]() {this->close(true);});}
void graceful_shutdown() {force_shutdown();}
std::string endpoint_to_string(const asio::ip::udp::endpoint& ep) const {return ep.address().to_string() + ':' + std::to_string(ep.port());}
......@@ -215,10 +215,6 @@ protected:
{
if (asio::error::operation_aborted != ec)
unified_out::error_out(ASCS_LLF " recv msg error (%d %s)", this->id(), ec.value(), ec.message().data());
#ifndef ASCS_CLEAR_OBJECT_INTERVAL
else if (nullptr != matrix)
matrix->del_socket(this->id());
#endif
}
virtual bool on_heartbeat_error()
......@@ -235,6 +231,10 @@ protected:
sending_msg.p->set_value(sync_call_result::NOT_APPLICABLE);
#endif
this->clear_io_context_refs();
#ifndef ASCS_CLEAR_OBJECT_INTERVAL
if (nullptr != matrix)
matrix->del_socket(this->id());
#endif
super::on_close();
}
......@@ -342,7 +342,7 @@ private:
this->next_layer().async_send_to(asio::buffer(sending_msg.data(), sending_msg.size()), sending_msg.peer_addr, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
else if (do_send_msg(sending_msg))
this->post_strand(rw_strand, [this]() {this->send_handler(asio::error_code(), sending_msg.size());});
this->post_in_io_strand([this]() {this->send_handler(asio::error_code(), sending_msg.size());});
else
this->next_layer().async_send(asio::buffer(sending_msg.data(), sending_msg.size()), make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册