提交 10f050d9 编写于 作者: Y youngwolf 提交者: youngowlf

Optimization: use dispatch instead of post.

Tidy ascs::object.
上级 c4fe82bd
......@@ -27,10 +27,11 @@ public:
bool stopped() const {return io_context_.stopped();}
#if ASIO_VERSION >= 101100
template <typename F> inline asio::executor_binder<typename asio::decay<F>::type, asio::io_context::strand> make_strand(ASIO_MOVE_ARG(F) f)
template <typename F> inline static asio::executor_binder<typename asio::decay<F>::type, asio::io_context::strand> make_strand(asio::io_context::strand& strand, ASIO_MOVE_ARG(F) f)
{return asio::bind_executor(strand, ASIO_MOVE_CAST(F)(f));}
#else
template <typename F> asio::detail::wrapped_handler<asio::io_context::strand, F, asio::detail::is_continuation_if_running> make_strand(F f) {return strand.wrap(f);}
template <typename F> static asio::detail::wrapped_handler<asio::io_context::strand, F, asio::detail::is_continuation_if_running> make_strand(asio::io_context::strand& strand, F f)
{return strand.wrap(f);}
#endif
#if 0 == ASCS_DELAY_CLOSE
......@@ -39,49 +40,37 @@ public:
#if (defined(_MSC_VER) && _MSC_VER > 1800) || (defined(__cplusplus) && __cplusplus > 201103L)
#if ASIO_VERSION >= 101100
template<typename F> void post(F&& handler) {asio::post(io_context_, [unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post(const F& handler) {asio::post(io_context_, [unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void post_strand(F&& handler) {asio::post(strand, [unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {asio::post(strand, [unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post_strand(const F& handler) {asio::post(strand, [unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler) {asio::post(strand, [unused(this->async_call_indicator), handler]() {handler();});}
//Don't use defer function unless you fully understand asio::defer, which was introduced in asio 1.11
template<typename F> void defer(F&& handler) {asio::defer(io_context_, [unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void defer(const F& handler) {asio::defer(io_context_, [unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void defer_strand(F&& handler) {strand.defer([unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void defer_strand(asio::io_context::strand& strand, F&& handler) {strand.defer([unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void defer_strand(const F& handler) {strand.defer([unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void defer_strand(asio::io_context::strand& strand, const F& handler) {strand.defer([unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void post(F&& handler) {asio::post(io_context_, [unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void defer(F&& handler) {asio::defer(io_context_, [unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch(F&& handler) {asio::dispatch(io_context_, [unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {asio::post(strand, [unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void defer_strand(asio::io_context::strand& strand, F&& handler) {asio::defer(strand, [unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler) {asio::dispatch(strand, [unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
#else
template<typename F> void post(F&& handler) {io_context_.post([unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post(const F& handler) {io_context_.post([unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void post_strand(F&& handler) {strand.post([unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {strand.post([unused(this->async_call_indicator), handler(std::move(handler))]() {handler();});}
template<typename F> void post_strand(const F& handler) {strand.post([unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler) {strand.post([unused(this->async_call_indicator), handler]() {handler();});}
template<typename F> void post(F&& handler) {io_context_.post([unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch(F&& handler) {io_context_.dispatch([unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {strand.post([unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler) {strand.dispatch([unused(this->async_call_indicator), handler(std::forward<F>(handler))]() {handler();});}
#endif
template<typename F> handler_with_error make_handler_error(F&& handler) const {return [unused(this->async_call_indicator), handler(std::move(handler))](const auto& ec) {handler(ec);};}
template<typename F> handler_with_error make_handler_error(const F& handler) const {return [unused(this->async_call_indicator), handler](const auto& ec) {handler(ec);};}
template<typename F> handler_with_error make_handler_error(F&& handler) const {return [unused(this->async_call_indicator), handler(std::forward<F>(handler))](const auto& ec) {handler(ec);};}
template<typename F> handler_with_error_size make_handler_error_size(F&& handler) const
{return [unused(this->async_call_indicator), handler(std::move(handler))](const auto& ec, auto bytes_transferred) {handler(ec, bytes_transferred);};}
template<typename F> handler_with_error_size make_handler_error_size(const F& handler) const
{return [unused(this->async_call_indicator), handler](const auto& ec, auto bytes_transferred) {handler(ec, bytes_transferred);};}
{return [unused(this->async_call_indicator), handler(std::forward<F>(handler))](const auto& ec, auto bytes_transferred) {handler(ec, bytes_transferred);};}
#else
#if ASIO_VERSION >= 101100
template<typename F> void post(const F& handler) {auto unused(async_call_indicator); asio::post(io_context_, [=]() {handler();});}
template<typename F> void post_strand(const F& handler) {auto unused(async_call_indicator); asio::post(strand, [=]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler) {auto unused(async_call_indicator); asio::post(strand, [=]() {handler();});}
//Don't use defer function unless you fully understand asio::defer, which was introduced in asio 1.11
template<typename F> void defer(const F& handler) {auto unused(async_call_indicator); asio::defer(io_context_, [=]() {handler();});}
template<typename F> void defer_strand(const F& handler) {auto unused(async_call_indicator); strand.defer([=]() {handler();});}
template<typename F> void defer_strand(asio::io_context::strand& strand, const F& handler) {auto unused(async_call_indicator); strand.defer([=]() {handler();});}
template<typename F> void dispatch(const F& handler) {auto unused(async_call_indicator); asio::dispatch(io_context_, [=]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler) {auto unused(async_call_indicator); asio::post(strand, [=]() {handler();});}
template<typename F> void defer_strand(asio::io_context::strand& strand, const F& handler) {auto unused(async_call_indicator); asio::defer(strand, [=]() {handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, const F& handler) {auto unused(async_call_indicator); asio::dispatch(strand, [=]() {handler();});}
#else
template<typename F> void post(const F& handler) {auto unused(async_call_indicator); io_context_.post([=]() {handler();});}
template<typename F> void post_strand(const F& handler) {auto unused(async_call_indicator); strand.post([=]() {handler();});}
template<typename F> void dispatch(const F& handler) {auto unused(async_call_indicator); io_context_.dispatch([=]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler) {auto unused(async_call_indicator); strand.post([=]() {handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, const F& handler) {auto unused(async_call_indicator); strand.dispatch([=]() {handler();});}
#endif
template<typename F> handler_with_error make_handler_error(const F& handler) const {auto unused(async_call_indicator); return [=](const asio::error_code& ec) {handler(ec);};}
template<typename F> handler_with_error_size make_handler_error_size(const F& handler) const
{auto unused(async_call_indicator); return [=](const asio::error_code& ec, size_t bytes_transferred) {handler(ec, bytes_transferred);};}
......@@ -92,49 +81,36 @@ public:
inline void set_async_calling(bool) {}
protected:
object(asio::io_context& _io_context_) : async_call_indicator(std::make_shared<char>('\0')), io_context_(_io_context_), strand(_io_context_) {}
object(asio::io_context& _io_context_) : async_call_indicator(std::make_shared<char>('\0')), io_context_(_io_context_) {}
std::shared_ptr<char> async_call_indicator;
#else
#if ASIO_VERSION >= 101100
template<typename F> void post(F&& handler) {asio::post(io_context_, std::move(handler));}
template<typename F> void post(const F& handler) {asio::post(io_context_, handler);}
template<typename F> void post_strand(F&& handler) {asio::post(strand, std::move(handler));}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {asio::post(strand, std::move(handler));}
template<typename F> void post_strand(const F& handler) {asio::post(strand, handler);}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler) {asio::post(strand, handler);}
//Don't use defer function unless you fully understand asio::defer, which was introduced in asio 1.11
template<typename F> void defer(F&& handler) {asio::defer(io_context_, std::move(handler));}
template<typename F> void defer(const F& handler) {asio::defer(io_context_, handler);}
template<typename F> void defer_strand(F&& handler) {strand.defer(std::move(handler));}
template<typename F> void defer_strand(asio::io_context::strand& strand, F&& handler) {strand.defer(std::move(handler));}
template<typename F> void defer_strand(const F& handler) {strand.defer(handler);}
template<typename F> void defer_strand(asio::io_context::strand& strand, const F& handler) {strand.defer(handler);}
template<typename F> void post(F&& handler) {asio::post(io_context_, std::forward<F>(handler));}
template<typename F> void defer(F&& handler) {asio::defer(io_context_, std::forward<F>(handler));}
template<typename F> void dispatch(F&& handler) {asio::dispatch(io_context_, std::forward<F>(handler));}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {asio::post(strand, std::forward<F>(handler));}
template<typename F> void defer_strand(asio::io_context::strand& strand, F&& handler) {asio::defer(strand, std::forward<F>(handler));}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler) {asio::dispatch(strand, std::forward<F>(handler));}
#else
template<typename F> void post(F&& handler) {io_context_.post(std::move(handler));}
template<typename F> void post(const F& handler) {io_context_.post(handler);}
template<typename F> void post_strand(F&& handler) {strand.post(std::move(handler));}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {strand.post(std::move(handler));}
template<typename F> void post_strand(const F& handler) {strand.post(handler);}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler) {strand.post(handler);}
template<typename F> void post(F&& handler) {io_context_.post(std::forward<F>(handler));}
template<typename F> void dispatch(F&& handler) {io_context_.dispatch(std::forward<F>(handler));}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {strand.post(std::forward<F>(handler));}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler) {strand.dispatch(std::forward<F>(handler));}
#endif
template<typename F> inline F&& make_handler_error(F&& f) const {return std::move(f);}
template<typename F> inline const F& make_handler_error(const F& f) const {return f;}
template<typename F> inline F&& make_handler_error_size(F&& f) const {return std::move(f);}
template<typename F> inline const F& make_handler_error_size(const F& f) const {return f;}
template<typename F> inline F&& make_handler_error(F&& f) const {return std::forward<F>(f);}
template<typename F> inline F&& make_handler_error_size(F&& f) const {return std::forward<F>(f);}
inline bool is_async_calling() const {return async_calling;}
inline bool is_last_async_call() const {return true;}
inline void set_async_calling(bool value) {async_calling = value;}
protected:
object(asio::io_context& _io_context_) : async_calling(false), io_context_(_io_context_), strand(_io_context_) {}
object(asio::io_context& _io_context_) : async_calling(false), io_context_(_io_context_) {}
bool async_calling;
#endif
asio::io_context& io_context_;
asio::io_context::strand strand;
};
} //namespace
......
......@@ -196,7 +196,7 @@ protected:
virtual bool on_heartbeat_error() = 0; //heartbeat timed out, return true to continue heartbeat function (useful for UDP)
//if ASCS_DELAY_CLOSE is equal to zero, in this callback, socket guarantee that there's no any other async call associated it,
// include user timers(created by set_timer()) and user async calls(started via post() or defer()), this means you can clean up any resource
// include user timers(created by set_timer()) and user async calls(started via post(), dispatch() or defer()), this means you can clean up any resource
// in this socket except this socket itself, because this socket maybe is being maintained by object_pool.
//otherwise (bigger than zero), socket simply call this callback ASCS_DELAY_CLOSE seconds later after link down, no any guarantees.
virtual void on_close() {unified_out::info_out("on_close()");}
......@@ -329,7 +329,7 @@ private:
return false;
}
void dispatch_msg() {if (!dispatching) this->post_strand(strand, [this]() {this->do_dispatch_msg();});}
void dispatch_msg() {if (!dispatching) this->dispatch_strand(strand, [this]() {this->do_dispatch_msg();});}
void do_dispatch_msg()
{
if (dispatching)
......
......@@ -35,8 +35,8 @@ private:
protected:
enum link_status {CONNECTED, FORCE_SHUTTING_DOWN, GRACEFUL_SHUTTING_DOWN, BROKEN};
socket_base(asio::io_context& io_context_) : super(io_context_) {first_init();}
template<typename Arg> socket_base(asio::io_context& io_context_, Arg& arg) : super(io_context_, arg) {first_init();}
socket_base(asio::io_context& io_context_) : super(io_context_), strand(io_context_) {first_init();}
template<typename Arg> socket_base(asio::io_context& io_context_, Arg& arg) : super(io_context_, arg), strand(io_context_) {first_init();}
//helper function, just call it in constructor
void first_init() {status = link_status::BROKEN; unpacker_ = std::make_shared<Unpacker>();}
......@@ -109,7 +109,7 @@ public:
#ifdef ASCS_RECV_AFTER_HANDLING
//changing unpacker must before calling ascs::socket::recv_msg, and define ASCS_RECV_AFTER_HANDLING macro.
void unpacker(const std::shared_ptr<i_unpacker<out_msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
virtual void recv_msg() {this->post_strand([this]() {this->do_recv_msg();});}
virtual void recv_msg() {this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
///////////////////////////////////////////////////
......@@ -172,9 +172,9 @@ protected:
private:
#ifndef ASCS_RECV_AFTER_HANDLING
virtual void recv_msg() {this->post_strand([this]() {this->do_recv_msg();});}
virtual void recv_msg() {this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
virtual void send_msg() {if (!this->sending) this->post_strand([this]() {this->do_send_msg(false);});}
virtual void send_msg() {if (!this->sending) this->dispatch_strand(strand, [this]() {this->do_send_msg(false);});}
void shutdown()
{
......@@ -197,7 +197,7 @@ private:
unified_out::error_out("The unpacker returned an empty buffer, quit receiving!");
else
asio::async_read(this->next_layer(), recv_buff,
[this](const asio::error_code& ec, size_t bytes_transferred)->size_t {return this->completion_checker(ec, bytes_transferred);}, this->make_strand(
[this](const asio::error_code& ec, size_t bytes_transferred)->size_t {return this->completion_checker(ec, bytes_transferred);}, this->make_strand(strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
}
......@@ -253,7 +253,7 @@ private:
if ((this->sending = !bufs.empty()))
{
last_send_msg.front().restart();
asio::async_write(this->next_layer(), bufs, this->make_strand(
asio::async_write(this->next_layer(), bufs, this->make_strand(strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
return true;
}
......@@ -315,6 +315,9 @@ protected:
std::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
volatile link_status status;
private:
asio::io_context::strand strand;
};
}} //namespace
......
......@@ -33,7 +33,7 @@ private:
typedef socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type, InQueue, InContainer, OutQueue, OutContainer> super;
public:
socket_base(asio::io_context& io_context_) : super(io_context_), unpacker_(std::make_shared<Unpacker>()) {}
socket_base(asio::io_context& io_context_) : super(io_context_), unpacker_(std::make_shared<Unpacker>()), strand(io_context_) {}
virtual bool is_ready() {return this->lowest_layer().is_open();}
virtual void send_heartbeat()
......@@ -90,7 +90,7 @@ public:
#ifdef ASCS_RECV_AFTER_HANDLING
//changing unpacker must before calling ascs::socket::recv_msg, and define ASCS_RECV_AFTER_HANDLING macro.
void unpacker(const std::shared_ptr<i_unpacker<typename Unpacker::msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
virtual void recv_msg() {this->post_strand([this]() {this->do_recv_msg();});}
virtual void recv_msg() {this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
///////////////////////////////////////////////////
......@@ -122,13 +122,13 @@ protected:
private:
#ifndef ASCS_RECV_AFTER_HANDLING
virtual void recv_msg() {this->post_strand([this]() {this->do_recv_msg();});}
virtual void recv_msg() {this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
virtual void send_msg() {if (!this->sending) this->post_strand([this]() {this->do_send_msg(false);});}
virtual void send_msg() {if (!this->sending) this->dispatch_strand(strand, [this]() {this->do_send_msg(false);});}
void shutdown()
{
this->post_strand([this]() {
this->dispatch_strand(strand, [this]() {
this->stop_all_timer();
this->close();
......@@ -148,7 +148,7 @@ private:
if (0 == asio::buffer_size(recv_buff))
unified_out::error_out("The unpacker returned an empty buffer, quit receiving!");
else
this->next_layer().async_receive_from(recv_buff, temp_addr, this->make_strand(
this->next_layer().async_receive_from(recv_buff, temp_addr, this->make_strand(strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
}
......@@ -188,7 +188,7 @@ private:
this->stat.send_delay_sum += statistic::now() - last_send_msg.begin_time;
last_send_msg.restart();
this->next_layer().async_send_to(ASCS_SEND_BUFFER_TYPE(last_send_msg.data(), last_send_msg.size()), last_send_msg.peer_addr, this->make_strand(
this->next_layer().async_send_to(ASCS_SEND_BUFFER_TYPE(last_send_msg.data(), last_send_msg.size()), last_send_msg.peer_addr, this->make_strand(strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
return true;
}
......@@ -259,6 +259,9 @@ protected:
asio::ip::udp::endpoint local_addr;
asio::ip::udp::endpoint temp_addr; //used when receiving messages
asio::ip::udp::endpoint peer_addr;
private:
asio::io_context::strand strand;
};
}} //namespace
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册