提交 e60cf2a8 编写于 作者: Y youngwolf

Generalize function send_msg, send_native_msg, safe_send_msg,...

Generalize function send_msg, send_native_msg, safe_send_msg, safe_send_native_msg, broadcast_msg, broadcast_native_msg, safe_broadcast_msg and safe_broadcast_native_msg.
上级 9157c083
......@@ -86,7 +86,7 @@ int main(int argc, const char* argv[])
else if (RECONNECT_COMMAND == str)
client.graceful_shutdown(true);
else
client.safe_send_msg(str);
client.safe_send_msg(str, false);
}
return 0;
......
......@@ -71,7 +71,7 @@ static bool check_msg;
///////////////////////////////////////////////////
//msg sending interface
#define TCP_RANDOM_SEND_MSG(FUNNAME, SEND_FUNNAME) \
void FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
void FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow) \
{ \
auto index = (size_t) ((uint64_t) rand() * (size() - 1) / RAND_MAX); \
at(index)->SEND_FUNNAME(pstr, len, num, can_overflow); \
......@@ -106,7 +106,7 @@ public:
memset(buff, msg_fill, msg_len);
memcpy(buff, &recv_index, sizeof(size_t)); //seq
send_msg(buff, msg_len);
send_msg(buff, msg_len, false);
delete[] buff;
}
......@@ -255,7 +255,7 @@ void send_msg_randomly(echo_client& client, size_t msg_num, size_t msg_len, char
{
memcpy(buff, &i, sizeof(size_t)); //seq
client.safe_random_send_msg(buff, msg_len); //can_overflow is false, it's important
client.safe_random_send_msg(buff, msg_len, false); //can_overflow is false, it's important
send_bytes += msg_len;
auto new_percent = (unsigned) (100 * send_bytes / total_msg_bytes);
......@@ -318,7 +318,7 @@ void send_msg_concurrently(echo_client& client, size_t send_thread_num, size_t m
{
memcpy(buff, &i, sizeof(size_t)); //seq
do_something_to_all(item, [buff, msg_len](echo_client::object_ctype& item2) {item2->safe_send_msg(buff, msg_len);}); //can_overflow is false, it's important
do_something_to_all(item, [buff, msg_len](echo_client::object_ctype& item2) {item2->safe_send_msg(buff, msg_len, false);}); //can_overflow is false, it's important
}
delete[] buff;
});
......
......@@ -109,11 +109,11 @@ protected:
out_container_type tmp_can;
can.swap(tmp_can);
ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->send_msg(msg.data(), msg.size(), true);});
ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->send_msg(msg, true);});
return tmp_can.size();
}
#else
virtual bool on_msg_handle(out_msg_type& msg) {return send_msg(msg.data(), msg.size());}
virtual bool on_msg_handle(out_msg_type& msg) {return send_msg(msg, false);}
#endif
//msg handling end
};
......@@ -220,7 +220,7 @@ int main(int argc, const char* argv[])
{
// /*
//broadcast series functions call pack_msg for each client respectively, because clients may used different protocols(so different type of packers, of course)
server_.broadcast_msg(str.data(), str.size() + 1);
server_.broadcast_msg(str.data(), str.size() + 1, false);
//send \0 character too, because demo client used basic_buffer as its msg type, it will not append \0 character automatically as std::string does,
//so need \0 character when printing it.
// */
......@@ -231,12 +231,12 @@ int main(int argc, const char* argv[])
//send \0 character too, because demo client used basic_buffer as its msg type, it will not append \0 character automatically as std::string does,
//so need \0 character when printing it.
if (!msg.empty())
server_.do_something_to_all([&msg](server_base<normal_server_socket>::object_ctype& item) {item->direct_send_msg(msg);});
server_.do_something_to_all([&msg](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(msg);});
*/
/*
//if demo client is using stream_unpacker
if (!str.empty())
server_.do_something_to_all([&str](server_base<normal_server_socket>::object_ctype& item) {item->direct_send_msg(str);});
server_.do_something_to_all([&str](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(str);});
*/
}
}
......
......@@ -44,7 +44,7 @@ public:
total_bytes *= msg_num;
send_bytes = recv_bytes = 0;
send_native_msg(msg, msg_len);
send_native_msg(msg, msg_len, false);
}
protected:
......
......@@ -150,7 +150,7 @@ int main(int argc, const char* argv[])
// client_.graceful_shutdown(false, false); //if you used single_client
#endif
else
server_.broadcast_msg(str);
server_.broadcast_msg(str, false);
}
return 0;
......
......@@ -57,7 +57,7 @@ int main(int argc, const char* argv[])
sp.start_service();
}
else
service.safe_send_native_msg(str); //to send to different endpoints, use overloads that take a const asio::ip::udp::endpoint& parameter
service.safe_send_native_msg(str, false); //to send to different endpoints, use overloads that take a const asio::ip::udp::endpoint& parameter
}
return 0;
......
......@@ -482,11 +482,11 @@ template<typename _Predicate> void NAME(const _Predicate& __pred) const {for (au
///////////////////////////////////////////////////
//TCP msg sending interface
#define TCP_SEND_MSG_CALL_SWITCH(FUNNAME, TYPE) \
TYPE FUNNAME(const char* pstr, size_t len, bool can_overflow = false) {return FUNNAME(&pstr, &len, 1, can_overflow);} \
TYPE FUNNAME(const std::string& str, bool can_overflow = false) {return FUNNAME(str.data(), str.size(), can_overflow);}
TYPE FUNNAME(const char* pstr, size_t len, bool can_overflow) {return FUNNAME(&pstr, &len, 1, can_overflow);} \
template<typename Buffer> TYPE FUNNAME(const Buffer& buffer, bool can_overflow) {return FUNNAME(buffer.data(), buffer.size(), can_overflow);}
#define TCP_SEND_MSG(FUNNAME, NATIVE) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow) \
{ \
if (!can_overflow && !this->is_send_buffer_available()) \
return false; \
......@@ -500,11 +500,11 @@ TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into tcp::socket_base's send buffer successfully
//if can_overflow equal to false and the buffer is not available, will wait until it becomes available
#define TCP_SAFE_SEND_MSG(FUNNAME, SEND_FUNNAME) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) {while (!SEND_FUNNAME(pstr, len, num, can_overflow)) SAFE_SEND_MSG_CHECK return true;} \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow) {while (!SEND_FUNNAME(pstr, len, num, can_overflow)) SAFE_SEND_MSG_CHECK return true;} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
#define TCP_BROADCAST_MSG(FUNNAME, SEND_FUNNAME) \
void FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
void FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow) \
{this->do_something_to_all([=](typename Pool::object_ctype& item) {item->SEND_FUNNAME(pstr, len, num, can_overflow);});} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, void)
//TCP msg sending interface
......@@ -513,14 +513,15 @@ TCP_SEND_MSG_CALL_SWITCH(FUNNAME, void)
///////////////////////////////////////////////////
//UDP msg sending interface
#define UDP_SEND_MSG_CALL_SWITCH(FUNNAME, TYPE) \
TYPE FUNNAME(const char* pstr, size_t len, bool can_overflow = false) {return FUNNAME(peer_addr, pstr, len, can_overflow);} \
TYPE FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* pstr, size_t len, bool can_overflow = false) {return FUNNAME(peer_addr, &pstr, &len, 1, can_overflow);} \
TYPE FUNNAME(const std::string& str, bool can_overflow = false) {return FUNNAME(peer_addr, str, can_overflow);} \
TYPE FUNNAME(const asio::ip::udp::endpoint& peer_addr, const std::string& str, bool can_overflow = false) {return FUNNAME(peer_addr, str.data(), str.size(), can_overflow);}
TYPE FUNNAME(const char* pstr, size_t len, bool can_overflow) {return FUNNAME(peer_addr, pstr, len, can_overflow);} \
TYPE FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* pstr, size_t len, bool can_overflow) {return FUNNAME(peer_addr, &pstr, &len, 1, can_overflow);} \
template<typename Buffer> TYPE FUNNAME(const Buffer& buffer, bool can_overflow) {return FUNNAME(peer_addr, buffer, can_overflow);} \
template<typename Buffer> TYPE FUNNAME(const asio::ip::udp::endpoint& peer_addr, const Buffer& buffer, bool can_overflow) \
{return FUNNAME(peer_addr, buffer.data(), buffer.size(), can_overflow);}
#define UDP_SEND_MSG(FUNNAME, NATIVE) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) {return FUNNAME(peer_addr, pstr, len, num, can_overflow);} \
bool FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow) {return FUNNAME(peer_addr, pstr, len, num, can_overflow);} \
bool FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, bool can_overflow) \
{ \
if (!can_overflow && !this->is_send_buffer_available()) \
return false; \
......@@ -532,8 +533,8 @@ UDP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into udp::socket_base's send buffer successfully
//if can_overflow equal to false and the buffer is not available, will wait until it becomes available
#define UDP_SAFE_SEND_MSG(FUNNAME, SEND_FUNNAME) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) {return FUNNAME(peer_addr, pstr, len, num, can_overflow);} \
bool FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow) {return FUNNAME(peer_addr, pstr, len, num, can_overflow);} \
bool FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, bool can_overflow) \
{while (!SEND_FUNNAME(peer_addr, pstr, len, num, can_overflow)) SAFE_SEND_MSG_CHECK return true;} \
UDP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
//UDP msg sending interface
......
......@@ -321,6 +321,10 @@
* ascs::socket::is_dispatching_msg() has been renamed to is_dispatching().
* typedef ascs::socket::in_container_type has been renamed to in_queue_type.
* typedef ascs::socket::out_container_type has been renamed to out_queue_type.
* Wipe default value for parameter can_overflow in function send_msg, send_native_msg, safe_send_msg, safe_send_native_msg,
* broadcast_msg, broadcast_native_msg, safe_broadcast_msg and safe_broadcast_native_msg, this is because we added template parameter to some of them,
* and the compiler will complain (ambiguity) if we omit the can_overflow parameter. So take send_msg function for example, if you omitted can_overflow
* before, then in 1.3, you must supplement it, like send_msg(...) -> send_msg(..., false).
*
* HIGHLIGHT:
* After introduced asio::io_context::strand (which is required, see FIX section for more details), we wiped two atomic in ascs::socket.
......@@ -335,6 +339,8 @@
* Add function ascs::socket::is_recv_buffer_available(), you can use it before calling recv_msg() to avoid receiving buffer overflow.
* Add typedef ascs::socket::in_container_type to represent the container type used by in_queue_type (sending buffer).
* Add typedef ascs::socket::out_container_type to represent the container type used by out_queue_type (receiving buffer).
* Generalize function send_msg, send_native_msg, safe_send_msg, safe_send_native_msg, broadcast_msg, broadcast_native_msg,
* safe_broadcast_msg and safe_broadcast_native_msg.
*
* DELETION:
* Deleted virtual function bool ascs::socket::on_msg().
......
......@@ -191,8 +191,8 @@ public:
POP_FIRST_PENDING_MSG(pop_first_pending_recv_msg, recv_msg_buffer, out_msg)
//clear all pending msgs
POP_ALL_PENDING_MSG(pop_all_pending_send_msg, send_msg_buffer, InContainer<in_msg>)
POP_ALL_PENDING_MSG(pop_all_pending_recv_msg, recv_msg_buffer, OutContainer<out_msg>)
POP_ALL_PENDING_MSG(pop_all_pending_send_msg, send_msg_buffer, in_container_type)
POP_ALL_PENDING_MSG(pop_all_pending_recv_msg, recv_msg_buffer, out_container_type)
protected:
virtual bool do_start()
......@@ -376,7 +376,7 @@ private:
recv_msg_buffer.front().restart(end_time);
set_timer(TIMER_DISPATCH_MSG, msg_handling_interval_, [this](tid id)->bool {return this->timer_handler(TIMER_DISPATCH_MSG);}); //hold dispatching
}
else //dispatch msg in sequence
else
{
#else
if ((dispatching = !last_dispatch_msg.empty() || recv_msg_buffer.try_dequeue(last_dispatch_msg)))
......@@ -392,12 +392,12 @@ private:
last_dispatch_msg.restart(end_time);
set_timer(TIMER_DISPATCH_MSG, msg_handling_interval_, [this](tid id)->bool {return this->timer_handler(TIMER_DISPATCH_MSG);}); //hold dispatching
}
else //dispatch msg in sequence
else
{
last_dispatch_msg.clear();
#endif
dispatching = false;
dispatch_msg();
dispatch_msg(); //dispatch msg in sequence
}
}
else if (!recv_msg_buffer.empty()) //just make sure no pending msgs
......
......@@ -64,9 +64,6 @@ public:
//success at here just means put the msg into tcp::socket_base's send buffer
TCP_BROADCAST_MSG(safe_broadcast_msg, safe_send_msg)
TCP_BROADCAST_MSG(safe_broadcast_native_msg, safe_send_native_msg)
//send message with sync mode
TCP_BROADCAST_MSG(sync_broadcast_msg, sync_send_msg)
TCP_BROADCAST_MSG(sync_broadcast_native_msg, sync_send_native_msg)
//msg sending interface
///////////////////////////////////////////////////
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册