提交 3bbb64b0 编写于 作者: Y youngwolf

Sync docs.

上级 3bf3c2a7
......@@ -41,8 +41,8 @@ public:
typedef obj_with_begin_time<OutMsgType> out_msg;
typedef InContainer<in_msg> in_container_type;
typedef OutContainer<out_msg> out_container_type;
typedef InQueue<in_msg, in_container_type> in_queue_type;
typedef OutQueue<out_msg, out_container_type> out_queue_type;
typedef InQueue<in_container_type> in_queue_type;
typedef OutQueue<out_container_type> out_queue_type;
uint_fast64_t id() const;
bool is_equal_to(uint_fast64_t id) const;
......@@ -65,17 +65,22 @@ asio::ssl::stream<asio::ip::tcp::socket>对象及其从它们继承的对象。
virtual void send_heartbeat() = 0;
发送心跳包,由子类实现。
virtual const char* type_name() const = 0;
virtual int type_id() const = 0;
如果你拿到一个ascs::socket,通过这两个函数判断它是,TCP/UDP,client/server, ssl(client/server)。
bool started() const;
是否已经开始,已经开始意思是已经调用过start()了。
是否已经开始,已经开始意思是已经调用过start()了,关闭连接之后,started()变为false
void start();
开始,在多线程保护的情况下调用do_start函数。
开始,在多线程保护的情况下调用do_start函数。
void start_heartbeat(int interval, int max_absence = ASCS_HEARTBEAT_MAX_ABSENCE);
开始定时器调用check_heartbeat。
bool check_heartbeat(int interval, int max_absence = ASCS_HEARTBEAT_MAX_ABSENCE);
检测心跳,如果未超时的话将发送一个心跳包。
检测心跳,如果未超时(在 ASCS_HEARTBEAT_INTERVAL * ASCS_HEARTBEAT_MAX_ABSENCE 秒之内未收到任何数据)
的话将发送一个心跳包。
bool is_sending() const;
是否正在发送数据。
......@@ -83,7 +88,7 @@ asio::ssl::stream<asio::ip::tcp::socket>对象及其从它们继承的对象。
#ifdef ASCS_PASSIVE_RECV
bool is_reading() const;
#endif
是否正在接收数据,如果未定义ASCS_PASSIVE_RECV宏,则总是处于接收数据的状态。
是否正在接收数据,如果未定义ASCS_PASSIVE_RECV宏,则总是处于接收数据的状态,所以无需该函数
bool is_dispatching() const;
是否正在派发数据。
......@@ -93,7 +98,7 @@ asio::ssl::stream<asio::ip::tcp::socket>对象及其从它们继承的对象。
void msg_resuming_interval(size_t interval);
size_t msg_resuming_interval() const;
接收缓存满时,重复检查间隔。
接收缓存满时,重复检查(以便在缓存可用时继承读取套接字)间隔。
void msg_handling_interval(size_t interval);
size_t msg_handling_interval() const;
......@@ -104,40 +109,38 @@ on_msg_handle返回false或者0时,延时多久重新派发消息。
std::shared_ptr<i_packer<typename Packer::msg_type>> packer();
std::shared_ptr<const i_packer<typename Packer::msg_type>> packer() const;
void packer(const std::shared_ptr<i_packer<typename Packer::msg_type>>& _packer_);
获取/修改打包器。
注意,运行时修改打包器是非线程安全的,它会与消息发送冲突,由于消息发送和打包器修改都是使用者触发的,所以如果有资源竞争,使用者
有义务解决冲突问题。不支持多线程一是为了效率,二是这个功能用得很少。
bool is_send_buffer_available();
判断消息发送缓存是否可用,即里面的消息数量是否小于ASCS_MAX_MSG_NUM条,如果以can_overflow为true调用任何消息发送函数(如send_msg),
判断消息发送缓存是否可用(参看宏ASCS_MAX_SEND_BUF),如果以can_overflow为true调用任何消息发送函数(如send_msg),
将马上成功而无论消息发送缓存是否可用,所以可能会造成消息发送缓存大小不可控。
bool is_recv_buffer_available() const;
接收缓存是否可用,即里面的消息数量是否小于ASCS_MAX_MSG_NUM条
接收缓存是否可用。
bool direct_send_msg(const InMsgType& msg, bool can_overflow = false);
bool direct_send_msg(InMsgType&& msg, bool can_overflow = false);
直接发送消息(放入消息发送缓存)而不再调用i_packer::pack_msg函数,其实socket内部在发送消息时也是调用这个函数,只是在调用
之前先调用了i_packer::pack_msg而已。
template<typename T> bool direct_send_msg(T&& msg, bool can_overflow = false);
bool direct_send_msg(list<InMsgType>& msg_can, bool can_overflow = false);
直接发送消息(放入消息发送缓存)而不再调用i_packer::pack_msg函数。
#ifdef ASCS_SYNC_SEND
sync_call_result direct_sync_send_msg(const InMsgType& msg, unsigned duration = 0, bool can_overflow = false);
sync_call_result direct_sync_send_msg(InMsgType&& msg, unsigned duration = 0, bool can_overflow = false);
template<typename T> sync_call_result direct_sync_send_msg(T&& msg, unsigned duration = 0, bool can_overflow = false);
sync_call_result direct_sync_send_msg(list<InMsgType>& msg_can, unsigned duration = 0, bool can_overflow = false);
#endif
同步直接发送消息,其它的direct_send_msg一样,只是多一个duration参数,表示等待时间,0为永远等待直到消息发送成功
同步直接发送消息,其它的和direct_send_msg一样,只是多一个duration参数,表示等待时间,0为永远等待直到消息发送成功或者失败
注意超时返回不代表是错误,库用异步写来模拟同步写,超时返回之后,消息仍然可以在将来的某个时候被成功发送,此时on_msg_send仍然会
被调用(如果定义了相应的宏的话)。
#ifdef ASCS_SYNC_RECV
sync_call_result sync_recv_msg(std::list<OutMsgType>& msg_can, unsigned duration = 0);
sync_call_result sync_recv_msg(list<OutMsgType>& msg_can, unsigned duration = 0);
#endif
同步接收消息,使用者必须消费掉所有从msg_can返回的消息,库不再维护它们。
注意库只是append操作msg_can容器,duration参数表示等待时间,0为永远等待直到成功接收到至少一条消息,超时返回不代表是错误,库用异步读
来模拟同步读,超时返回之后,库仍然可以在将来某个时刻成功的接收到消息,此时如果使用者已经发起了另一个同步读请求,则消息将通过另一个
sync_recv_msg返回且不再继承尝试on_msg和on_msg_handle,如果没有,则消息将通过on_msg同步派发(如果定义了相应宏的话),最后如果还有剩余
消息的话,将通过on_msg_handle异步派发。
注意库只是append操作msg_can容器,duration参数表示等待时间,0为永远等待直到成功接收到至少一条消息或者失败,超时返回不代表是错误,
库用异步读来模拟同步读,超时返回之后,库仍然可以在将来某个时刻成功的接收到消息,此时如果使用者已经发起了另一个同步读请求,
则消息将通过另一个sync_recv_msg返回且不再继续尝试on_msg和on_msg_handle,如果没有,则消息将通过on_msg同步派发(如果定义了相应宏
的话),最后如果还有剩余消息的话,将通过on_msg_handle异步派发。
size_t get_pending_send_msg_num();
size_t get_pending_recv_msg_num();
......@@ -156,33 +159,33 @@ protected:
virtual bool do_start();
记录上次开始接收消息的时间,调用start_heartbeat(如果需要心跳的话),开始收发消息。
virtual void on_send_error(const error_code& ec);
virtual void on_recv_error(const asio::error_code& ec) = 0;
发送接收失败时回调,对于tcp::socket,如果需要连接断开事件,建议重写on_recv_error。
接收失败时回调,对于tcp,如果需要连接断开事件,建议重写on_recv_error。
virtual bool on_heartbeat_error() = 0;
心跳包超时时回调。
virtual void on_close();
当对象真正被close之前,会调用这个回调,用户可以在这里面释放资源(除了socket自身),在这之后,对象可能会被重用或者释放。
当对象真正被close之前,会调用这个回调,用户可以在这里面释放资源(除了socket自身),在这之后,对象可能会被重用或者释放。
virtual void after_close();
重写这个函数的最好的例子就是客户端做重连接。
#ifdef ASCS_SYNC_DISPATCH
virtual size_t on_msg(std::list<OutMsgType>& msg_can);
virtual size_t on_msg(list<OutMsgType>& msg_can);
#endif
同步派发消息,在on_msg返回之后,本套接字上的数据接收暂停,如果使用者不能消费掉所有msg_can里面的消息,则剩余的消息将进入异步派发
模式(通过on_msg_handle派发),这就带来一个消息乱序的问题,on_msg_handle将和on_msg(下次收到消息时调用)并发的被调用,所以如果
使用者不能消费掉所有消息,则不能开启这个功能,在简单的数据往返业务中(如类似乒乓测试一样的业务逻辑),这个功能会提高效率。
同步派发消息,在on_msg返回之前,本套接字上的数据接收暂停,如果使用者不能消费掉所有msg_can里面的消息,则剩余的消息将进入异步派发
模式(通过on_msg_handle派发),这就带来一个消息乱序的问题,on_msg_handle将和on_msg(下次收到消息时调用)并发地被调用,所以如果
使用者不能消费掉所有消息,则不能开启这个功能(除非你无所有乱序),在简单的数据往返业务中(如类似乒乓测试一样的业务逻辑),
这个功能会提高效率。
#ifdef ASCS_DISPATCH_BATCH_MSG
virtual size_t on_msg_handle(out_queue_type& can);
#else
virtual bool on_msg_handle(OutMsgType& msg);
#endif
从接收缓存派发一条(或者所有)消息,返回true(或者大于0)表示消息被成功处理,返回false(或者0)表示消息无法立即处理,于是将暂停一小段时间之后继续重试(异步)。
如果派发所有消息,用户需要把处理过的消息从queue里面弹出来,剩下的将在下次on_msg_handle里面继续派发。
从接收缓存派发一条(或者所有)消息,返回true(或者大于0)表示消息被成功处理,返回false(或者0)表示消息无法立即处理,于是将暂停
一段时间(异步)之后继续重试。如果派发所有消息,用户需要把处理过的消息从queue里面弹出来,剩下的将在下次on_msg_handle里面继续派发。
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
virtual void on_msg_send(InMsgType& msg);
......@@ -197,18 +200,20 @@ protected:
void close();
开启close流程,由继承者调用。ascs::socket会定时检测自己是否可以安全的被重用或被释放(即所有异步调用都已结束,包括正常结束和非正常结束),
如果是,调用上面的on_close()和after_close(), 然后object_pool将完全接管这个socket,以便在适当的时候重用或者释放它。
如果定义了ASCS_DELAY_CLOSE宏且其值等于0,则socket将保证以上说的行为,如果没有定义,则简单地在ASCS_DELAY_CLOSE秒后,调用on_close()和after_close(),
然后同样的道理,object_pool将完全接管这个socket,以便在适当的时候重用或者释放它。
如果定义了ASCS_DELAY_CLOSE宏且其值等于0,则socket将保证以上说的行为,如果没有定义,则简单地在ASCS_DELAY_CLOSE秒后,调用on_close()和
after_close(),然后同样的道理,object_pool将完全接管这个socket,以便在适当的时候重用或者释放它。
bool handle_msg();
子类收到消息之后,调用这个函数来派发消息,如果当前有同步接收,则通知sync_recv_msg函数,否则如果定义了ASCS_SYNC_DISPATCH宏就调用on_msg,否则就把消息存入消息
接收缓存,然后调用dispatch_msg。
子类收到消息之后,调用这个函数来派发消息,如果当前有同步接收,则通知sync_recv_msg函数,否则如果定义了ASCS_SYNC_DISPATCH宏就调用on_msg,
否则就把消息存入消息接收缓存,然后调用dispatch_msg。
bool do_direct_send_msg(InMsgType&& msg);
template<typename T> bool do_direct_send_msg(T&& msg);
bool do_direct_send_msg(list<InMsgType>& msg_can);
将消息插入发送队列并开始异步发送数据(如果当前没有异步发送的话),内部使用。
#ifdef ASCS_SYNC_SEND
sync_call_result do_direct_sync_send_msg(InMsgType&& msg, unsigned duration = 0);
template<typename T> sync_call_result do_direct_sync_send_msg(T&& msg, unsigned duration = 0);
sync_call_result do_direct_sync_send_msg(list<InMsgType>& msg_can, unsigned duration = 0);
#endif
将消息插入发送队列并开始异步发送数据(如果当前没有异步发送的话),然后等待发送结束,内部使用。
......
......@@ -2,42 +2,73 @@
namespace ascs { namespace tcp {
带连接功能的tcp::socket_base,算是一个真正的客户端了
template <typename Packer, typename Unpacker, typename Socket = asio::ip::tcp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
template <typename Packer, typename Unpacker, typename Matrix = i_matrix, typename Socket = asio::ip::tcp::socket,
template<typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class client_socket_base : public socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer>
{
private:
typedef socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer> super;
public:
static const typename super::tid TIMER_BEGIN = super::TIMER_END;
static const typename super::tid TIMER_CONNECT = TIMER_BEGIN;
static const typename super::tid TIMER_END = TIMER_BEGIN + 5;
client_socket_base(asio::io_service& io_service_);
template<typename Arg> client_socket_base(asio::io_context& io_context_, Arg& arg);
template<typename Arg>
client_socket_base(asio::io_service& io_service_, Arg& arg);
client_socket_base(Matrix& matrix_);
template<typename Arg> client_socket_base(Matrix& matrix_, Arg& arg);
ssl使用。
public:
virtual void reset();
重置所有,object_pool在重用时会调用。client_socket_base的子类可重写它以重置自己的状态,记得最后需要调用本类的reset。
virtual const char* type_name() const;
返回 "TCP (client endpoint)"。
virtual int type_id() const;
返回 1。
virtual bool obsoleted();
在调用父类同名函数的基础上,增加了对是否重连接的判断,如果需要重连接则返回假
virtual void reset();
重置重连状态为ASCS_RECONNECT,然后调用父类同名函数
void set_server_addr(unsigned short port, const std::string& ip);
const boost::asio::ip::tcp::endpoint& get_server_addr() const;
设置服务端地址用于连接之用,需要在do_start之前被调用。
bool set_local_addr(unsigned short port, const std::string& ip = std::string());
const asio::ip::tcp::endpoint& get_local_addr() const;
设备本端地址,需要在do_start之前被调用。
void open_reconnect();
void close_reconnect();
开启关闭自动重连接。
如果你想关闭重连接,则:
1.定义ASCS_RECONNECT为false;
2.在on_connect()中调用close_reconnect();
3.重写after_close()并且什么也不做。
如果你想控制重连接失败之后的重试次数,则重写prepare_reconnect虚函数。
disconnect(bool), force_shutdown(bool) 和 graceful_shutdown(bool, bool) 可以覆盖宏定义的定义。
reset()函数会重置重连接行为为ASCS_RECONNECT。
void disconnect(bool reconnect = false);
直接调用force_shutdown。
void force_shutdown(bool reconnect = false);
强制关闭————调用父类的shutdown,如果reconnect为true,则关闭之后,马上重新连接服务器
强制关闭,记录重连接状态,输出一些提示信息,调用父类的shutdown
void graceful_shutdown(bool reconnect = false, bool sync = true);
优雅关闭,调用父类的graceful_shutdown函数,reconnect参数的意义同上,sync参数直接传递给父类
优雅关闭,记录重连接状态,输出一些提示信息,调用父类的graceful_shutdown函数
在on_msg中,请以sync为false调用该函数,在其它所有service线程中,推荐也用sync为false调用该函数。
protected:
void first_init(Matrix* matrix_ = nullptr);
构造时调用,仅仅是为了节省代码量而已,因为我们有多个构造函数都将调用它。
Matrix* get_matrix();
const Matrix* get_matrix() const;
virtual bool do_start();
发起异步连接。
创建套接字,绑定本端端口,开始异步连接。
virtual void connect_handler(const asio::error_code& ec);
异步连接回调函数,如果失败则调用prepare_next_reconnect(),成功则调用父类do_start()。
......@@ -63,17 +94,22 @@ protected:
virtual void after_close();
如果需要重连接,则执行重连接。
private:
bool prepare_next_reconnect(const asio::error_code& ec);
如果允许(io_context仍然在运行且prepare_reconnect返回大于等于0),启动定时器以延时一小段时间之后重新连接服务器。
protected:
private:
bool need_reconnect;
是否重新连接(当连接断开后)。
private:
asio::ip::tcp::endpoint server_addr;
服务器地址。
asio::ip::tcp::endpoint local_addr;
本端地址。
Matrix* matrix;
用于操控multi_client_base,multi_client_base在创建client_socket_base的时候,会把自己this指针通过构造函数传入。
};
}} //namespace
......@@ -3,20 +3,22 @@ namespace ascs { namespace tcp {
服务端套接字类
template<typename Packer, typename Unpacker, typename Server = i_server, typename Socket = asio::ip::tcp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
template<typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class server_socket_base : public socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer>,
public std::enable_shared_from_this<server_socket_base<Packer, Unpacker, Server, Socket, InQueue, InContainer, OutQueue, OutContainer>>
{
public:
server_socket_base(Server& server_);
template<typename Arg>
server_socket_base(Server& server_, Arg& arg);
template<typename Arg> server_socket_base(Server& server_, Arg& arg);
ssl使用。
virtual void reset();
重置所有,object_pool在重用时会调用。server_socket_base的子类可重写它以重置自己的状态,记得最后需要调用本类的reset。
virtual const char* type_name() const;
返回 "TCP (server endpoint)"。
virtual int type_id() const;
返回 2。
virtual void take_over(std::shared_ptr<st_server_socket_base> socket_ptr);
恢复对象socket_ptr到这个st_server_socket,所以你的用户数据最好是指针(智能指针),这样就不需要拷贝了,否则必须执行深拷贝。
......@@ -25,10 +27,10 @@ ssl使用。
直接调用force_shutdown。
void force_shutdown();
强制退出————调用父类的同名函数
强制关闭,记录重连接状态,输出一些提示信息,调用父类的shutdown
void graceful_shutdown(bool sync = true);
优雅关闭————调用父类的同名函数。
优雅关闭,记录重连接状态,输出一些提示信息,调用父类的graceful_shutdown函数。
在on_msg中,请以sync为false调用该函数,在其它所有service线程中,推荐也用sync为false调用该函数。
protected:
......@@ -39,7 +41,7 @@ protected:
解包错误,默认的行为是关闭连接,可以重写以自定义行为。
virtual void on_recv_error(const error_code& ec);
连接断开,默认的行为是调用i_server::del_client,可以重写以自定义行为。
连接断开,默认的行为是调用i_server::del_socket,可以重写以自定义行为。
virtual void on_async_shutdown_error();
异步关闭出错时回调。
......
......@@ -3,8 +3,7 @@ namespace ascs { namespace tcp {
tcp套接字类,实现tcp数据的收发
template <typename Socket, typename Packer, typename Unpacker,
template<typename, typename> class InQueue, template<typename> class InContainer,
template<typename, typename> class OutQueue, template<typename> class OutContainer>
template<typename> class InQueue, template<typename> class InContainer, template<typename> class OutQueue, template<typename> class OutContainer>
class socket_base : public socket<Socket, Packer, typename Packer::msg_type, typename Unpacker::msg_type, InQueue, InContainer, OutQueue, OutContainer>
{
public:
......@@ -40,8 +39,9 @@ public:
virtual void send_heartbeat();
发送一个心跳包。
void reset();
重置所有并调用同类同名函数。
virtual void reset();
重置所有并调用同类同名函数,如果你在构造函数里面做了额外的配置工作,那么那些配置工作也需要在这里做,因为在对象
被重用时,不会再调用构造函数,而是这个reset函数。
bool is_broken() const;
连接是否已经断开。
......@@ -61,7 +61,7 @@ public:
获取/修改解包器。
注意,运行时修改解包器是非线程安全的,只能在如下几个地方修改,切记:
1. 构造函数;
2. 子类的reset函数(虚的那个)
2. 子类的reset函数;
3. on_msg(需要定义ASCS_SYNC_DISPATCH宏);
4. 定义ASCS_PASSIVE_RECV宏,并在调用(sync_)recv_msg之前修改。
不支持多线程一是为了效率,二是这个功能用得很少,三是可以通过在unpacker里面加标志位来达到同步的目的。
......@@ -70,26 +70,39 @@ public:
post一个异步调用到do_recv_msg()。
#endif
template<typename Buffer> bool send_msg(const Buffer& buffer, bool can_overflow);
bool send_msg(in_msg_type&& msg, bool can_overflow = false);
bool send_msg(in_msg_type&& msg1, in_msg_type&& msg2, bool can_overflow = false);
bool send_msg(typename Packer::container_type& msg_can, bool can_overflow = false);
bool send_msg(const char* pstr, size_t len, bool can_overflow);
bool send_msg(const char* const pstr[], const size_t len[], size_t num, bool can_overflow);
发送消息,前两个是helper函数,最后一个才是真正的发送消息(放入消息发送缓存);第一个调用第二个,第二个调用第三个。
template<typename Buffer> bool send_native_msg(const Buffer& buffer, bool can_overflow);
template<typename Buffer> bool send_msg(const Buffer& buffer, bool can_overflow = fase);
bool send_msg(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = fase);
发送消息,前三个会为每一个消息打一个只包涵头的包,另一个直接move到库的缓存里面,减少一次内存拷贝,
第四、五个是helper函数,第五个调用第四个,第四个调用第六个。
bool send_native_msg(in_msg_type&& msg, bool can_overflow = false);
bool send_native_msg(in_msg_type&& msg1, in_msg_type&& msg2, bool can_overflow = false);
bool send_native_msg(typename Packer::container_type& msg_can, bool can_overflow = false);
bool send_native_msg(const char* pstr, size_t len, bool can_overflow);
bool send_native_msg(const char* const pstr[], const size_t len[], size_t num, bool can_overflow);
同上,只是以native为true调用i_packer::pack_msg接口。
template<typename Buffer> bool send_native_msg(const Buffer& buffer, bool can_overflow = fase);
bool send_native_msg(const char* const pstr[], const size_t len[], size_t num, bool can_overflow = fase);
同上,只是以native为true调用i_packer::pack_msg接口,对于前三个,则干脆不调用i_packer::pack_msg函数。
template<typename Buffer> bool safe_send_msg(const Buffer& buffer, bool can_overflow);
bool safe_send_msg(in_msg_type&& msg, bool can_overflow = false);
bool safe_send_msg(in_msg_type&& msg1, in_msg_type&& msg2, bool can_overflow = false);
bool safe_send_msg(typename Packer::container_type& msg_can, bool can_overflow = false);
bool safe_send_msg(const char* pstr, size_t len, bool can_overflow);
template<typename Buffer> bool safe_send_msg(const Buffer& buffer, bool can_overflow);
bool safe_send_msg(const char* const pstr[], const size_t len[], size_t num, bool can_overflow);
同send_msg,只是在消息发送缓存溢出的时候会等待直到缓存可用;如果is_ready()返回false或者io_context已经停止,则马上放弃等待返回失败。
safe系列函数,在on_msg和om_msg_handle里面调用时需要特别谨慎,因为它会阻塞service线程。
template<typename Buffer> bool safe_send_native_msg(const Buffer& buffer, bool can_overflow);
bool safe_send_native_msg(in_msg_type&& msg, bool can_overflow = false);
bool safe_send_native_msg(in_msg_type&& msg1, in_msg_type&& msg2, bool can_overflow = false);
bool safe_send_native_msg(typename Packer::container_type& msg_can, bool can_overflow = false);
bool safe_send_native_msg(const char* pstr, size_t len, bool can_overflow);
template<typename Buffer> bool safe_send_native_msg(const Buffer& buffer, bool can_overflow);
bool safe_send_native_msg(const char* const pstr[], const size_t len[], size_t num, bool can_overflow);
同上,只是以native为true调用i_packer::pack_msg接口。
同上,只是以native为true调用i_packer::pack_msg接口,对于前三个,则干脆不调用i_packer::pack_msg函数
以上所有函数都有对应的一个以sync_开头的版本,表示同步发送,同时带一个超时间隔,为0则永远等待直到发送成功。
注意超时返回不代表是错误,库用异步写来模拟同步写,超时返回之后,消息仍然可以在将来的某个时候被成功发送,此时on_msg_send仍然会
......@@ -99,16 +112,18 @@ protected:
void force_shutdown();
void graceful_shutdown(bool sync);
第一个直接直接调用shutdown()。
第二个函数优雅关闭套接字,所谓优雅关闭,就是先关闭自己的数据发送,然后接收完遗留数据之后,才完全关闭套接字。当sync为假时,graceful_shutdown马上返回,
优雅关闭将在后台继承进行,当回调到on_recv_error的时候,关闭结束(有可能优雅关闭成功,有可能超时被强制关闭,超时由ASCS_GRACEFUL_SHUTDOWN_MAX_DURATION宏决定)。
第二个函数优雅关闭套接字,所谓优雅关闭,就是先关闭自己的数据发送,然后接收完遗留数据之后,才完全关闭套接字。当sync为假时,
graceful_shutdown马上返回,优雅关闭将在后台继承进行,当回调到on_recv_error的时候,关闭结束(有可能优雅关闭成功,有可能超时
被强制关闭,超时由ASCS_GRACEFUL_SHUTDOWN_MAX_DURATION宏决定)。
virtual bool do_start();
记录连接建立时间,更新连接状态为已连接,调用on_connect(),调用父类同名函数。
#ifdef ASCS_SYNC_SEND
virtual void on_send_error(const error_code& ec);
发送失败时回调。
virtual void on_close();
#endif
通知所有同步发送结束。
更新连接状态为断开状态,通知所有同步发送结束,调用父类同名函数。
virtual void on_connect();
连接建立后回调。
......@@ -117,7 +132,7 @@ virtual bool do_start();
解包出错时回调。
virtual void on_async_shutdown_error() = 0;
异步关闭连接失败时架设
异步关闭连接失败时回调
private:
#ifndef ASCS_PASSIVE_RECV
......@@ -137,7 +152,7 @@ post一个异步调用到do_send_msg()。
调用async_read。
void recv_handler(const error_code& ec, size_t bytes_transferred);
收到数据后asio回调。
收到消息后由asio回调。
bool do_send_msg(bool in_strand);
调用async_write。
......@@ -164,7 +179,7 @@ private:
#endif
std::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
list<typename super::in_msg> last_send_msg;
typename super::in_containter_type last_send_msg;
asio::io_context::strand strand;
};
......
......@@ -2,9 +2,9 @@
namespace ascs { namespace udp {
udp套接字类,实现udp数据的收发
template <typename Packer, typename Unpacker, typename Socket = asio::ip::udp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
template <typename Packer, typename Unpacker, typename Matrix = i_matrix, typename Socket = asio::ip::udp::socket,
template<typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class socket_base : public socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer>
{
public:
......@@ -15,6 +15,7 @@ public:
public:
socket_base(asio::io_context& io_context_);
socket_base(Matrix& matrix_);
public:
virtual bool is_ready();
......@@ -23,8 +24,15 @@ public:
virtual void send_heartbeat();
发送一个心跳包。
virtual const char* type_name() const;
返回 “UDP”。
virtual int type_id() const;
返回 0。
virtual void reset();
重置所有,object_pool在重用时会调用。socket_base的子类可重写它以重置自己的状态,记得最后需要调用本类的reset。
重置所有并调用同类同名函数,如果你在构造函数里面做了额外的配置工作,那么那些配置工作也需要在这里做,因为在对象
被重用时,不会再调用构造函数,而是这个reset函数。
void set_local_addr(unsigned short port, const std::string& ip = std::string());
const asio::ip::udp::endpoint& get_local_addr() const;
......@@ -32,7 +40,7 @@ public:
bool set_peer_addr(unsigned short port, const std::string& ip = std::string());
const asio::ip::udp::endpoint& get_peer_addr() const;
设置获取对端地址。
设置获取对端地址,发送消息时也可以指定对端地址,那时这里的对端地址将不使用
void disconnect();
void force_close();
......@@ -83,6 +91,15 @@ safe系列函数,在on_msg和om_msg_handle里面调用时需要特别谨慎,
以上所有函数都有同步发送版本,请参考tcp::socket_base。
protected:
void first_init(Matrix* matrix_ = nullptr);
构造时调用,仅仅是为了节省代码量而已,因为我们有两个构造函数都将调用它。
Matrix* get_matrix();
const Matrix* get_matrix() const;
virtual bool do_start();
创建套接字,绑定到指定端口,然后调用父类的同名函数。
virtual void on_recv_error(const error_code& ec);
接收消息出错时回调。
......@@ -92,18 +109,19 @@ protected:
#ifdef ASCS_SYNC_SEND
virtual void on_close();
#endif
通知所有同步发送结束。
通知所有同步发送结束,然后调用父类同名函数
private:
#ifndef ASCS_PASSIVE_RECV
virtual void recv_msg();
post一个异步调用到do_recv_msg()。
#endif
virtual void send_msg();
post一个异步调用到do_send_msg()。
void shutdown();
关闭套接字,停止所有定时器,直接派发所有剩余消息,重置所有状态(调用reset_state)
调用父类close函数
void do_recv_msg();
调用async_receive_from()。
......@@ -132,6 +150,9 @@ private:
using super::reading;
#endif
bool has_bound;
是否已经绑定到指定地址。
typename super::in_msg last_send_msg;
std::shared_ptr<i_unpacker<typename Unpacker::msg_type>> unpacker_;
asio::ip::udp::endpoint local_addr;
......@@ -141,6 +162,7 @@ private:
它只代表上一次接收udp消息时的对端地址,对于已经接收到的udp消息,对端地址保存在out_msg_type里面。
如果发送消息时不指定对端地址,则使用peer_addr。
Matrix* matrix;
asio::io_context::strand strand;
};
......
......@@ -94,8 +94,7 @@ protected:
#ifndef ASCS_DISPATCH_BATCH_MSG
last_dispatch_msg.clear();
#endif
in_container_type can;
pop_all_pending_send_msg(can);
send_msg_buffer.clear();
recv_msg_buffer.clear();
}
......
......@@ -28,8 +28,7 @@ private:
public:
server_socket_base(Server& server_) : super(server_.get_service_pump()), server(server_) {}
template<typename Arg>
server_socket_base(Server& server_, Arg& arg) : super(server_.get_service_pump(), arg), server(server_) {}
template<typename Arg> server_socket_base(Server& server_, Arg& arg) : super(server_.get_service_pump(), arg), server(server_) {}
virtual const char* type_name() const {return "TCP (server endpoint)";}
virtual int type_id() const {return 2;}
......
......@@ -194,19 +194,7 @@ private:
using super::do_direct_sync_send_msg;
#endif
void shutdown()
{
this->stop_all_timer();
close();
auto& lowest_object = this->lowest_layer();
if (lowest_object.is_open())
{
asio::error_code ec;
lowest_object.shutdown(asio::ip::udp::socket::shutdown_both, ec);
lowest_object.close(ec);
}
}
void shutdown() {close();}
void do_recv_msg()
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册