提交 1df8bec3 编写于 作者: Y youngwolf 提交者: youngwolf

Support call back registration.

上级 3dfd5c84
......@@ -38,6 +38,7 @@ public:
};
#include <ascs/ext/tcp.h>
#include <ascs/ext/socket.h>
using namespace ascs;
using namespace ascs::tcp;
using namespace ascs::ext;
......@@ -49,17 +50,8 @@ using namespace ascs::ext::tcp::proxy;
#define RECONNECT "reconnect"
#define STATISTIC "statistic"
//we only want close reconnecting mechanism on this socket, so we don't define macro ASCS_RECONNECT
class short_connection : public socks4::client_socket
{
public:
short_connection(i_matrix& matrix_) : socks4::client_socket(matrix_) {}
protected:
virtual void on_connect() {set_reconnect(false); client_socket::on_connect();} //close reconnecting mechanism
};
class short_client : public multi_client_base<short_connection>
//we only want close reconnecting mechanism on these sockets, so it cannot be done by defining macro ASCS_RECONNECT to false
class short_client : public multi_client_base<c_socket<socks4::client_socket>>
{
public:
short_client(service_pump& service_pump_) : multi_client_base(service_pump_) {set_server_addr(ASCS_SERVER_PORT);}
......@@ -74,6 +66,9 @@ public:
if (!socket_ptr)
return false;
//register event callback from outside of the socket, it also can be done from inside of the socket, see echo_server for more details
socket_ptr->register_on_connect([](socks4::client_socket* socket) {socket->set_reconnect(false);}, true); //close reconnection mechanism
//without following setting, socks4::client_socket will be downgraded to normal client_socket
//socket_ptr->set_target_addr(9527, "172.27.0.14"); //target server address, original server address becomes SOCK4 server address
return socket_ptr->send_msg(std::move(msg));
......
......@@ -48,6 +48,7 @@
//configuration
#include <ascs/ext/tcp.h>
#include <ascs/ext/socket.h>
using namespace ascs;
using namespace ascs::tcp;
using namespace ascs::ext;
......@@ -192,31 +193,34 @@ protected:
virtual bool on_accept(object_ctype& socket_ptr) {stop_listen(); return true;}
};
class short_connection : public server_socket_base<packer<>, unpacker<>>
class short_connection : public s_socket<server_socket_base<packer<>, unpacker<>>>
{
private:
typedef server_socket_base<ext::packer<>, ext::unpacker<>> super;
typedef s_socket<server_socket_base<ext::packer<>, ext::unpacker<>>> super;
public:
short_connection(i_server& server_) : super(server_) {}
protected:
//msg handling
short_connection(i_server& server_) : super(server_)
{
//register msg handling from inside of the socket, it also can be done from outside of the socket, see client for more details
//since we're in the socket, so the 'raw_socket* socket' actually is 'this'
//in xxxx callback, do not call super::xxxx, call raw_socket::xxxx instead, otherwise, dead loop will occur.
//in this demo, the raw_socket is 'server_socket_base<ext::packer<>, ext::unpacker<>>', it is defined by the s_socket.
#ifdef ASCS_SYNC_DISPATCH
//do not hold msg_can for further usage, return from on_msg as quickly as possible
//access msg_can freely within this callback, it's always thread safe.
virtual size_t on_msg(std::list<out_msg_type>& msg_can) {auto re = super::on_msg(msg_can); force_shutdown(); return re;}
//do not hold msg_can for further usage, return from on_msg as quickly as possible
//access msg_can freely within this callback, it's always thread safe.
register_on_msg([this](raw_socket* socket, std::list<out_msg_type>& msg_can) {auto re = raw_socket::on_msg(msg_can); socket->force_shutdown(); return re;});
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
//do not hold msg_can for further usage, access msg_can and return from on_msg_handle as quickly as possible
//can only access msg_can via functions that marked as 'thread safe', if you used non-lock queue, its your responsibility to guarantee
// that new messages will not come until we returned from this callback (for example, pingpong test).
virtual size_t on_msg_handle(out_queue_type& msg_can) {auto re = super::on_msg_handle(msg_can); force_shutdown(); return re;}
//do not hold msg_can for further usage, access msg_can and return from on_msg_handle as quickly as possible
//can only access msg_can via functions that marked as 'thread safe', if you used non-lock queue, its your responsibility to guarantee
// that new messages will not come until we returned from this callback (for example, pingpong test).
register_on_msg_handle([this](raw_socket* socket, out_queue_type& msg_can) {auto re = raw_socket::on_msg_handle(msg_can); socket->force_shutdown(); return re;});
#else
virtual bool on_msg_handle(out_msg_type& msg) {auto re = super::on_msg_handle(msg); force_shutdown(); return re;}
register_on_msg_handle([this](raw_socket* socket, out_msg_type& msg) {auto re = raw_socket::on_msg_handle(msg); socket->force_shutdown(); return re;});
#endif
//msg handling end
//register msg handling end
}
};
void dump_io_context_refs(service_pump& sp)
......
......@@ -811,9 +811,11 @@
* Graceful shutdown does not support sync mode anymore.
* Introduce memory fence to synchronize socket's status -- sending and reading, dispatch_strand is not enough, except post_strand,
* but dispatch_strand is more efficient than post_strand in specific circumstances.
* stop_timer will also invalidate cumulative timers.
*
* HIGHLIGHT:
* Make shutdown thread safe.
* Support call back registration, then we can get event notify and customize our socket without inhirt ascs socket and overwrite its virtual functions.
*
* FIX:
* Fix alias for tcp and ssl.
......@@ -824,6 +826,7 @@
* pop_first/all_pending_send_msg and pop_first/all_pending_recv_msg.
* socket::send_msg().
* Fix -- basic_buffer doesn't support fixed arrays (just in the constructor) in GCC and Clang.
* Fix ssl graceful shutdown monitoring.
*
* ENHANCEMENTS:
* heartbeat(ext) optimization.
......@@ -833,6 +836,8 @@
* ascs needs the container's empty() function (used by the queue for message sending and receiving) to be thread safe (here, thread safe does not means
* correctness, but just no memory access violation), almost all implementations of list::empty() in the world are thread safe, but not for list::size().
* if your container's empty() function is not thread safe, please define macro ASCS_CAN_EMPTY_NOT_SAFE, then ascs will make it thread safe for you.
* Maintain the ssl::context in ssl::single_client_base.
* Make function endpoint_to_string static.
*
* DELETION:
*
......
......@@ -30,8 +30,11 @@
namespace ascs { namespace ext { namespace udp {
typedef ascs::udp::reliable_socket_base<ASCS_DEFAULT_PACKER, ASCS_DEFAULT_UDP_UNPACKER> reliable_socket;
template<typename Matrix = i_matrix> using reliable_socket2 = ascs::udp::reliable_socket_base<ASCS_DEFAULT_PACKER, ASCS_DEFAULT_UDP_UNPACKER, Matrix>;
typedef ascs::udp::single_socket_service_base<reliable_socket> single_reliable_socket_service;
typedef ascs::udp::multi_socket_service_base<reliable_socket> multi_reliable_socket_service;
template<typename Socket, typename Matrix = i_matrix> using multi_reliable_socket_service2 = ascs::udp::multi_socket_service_base<Socket, object_pool<Socket>, Matrix>;
typedef multi_reliable_socket_service reliable_socket_service;
}}} //namespace
......
/*
* socket.h
*
* Created on: 2023-1-1
* Author: youngwolf
* email: mail2tao@163.com
* QQ: 676218192
* Community on QQ: 198941541
*
* customize socket by event registration instead of overwrite virtual functions.
*/
#ifndef _ASCS_EXT_SOCKET_H_
#define _ASCS_EXT_SOCKET_H_
#include <functional>
#include "../base.h"
namespace ascs { namespace ext {
#define call_cb_void(fun) virtual void fun() {if (cb_##fun.first) cb_##fun.first(this); if (cb_##fun.second) Socket::fun();}
#define call_cb_1_void(fun, p) {if (cb_##fun.first) cb_##fun.first(this, p); if (cb_##fun.second) Socket::fun(p);}
#define call_cb_2_void(fun, p1, p2) {if (cb_##fun.first) cb_##fun.first(this, p1, p2); if (cb_##fun.second) Socket::fun(p1, p2);}
#define call_cb_combine(fun) virtual bool fun() {auto re = cb_##fun.first ? cb_##fun.first(this) : true; if (re && cb_##fun.second) re = Socket::fun(); return re;}
#define call_cb_1_return(init, fun, p) {auto re = init; if (cb_##fun.first) re = cb_##fun.first(this, p); if (cb_##fun.second) re = Socket::fun(p); return re;}
#define register_cb(fun, init) \
template<typename CallBack> void register_##fun(CallBack&& cb, bool pass_on = init) {cb_##fun.first = std::forward<CallBack>(cb); cb_##fun.second = pass_on;}
template<typename Socket> class g_socket : public Socket //udp socket will use g_socket only
{
public:
typedef Socket raw_socket;
public:
template<typename Arg> g_socket(Arg& arg) : Socket(arg) {first_init();}
template<typename Arg1, typename Arg2> g_socket(Arg1& arg1, Arg2&& arg2) : Socket(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(obsoleted, true)
register_cb(is_ready, true)
register_cb(send_heartbeat, false)
register_cb(reset, true)
register_cb(on_heartbeat_error, true)
register_cb(on_send_error, true)
register_cb(on_recv_error, true)
register_cb(on_close, true)
register_cb(after_close, true)
#ifdef ASCS_SYNC_DISPATCH
register_cb(on_msg, false)
#endif
register_cb(on_msg_handle, false)
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
register_cb(on_msg_send, false)
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
register_cb(on_all_msg_send, false)
#endif
#ifdef ASCS_SHRINK_SEND_BUFFER
register_cb(calc_shrink_size, false)
register_cb(on_msg_discard, false)
#endif
public:
call_cb_combine(obsoleted)
call_cb_combine(is_ready)
call_cb_void(send_heartbeat)
call_cb_void(reset)
protected:
call_cb_combine(on_heartbeat_error)
virtual void on_send_error(const asio::error_code& ec, typename Socket::in_container_type& msg_can) call_cb_2_void(on_send_error, ec, msg_can)
virtual void on_recv_error(const asio::error_code& ec) call_cb_1_void(on_recv_error, ec)
call_cb_void(on_close)
call_cb_void(after_close)
#ifdef ASCS_SYNC_DISPATCH
virtual size_t on_msg(std::list<typename Socket::out_msg_type>& msg_can) call_cb_1_return((size_t) 0, on_msg, msg_can)
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
virtual size_t on_msg_handle(typename Socket::out_queue_type& msg_can) call_cb_1_return((size_t) 0, on_msg_handle, msg_can)
#else
virtual bool on_msg_handle(typename Socket::out_msg_type& msg) call_cb_1_return(false, on_msg_handle, msg)
#endif
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
virtual void on_msg_send(typename Socket::in_msg_type& msg) call_cb_1_void(on_msg_send, msg)
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
virtual void on_all_msg_send(typename Socket::in_msg_type& msg) call_cb_1_void(on_all_msg_send, msg)
#endif
#ifdef ASCS_SHRINK_SEND_BUFFER
virtual size_t calc_shrink_size(size_t current_size) call_cb_1_return((size_t) 0, calc_shrink_size, current_size)
virtual void on_msg_discard(typename Socket::in_container_type& msg_can) call_cb_1_void(on_msg_discard, msg_can)
#endif
private:
void first_init()
{
cb_obsoleted.second = true;
cb_is_ready.second = true;
cb_send_heartbeat.second = true;
cb_reset.second = true;
cb_on_heartbeat_error.second = true;
cb_on_send_error.second = true;
cb_on_recv_error.second = true;
cb_on_close.second = true;
cb_after_close.second = true;
#ifdef ASCS_SYNC_DISPATCH
cb_on_msg.second = true;
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
cb_on_msg_handle.second = true;
#else
cb_on_msg_handle.second = true;
#endif
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
cb_on_msg_send.second = true;
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
cb_on_all_msg_send.second = true;
#endif
#ifdef ASCS_SHRINK_SEND_BUFFER
cb_calc_shrink_size.second = true;
cb_on_msg_discard.second = true;
#endif
}
private:
std::pair<std::function<bool(Socket*)>, bool> cb_obsoleted;
std::pair<std::function<bool(Socket*)>, bool> cb_is_ready;
std::pair<std::function<void(Socket*)>, bool> cb_send_heartbeat;
std::pair<std::function<void(Socket*)>, bool> cb_reset;
std::pair<std::function<bool(Socket*)>, bool> cb_on_heartbeat_error;
std::pair<std::function<void(Socket*, const asio::error_code&, typename Socket::in_container_type&)>, bool> cb_on_send_error;
std::pair<std::function<void(Socket*, const asio::error_code&)>, bool> cb_on_recv_error;
std::pair<std::function<void(Socket*)>, bool> cb_on_close;
std::pair<std::function<void(Socket*)>, bool> cb_after_close;
#ifdef ASCS_SYNC_DISPATCH
std::pair<std::function<size_t(Socket*, std::list<typename Socket::out_msg_type>&)>, bool> cb_on_msg;
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
std::pair<std::function<size_t(Socket*, typename Socket::out_queue_type&)>, bool> cb_on_msg_handle;
#else
std::pair<std::function<bool(Socket*, typename Socket::out_msg_type&)>, bool> cb_on_msg_handle;
#endif
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
std::pair<std::function<void(Socket*, typename Socket::in_msg_type&)>, bool> cb_on_msg_send;
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
std::pair<std::function<void(Socket*, typename Socket::in_msg_type&)>, bool> cb_on_all_msg_send;
#endif
#ifdef ASCS_SHRINK_SEND_BUFFER
std::pair<std::function<size_t(Socket*, size_t)>, bool> cb_calc_shrink_size;
std::pair<std::function<void(Socket*, typename Socket::in_container_type&)>, bool> cb_on_msg_discard;
#endif
};
template<typename Socket> class tcp_socket : public g_socket<Socket>
{
public:
template<typename Arg> tcp_socket(Arg& arg) : g_socket<Socket>(arg) {first_init();}
template<typename Arg1, typename Arg2> tcp_socket(Arg1& arg1, Arg2&& arg2) : g_socket<Socket>(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(on_connect, false)
register_cb(on_unpack_error, true)
register_cb(on_async_shutdown_error, true)
protected:
call_cb_void(on_connect)
call_cb_void(on_unpack_error)
call_cb_void(on_async_shutdown_error)
private:
void first_init()
{
cb_on_connect.second = true;
cb_on_unpack_error.second = true;
cb_on_async_shutdown_error.second = true;
}
private:
std::pair<std::function<void(Socket*)>, bool> cb_on_connect;
std::pair<std::function<void(Socket*)>, bool> cb_on_unpack_error;
std::pair<std::function<void(Socket*)>, bool> cb_on_async_shutdown_error;
};
template<typename Socket> class c_socket : public tcp_socket<Socket> //for client socket
{
public:
template<typename Arg> c_socket(Arg& arg) : tcp_socket<Socket>(arg) {first_init();}
template<typename Arg1, typename Arg2> c_socket(Arg1& arg1, Arg2&& arg2) : tcp_socket<Socket>(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(prepare_reconnect, false)
protected:
virtual int prepare_reconnect(const asio::error_code& ec) call_cb_1_return(0, prepare_reconnect, ec)
private:
void first_init() {cb_prepare_reconnect.second = true;}
private:
std::pair<std::function<int(Socket*, const asio::error_code&)>, bool> cb_prepare_reconnect;
};
template<typename Socket> class s_socket : public tcp_socket<Socket> //for server socket
{
public:
template<typename Arg> s_socket(Arg& arg) : tcp_socket<Socket>(arg) {first_init();}
template<typename Arg1, typename Arg2> s_socket(Arg1& arg1, Arg2&& arg2) : tcp_socket<Socket>(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(take_over, false)
public:
virtual void take_over(std::shared_ptr<Socket> socket_ptr) call_cb_1_void(take_over, socket_ptr)
private:
void first_init() {cb_take_over.second = true;}
private:
std::pair<std::function<void(Socket*, std::shared_ptr<Socket>)>, bool> cb_take_over;
};
}} //namespace
#endif /* _ASCS_EXT_SOCKET_H_ */
......@@ -364,7 +364,7 @@ private:
else if (!ec)
{
assert(false);
unified_out::error_out(ASCS_LLF " read 0 byte without any errors which is unexpected, please check your unpacker!", this->id());
unified_out::error_out(ASCS_LLF " read 0 byte without any errors is unexpected, please check your unpacker!", this->id());
}
if (ec)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册