提交 009accc4 编写于 作者: W wolf

Packer and unpacker can have different msg types.

上级 64b2000f
......@@ -3,8 +3,7 @@
#define SERVER_PORT 9527
#define FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define CUSTOM_LOG
#define DEFAULT_PACKER inflexible_packer
#define DEFAULT_UNPACKER inflexible_unpacker
#define DEFAULT_UNPACKER unbuffered_unpacker
//the following three macros demonstrate how to support huge msg(exceed 65535 - 2).
//huge msg will consume huge memory, for example, if we want to support 1M msg size, because every st_tcp_socket has a
......
......@@ -81,7 +81,7 @@ protected:
//msg handling: send the original msg back(echo server)
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
//this virtual function doesn't exists if FORCE_TO_USE_MSG_RECV_BUFFER been defined
virtual bool on_msg(msg_type& msg)
virtual bool on_msg(out_msg_type& msg)
{
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
......@@ -92,7 +92,7 @@ protected:
}
#endif
//we should handle msg in on_msg_handle for time-consuming task like this:
virtual bool on_msg_handle(msg_type& msg, bool link_down)
virtual bool on_msg_handle(out_msg_type& msg, bool link_down)
{
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
......
......@@ -3,8 +3,7 @@
#define SERVER_PORT 9528
#define FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define CUSTOM_LOG
#define DEFAULT_PACKER inflexible_packer
#define DEFAULT_UNPACKER inflexible_unpacker
#define DEFAULT_UNPACKER unbuffered_unpacker
//the following three macros demonstrate how to support huge msg(exceed 65535 - 2).
//huge msg will consume huge memory, for example, if we want to support 1M msg size, because every st_tcp_socket has a
......
......@@ -81,7 +81,7 @@ protected:
//msg handling: send the original msg back(echo server)
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
//this virtual function doesn't exists if FORCE_TO_USE_MSG_RECV_BUFFER been defined
virtual bool on_msg(msg_type& msg)
virtual bool on_msg(out_msg_type& msg)
{
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
......@@ -92,7 +92,7 @@ protected:
}
#endif
//we should handle msg in on_msg_handle for time-consuming task like this:
virtual bool on_msg_handle(msg_type& msg, bool link_down)
virtual bool on_msg_handle(out_msg_type& msg, bool link_down)
{
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
......@@ -184,7 +184,7 @@ int main(int argc, const char* argv[])
//send \0 character too, because asio_client used inflexible_buffer as its msg type, it will not append \0 character automatically as std::string does,
//so need \0 character when printing it.
if (p.pack_msg(msg, str.data(), str.size() + 1))
server_.do_something_to_all(boost::bind((bool (normal_server_socket::*)(normal_server_socket::msg_ctype&, bool)) &normal_server_socket::direct_send_msg, _1, boost::cref(msg), false));
server_.do_something_to_all(boost::bind((bool (normal_server_socket::*)(packer::msg_ctype&, bool)) &normal_server_socket::direct_send_msg, _1, boost::cref(msg), false));
}
}
......
......@@ -75,9 +75,10 @@ protected:
//msg handling
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
//we can handle msg very fast, so we don't use recv buffer
virtual bool on_msg(msg_type& msg) {handle_msg(msg); return true;}
virtual bool on_msg(out_msg_type& msg) {handle_msg(msg); return true;}
#endif
virtual bool on_msg_handle(msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//we will change unpacker at runtime, this operation must be done in on_msg, do not to it in on_msg_handle
//virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//msg handling end
private:
......@@ -94,7 +95,7 @@ private:
}
void trans_end() {clear(); ++completed_client_num;}
void handle_msg(msg_ctype& msg)
void handle_msg(out_msg_ctype& msg)
{
if (TRANS_BUSY == state)
{
......
......@@ -14,13 +14,13 @@ void file_socket::reset() {trans_end(); st_server_socket::reset();}
//msg handling
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
//we can handle msg very fast, so we don't use recv buffer
bool file_socket::on_msg(msg_type& msg) {handle_msg(msg); return true;}
bool file_socket::on_msg(out_msg_type& msg) {handle_msg(msg); return true;}
#endif
bool file_socket::on_msg_handle(msg_type& msg, bool link_down) {handle_msg(msg); return true;}
bool file_socket::on_msg_handle(out_msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//msg handling end
#ifdef WANT_MSG_SEND_NOTIFY
void file_socket::on_msg_send(msg_type& msg)
void file_socket::on_msg_send(in_msg_type& msg)
{
BOOST_AUTO(buffer, boost::dynamic_pointer_cast<file_buffer>(msg.raw_buffer()));
if (NULL != buffer)
......@@ -44,7 +44,7 @@ void file_socket::trans_end()
}
}
void file_socket::handle_msg(msg_ctype& msg)
void file_socket::handle_msg(out_msg_ctype& msg)
{
if (msg.size() <= ORDER_LEN)
{
......
......@@ -21,18 +21,18 @@ protected:
//msg handling
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
//we can handle msg very fast, so we don't use recv buffer
virtual bool on_msg(msg_type& msg);
virtual bool on_msg(out_msg_type& msg);
#endif
virtual bool on_msg_handle(msg_type& msg, bool link_down);
virtual bool on_msg_handle(out_msg_type& msg, bool link_down);
//msg handling end
#ifdef WANT_MSG_SEND_NOTIFY
virtual void on_msg_send(msg_type& msg);
virtual void on_msg_send(in_msg_type& msg);
#endif
private:
void trans_end();
void handle_msg(msg_ctype& msg);
void handle_msg(out_msg_ctype& msg);
};
#endif //#ifndef FILE_SOCKET_H_
......@@ -385,12 +385,17 @@
* Fixed the possibility of race condition in on_all_msg_send.
* Dropped fixed_length_packer, use packer(the default packer) instead.
* i_service::init() will now return boolean value to indicate whether the initialization was succeeded or not.
*
* 5.2.2 2015.8.20
* Packer and unpacker now can have different msg types, for example, use std::string when sending msgs, use inflexible_buffer when receiving msgs,
* see asio_client for more details.
* Dropped inflexible_packer, changed inflexible_unpacker to unbuffered_unpacker, just the class name.
*/
#ifndef ST_ASIO_WRAPPER_H_
#define ST_ASIO_WRAPPER_H_
#define ST_ASIO_WRAPPER_VERSION 50201
#define ST_ASIO_WRAPPER_VERSION 50202
#if !defined _MSC_VER && !defined __GNUC__
#error st_asio_wrapper only support vc and gcc.
......
......@@ -24,22 +24,20 @@
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/typeof/typeof.hpp>
#include "st_asio_wrapper.h"
#ifndef UNIFIED_OUT_BUF_NUM
#define UNIFIED_OUT_BUF_NUM 2048
#endif
//buffer size used when receiving msg, must equal to or larger than the biggest msg size,
//the size of the buffer used when receiving msg, must equal to or larger than the biggest msg size,
//the bigger this buffer is, the more msgs can be received in one time if there are enough msgs buffered in the SOCKET.
//every unpackers have a fixed buffer with this size, every st_tcp_sockets have an unpacker, so, this size is not the bigger the better.
//if you customized the packer and unpacker, the above principle maybe not right anymore, it should depends on your implementations.
#ifndef MSG_BUFFER_SIZE
#define MSG_BUFFER_SIZE 4000
#endif
//msg send and recv buffer's maximum size (list::size()), corresponding buffers are expanded dynamicly,
//which means only allocate memories when needed.
//msg send and recv buffer's maximum size (list::size()), corresponding buffers are expanded dynamicly, which means only allocate memory when needed.
#ifndef MAX_MSG_NUM
#define MAX_MSG_NUM 1024
#endif
......@@ -134,7 +132,7 @@ namespace st_asio_wrapper
size_t len;
};
//free functions, used to do something to any container optionally with any mutex
//free functions, used to do something to any container(except map and multimap) optionally with any mutex
template<typename _Can, typename _Mutex, typename _Predicate>
void do_something_to_all(_Can& __can, _Mutex& __mutex, const _Predicate& __pred)
{
......@@ -159,7 +157,7 @@ namespace st_asio_wrapper
bool splice_helper(_Can& dest_can, _Can& src_can, size_t max_size = MAX_MSG_NUM)
{
size_t size = dest_can.size();
if (size < max_size) //dest_can's buffer available
if (size < max_size) //dest_can can hold more items.
{
size = max_size - size; //maximum items this time can handle
BOOST_AUTO(begin_iter, src_can.begin()); BOOST_AUTO(end_iter, src_can.end());
......@@ -182,7 +180,7 @@ namespace st_asio_wrapper
return false;
}
//member functions, used to do something to any member container optionally with any member mutex
//member functions, used to do something to any member container(except map and multimap) optionally with any member mutex
#define DO_SOMETHING_TO_ALL_MUTEX(CAN, MUTEX) DO_SOMETHING_TO_ALL_MUTEX_NAME(do_something_to_all, CAN, MUTEX)
#define DO_SOMETHING_TO_ALL(CAN) DO_SOMETHING_TO_ALL_NAME(do_something_to_all, CAN)
......@@ -211,6 +209,33 @@ template<typename _Predicate> void NAME(const _Predicate& __pred) const {for (BO
boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(50)); \
}
#define GET_PENDING_MSG_NUM(FUNNAME, CAN, MUTEX) size_t FUNNAME() {boost::shared_lock<boost::shared_mutex> lock(MUTEX); return CAN.size();}
#define PEEK_FIRST_PENDING_MSG(FUNNAME, CAN, MUTEX, MSGTYPE) \
void FUNNAME(MSGTYPE& msg) \
{ \
msg.clear(); \
boost::shared_lock<boost::shared_mutex> lock(MUTEX); \
if (!CAN.empty()) \
msg = CAN.front(); \
}
#define POP_FIRST_PENDING_MSG(FUNNAME, CAN, MUTEX, MSGTYPE) \
void FUNNAME(MSGTYPE& msg) \
{ \
msg.clear(); \
boost::unique_lock<boost::shared_mutex> lock(MUTEX); \
if (!CAN.empty()) \
{ \
msg.swap(CAN.front()); \
CAN.pop_front(); \
} \
}
#define POP_ALL_PENDING_MSG(FUNNAME, CAN, MUTEX, CANTYPE) \
void FUNNAME(CANTYPE& msg_list) \
{ \
boost::unique_lock<boost::shared_mutex> lock(MUTEX); \
msg_list.splice(msg_list.end(), CAN); \
}
///////////////////////////////////////////////////
//TCP msg sending interface
#define TCP_SEND_MSG_CALL_SWITCH(FUNNAME, TYPE) \
......@@ -220,8 +245,8 @@ TYPE FUNNAME(const std::string& str, bool can_overflow = false) {return FUNNAME(
#define TCP_SEND_MSG(FUNNAME, NATIVE) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
{ \
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex); \
if (can_overflow || send_msg_buffer.size() < MAX_MSG_NUM) \
boost::unique_lock<boost::shared_mutex> lock(ST_THIS send_msg_buffer_mutex); \
if (can_overflow || ST_THIS send_msg_buffer.size() < MAX_MSG_NUM) \
{ \
typename Packer::msg_type msg; \
return ST_THIS packer_->pack_msg(msg, pstr, len, num, NATIVE) ? ST_THIS do_direct_send_msg(msg) : false; \
......@@ -238,7 +263,8 @@ bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_
} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_tcp_socket's send buffer
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_tcp_socket's send buffer successfully
//if can_overflow equal to false and the buffer is not available, will wait until it becomes available
#define TCP_SAFE_SEND_MSG(FUNNAME, SEND_FUNNAME) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) {while (!SEND_FUNNAME(pstr, len, num, can_overflow)) SAFE_SEND_MSG_CHECK return true;} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
......@@ -258,8 +284,8 @@ TYPE FUNNAME(const boost::asio::ip::udp::endpoint& peer_addr, const std::string&
#define UDP_SEND_MSG(FUNNAME, NATIVE) \
bool FUNNAME(const boost::asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
{ \
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex); \
if (can_overflow || send_msg_buffer.size() < MAX_MSG_NUM) \
boost::unique_lock<boost::shared_mutex> lock(ST_THIS send_msg_buffer_mutex); \
if (can_overflow || ST_THIS send_msg_buffer.size() < MAX_MSG_NUM) \
{ \
udp_msg<typename Packer::msg_type> msg(peer_addr); \
return ST_THIS packer_->pack_msg(msg, pstr, len, num, NATIVE) ? ST_THIS do_direct_send_msg(msg) : false; \
......@@ -276,7 +302,8 @@ bool FUNNAME(const boost::asio::ip::udp::endpoint& peer_addr, const char* const
} \
UDP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_udp_socket's send buffer
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_udp_socket's send buffer successfully
//if can_overflow equal to false and the buffer is not available, will wait until it becomes available
#define UDP_SAFE_SEND_MSG(FUNNAME, SEND_FUNNAME) \
bool FUNNAME(const boost::asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
{while (!SEND_FUNNAME(peer_addr, pstr, len, num, can_overflow)) SAFE_SEND_MSG_CHECK return true;} \
......
......@@ -134,54 +134,6 @@ public:
}
};
class inflexible_packer : public i_packer<inflexible_buffer>
{
public:
static size_t get_max_msg_size() {return MSG_BUFFER_SIZE - HEAD_LEN;}
using i_packer<msg_type>::pack_msg;
virtual bool pack_msg(msg_type& msg, const char* const pstr[], const size_t len[], size_t num, bool native = false)
{
msg.clear();
size_t pre_len = native ? 0 : HEAD_LEN;
size_t total_len = packer_helper::msg_size_check(pre_len, pstr, len, num);
if ((size_t) -1 == total_len)
return false;
else if (total_len > pre_len)
{
char* buff = NULL;
size_t pos = 0;
if (!native)
{
HEAD_TYPE head_len = (HEAD_TYPE) total_len;
if (total_len != head_len)
{
unified_out::error_out("pack msg error: length exceeds the header's range!");
return false;
}
head_len = HEAD_H2N(head_len);
buff = new char[total_len];
memcpy(buff, (const char*) &head_len, HEAD_LEN);
pos = HEAD_LEN;
}
else
buff = new char[total_len];
for (size_t i = 0; i < num; ++i)
if (NULL != pstr[i])
{
memcpy(buff + pos, pstr[i], len[i]);
pos += len[i];
}
msg.attach(buff, total_len);
} //if (total_len > pre_len)
return true;
}
};
class prefix_suffix_packer : public i_packer<std::string>
{
public:
......
......@@ -29,22 +29,13 @@
namespace st_asio_wrapper
{
enum BufferType {POST_BUFFER, SEND_BUFFER, RECV_BUFFER};
#define post_msg_buffer ST_THIS msg_buffer[0]
#define post_msg_buffer_mutex ST_THIS msg_buffer_mutex[0]
#define send_msg_buffer ST_THIS msg_buffer[1]
#define send_msg_buffer_mutex ST_THIS msg_buffer_mutex[1]
#define recv_msg_buffer ST_THIS msg_buffer[2]
#define recv_msg_buffer_mutex ST_THIS msg_buffer_mutex[2]
#define temp_msg_buffer ST_THIS msg_buffer[3]
template<typename Socket, typename Packer, typename MsgType = typename Packer::msg_type>
template<typename Socket, typename Packer, typename Unpacker, typename InMsgType = typename Packer::msg_type, typename OutMsgType = typename Unpacker::msg_type>
class st_socket: public st_timer
{
public:
//keep size() constant time would better, because we invoke it frequently, so don't use std::list(gcc)
typedef boost::container::list<MsgType> container_type;
typedef boost::container::list<InMsgType> in_container_type;
typedef typename Unpacker::container_type out_container_type;
protected:
st_socket(boost::asio::io_service& io_service_) : st_timer(io_service_), _id(-1), next_layer_(io_service_), packer_(boost::make_shared<Packer>()) {reset_state();}
......@@ -62,8 +53,10 @@ protected:
void clear_buffer()
{
for (size_t i = 0; i < sizeof(msg_buffer) / sizeof(container_type); ++i)
msg_buffer[i].clear();
post_msg_buffer.clear();
send_msg_buffer.clear();
recv_msg_buffer.clear();
temp_msg_buffer.clear();
}
public:
......@@ -126,17 +119,17 @@ public:
}
//don't use the packer but insert into send buffer directly
bool direct_send_msg(const MsgType& msg, bool can_overflow = false) {MsgType tmp_msg(msg); return direct_send_msg(tmp_msg, can_overflow);}
bool direct_send_msg(const InMsgType& msg, bool can_overflow = false) {InMsgType tmp_msg(msg); return direct_send_msg(tmp_msg, can_overflow);}
//after this call, msg becomes empty, please note.
bool direct_send_msg(MsgType& msg, bool can_overflow = false)
bool direct_send_msg(InMsgType& msg, bool can_overflow = false)
{
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex);
return can_overflow || send_msg_buffer.size() < MAX_MSG_NUM ? do_direct_send_msg(msg) : false;
}
bool direct_post_msg(const MsgType& msg, bool can_overflow = false) {MsgType tmp_msg(msg); return direct_post_msg(tmp_msg, can_overflow);}
bool direct_post_msg(const InMsgType& msg, bool can_overflow = false) {InMsgType tmp_msg(msg); return direct_post_msg(tmp_msg, can_overflow);}
//after this call, msg becomes empty, please note.
bool direct_post_msg(MsgType& msg, bool can_overflow = false)
bool direct_post_msg(InMsgType& msg, bool can_overflow = false)
{
if (direct_send_msg(msg, can_overflow))
return true;
......@@ -146,43 +139,22 @@ public:
}
//how many msgs waiting for sending or dispatching
size_t get_pending_msg_num(BufferType buffer_type = SEND_BUFFER)
{
boost::shared_lock<boost::shared_mutex> lock(msg_buffer_mutex[buffer_type]);
return msg_buffer[buffer_type].size();
}
//msgs in send buffer and post buffer are packed
//msgs in receive buffer are unpacked
void peek_first_pending_msg(MsgType& msg, BufferType buffer_type = SEND_BUFFER)
{
msg.clear();
boost::shared_lock<boost::shared_mutex> lock(msg_buffer_mutex[buffer_type]);
if (!msg_buffer[buffer_type].empty())
msg = msg_buffer[buffer_type].front();
}
GET_PENDING_MSG_NUM(get_pending_post_msg_num, post_msg_buffer, post_msg_buffer_mutex)
GET_PENDING_MSG_NUM(get_pending_send_msg_num, send_msg_buffer, send_msg_buffer_mutex)
GET_PENDING_MSG_NUM(get_pending_recv_msg_num, recv_msg_buffer, recv_msg_buffer_mutex)
//msgs in send buffer and post buffer are packed
//msgs in receive buffer are unpacked
void pop_first_pending_msg(MsgType& msg, BufferType buffer_type = SEND_BUFFER)
{
msg.clear();
PEEK_FIRST_PENDING_MSG(peek_first_pending_post_msg, post_msg_buffer, post_msg_buffer_mutex, InMsgType)
PEEK_FIRST_PENDING_MSG(peek_first_pending_send_msg, send_msg_buffer, send_msg_buffer_mutex, InMsgType)
PEEK_FIRST_PENDING_MSG(peek_first_pending_recv_msg, recv_msg_buffer, recv_msg_buffer_mutex, OutMsgType)
boost::unique_lock<boost::shared_mutex> lock(msg_buffer_mutex[buffer_type]);
if (!msg_buffer[buffer_type].empty())
{
msg.swap(msg_buffer[buffer_type].front());
msg_buffer[buffer_type].pop_front();
}
}
POP_FIRST_PENDING_MSG(pop_first_pending_post_msg, post_msg_buffer, post_msg_buffer_mutex, InMsgType)
POP_FIRST_PENDING_MSG(pop_first_pending_send_msg, send_msg_buffer, send_msg_buffer_mutex, InMsgType)
POP_FIRST_PENDING_MSG(pop_first_pending_recv_msg, recv_msg_buffer, recv_msg_buffer_mutex, OutMsgType)
//clear all pending msgs
void pop_all_pending_msg(container_type& msg_list, BufferType buffer_type = SEND_BUFFER)
{
boost::unique_lock<boost::shared_mutex> lock(msg_buffer_mutex[buffer_type]);
msg_list.splice(msg_list.end(), msg_buffer[buffer_type]);
}
POP_ALL_PENDING_MSG(pop_all_pending_post_msg, post_msg_buffer, post_msg_buffer_mutex, in_container_type)
POP_ALL_PENDING_MSG(pop_all_pending_send_msg, send_msg_buffer, send_msg_buffer_mutex, in_container_type)
POP_ALL_PENDING_MSG(pop_all_pending_recv_msg, recv_msg_buffer, recv_msg_buffer_mutex, out_container_type)
protected:
virtual bool do_start() = 0;
......@@ -205,7 +177,7 @@ protected:
//notice: on_msg_handle() will not be invoked from within this function
//
//notice: the msg is unpacked, using inconstant is for the convenience of swapping
virtual bool on_msg(MsgType& msg) = 0;
virtual bool on_msg(OutMsgType& msg) = 0;
#endif
//handling msg in om_msg_handle() will not block msg receiving on the same st_socket
......@@ -213,17 +185,17 @@ protected:
//if link_down is true, no matter return true or false, st_socket will not maintain this msg anymore, and continue dispatch the next msg continuously
//
//notice: the msg is unpacked, using inconstant is for the convenience of swapping
virtual bool on_msg_handle(MsgType& msg, bool link_down) = 0;
virtual bool on_msg_handle(OutMsgType& msg, bool link_down) = 0;
#ifdef WANT_MSG_SEND_NOTIFY
//one msg has sent to the kernel buffer, msg is the right msg
//notice: the msg is packed, using inconstant is for the convenience of swapping
virtual void on_msg_send(MsgType& msg) {}
virtual void on_msg_send(InMsgType& msg) {}
#endif
#ifdef WANT_ALL_MSG_SEND_NOTIFY
//send buffer goes empty
//notice: the msg is packed, using inconstant is for the convenience of swapping
virtual void on_all_msg_send(MsgType& msg) {}
virtual void on_all_msg_send(InMsgType& msg) {}
#endif
virtual bool on_timer(unsigned char id, const void* user_data)
......@@ -311,7 +283,7 @@ protected:
dispatching = false;
if (!re) //dispatch failed, re-dispatch
{
recv_msg_buffer.push_front(MsgType());
recv_msg_buffer.push_front(OutMsgType());
recv_msg_buffer.front().swap(last_dispatch_msg);
set_timer(3, 50, NULL);
}
......@@ -368,7 +340,7 @@ protected:
}
//must mutex send_msg_buffer before invoke this function
bool do_direct_send_msg(MsgType& msg)
bool do_direct_send_msg(InMsgType& msg)
{
if (!msg.empty())
{
......@@ -381,7 +353,7 @@ protected:
}
//must mutex post_msg_buffer before invoke this function
bool do_direct_post_msg(MsgType& msg)
bool do_direct_post_msg(InMsgType& msg)
{
if (!msg.empty())
{
......@@ -401,13 +373,16 @@ protected:
boost::uint_fast64_t _id;
Socket next_layer_;
MsgType last_send_msg, last_dispatch_msg;
InMsgType last_send_msg;
OutMsgType last_dispatch_msg;
boost::shared_ptr<i_packer<typename Packer::msg_type> > packer_;
container_type msg_buffer[4];
in_container_type post_msg_buffer, send_msg_buffer;
out_container_type recv_msg_buffer, temp_msg_buffer;
//st_socket will invoke dispatch_msg() when got some msgs. if these msgs can't push into recv_msg_buffer cause of receive buffer overflow,
//st_socket will delay 50 milliseconds(non-blocking) to invoke dispatch_msg() again, and now, as you known, temp_msg_buffer is used to hold these msgs temporarily.
boost::shared_mutex msg_buffer_mutex[3];
boost::shared_mutex post_msg_buffer_mutex, send_msg_buffer_mutex;
boost::shared_mutex recv_msg_buffer_mutex;
bool posting;
bool sending, suspend_send_msg_;
......
......@@ -34,17 +34,19 @@ namespace st_tcp
{
template <typename Socket, typename Packer, typename Unpacker>
class st_tcp_socket_base : public st_socket<Socket, Packer>
class st_tcp_socket_base : public st_socket<Socket, Packer, Unpacker>
{
public:
typedef typename Packer::msg_type msg_type;
typedef typename Packer::msg_ctype msg_ctype;
typedef typename Packer::msg_type in_msg_type;
typedef typename Packer::msg_ctype in_msg_ctype;
typedef typename Unpacker::msg_type out_msg_type;
typedef typename Unpacker::msg_ctype out_msg_ctype;
protected:
st_tcp_socket_base(boost::asio::io_service& io_service_) : st_socket<Socket, Packer>(io_service_), unpacker_(boost::make_shared<Unpacker>()) {reset_state();}
st_tcp_socket_base(boost::asio::io_service& io_service_) : st_socket<Socket, Packer, Unpacker>(io_service_), unpacker_(boost::make_shared<Unpacker>()) {reset_state();}
template<typename Arg>
st_tcp_socket_base(boost::asio::io_service& io_service_, Arg& arg) : st_socket<Socket, Packer>(io_service_, arg), unpacker_(boost::make_shared<Unpacker>()) {reset_state();}
st_tcp_socket_base(boost::asio::io_service& io_service_, Arg& arg) : st_socket<Socket, Packer, Unpacker>(io_service_, arg), unpacker_(boost::make_shared<Unpacker>()) {reset_state();}
public:
//reset all, be ensure that there's no any operations performed on this st_tcp_socket_base when invoke it
......@@ -52,7 +54,7 @@ public:
void reset_state()
{
unpacker_->reset_state();
st_socket<Socket, Packer>::reset_state();
st_socket<Socket, Packer, Unpacker>::reset_state();
closing = false;
}
......@@ -78,11 +80,11 @@ public:
bool is_closing() const {return closing;}
//get or change the unpacker at runtime
boost::shared_ptr<i_unpacker<msg_type> > inner_unpacker() {return unpacker_;}
boost::shared_ptr<const i_unpacker<msg_type> > inner_unpacker() const {return unpacker_;}
void inner_unpacker(const boost::shared_ptr<i_unpacker<msg_type> >& _unpacker_) {unpacker_ = _unpacker_;}
boost::shared_ptr<i_unpacker<out_msg_type> > inner_unpacker() {return unpacker_;}
boost::shared_ptr<const i_unpacker<out_msg_type> > inner_unpacker() const {return unpacker_;}
void inner_unpacker(const boost::shared_ptr<i_unpacker<out_msg_type> >& _unpacker_) {unpacker_ = _unpacker_;}
using st_socket<Socket, Packer>::send_msg;
using st_socket<Socket, Packer, Unpacker>::send_msg;
///////////////////////////////////////////////////
//msg sending interface
TCP_SEND_MSG(send_msg, false) //use the packer with native = false to pack the msgs
......@@ -111,19 +113,19 @@ protected:
{
if (!is_send_allowed() || ST_THIS get_io_service().stopped())
ST_THIS sending = false;
else if (!ST_THIS sending && !send_msg_buffer.empty())
else if (!ST_THIS sending && !ST_THIS send_msg_buffer.empty())
{
ST_THIS sending = true;
ST_THIS last_send_msg.swap(send_msg_buffer.front());
ST_THIS last_send_msg.swap(ST_THIS send_msg_buffer.front());
boost::asio::async_write(ST_THIS next_layer(), boost::asio::buffer(ST_THIS last_send_msg.data(), ST_THIS last_send_msg.size()),
boost::bind(&st_tcp_socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
send_msg_buffer.pop_front();
ST_THIS send_msg_buffer.pop_front();
}
return ST_THIS sending;
}
virtual bool is_send_allowed() const {return !is_closing() && st_socket<Socket, Packer>::is_send_allowed();}
virtual bool is_send_allowed() const {return !is_closing() && st_socket<Socket, Packer, Unpacker>::is_send_allowed();}
//can send data or not(just put into send buffer)
//msg can not be unpacked
......@@ -131,10 +133,10 @@ protected:
virtual void on_unpack_error() = 0;
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
virtual bool on_msg(msg_type& msg) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_msg(out_msg_type& msg) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
#endif
virtual bool on_msg_handle(msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
//start the asynchronous read
//it's child's responsibility to invoke this properly, because st_tcp_socket_base doesn't know any of the connection status
......@@ -143,7 +145,7 @@ protected:
BOOST_AUTO(recv_buff, unpacker_->prepare_next_recv());
if (boost::asio::buffer_size(recv_buff) > 0)
boost::asio::async_read(ST_THIS next_layer(), recv_buff,
boost::bind(&i_unpacker<msg_type>::completion_condition, unpacker_, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred),
boost::bind(&i_unpacker<out_msg_type>::completion_condition, unpacker_, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred),
boost::bind(&st_tcp_socket_base::recv_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
......@@ -164,14 +166,13 @@ protected:
{
if (!ec && bytes_transferred > 0)
{
bool unpack_ok = unpacker_->parse_msg(bytes_transferred, temp_msg_buffer);
bool unpack_ok = unpacker_->parse_msg(bytes_transferred, ST_THIS temp_msg_buffer);
ST_THIS dispatch_msg();
if (!unpack_ok)
{
on_unpack_error();
//reset unpacker's state after on_unpack_error(),
//so user can get the left half-baked msg in on_unpack_error()
//reset unpacker's state after on_unpack_error(), so user can get the left half-baked msg in on_unpack_error()
unpacker_->reset_state();
}
}
......@@ -191,7 +192,7 @@ protected:
else
ST_THIS on_send_error(ec);
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex);
boost::unique_lock<boost::shared_mutex> lock(ST_THIS send_msg_buffer_mutex);
ST_THIS sending = false;
//send msg sequentially, that means second send only after first send success
......@@ -208,7 +209,7 @@ protected:
}
protected:
boost::shared_ptr<i_unpacker<msg_type> > unpacker_;
boost::shared_ptr<i_unpacker<out_msg_type> > unpacker_;
bool closing;
};
......
......@@ -38,30 +38,21 @@ namespace st_asio_wrapper
namespace st_udp
{
template<typename MsgType>
class udp_msg : public MsgType
{
public:
boost::asio::ip::udp::endpoint peer_addr;
udp_msg() {}
udp_msg(const boost::asio::ip::udp::endpoint& _peer_addr) : peer_addr(_peer_addr) {}
void swap(udp_msg& other) {std::swap(peer_addr, other.peer_addr); MsgType::swap(other);}
};
template <typename Packer = DEFAULT_PACKER, typename Unpacker = DEFAULT_UDP_UNPACKER, typename Socket = boost::asio::ip::udp::socket>
class st_udp_socket_base : public st_socket<Socket, Packer, udp_msg<typename Packer::msg_type> >
class st_udp_socket_base : public st_socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type> >
{
public:
typedef udp_msg<typename Packer::msg_type> msg_type;
typedef const msg_type msg_ctype;
typedef udp_msg<typename Packer::msg_type> in_msg_type;
typedef const in_msg_type in_msg_ctype;
typedef udp_msg<typename Unpacker::msg_type> out_msg_type;
typedef const out_msg_type out_msg_ctype;
public:
st_udp_socket_base(boost::asio::io_service& io_service_) : st_socket<Socket, Packer, msg_type>(io_service_), unpacker_(boost::make_shared<Unpacker>()) {ST_THIS reset_state();}
st_udp_socket_base(boost::asio::io_service& io_service_) : st_socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type>(io_service_), unpacker_(boost::make_shared<Unpacker>())
{ST_THIS reset_state();}
//reset all, be ensure that there's no any operations performed on this st_udp_socket when invoke it
//notice, when reuse this st_udp_socket, st_object_pool will invoke reset(), child must re-write this to initialize
//please note, when reuse this st_udp_socket, st_object_pool will invoke reset(), child must re-write this to initialize
//all member variables, and then do not forget to invoke st_udp_socket::reset() to initialize father's
//member variables
virtual void reset()
......@@ -107,7 +98,7 @@ public:
boost::shared_ptr<const i_udp_unpacker<typename Packer::msg_type> > inner_unpacker() const {return unpacker_;}
void inner_unpacker(const boost::shared_ptr<i_udp_unpacker<typename Packer::msg_type> >& _unpacker_) {unpacker_ = _unpacker_;}
using st_socket<Socket, Packer, msg_type>::send_msg;
using st_socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type>::send_msg;
///////////////////////////////////////////////////
//msg sending interface
UDP_SEND_MSG(send_msg, false) //use the packer with native = false to pack the msgs
......@@ -149,19 +140,19 @@ protected:
{
if (!is_send_allowed() || ST_THIS get_io_service().stopped())
ST_THIS sending = false;
else if (!ST_THIS sending && !send_msg_buffer.empty())
else if (!ST_THIS sending && !ST_THIS send_msg_buffer.empty())
{
ST_THIS sending = true;
ST_THIS last_send_msg.swap(send_msg_buffer.front());
ST_THIS last_send_msg.swap(ST_THIS send_msg_buffer.front());
ST_THIS next_layer().async_send_to(boost::asio::buffer(ST_THIS last_send_msg.data(), ST_THIS last_send_msg.size()), ST_THIS last_send_msg.peer_addr,
boost::bind(&st_udp_socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
send_msg_buffer.pop_front();
ST_THIS send_msg_buffer.pop_front();
}
return ST_THIS sending;
}
virtual bool is_send_allowed() const {return ST_THIS lowest_layer().is_open() && st_socket<Socket, Packer, msg_type>::is_send_allowed();}
virtual bool is_send_allowed() const {return ST_THIS lowest_layer().is_open() && st_socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type>::is_send_allowed();}
//can send data or not(just put into send buffer)
virtual void on_recv_error(const boost::system::error_code& ec)
......@@ -171,10 +162,10 @@ protected:
}
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
virtual bool on_msg(msg_type& msg) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_msg(out_msg_type& msg) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
#endif
virtual bool on_msg_handle(msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
void clean_up()
{
......@@ -193,8 +184,8 @@ protected:
{
if (!ec && bytes_transferred > 0)
{
temp_msg_buffer.push_back(udp_msg<typename Packer::msg_type>(peer_addr));
unpacker_->parse_msg(temp_msg_buffer.back(), bytes_transferred);
ST_THIS temp_msg_buffer.push_back(out_msg_type(peer_addr));
unpacker_->parse_msg(ST_THIS temp_msg_buffer.back(), bytes_transferred);
ST_THIS dispatch_msg();
}
#ifdef _MSC_VER
......@@ -217,7 +208,7 @@ protected:
else
ST_THIS on_send_error(ec);
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex);
boost::unique_lock<boost::shared_mutex> lock(ST_THIS send_msg_buffer_mutex);
ST_THIS sending = false;
//send msg sequentially, that means second send only after first send success
......
......@@ -48,12 +48,25 @@ public:
virtual boost::asio::mutable_buffers_1 prepare_next_recv() = 0;
};
template<typename MsgType>
class udp_msg : public MsgType
{
public:
boost::asio::ip::udp::endpoint peer_addr;
udp_msg() {}
udp_msg(const boost::asio::ip::udp::endpoint& _peer_addr) : peer_addr(_peer_addr) {}
void swap(udp_msg& other) {std::swap(peer_addr, other.peer_addr); MsgType::swap(other);}
};
template<typename MsgType>
class i_udp_unpacker
{
public:
typedef MsgType msg_type;
typedef const msg_type msg_ctype;
typedef boost::container::list<udp_msg<msg_type> > container_type;
protected:
virtual ~i_udp_unpacker() {}
......@@ -80,9 +93,6 @@ public:
while (unpack_ok) //considering stick package problem, we need a loop
if ((size_t) -1 != cur_msg_len)
{
//cur_msg_len now can be assigned in the completion_condition function, or in the following 'else if',
//so, we must verify cur_msg_len at the very beginning of using it, not at the assignment as we do
//before, please pay special attention
if (cur_msg_len > MSG_BUFFER_SIZE || cur_msg_len <= HEAD_LEN)
unpack_ok = false;
else if (remain_len >= cur_msg_len) //one msg received
......@@ -121,7 +131,7 @@ public:
if (unpack_ok && remain_len > 0)
{
const char* pnext = boost::next(msg_pos_can.back().first, msg_pos_can.back().second);
memcpy(raw_buff.begin(), pnext, remain_len); //left behind unparsed msg
memcpy(raw_buff.begin(), pnext, remain_len); //left behind unparsed data
}
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
......@@ -130,8 +140,7 @@ public:
//a return value of 0 indicates that the read operation is complete. a non-zero value indicates the maximum number
//of bytes to be read on the next call to the stream's async_read_some function. ---boost::asio::async_read
//read as many as possible to reduce asynchronous call-back(st_tcp_socket_base::recv_handler), and don't forget to handle
//stick package carefully in parse_msg function.
//read as many as possible to reduce asynchronous call-back, and don't forget to handle stick package carefully in parse_msg function.
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred)
{
if (ec)
......@@ -159,7 +168,7 @@ public:
protected:
boost::array<char, MSG_BUFFER_SIZE> raw_buff;
size_t cur_msg_len; //-1 means head has not received, so, doesn't know the whole msg length.
size_t cur_msg_len; //-1 means head has not received, so doesn't know the whole msg length.
size_t remain_len; //half-baked msg
};
......@@ -175,9 +184,15 @@ protected:
class replaceable_unpacker : public i_unpacker<replaceable_buffer>, public unpacker
{
public:
//overwrite the following three typedef defined by unpacker
using i_unpacker<replaceable_buffer>::msg_type;
using i_unpacker<replaceable_buffer>::msg_ctype;
using i_unpacker<replaceable_buffer>::container_type;
public:
virtual void reset_state() {unpacker::reset_state();}
virtual bool parse_msg(size_t bytes_transferred, i_unpacker<replaceable_buffer>::container_type& msg_can)
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
unpacker::container_type tmp_can;
bool unpack_ok = unpacker::parse_msg(bytes_transferred, tmp_can);
......@@ -189,7 +204,7 @@ public:
msg_can.back().raw_buffer(com);
}
//when unpack failed, some successfully parsed msgs may still returned via msg_can(stick package), please note.
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
return unpack_ok;
}
......@@ -214,10 +229,10 @@ protected:
};
//this unpacker demonstrate how to forbid memory copy while parsing msgs.
class inflexible_unpacker : public i_unpacker<inflexible_buffer>
class unbuffered_unpacker : public i_unpacker<inflexible_buffer>
{
public:
inflexible_unpacker() {reset_state();}
unbuffered_unpacker() {reset_state();}
size_t current_msg_length() const {return raw_buff.size();} //current msg's total length(not include the head), 0 means don't know
public:
......@@ -279,12 +294,11 @@ public:
private:
HEAD_TYPE head_buff;
//please notice that we don't have a fixed size array with maximum size any more(like the default unpacker).
//please note that we don't have a fixed size array with maximum size any more(like the default unpacker).
//this is very useful if you have very few but very large msgs, fox example:
//you have a very large msg(1M size), but all others are very small, if you use a fixed size array to hold msgs in the unpackers,
//all the unpackers must have an array with at least 1M size, each st_socket will have a unpacker, this will cause your application occupy very large memory but with
//very low utilization ratio.
//this inflexible_unpacker will resolve the above problem, and with another benefit: no memory copying needed any more.
//all the unpackers must have an array with at least 1M size, each st_socket will have a unpacker, this will cause your application occupy very large memory but with very low utilization ratio.
//this unbuffered_unpacker will resolve above problem, and with another benefit: no memory replication needed any more.
msg_type raw_buff;
int step; //-1-error format, 0-want the head, 1-want the body
};
......
......@@ -9,15 +9,6 @@
//#define REUSE_OBJECT //use objects pool
//#define AUTO_CLEAR_CLOSED_SOCKET
//#define CLEAR_CLOSED_SOCKET_INTERVAL 1
//the following three macro demonstrate how to support huge msg(exceed 65535 - 2).
//huge msg consume huge memory, for example, if we support 1M msg size, because every st_tcp_socket has a
//private unpacker which has a buffer at lest 1M size, so 1K st_tcp_socket will consume 1G memory.
//if we consider the send buffer and recv buffer, the buffer's default max size is 1K, so, every st_tcp_socket
//can consume 2G(2 * 1M * 1K) memory when performance testing(both send buffer and recv buffer are full).
//#define HUGE_MSG
//#define MAX_MSG_LEN (1024 * 1024)
//#define MAX_MSG_NUM 8 //reduce buffer size to reduce memory occupation
//configuration
//use the following macro to control the type of packer and unpacker
......@@ -83,16 +74,16 @@ public:
protected:
//msg handling
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER //not force to use msg recv buffer(so on_msg() will make the decision)
//we can handle the msg very fast, so we don't use the recv buffer(return false)
virtual bool on_msg(msg_type& msg) {handle_msg(msg); return true;}
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER //not force to use msg recv buffer(so on_msg will make the decision)
//we can handle msg very fast, so we don't use recv buffer(return true)
virtual bool on_msg(out_msg_type& msg) {handle_msg(msg); return true;}
#endif
//we should handle the msg in on_msg_handle for time-consuming task like this:
virtual bool on_msg_handle(msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//we should handle msg in on_msg_handle for time-consuming task like this:
virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//msg handling end
private:
void handle_msg(msg_ctype& msg)
void handle_msg(out_msg_ctype& msg)
{
recv_bytes += msg.size();
if (check_msg && (msg.size() < sizeof(size_t) || recv_index != *(size_t*) msg.data()))
......@@ -122,27 +113,23 @@ public:
void close_some_client(size_t n)
{
//close some clients
//method #1
// for (BOOST_AUTO(iter, object_can.begin()); n-- > 0 && iter != object_can.end(); ++iter)
// (*iter)->graceful_close();
//notice: this method need to define AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro, because it just closed the st_socket,
//not really removed them from object pool, this will cause test_client still send data to them, and wait responses from them.
//not really removed them from object pool, this will cause test_client still send data via them, and wait responses from them.
//for this scenario, the smaller CLEAR_CLOSED_SOCKET_INTERVAL is, the better experience you will get, so set it to 1 second.
//method #2
while (n-- > 0)
graceful_close(at(0));
//notice: this method directly remove the client from object pool (and insert into list temp_object_can), and close the st_socket.
//clients in list temp_object_can will be reused if new clients needed (REUSE_OBJECT macro been defined), or be truly freed from memory
//CLOSED_SOCKET_MAX_DURATION seconds later (but check interval is SOCKET_FREE_INTERVAL seconds, so the maximum delay is CLOSED_SOCKET_MAX_DURATION + SOCKET_FREE_INTERVAL).
//this is a equivalence of calling i_server::del_client in st_server_socket_base::on_recv_error (see st_server_socket_base for more details).
//notice: this method directly remove clients from object pool, and close them, not require AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro
//this is a equivalence of calling i_server::del_client in st_server_socket_base::on_recv_error(see st_server_socket_base for more details).
}
///////////////////////////////////////////////////
//msg sending interface
//guarantee send msg successfully even if can_overflow equal to false
//success at here just means put the msg into st_tcp_socket's send buffer
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_tcp_socket's send buffer successfully
TCP_RANDOM_SEND_MSG(safe_random_send_msg, safe_send_msg)
TCP_RANDOM_SEND_MSG(safe_random_send_native_msg, safe_send_native_msg)
//msg sending interface
......
......@@ -68,9 +68,10 @@ protected:
//msg handling
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
//we can handle msg very fast, so we don't use recv buffer
virtual bool on_msg(msg_type& msg) {handle_msg(msg); return true;}
virtual bool on_msg(out_msg_type& msg) {handle_msg(msg); return true;}
#endif
virtual bool on_msg_handle(msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//we will change unpacker at runtime, this operation must be done in on_msg, do not to it in on_msg_handle
//virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//msg handling end
private:
......@@ -87,7 +88,7 @@ private:
}
void trans_end() {clear(); ++completed_client_num;}
void handle_msg(msg_ctype& msg)
void handle_msg(out_msg_ctype& msg)
{
if (TRANS_BUSY == state)
{
......
......@@ -14,13 +14,13 @@ void file_socket::reset() {trans_end(); st_server_socket::reset();}
//msg handling
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
//we can handle msg very fast, so we don't use recv buffer
bool file_socket::on_msg(msg_type& msg) {handle_msg(msg); return true;}
bool file_socket::on_msg(out_msg_type& msg) {handle_msg(msg); return true;}
#endif
bool file_socket::on_msg_handle(msg_type& msg, bool link_down) {handle_msg(msg); return true;}
bool file_socket::on_msg_handle(out_msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//msg handling end
#ifdef WANT_MSG_SEND_NOTIFY
void file_socket::on_msg_send(msg_type& msg)
void file_socket::on_msg_send(in_msg_type& msg)
{
auto buffer = boost::dynamic_pointer_cast<file_buffer>(msg.raw_buffer());
if (nullptr != buffer)
......@@ -44,7 +44,7 @@ void file_socket::trans_end()
}
}
void file_socket::handle_msg(msg_ctype& msg)
void file_socket::handle_msg(out_msg_ctype& msg)
{
if (msg.size() <= ORDER_LEN)
{
......
......@@ -21,18 +21,18 @@ protected:
//msg handling
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
//we can handle msg very fast, so we don't use recv buffer
virtual bool on_msg(msg_type& msg);
virtual bool on_msg(out_msg_type& msg);
#endif
virtual bool on_msg_handle(msg_type& msg, bool link_down);
virtual bool on_msg_handle(out_msg_type& msg, bool link_down);
//msg handling end
#ifdef WANT_MSG_SEND_NOTIFY
virtual void on_msg_send(msg_type& msg);
virtual void on_msg_send(in_msg_type& msg);
#endif
private:
void trans_end();
void handle_msg(msg_ctype& msg);
void handle_msg(out_msg_ctype& msg);
};
#endif //#ifndef FILE_SOCKET_H_
......@@ -385,12 +385,17 @@
* Fixed the possibility of race condition in on_all_msg_send.
* Dropped fixed_length_packer, use packer(the default packer) instead.
* i_service::init() will now return boolean value to indicate whether the initialization was succeeded or not.
*
* 5.2.2 2015.8.20
* Packer and unpacker now can have different msg types, for example, use std::string when sending msgs, use inflexible_buffer when receiving msgs,
* see asio_client for more details.
* Dropped inflexible_packer, changed inflexible_unpacker to unbuffered_unpacker, just the class name.
*/
#ifndef ST_ASIO_WRAPPER_H_
#define ST_ASIO_WRAPPER_H_
#define ST_ASIO_WRAPPER_VERSION 50201
#define ST_ASIO_WRAPPER_VERSION 50202
#if !defined _MSC_VER && !defined __GNUC__
#error st_asio_wrapper only support vc and gcc.
......
......@@ -30,15 +30,14 @@
#ifndef UNIFIED_OUT_BUF_NUM
#define UNIFIED_OUT_BUF_NUM 2048
#endif
//buffer size used when receiving msg, must equal to or larger than the biggest msg size,
//the size of the buffer used when receiving msg, must equal to or larger than the biggest msg size,
//the bigger this buffer is, the more msgs can be received in one time if there are enough msgs buffered in the SOCKET.
//every unpackers have a fixed buffer with this size, every st_tcp_sockets have an unpacker, so, this size is not the bigger the better.
//if you customized the packer and unpacker, the above principle maybe not right anymore, it should depends on your implementations.
#ifndef MSG_BUFFER_SIZE
#define MSG_BUFFER_SIZE 4000
#endif
//msg send and recv buffer's maximum size (list::size()), corresponding buffers are expanded dynamicly,
//which means only allocate memories when needed.
//msg send and recv buffer's maximum size (list::size()), corresponding buffers are expanded dynamicly, which means only allocate memory when needed.
#ifndef MAX_MSG_NUM
#define MAX_MSG_NUM 1024
#endif
......@@ -134,7 +133,7 @@ namespace st_asio_wrapper
size_t len;
};
//free functions, used to do something to any container optionally with any mutex
//free functions, used to do something to any container(except map and multimap) optionally with any mutex
#if !defined _MSC_VER || _MSC_VER >= 1700
template<typename _Can, typename _Mutex, typename _Predicate>
void do_something_to_all(_Can& __can, _Mutex& __mutex, const _Predicate& __pred) {boost::shared_lock<boost::shared_mutex> lock(__mutex); for (auto& item : __can) __pred(item);}
......@@ -163,15 +162,14 @@ namespace st_asio_wrapper
bool splice_helper(_Can& dest_can, _Can& src_can, size_t max_size = MAX_MSG_NUM)
{
auto size = dest_can.size();
if (size < max_size) //dest_can's buffer available
if (size < max_size) //dest_can can hold more items.
{
size = max_size - size; //maximum items this time can handle
auto begin_iter = std::begin(src_can), end_iter = std::end(src_can);
if (src_can.size() > size) //some items left behind
{
auto left_num = src_can.size() - size;
//find the minimum movement
end_iter = left_num > size ? std::next(begin_iter, size) : std::prev(end_iter, left_num);
end_iter = left_num > size ? std::next(begin_iter, size) : std::prev(end_iter, left_num); //find the minimum movement
}
else
size = src_can.size();
......@@ -184,7 +182,7 @@ namespace st_asio_wrapper
return false;
}
//member functions, used to do something to any member container optionally with any member mutex
//member functions, used to do something to any member container(except map and multimap) optionally with any member mutex
#define DO_SOMETHING_TO_ALL_MUTEX(CAN, MUTEX) DO_SOMETHING_TO_ALL_MUTEX_NAME(do_something_to_all, CAN, MUTEX)
#define DO_SOMETHING_TO_ALL(CAN) DO_SOMETHING_TO_ALL_NAME(do_something_to_all, CAN)
......@@ -222,6 +220,33 @@ template<typename _Predicate> void NAME(const _Predicate& __pred) const {for (au
boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(50)); \
}
#define GET_PENDING_MSG_NUM(FUNNAME, CAN, MUTEX) size_t FUNNAME() {boost::shared_lock<boost::shared_mutex> lock(MUTEX); return CAN.size();}
#define PEEK_FIRST_PENDING_MSG(FUNNAME, CAN, MUTEX, MSGTYPE) \
void FUNNAME(MSGTYPE& msg) \
{ \
msg.clear(); \
boost::shared_lock<boost::shared_mutex> lock(MUTEX); \
if (!CAN.empty()) \
msg = CAN.front(); \
}
#define POP_FIRST_PENDING_MSG(FUNNAME, CAN, MUTEX, MSGTYPE) \
void FUNNAME(MSGTYPE& msg) \
{ \
msg.clear(); \
boost::unique_lock<boost::shared_mutex> lock(MUTEX); \
if (!CAN.empty()) \
{ \
msg.swap(CAN.front()); \
CAN.pop_front(); \
} \
}
#define POP_ALL_PENDING_MSG(FUNNAME, CAN, MUTEX, CANTYPE) \
void FUNNAME(CANTYPE& msg_list) \
{ \
boost::unique_lock<boost::shared_mutex> lock(MUTEX); \
msg_list.splice(msg_list.end(), CAN); \
}
///////////////////////////////////////////////////
//TCP msg sending interface
#define TCP_SEND_MSG_CALL_SWITCH(FUNNAME, TYPE) \
......@@ -231,8 +256,8 @@ TYPE FUNNAME(const std::string& str, bool can_overflow = false) {return FUNNAME(
#define TCP_SEND_MSG(FUNNAME, NATIVE) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
{ \
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex); \
return (can_overflow || send_msg_buffer.size() < MAX_MSG_NUM) ? ST_THIS do_direct_send_msg(ST_THIS packer_->pack_msg(pstr, len, num, NATIVE)) : false; \
boost::unique_lock<boost::shared_mutex> lock(ST_THIS send_msg_buffer_mutex); \
return (can_overflow || ST_THIS send_msg_buffer.size() < MAX_MSG_NUM) ? ST_THIS do_direct_send_msg(ST_THIS packer_->pack_msg(pstr, len, num, NATIVE)) : false; \
} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
......@@ -240,7 +265,8 @@ TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) {return ST_THIS direct_post_msg(ST_THIS packer_->pack_msg(pstr, len, num, NATIVE), can_overflow);} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_tcp_socket's send buffer
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_tcp_socket's send buffer successfully
//if can_overflow equal to false and the buffer is not available, will wait until it becomes available
#define TCP_SAFE_SEND_MSG(FUNNAME, SEND_FUNNAME) \
bool FUNNAME(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) {while (!SEND_FUNNAME(pstr, len, num, can_overflow)) SAFE_SEND_MSG_CHECK return true;} \
TCP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
......@@ -261,8 +287,8 @@ TYPE FUNNAME(const boost::asio::ip::udp::endpoint& peer_addr, const std::string&
#define UDP_SEND_MSG(FUNNAME, NATIVE) \
bool FUNNAME(const boost::asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
{ \
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex); \
if (can_overflow || send_msg_buffer.size() < MAX_MSG_NUM) \
boost::unique_lock<boost::shared_mutex> lock(ST_THIS send_msg_buffer_mutex); \
if (can_overflow || ST_THIS send_msg_buffer.size() < MAX_MSG_NUM) \
{ \
udp_msg<typename Packer::msg_type> msg(peer_addr, ST_THIS packer_->pack_msg(pstr, len, num, NATIVE)); \
return ST_THIS do_direct_send_msg(std::move(msg)); \
......@@ -279,7 +305,8 @@ bool FUNNAME(const boost::asio::ip::udp::endpoint& peer_addr, const char* const
} \
UDP_SEND_MSG_CALL_SWITCH(FUNNAME, bool)
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_udp_socket's send buffer
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_udp_socket's send buffer successfully
//if can_overflow equal to false and the buffer is not available, will wait until it becomes available
#define UDP_SAFE_SEND_MSG(FUNNAME, SEND_FUNNAME) \
bool FUNNAME(const boost::asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, bool can_overflow = false) \
{while (!SEND_FUNNAME(peer_addr, pstr, len, num, can_overflow)) SAFE_SEND_MSG_CHECK return true;} \
......
......@@ -149,7 +149,7 @@ protected:
{
connected = reconnecting = true;
on_connect();
ST_THIS send_msg(); //send msg buffer may have msgs, send them
ST_THIS send_msg(); //send buffer may have msgs, send them
do_start();
}
else if ((boost::asio::error::operation_aborted != ec || reconnecting) && RE_CONNECT_CHECK && !ST_THIS get_io_service().stopped())
......
......@@ -127,54 +127,6 @@ public:
}
};
class inflexible_packer : public i_packer<inflexible_buffer>
{
public:
static size_t get_max_msg_size() {return MSG_BUFFER_SIZE - HEAD_LEN;}
using i_packer<msg_type>::pack_msg;
virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false)
{
msg_type msg;
auto pre_len = native ? 0 : HEAD_LEN;
auto total_len = packer_helper::msg_size_check(pre_len, pstr, len, num);
if ((size_t) -1 == total_len)
return msg;
else if (total_len > pre_len)
{
char* buff = nullptr;
size_t pos = 0;
if (!native)
{
auto head_len = (HEAD_TYPE) total_len;
if (total_len != head_len)
{
unified_out::error_out("pack msg error: length exceeds the header's range!");
return msg;
}
head_len = HEAD_H2N(head_len);
buff = new char[total_len];
memcpy(buff, (const char*) &head_len, HEAD_LEN);
pos = HEAD_LEN;
}
else
buff = new char[total_len];
for (size_t i = 0; i < num; ++i)
if (nullptr != pstr[i])
{
memcpy(buff + pos, pstr[i], len[i]);
pos += len[i];
}
msg.attach(buff, total_len);
} //if (total_len > pre_len)
return msg;
}
};
class prefix_suffix_packer : public i_packer<std::string>
{
public:
......
......@@ -36,10 +36,9 @@ public:
template<typename Arg>
st_server_socket_base(Server& server_, Arg& arg) : st_tcp_socket_base<Socket, Packer, Unpacker>(server_.get_service_pump(), arg), server(server_) {}
//reset all, be ensure that there's no any operations performed on this st_server_socket_base when invoke it
//notice, when reuse this st_server_socket_base, st_object_pool will invoke reset(), child must re-write this
//to initialize all member variables, and then do not forget to invoke st_server_socket_base::reset() to initialize father's
//member variables
//reset all, be ensure that there's no any operations performed on this socket when invoke it
//please note, when reuse this socket, st_object_pool will invoke reset(), child must re-write it to initialize all member variables,
//and then do not forget to invoke st_server_socket_base::reset() to initialize father's member variables
virtual void reset() {st_tcp_socket_base<Socket, Packer, Unpacker>::reset();}
protected:
......@@ -55,7 +54,7 @@ protected:
}
virtual void on_unpack_error() {unified_out::error_out("can not unpack msg."); ST_THIS force_close();}
//do not forget to force_close this st_tcp_socket_base(in del_client(), there's a force_close() invocation)
//do not forget to force_close this socket(in del_client(), there's a force_close() invocation)
virtual void on_recv_error(const boost::system::error_code& ec)
{
#ifdef AUTO_CLEAR_CLOSED_SOCKET
......
......@@ -29,22 +29,13 @@
namespace st_asio_wrapper
{
enum BufferType {POST_BUFFER, SEND_BUFFER, RECV_BUFFER};
#define post_msg_buffer ST_THIS msg_buffer[0]
#define post_msg_buffer_mutex ST_THIS msg_buffer_mutex[0]
#define send_msg_buffer ST_THIS msg_buffer[1]
#define send_msg_buffer_mutex ST_THIS msg_buffer_mutex[1]
#define recv_msg_buffer ST_THIS msg_buffer[2]
#define recv_msg_buffer_mutex ST_THIS msg_buffer_mutex[2]
#define temp_msg_buffer ST_THIS msg_buffer[3]
template<typename Socket, typename Packer, typename MsgType = typename Packer::msg_type>
template<typename Socket, typename Packer, typename Unpacker, typename InMsgType = typename Packer::msg_type, typename OutMsgType = typename Unpacker::msg_type>
class st_socket: public st_timer
{
public:
//keep size() constant time would better, because we invoke it frequently, so don't use std::list(gcc)
typedef boost::container::list<MsgType> container_type;
typedef boost::container::list<InMsgType> in_container_type;
typedef typename Unpacker::container_type out_container_type;
protected:
st_socket(boost::asio::io_service& io_service_) : st_timer(io_service_), _id(-1), next_layer_(io_service_), packer_(boost::make_shared<Packer>()) {reset_state();}
......@@ -60,7 +51,13 @@ protected:
started_ = false;
}
void clear_buffer() {st_asio_wrapper::do_something_to_all(msg_buffer, [](container_type& can) {can.clear();});}
void clear_buffer()
{
post_msg_buffer.clear();
send_msg_buffer.clear();
recv_msg_buffer.clear();
temp_msg_buffer.clear();
}
public:
//please do not change id at runtime via the following function, except this st_socket is not managed by st_object_pool,
......@@ -122,15 +119,15 @@ public:
}
//don't use the packer but insert into send buffer directly
bool direct_send_msg(const MsgType& msg, bool can_overflow = false) {return direct_send_msg(MsgType(msg), can_overflow);}
bool direct_send_msg(MsgType&& msg, bool can_overflow = false)
bool direct_send_msg(const InMsgType& msg, bool can_overflow = false) {return direct_send_msg(InMsgType(msg), can_overflow);}
bool direct_send_msg(InMsgType&& msg, bool can_overflow = false)
{
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex);
return can_overflow || send_msg_buffer.size() < MAX_MSG_NUM ? do_direct_send_msg(std::move(msg)) : false;
}
bool direct_post_msg(const MsgType& msg, bool can_overflow = false) {return direct_post_msg(MsgType(msg), can_overflow);}
bool direct_post_msg(MsgType&& msg, bool can_overflow = false)
bool direct_post_msg(const InMsgType& msg, bool can_overflow = false) {return direct_post_msg(InMsgType(msg), can_overflow);}
bool direct_post_msg(InMsgType&& msg, bool can_overflow = false)
{
if (direct_send_msg(std::move(msg), can_overflow))
return true;
......@@ -140,43 +137,22 @@ public:
}
//how many msgs waiting for sending or dispatching
size_t get_pending_msg_num(BufferType buffer_type = SEND_BUFFER)
{
boost::shared_lock<boost::shared_mutex> lock(msg_buffer_mutex[buffer_type]);
return msg_buffer[buffer_type].size();
}
//msgs in send buffer and post buffer are packed
//msgs in receive buffer are unpacked
void peek_first_pending_msg(MsgType& msg, BufferType buffer_type = SEND_BUFFER)
{
msg.clear();
GET_PENDING_MSG_NUM(get_pending_post_msg_num, post_msg_buffer, post_msg_buffer_mutex)
GET_PENDING_MSG_NUM(get_pending_send_msg_num, send_msg_buffer, send_msg_buffer_mutex)
GET_PENDING_MSG_NUM(get_pending_recv_msg_num, recv_msg_buffer, recv_msg_buffer_mutex)
boost::shared_lock<boost::shared_mutex> lock(msg_buffer_mutex[buffer_type]);
if (!msg_buffer[buffer_type].empty())
msg = msg_buffer[buffer_type].front();
}
//msgs in send buffer and post buffer are packed
//msgs in receive buffer are unpacked
void pop_first_pending_msg(MsgType& msg, BufferType buffer_type = SEND_BUFFER)
{
msg.clear();
PEEK_FIRST_PENDING_MSG(peek_first_pending_post_msg, post_msg_buffer, post_msg_buffer_mutex, InMsgType)
PEEK_FIRST_PENDING_MSG(peek_first_pending_send_msg, send_msg_buffer, send_msg_buffer_mutex, InMsgType)
PEEK_FIRST_PENDING_MSG(peek_first_pending_recv_msg, recv_msg_buffer, recv_msg_buffer_mutex, OutMsgType)
boost::unique_lock<boost::shared_mutex> lock(msg_buffer_mutex[buffer_type]);
if (!msg_buffer[buffer_type].empty())
{
msg.swap(msg_buffer[buffer_type].front());
msg_buffer[buffer_type].pop_front();
}
}
POP_FIRST_PENDING_MSG(pop_first_pending_post_msg, post_msg_buffer, post_msg_buffer_mutex, InMsgType)
POP_FIRST_PENDING_MSG(pop_first_pending_send_msg, send_msg_buffer, send_msg_buffer_mutex, InMsgType)
POP_FIRST_PENDING_MSG(pop_first_pending_recv_msg, recv_msg_buffer, recv_msg_buffer_mutex, OutMsgType)
//clear all pending msgs
void pop_all_pending_msg(container_type& msg_list, BufferType buffer_type = SEND_BUFFER)
{
boost::unique_lock<boost::shared_mutex> lock(msg_buffer_mutex[buffer_type]);
msg_list.splice(msg_list.end(), msg_buffer[buffer_type]);
}
POP_ALL_PENDING_MSG(pop_all_pending_post_msg, post_msg_buffer, post_msg_buffer_mutex, in_container_type)
POP_ALL_PENDING_MSG(pop_all_pending_send_msg, send_msg_buffer, send_msg_buffer_mutex, in_container_type)
POP_ALL_PENDING_MSG(pop_all_pending_recv_msg, recv_msg_buffer, recv_msg_buffer_mutex, out_container_type)
protected:
virtual bool do_start() = 0;
......@@ -199,7 +175,7 @@ protected:
//notice: on_msg_handle() will not be invoked from within this function
//
//notice: the msg is unpacked, using inconstant is for the convenience of swapping
virtual bool on_msg(MsgType& msg) = 0;
virtual bool on_msg(OutMsgType& msg) = 0;
#endif
//handling msg in om_msg_handle() will not block msg receiving on the same st_socket
......@@ -207,17 +183,17 @@ protected:
//if link_down is true, no matter return true or false, st_socket will not maintain this msg anymore, and continue dispatch the next msg continuously
//
//notice: the msg is unpacked, using inconstant is for the convenience of swapping
virtual bool on_msg_handle(MsgType& msg, bool link_down) = 0;
virtual bool on_msg_handle(OutMsgType& msg, bool link_down) = 0;
#ifdef WANT_MSG_SEND_NOTIFY
//one msg has sent to the kernel buffer, msg is the right msg
//notice: the msg is packed, using inconstant is for the convenience of swapping
virtual void on_msg_send(MsgType& msg) {}
virtual void on_msg_send(InMsgType& msg) {}
#endif
#ifdef WANT_ALL_MSG_SEND_NOTIFY
//send buffer goes empty
//notice: the msg is packed, using inconstant is for the convenience of swapping
virtual void on_all_msg_send(MsgType& msg) {}
virtual void on_all_msg_send(InMsgType& msg) {}
#endif
virtual bool on_timer(unsigned char id, const void* user_data)
......@@ -305,7 +281,7 @@ protected:
dispatching = false;
if (!re) //dispatch failed, re-dispatch
{
recv_msg_buffer.push_front(MsgType());
recv_msg_buffer.push_front(OutMsgType());
recv_msg_buffer.front().swap(last_dispatch_msg);
set_timer(3, 50, nullptr);
}
......@@ -354,7 +330,7 @@ protected:
recv_msg_buffer.splice(std::end(recv_msg_buffer), temp_msg_buffer);
#endif
#ifndef DISCARD_MSG_WHEN_LINK_DOWN
st_asio_wrapper::do_something_to_all(recv_msg_buffer, [this](MsgType& msg) {ST_THIS on_msg_handle(msg, true);});
st_asio_wrapper::do_something_to_all(recv_msg_buffer, [this](OutMsgType& msg) {ST_THIS on_msg_handle(msg, true);});
#endif
recv_msg_buffer.clear();
}
......@@ -362,7 +338,7 @@ protected:
}
//must mutex send_msg_buffer before invoke this function
bool do_direct_send_msg(MsgType&& msg)
bool do_direct_send_msg(InMsgType&& msg)
{
if (!msg.empty())
{
......@@ -375,7 +351,7 @@ protected:
}
//must mutex post_msg_buffer before invoke this function
bool do_direct_post_msg(MsgType&& msg)
bool do_direct_post_msg(InMsgType&& msg)
{
if (!msg.empty())
{
......@@ -395,13 +371,16 @@ protected:
uint_fast64_t _id;
Socket next_layer_;
MsgType last_send_msg, last_dispatch_msg;
InMsgType last_send_msg;
OutMsgType last_dispatch_msg;
boost::shared_ptr<i_packer<typename Packer::msg_type>> packer_;
container_type msg_buffer[4];
in_container_type post_msg_buffer, send_msg_buffer;
out_container_type recv_msg_buffer, temp_msg_buffer;
//st_socket will invoke dispatch_msg() when got some msgs. if these msgs can't push into recv_msg_buffer cause of receive buffer overflow,
//st_socket will delay 50 milliseconds(non-blocking) to invoke dispatch_msg() again, and now, as you known, temp_msg_buffer is used to hold these msgs temporarily.
boost::shared_mutex msg_buffer_mutex[3];
boost::shared_mutex post_msg_buffer_mutex, send_msg_buffer_mutex;
boost::shared_mutex recv_msg_buffer_mutex;
bool posting;
bool sending, suspend_send_msg_;
......
......@@ -28,12 +28,7 @@ class st_ssl_connector_base : public st_connector_base<Packer, Unpacker, Socket>
public:
st_ssl_connector_base(boost::asio::io_service& io_service_, boost::asio::ssl::context& ctx) : st_connector_base<Packer, Unpacker, Socket>(io_service_, ctx), authorized_(false) {}
//reset all, be ensure that there's no any operations performed on this st_ssl_connector_base when invoke it
//notice, when reuse this st_ssl_connector_base, st_object_pool will invoke reset(), child must re-write this to initialize
//all member variables, and then do not forget to invoke st_ssl_connector_base::reset() to initialize father's
//member variables
virtual void reset() {authorized_ = false; st_connector_base<Packer, Unpacker, Socket>::reset();}
bool authorized() const {return authorized_;}
protected:
......@@ -89,7 +84,7 @@ protected:
else
{
authorized_ = true;
ST_THIS send_msg(); //send msg buffer may have msgs, send them
ST_THIS send_msg(); //send buffer may have msgs, send them
do_start();
}
}
......@@ -107,9 +102,6 @@ public:
st_ssl_object_pool(st_service_pump& service_pump_, boost::asio::ssl::context::method m) : st_object_pool<Object>(service_pump_), ctx(m) {}
boost::asio::ssl::context& ssl_context() {return ctx;}
//this method simply create a class derived from st_socket from heap, secondly you must invoke
//bool add_client(typename st_client::object_ctype&, bool) before this socket can send or receive msgs.
//for st_udp_socket, you also need to invoke set_local_addr() before add_client(), please note
typename st_ssl_object_pool::object_type create_object()
{
auto client_ptr = ST_THIS reuse_object();
......
......@@ -34,17 +34,19 @@ namespace st_tcp
{
template <typename Socket, typename Packer, typename Unpacker>
class st_tcp_socket_base : public st_socket<Socket, Packer>
class st_tcp_socket_base : public st_socket<Socket, Packer, Unpacker>
{
public:
typedef typename Packer::msg_type msg_type;
typedef typename Packer::msg_ctype msg_ctype;
typedef typename Packer::msg_type in_msg_type;
typedef typename Packer::msg_ctype in_msg_ctype;
typedef typename Unpacker::msg_type out_msg_type;
typedef typename Unpacker::msg_ctype out_msg_ctype;
protected:
st_tcp_socket_base(boost::asio::io_service& io_service_) : st_socket<Socket, Packer>(io_service_), unpacker_(boost::make_shared<Unpacker>()) {reset_state();}
st_tcp_socket_base(boost::asio::io_service& io_service_) : st_socket<Socket, Packer, Unpacker>(io_service_), unpacker_(boost::make_shared<Unpacker>()) {reset_state();}
template<typename Arg>
st_tcp_socket_base(boost::asio::io_service& io_service_, Arg& arg) : st_socket<Socket, Packer>(io_service_, arg), unpacker_(boost::make_shared<Unpacker>()) {reset_state();}
st_tcp_socket_base(boost::asio::io_service& io_service_, Arg& arg) : st_socket<Socket, Packer, Unpacker>(io_service_, arg), unpacker_(boost::make_shared<Unpacker>()) {reset_state();}
public:
//reset all, be ensure that there's no any operations performed on this st_tcp_socket_base when invoke it
......@@ -52,7 +54,7 @@ public:
void reset_state()
{
unpacker_->reset_state();
st_socket<Socket, Packer>::reset_state();
st_socket<Socket, Packer, Unpacker>::reset_state();
closing = false;
}
......@@ -78,11 +80,11 @@ public:
bool is_closing() const {return closing;}
//get or change the unpacker at runtime
boost::shared_ptr<i_unpacker<msg_type>> inner_unpacker() {return unpacker_;}
boost::shared_ptr<const i_unpacker<msg_type>> inner_unpacker() const {return unpacker_;}
void inner_unpacker(const boost::shared_ptr<i_unpacker<msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
boost::shared_ptr<i_unpacker<out_msg_type>> inner_unpacker() {return unpacker_;}
boost::shared_ptr<const i_unpacker<out_msg_type>> inner_unpacker() const {return unpacker_;}
void inner_unpacker(const boost::shared_ptr<i_unpacker<out_msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
using st_socket<Socket, Packer>::send_msg;
using st_socket<Socket, Packer, Unpacker>::send_msg;
///////////////////////////////////////////////////
//msg sending interface
TCP_SEND_MSG(send_msg, false) //use the packer with native = false to pack the msgs
......@@ -111,19 +113,19 @@ protected:
{
if (!is_send_allowed() || ST_THIS get_io_service().stopped())
ST_THIS sending = false;
else if (!ST_THIS sending && !send_msg_buffer.empty())
else if (!ST_THIS sending && !ST_THIS send_msg_buffer.empty())
{
ST_THIS sending = true;
ST_THIS last_send_msg.swap(send_msg_buffer.front());
ST_THIS last_send_msg.swap(ST_THIS send_msg_buffer.front());
boost::asio::async_write(ST_THIS next_layer(), boost::asio::buffer(ST_THIS last_send_msg.data(), ST_THIS last_send_msg.size()),
boost::bind(&st_tcp_socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
send_msg_buffer.pop_front();
ST_THIS send_msg_buffer.pop_front();
}
return ST_THIS sending;
}
virtual bool is_send_allowed() const {return !is_closing() && st_socket<Socket, Packer>::is_send_allowed();}
virtual bool is_send_allowed() const {return !is_closing() && st_socket<Socket, Packer, Unpacker>::is_send_allowed();}
//can send data or not(just put into send buffer)
//msg can not be unpacked
......@@ -131,10 +133,10 @@ protected:
virtual void on_unpack_error() = 0;
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
virtual bool on_msg(msg_type& msg) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_msg(out_msg_type& msg) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
#endif
virtual bool on_msg_handle(msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
//start the asynchronous read
//it's child's responsibility to invoke this properly, because st_tcp_socket_base doesn't know any of the connection status
......@@ -143,7 +145,7 @@ protected:
auto recv_buff = unpacker_->prepare_next_recv();
if (boost::asio::buffer_size(recv_buff) > 0)
boost::asio::async_read(ST_THIS next_layer(), recv_buff,
boost::bind(&i_unpacker<msg_type>::completion_condition, unpacker_, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred),
boost::bind(&i_unpacker<out_msg_type>::completion_condition, unpacker_, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred),
boost::bind(&st_tcp_socket_base::recv_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
......@@ -164,14 +166,13 @@ protected:
{
if (!ec && bytes_transferred > 0)
{
auto unpack_ok = unpacker_->parse_msg(bytes_transferred, temp_msg_buffer);
auto unpack_ok = unpacker_->parse_msg(bytes_transferred, ST_THIS temp_msg_buffer);
ST_THIS dispatch_msg();
if (!unpack_ok)
{
on_unpack_error();
//reset unpacker's state after on_unpack_error(),
//so user can get the left half-baked msg in on_unpack_error()
//reset unpacker's state after on_unpack_error(), so user can get the left half-baked msg in on_unpack_error()
unpacker_->reset_state();
}
}
......@@ -191,7 +192,7 @@ protected:
else
ST_THIS on_send_error(ec);
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex);
boost::unique_lock<boost::shared_mutex> lock(ST_THIS send_msg_buffer_mutex);
ST_THIS sending = false;
//send msg sequentially, that means second send only after first send success
......@@ -208,7 +209,7 @@ protected:
}
protected:
boost::shared_ptr<i_unpacker<msg_type>> unpacker_;
boost::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
bool closing;
};
......
......@@ -41,9 +41,8 @@ namespace st_asio_wrapper
{
//timers are identified by id.
//for the same timer in the same st_timer, set_timer and stop_timer are not thread safe,
//please pay special attention. to resolve this defect, we must add a mutex member variable to timer_info,
//it's not worth
//for the same timer in the same st_timer, set_timer and stop_timer are not thread safe, please pay special attention.
//to resolve this defect, we must add a mutex member variable to timer_info, it's not worth
//
//suppose you have more than one service thread(see st_service_pump for service thread number control), then:
//same st_timer, same timer, on_timer is called sequentially
......@@ -73,7 +72,6 @@ public:
typedef const object_type object_ctype;
typedef boost::container::set<object_type> container_type;
//not thread safe for the same timer
void set_timer(unsigned char id, size_t milliseconds, const void* user_data)
{
object_type ti = {id};
......@@ -98,7 +96,6 @@ public:
start_timer(*iter);
}
//not thread safe for the same timer
void stop_timer(unsigned char id)
{
object_type ti = {id};
......
......@@ -38,31 +38,21 @@ namespace st_asio_wrapper
namespace st_udp
{
template<typename MsgType>
class udp_msg : public MsgType
{
public:
boost::asio::ip::udp::endpoint peer_addr;
udp_msg() {}
udp_msg(const boost::asio::ip::udp::endpoint& _peer_addr, MsgType&& msg) : MsgType(std::move(msg)), peer_addr(_peer_addr) {}
void swap(udp_msg& other) {std::swap(peer_addr, other.peer_addr); MsgType::swap(other);}
void swap(boost::asio::ip::udp::endpoint& addr, MsgType&& tmp_msg) {std::swap(peer_addr, addr); MsgType::swap(tmp_msg);}
};
template <typename Packer = DEFAULT_PACKER, typename Unpacker = DEFAULT_UDP_UNPACKER, typename Socket = boost::asio::ip::udp::socket>
class st_udp_socket_base : public st_socket<Socket, Packer, udp_msg<typename Packer::msg_type>>
class st_udp_socket_base : public st_socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>>
{
public:
typedef udp_msg<typename Packer::msg_type> msg_type;
typedef const msg_type msg_ctype;
typedef udp_msg<typename Packer::msg_type> in_msg_type;
typedef const in_msg_type in_msg_ctype;
typedef udp_msg<typename Unpacker::msg_type> out_msg_type;
typedef const out_msg_type out_msg_ctype;
public:
st_udp_socket_base(boost::asio::io_service& io_service_) : st_socket<Socket, Packer, msg_type>(io_service_), unpacker_(boost::make_shared<Unpacker>()) {ST_THIS reset_state();}
st_udp_socket_base(boost::asio::io_service& io_service_) : st_socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type>(io_service_), unpacker_(boost::make_shared<Unpacker>())
{ST_THIS reset_state();}
//reset all, be ensure that there's no any operations performed on this st_udp_socket when invoke it
//notice, when reuse this st_udp_socket, st_object_pool will invoke reset(), child must re-write this to initialize
//please note, when reuse this st_udp_socket, st_object_pool will invoke reset(), child must re-write this to initialize
//all member variables, and then do not forget to invoke st_udp_socket::reset() to initialize father's
//member variables
virtual void reset()
......@@ -108,7 +98,7 @@ public:
boost::shared_ptr<const i_udp_unpacker<typename Packer::msg_type>> inner_unpacker() const {return unpacker_;}
void inner_unpacker(const boost::shared_ptr<i_udp_unpacker<typename Packer::msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
using st_socket<Socket, Packer, msg_type>::send_msg;
using st_socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type>::send_msg;
///////////////////////////////////////////////////
//msg sending interface
UDP_SEND_MSG(send_msg, false) //use the packer with native = false to pack the msgs
......@@ -150,19 +140,19 @@ protected:
{
if (!is_send_allowed() || ST_THIS get_io_service().stopped())
ST_THIS sending = false;
else if (!ST_THIS sending && !send_msg_buffer.empty())
else if (!ST_THIS sending && !ST_THIS send_msg_buffer.empty())
{
ST_THIS sending = true;
ST_THIS last_send_msg.swap(send_msg_buffer.front());
ST_THIS last_send_msg.swap(ST_THIS send_msg_buffer.front());
ST_THIS next_layer().async_send_to(boost::asio::buffer(ST_THIS last_send_msg.data(), ST_THIS last_send_msg.size()), ST_THIS last_send_msg.peer_addr,
boost::bind(&st_udp_socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
send_msg_buffer.pop_front();
ST_THIS send_msg_buffer.pop_front();
}
return ST_THIS sending;
}
virtual bool is_send_allowed() const {return ST_THIS lowest_layer().is_open() && st_socket<Socket, Packer, msg_type>::is_send_allowed();}
virtual bool is_send_allowed() const {return ST_THIS lowest_layer().is_open() && st_socket<Socket, Packer, Unpacker, in_msg_type, out_msg_type>::is_send_allowed();}
//can send data or not(just put into send buffer)
virtual void on_recv_error(const boost::system::error_code& ec)
......@@ -172,10 +162,10 @@ protected:
}
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER
virtual bool on_msg(msg_type& msg) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_msg(out_msg_type& msg) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
#endif
virtual bool on_msg_handle(msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {unified_out::debug_out("recv(" size_t_format "): %s", msg.size(), msg.data()); return true;}
void clean_up()
{
......@@ -194,8 +184,8 @@ protected:
{
if (!ec && bytes_transferred > 0)
{
temp_msg_buffer.resize(temp_msg_buffer.size() + 1);
temp_msg_buffer.back().swap(peer_addr, unpacker_->parse_msg(bytes_transferred));
ST_THIS temp_msg_buffer.resize(ST_THIS temp_msg_buffer.size() + 1);
ST_THIS temp_msg_buffer.back().swap(peer_addr, unpacker_->parse_msg(bytes_transferred));
ST_THIS dispatch_msg();
}
#ifdef _MSC_VER
......@@ -218,7 +208,7 @@ protected:
else
ST_THIS on_send_error(ec);
boost::unique_lock<boost::shared_mutex> lock(send_msg_buffer_mutex);
boost::unique_lock<boost::shared_mutex> lock(ST_THIS send_msg_buffer_mutex);
ST_THIS sending = false;
//send msg sequentially, that means second send only after first send success
......
......@@ -48,12 +48,26 @@ public:
virtual boost::asio::mutable_buffers_1 prepare_next_recv() = 0;
};
template<typename MsgType>
class udp_msg : public MsgType
{
public:
boost::asio::ip::udp::endpoint peer_addr;
udp_msg() {}
udp_msg(const boost::asio::ip::udp::endpoint& _peer_addr, MsgType&& msg) : MsgType(std::move(msg)), peer_addr(_peer_addr) {}
void swap(udp_msg& other) {std::swap(peer_addr, other.peer_addr); MsgType::swap(other);}
void swap(boost::asio::ip::udp::endpoint& addr, MsgType&& tmp_msg) {std::swap(peer_addr, addr); MsgType::swap(tmp_msg);}
};
template<typename MsgType>
class i_udp_unpacker
{
public:
typedef MsgType msg_type;
typedef const msg_type msg_ctype;
typedef boost::container::list<udp_msg<msg_type> > container_type;
protected:
virtual ~i_udp_unpacker() {}
......@@ -80,9 +94,6 @@ public:
while (unpack_ok) //considering stick package problem, we need a loop
if ((size_t) -1 != cur_msg_len)
{
//cur_msg_len now can be assigned in the completion_condition function, or in the following 'else if',
//so, we must verify cur_msg_len at the very beginning of using it, not at the assignment as we do
//before, please pay special attention
if (cur_msg_len > MSG_BUFFER_SIZE || cur_msg_len <= HEAD_LEN)
unpack_ok = false;
else if (remain_len >= cur_msg_len) //one msg received
......@@ -120,7 +131,7 @@ public:
if (unpack_ok && remain_len > 0)
{
auto pnext = std::next(msg_pos_can.back().first, msg_pos_can.back().second);
memcpy(std::begin(raw_buff), pnext, remain_len); //left behind unparsed msg
memcpy(std::begin(raw_buff), pnext, remain_len); //left behind unparsed data
}
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
......@@ -129,8 +140,7 @@ public:
//a return value of 0 indicates that the read operation is complete. a non-zero value indicates the maximum number
//of bytes to be read on the next call to the stream's async_read_some function. ---boost::asio::async_read
//read as many as possible to reduce asynchronous call-back(st_tcp_socket_base::recv_handler), and don't forget to handle
//stick package carefully in parse_msg function.
//read as many as possible to reduce asynchronous call-back, and don't forget to handle stick package carefully in parse_msg function.
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred)
{
if (ec)
......@@ -158,7 +168,7 @@ public:
protected:
boost::array<char, MSG_BUFFER_SIZE> raw_buff;
size_t cur_msg_len; //-1 means head has not received, so, doesn't know the whole msg length.
size_t cur_msg_len; //-1 means head has not received, so doesn't know the whole msg length.
size_t remain_len; //half-baked msg
};
......@@ -174,9 +184,15 @@ protected:
class replaceable_unpacker : public i_unpacker<replaceable_buffer>, public unpacker
{
public:
//overwrite the following three typedef defined by unpacker
using i_unpacker<replaceable_buffer>::msg_type;
using i_unpacker<replaceable_buffer>::msg_ctype;
using i_unpacker<replaceable_buffer>::container_type;
public:
virtual void reset_state() {unpacker::reset_state();}
virtual bool parse_msg(size_t bytes_transferred, i_unpacker<replaceable_buffer>::container_type& msg_can)
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
unpacker::container_type tmp_can;
auto unpack_ok = unpacker::parse_msg(bytes_transferred, tmp_can);
......@@ -187,7 +203,7 @@ public:
msg_can.back().raw_buffer(com);
});
//when unpack failed, some successfully parsed msgs may still returned via msg_can(stick package), please note.
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
return unpack_ok;
}
......@@ -212,10 +228,10 @@ protected:
};
//this unpacker demonstrate how to forbid memory copy while parsing msgs.
class inflexible_unpacker : public i_unpacker<inflexible_buffer>
class unbuffered_unpacker : public i_unpacker<inflexible_buffer>
{
public:
inflexible_unpacker() {reset_state();}
unbuffered_unpacker() {reset_state();}
size_t current_msg_length() const {return raw_buff.size();} //current msg's total length(not include the head), 0 means don't know
public:
......@@ -277,12 +293,11 @@ public:
private:
HEAD_TYPE head_buff;
//please notice that we don't have a fixed size array with maximum size any more(like the default unpacker).
//please note that we don't have a fixed size array with maximum size any more(like the default unpacker).
//this is very useful if you have very few but very large msgs, fox example:
//you have a very large msg(1M size), but all others are very small, if you use a fixed size array to hold msgs in the unpackers,
//all the unpackers must have an array with at least 1M size, each st_socket will have a unpacker, this will cause your application occupy very large memory but with
//very low utilization ratio.
//this inflexible_unpacker will resolve the above problem, and with another benefit: no memory copying needed any more.
//all the unpackers must have an array with at least 1M size, each st_socket will have a unpacker, this will cause your application occupy very large memory but with very low utilization ratio.
//this unbuffered_unpacker will resolve above problem, and with another benefit: no memory replication needed any more.
msg_type raw_buff;
int step; //-1-error format, 0-want the head, 1-want the body
};
......
......@@ -7,15 +7,6 @@
//#define REUSE_OBJECT //use objects pool
//#define AUTO_CLEAR_CLOSED_SOCKET
//#define CLEAR_CLOSED_SOCKET_INTERVAL 1
//the following three macro demonstrate how to support huge msg(exceed 65535 - 2).
//huge msg consume huge memory, for example, if we support 1M msg size, because every st_tcp_socket has a
//private unpacker which has a buffer at lest 1M size, so 1K st_tcp_socket will consume 1G memory.
//if we consider the send buffer and recv buffer, the buffer's default max size is 1K, so, every st_tcp_socket
//can consume 2G(2 * 1M * 1K) memory when performance testing(both send buffer and recv buffer are full).
//#define HUGE_MSG
//#define MAX_MSG_LEN (1024 * 1024)
//#define MAX_MSG_NUM 8 //reduce buffer size to reduce memory occupation
//configuration
//use the following macro to control the type of packer and unpacker
......@@ -81,16 +72,16 @@ public:
protected:
//msg handling
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER //not force to use msg recv buffer(so on_msg() will make the decision)
//we can handle the msg very fast, so we don't use the recv buffer(return false)
virtual bool on_msg(msg_type& msg) {handle_msg(msg); return true;}
#ifndef FORCE_TO_USE_MSG_RECV_BUFFER //not force to use msg recv buffer(so on_msg will make the decision)
//we can handle msg very fast, so we don't use recv buffer(return true)
virtual bool on_msg(out_msg_type& msg) {handle_msg(msg); return true;}
#endif
//we should handle the msg in on_msg_handle for time-consuming task like this:
virtual bool on_msg_handle(msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//we should handle msg in on_msg_handle for time-consuming task like this:
virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//msg handling end
private:
void handle_msg(msg_ctype& msg)
void handle_msg(out_msg_ctype& msg)
{
recv_bytes += msg.size();
if (check_msg && (msg.size() < sizeof(size_t) || recv_index != *(size_t*) msg.data()))
......@@ -120,26 +111,22 @@ public:
void close_some_client(size_t n)
{
//close some clients
//method #1
// do_something_to_one([&n](object_ctype& item) {return n-- > 0 ? item->graceful_close(), false : true;});
//notice: this method need to define AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro, because it just closed the st_socket,
//not really removed them from object pool, this will cause test_client still send data to them, and wait responses from them.
//not really removed them from object pool, this will cause test_client still send data via them, and wait responses from them.
//for this scenario, the smaller CLEAR_CLOSED_SOCKET_INTERVAL is, the better experience you will get, so set it to 1 second.
//method #2
while (n-- > 0)
graceful_close(at(0));
//notice: this method directly remove the client from object pool (and insert into list temp_object_can), and close the st_socket.
//clients in list temp_object_can will be reused if new clients needed (REUSE_OBJECT macro been defined), or be truly freed from memory
//CLOSED_SOCKET_MAX_DURATION seconds later (but check interval is SOCKET_FREE_INTERVAL seconds, so the maximum delay is CLOSED_SOCKET_MAX_DURATION + SOCKET_FREE_INTERVAL).
//this is a equivalence of calling i_server::del_client in st_server_socket_base::on_recv_error (see st_server_socket_base for more details).
//notice: this method directly remove clients from object pool, and close them, not require AUTO_CLEAR_CLOSED_SOCKET and CLEAR_CLOSED_SOCKET_INTERVAL macro
//this is a equivalence of calling i_server::del_client in st_server_socket_base::on_recv_error(see st_server_socket_base for more details).
}
///////////////////////////////////////////////////
//msg sending interface
//guarantee send msg successfully even if can_overflow equal to false
//success at here just means put the msg into st_tcp_socket's send buffer
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into st_tcp_socket's send buffer successfully
TCP_RANDOM_SEND_MSG(safe_random_send_msg, safe_send_msg)
TCP_RANDOM_SEND_MSG(safe_random_send_native_msg, safe_send_native_msg)
//msg sending interface
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册