提交 e0a6be20 编写于 作者: Y youngwolf 提交者: youngwolf

More convenient message sending interfaces.

A small optimization for message sending (use message replication rather than parsing message).
Drop direct_broadcast_msg, use broadcast_native_msg instead.
上级 84af74de
......@@ -313,8 +313,6 @@ int main(int argc, const char* argv[])
((normal_server&) normal_server_).do_something_to_all([&str](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(str);});
//or
normal_server_.broadcast_native_msg(str);
//or
normal_server_.direct_broadcast_msg(str);
*/
}
}
......
......@@ -161,7 +161,7 @@ public:
//no native parameter anymore, which means it's always false, if true, you should call direct_(sync_)send_msg instead
virtual bool pack_msg(msg_type&& msg, container_type& msg_can) {return false;}
virtual bool pack_msg(msg_type&& msg1, msg_type&& msg2, container_type& msg_can) {return false;}
virtual bool pack_msg(container_type&& in, container_type& out) {return false;}
virtual bool pack_msg(container_type& in, container_type& out) {return false;}
virtual msg_type pack_heartbeat() {return msg_type();}
//this default implementation is meaningless, just satisfy compilers
......@@ -171,6 +171,8 @@ public:
msg_type pack_msg(const char* pstr, size_t len, bool native = false) {return pack_msg(&pstr, &len, 1, native);}
msg_type pack_msg(const std::string& str, bool native = false) {return pack_msg(str.data(), str.size(), native);}
bool pack_msg(msg_ctype& msg, container_type& msg_can) {return pack_msg(msg_type(msg), msg_can);}
bool pack_msg(msg_ctype& msg1, msg_ctype& msg2, container_type& msg_can) {return pack_msg(msg_type(msg1), msg_type(msg2), msg_can);}
};
//packer concept
......@@ -585,6 +587,18 @@ bool FUNNAME(in_msg_type&& msg, bool can_overflow = false, bool prior = false) \
dur.end(); \
return re && do_direct_send_msg(msg_can, prior); \
} \
bool FUNNAME(in_msg_ctype& msg, bool can_overflow = false, bool prior = false) \
{ \
if (!can_overflow && !this->shrink_send_buffer()) \
return false; \
else if (NATIVE) \
return do_direct_send_msg(msg, prior); \
typename Packer::container_type msg_can; \
auto_duration dur(stat.pack_time_sum); \
auto re = this->packer()->pack_msg(msg, msg_can); \
dur.end(); \
return re && do_direct_send_msg(msg_can, prior); \
} \
bool FUNNAME(in_msg_type&& msg1, in_msg_type&& msg2, bool can_overflow = false, bool prior = false) \
{ \
if (!can_overflow && !this->shrink_send_buffer()) \
......@@ -600,7 +614,22 @@ bool FUNNAME(in_msg_type&& msg1, in_msg_type&& msg2, bool can_overflow = false,
dur.end(); \
return re && do_direct_send_msg(msg_can, prior); \
} \
bool FUNNAME(typename Packer::container_type&& msg_can, bool can_overflow = false, bool prior = false) \
bool FUNNAME(in_msg_ctype& msg1, in_msg_ctype& msg2, bool can_overflow = false, bool prior = false) \
{ \
if (!can_overflow && !this->shrink_send_buffer()) \
return false; \
typename Packer::container_type msg_can; \
if (NATIVE) \
{ \
msg_can.emplace_back(msg1); msg_can.emplace_back(msg2); \
return do_direct_send_msg(msg_can, prior); \
} \
auto_duration dur(stat.pack_time_sum); \
auto re = this->packer()->pack_msg(msg1, msg2, msg_can); \
dur.end(); \
return re && do_direct_send_msg(msg_can, prior); \
} \
bool FUNNAME(typename Packer::container_type& msg_can, bool can_overflow = false, bool prior = false) \
{ \
if (!can_overflow && !this->shrink_send_buffer()) \
return false; \
......@@ -608,7 +637,7 @@ bool FUNNAME(typename Packer::container_type&& msg_can, bool can_overflow = fals
return do_direct_send_msg(msg_can, prior); \
typename Packer::container_type out; \
auto_duration dur(stat.pack_time_sum); \
auto re = this->packer()->pack_msg(std::move(msg_can), out); \
auto re = this->packer()->pack_msg(msg_can, out); \
dur.end(); \
return re && do_direct_send_msg(out, prior); \
} \
......@@ -628,15 +657,23 @@ TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
#define TCP_SAFE_SEND_MSG(FUNNAME, SEND_FUNNAME) \
bool FUNNAME(in_msg_type&& msg, bool can_overflow = false, bool prior = false) \
{while (!SEND_FUNNAME(std::move(msg), can_overflow, prior)) SAFE_SEND_MSG_CHECK(false) return true;} \
bool FUNNAME(in_msg_ctype& msg, bool can_overflow = false, bool prior = false) \
{while (!SEND_FUNNAME(msg, can_overflow, prior)) SAFE_SEND_MSG_CHECK(false) return true;} \
bool FUNNAME(in_msg_type&& msg1, in_msg_type&& msg2, bool can_overflow = false, bool prior = false) \
{while (!SEND_FUNNAME(std::move(msg1), std::move(msg2), can_overflow, prior)) SAFE_SEND_MSG_CHECK(false) return true;} \
bool FUNNAME(typename Packer::container_type&& msg_can, bool can_overflow = false, bool prior = false) \
{while (!SEND_FUNNAME(std::move(msg_can), can_overflow, prior)) SAFE_SEND_MSG_CHECK(false) return true;} \
bool FUNNAME(in_msg_ctype& msg1, in_msg_ctype& msg2, bool can_overflow = false, bool prior = false) \
{while (!SEND_FUNNAME(msg1, msg2, can_overflow, prior)) SAFE_SEND_MSG_CHECK(false) return true;} \
bool FUNNAME(typename Packer::container_type& msg_can, bool can_overflow = false, bool prior = false) \
{while (!SEND_FUNNAME(msg_can, can_overflow, prior)) SAFE_SEND_MSG_CHECK(false) return true;} \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false, bool prior = false) \
{while (!SEND_FUNNAME(pstr, len, num, can_overflow, prior)) SAFE_SEND_MSG_CHECK(false) return true;} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
#define TCP_BROADCAST_MSG(FUNNAME, SEND_FUNNAME) \
void FUNNAME(typename Pool::in_msg_ctype& msg, bool can_overflow = false, bool prior = false) \
{this->do_something_to_all([&](typename Pool::object_ctype& item) {item->SEND_FUNNAME(msg, can_overflow, prior);});} \
void FUNNAME(typename Pool::in_msg_ctype& msg1, typename Pool::in_msg_ctype& msg2, bool can_overflow = false, bool prior = false) \
{this->do_something_to_all([&](typename Pool::object_ctype& item) {item->SEND_FUNNAME(msg1, msg2, can_overflow, prior);});} \
void FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false, bool prior = false) \
{this->do_something_to_all([&](typename Pool::object_ctype& item) {item->SEND_FUNNAME(pstr, len, num, can_overflow, prior);});} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, void)
......@@ -667,6 +704,18 @@ sync_call_result FUNNAME(in_msg_type&& msg, unsigned duration = 0, bool can_over
dur.end(); \
return re ? do_direct_sync_send_msg(msg_can, duration, prior) : sync_call_result::NOT_APPLICABLE; \
} \
sync_call_result FUNNAME(in_msg_ctype& msg, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{ \
if (!can_overflow && !this->shrink_send_buffer()) \
return sync_call_result::NOT_APPLICABLE; \
else if (NATIVE) \
return do_direct_sync_send_msg(msg, duration, prior); \
typename Packer::container_type msg_can; \
auto_duration dur(stat.pack_time_sum); \
auto re = this->packer()->pack_msg(msg, msg_can); \
dur.end(); \
return re ? do_direct_sync_send_msg(msg_can, duration, prior) : sync_call_result::NOT_APPLICABLE; \
} \
sync_call_result FUNNAME(in_msg_type&& msg1, in_msg_type&& msg2, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{ \
if (!can_overflow && !this->shrink_send_buffer()) \
......@@ -682,7 +731,22 @@ sync_call_result FUNNAME(in_msg_type&& msg1, in_msg_type&& msg2, unsigned durati
dur.end(); \
return re ? do_direct_sync_send_msg(msg_can, duration, prior) : sync_call_result::NOT_APPLICABLE; \
} \
sync_call_result FUNNAME(typename Packer::container_type&& msg_can, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
sync_call_result FUNNAME(in_msg_ctype& msg1, in_msg_ctype& msg2, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{ \
if (!can_overflow && !this->shrink_send_buffer()) \
return sync_call_result::NOT_APPLICABLE; \
typename Packer::container_type msg_can; \
if (NATIVE) \
{ \
msg_can.emplace_back(msg1); msg_can.emplace_back(msg2); \
return do_direct_sync_send_msg(msg_can, duration, prior); \
} \
auto_duration dur(stat.pack_time_sum); \
auto re = this->packer()->pack_msg(msg1, msg2, msg_can); \
dur.end(); \
return re ? do_direct_sync_send_msg(msg_can, duration, prior) : sync_call_result::NOT_APPLICABLE; \
} \
sync_call_result FUNNAME(typename Packer::container_type& msg_can, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{ \
if (!can_overflow && !this->shrink_send_buffer()) \
return sync_call_result::NOT_APPLICABLE; \
......@@ -690,7 +754,7 @@ sync_call_result FUNNAME(typename Packer::container_type&& msg_can, unsigned dur
return do_direct_sync_send_msg(msg_can, duration, prior); \
typename Packer::container_type out; \
auto_duration dur(stat.pack_time_sum); \
auto re = this->packer()->pack_msg(std::move(msg_can), out); \
auto re = this->packer()->pack_msg(msg_can, out); \
dur.end(); \
return re ? do_direct_sync_send_msg(out, duration, prior) : sync_call_result::NOT_APPLICABLE; \
} \
......@@ -711,11 +775,17 @@ TCP_SYNC_SEND_MSG_CALL_SWITCH(FUNNAME, sync_call_result)
sync_call_result FUNNAME(in_msg_type&& msg, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{while (sync_call_result::SUCCESS != SEND_FUNNAME(std::move(msg), duration, can_overflow, prior)) \
SAFE_SEND_MSG_CHECK(sync_call_result::NOT_APPLICABLE) return sync_call_result::SUCCESS;} \
sync_call_result FUNNAME(in_msg_ctype& msg, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{while (sync_call_result::SUCCESS != SEND_FUNNAME(msg, duration, can_overflow, prior)) \
SAFE_SEND_MSG_CHECK(sync_call_result::NOT_APPLICABLE) return sync_call_result::SUCCESS;} \
sync_call_result FUNNAME(in_msg_type&& msg1, in_msg_type&& msg2, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{while (sync_call_result::SUCCESS != SEND_FUNNAME(std::move(msg1), std::move(msg2), duration, can_overflow, prior)) \
SAFE_SEND_MSG_CHECK(sync_call_result::NOT_APPLICABLE) return sync_call_result::SUCCESS;} \
sync_call_result FUNNAME(typename Packer::container_type&& msg_can, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{while (sync_call_result::SUCCESS != SEND_FUNNAME(std::move(msg_can), duration, can_overflow, prior)) \
sync_call_result FUNNAME(in_msg_ctype& msg1, in_msg_ctype& msg2, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{while (sync_call_result::SUCCESS != SEND_FUNNAME(msg1, msg2, duration, can_overflow, prior)) \
SAFE_SEND_MSG_CHECK(sync_call_result::NOT_APPLICABLE) return sync_call_result::SUCCESS;} \
sync_call_result FUNNAME(typename Packer::container_type& msg_can, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{while (sync_call_result::SUCCESS != SEND_FUNNAME(msg_can, duration, can_overflow, prior)) \
SAFE_SEND_MSG_CHECK(sync_call_result::NOT_APPLICABLE) return sync_call_result::SUCCESS;} \
sync_call_result FUNNAME(const char* const pstr[], const size_t len[], size_t num, unsigned duration = 0, bool can_overflow = false, bool prior = false) \
{while (sync_call_result::SUCCESS != SEND_FUNNAME(pstr, len, num, duration, can_overflow, prior)) \
......
......@@ -117,7 +117,7 @@ public:
return true;
}
virtual bool pack_msg(container_type&& in, container_type& out)
virtual bool pack_msg(container_type& in, container_type& out)
{
auto len = ascs::get_size_in_byte(in);
if (len > get_max_msg_size()) //not considered overflow
......@@ -185,7 +185,7 @@ public:
return true;
}
virtual bool pack_msg(typename super::container_type&& in, typename super::container_type& out)
virtual bool pack_msg(typename super::container_type& in, typename super::container_type& out)
{
auto len = ascs::get_size_in_byte(in);
if (len > get_max_msg_size()) //not considered overflow
......@@ -221,7 +221,7 @@ public:
virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = true) {return packer::pack_msg(pstr, len, num, true);}
virtual bool pack_msg(msg_type&& msg, container_type& msg_can) {msg_can.emplace_back(std::move(msg)); return true;}
virtual bool pack_msg(msg_type&& msg1, msg_type&& msg2, container_type& msg_can) {msg_can.emplace_back(std::move(msg1)); msg_can.emplace_back(std::move(msg2)); return true;}
virtual bool pack_msg(container_type&& in, container_type& out) {in.swap(out); return true;}
virtual bool pack_msg(container_type& in, container_type& out) {in.swap(out); return true;}
//not support heartbeat because fixed_length_unpacker cannot recognize heartbeat message
};
......@@ -284,7 +284,7 @@ public:
return true;
}
virtual bool pack_msg(container_type&& in, container_type& out)
virtual bool pack_msg(container_type& in, container_type& out)
{
auto len = _prefix.size() + _suffix.size() + ascs::get_size_in_byte(in);
if (len > ASCS_MSG_BUFFER_SIZE) //not considered overflow
......
......@@ -27,6 +27,10 @@ template<typename Object>
class object_pool : public service_pump::i_service, protected timer<executor>
{
public:
typedef typename Object::in_msg_type in_msg_type;
typedef typename Object::in_msg_ctype in_msg_ctype;
typedef typename Object::out_msg_type out_msg_type;
typedef typename Object::out_msg_ctype out_msg_ctype;
typedef std::shared_ptr<Object> object_type;
typedef const object_type object_ctype;
typedef std::unordered_map<uint_fast64_t, object_type> container_type;
......
......@@ -71,9 +71,6 @@ public:
//success at here just means put the msg into tcp::socket_base's send buffer
TCP_BROADCAST_MSG(safe_broadcast_msg, safe_send_msg)
TCP_BROADCAST_MSG(safe_broadcast_native_msg, safe_send_native_msg)
template<typename T> void direct_broadcast_msg(const T& msg, bool can_overflow = false, bool prior = false)
{this->do_something_to_all([&](typename Pool::object_ctype& item) {item->direct_send_msg(msg, can_overflow, prior);});}
//msg sending interface
///////////////////////////////////////////////////
......
......@@ -156,9 +156,6 @@ public:
//success at here just means putting the msg into tcp::socket_base's send buffer
TCP_BROADCAST_MSG(safe_broadcast_msg, safe_send_msg)
TCP_BROADCAST_MSG(safe_broadcast_native_msg, safe_send_native_msg)
template<typename T> void direct_broadcast_msg(const T& msg, bool can_overflow = false, bool prior = false)
{this->do_something_to_all([&](typename Pool::object_ctype& item) {item->direct_send_msg(msg, can_overflow, prior);});}
//msg sending interface
///////////////////////////////////////////////////
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册