提交 2adb7180 编写于 作者: Y youngwolf

Event callback support raw functions.

上级 b40c9146
......@@ -51,6 +51,7 @@ using namespace ascs::ext::tcp::proxy;
#define STATISTIC "statistic"
//we only want close reconnecting mechanism on these sockets, so it cannot be done by defining macro ASCS_RECONNECT to false
///*
//method 1
class short_client : public multi_client_base<callbacks::c_socket<socks4::client_socket>>
{
......@@ -79,20 +80,39 @@ private:
unsigned short port;
std::string ip;
};
//*/
//method 2
//class short_client : public multi_client_base<socks4::client_socket, callbacks::object_pool<object_pool<socks4::client_socket>>>
//{
//public:
// we now can not call register_on_connect on socket_ptr, since it's not wrapped by callbacks::c_socket
// socket_ptr->register_on_connect([](socks4::client_socket* socket) {socket->set_reconnect(false);}, true); //close reconnection mechanism
//};
//
//but now, we're able to call register_on_create on client2, since its object pool is wrapped by callbacks::object_pool
//short_client client2(...);
//client2.register_on_create([](object_pool<socks4::client_socket>*, object_pool<socks4::client_socket>::object_ctype& object_ptr) {
// object_ptr->set_reconnect(false); //close reconnection mechanism
//});
/*
class short_client : public multi_client_base<socks4::client_socket, callbacks::object_pool<object_pool<socks4::client_socket>>>
{
public:
short_client(service_pump& service_pump_) : multi_client_base(service_pump_) {set_server_addr(ASCS_SERVER_PORT);}
void set_server_addr(unsigned short _port, const std::string& _ip = ASCS_SERVER_IP) {port = _port; ip = _ip;}
bool send_msg(const std::string& msg) {return send_msg(std::string(msg), port, ip);}
bool send_msg(std::string&& msg) {return send_msg(std::move(msg), port, ip);}
bool send_msg(const std::string& msg, unsigned short port, const std::string& ip) {return send_msg(std::string(msg), port, ip);}
bool send_msg(std::string&& msg, unsigned short port, const std::string& ip)
{
auto socket_ptr = add_socket(port, ip);
if (!socket_ptr)
return false;
//we now can not call register_on_connect on socket_ptr, since socket_ptr is not wrapped by callbacks::c_socket
//register event callback from outside of the socket, it also can be done from inside of the socket, see echo_server for more details
//socket_ptr->register_on_connect([](socks4::client_socket* socket) {socket->set_reconnect(false);}, true); //close reconnection mechanism
//without following setting, socks4::client_socket will be downgraded to normal client_socket
//socket_ptr->set_target_addr(9527, "172.27.0.14"); //target server address, original server address becomes SOCK4 server address
return socket_ptr->send_msg(std::move(msg));
}
private:
unsigned short port;
std::string ip;
};
void on_create(object_pool<socks4::client_socket>* op, object_pool<socks4::client_socket>::object_ctype& socket_ptr) {socket_ptr->set_reconnect(false);}
*/
std::thread create_sync_recv_thread(client_socket& client)
{
......@@ -124,6 +144,16 @@ int main(int argc, const char* argv[])
single_service_pump<socks5::single_client> client;
//singel_service_pump also is a service_pump, this let us to control client2 via client
short_client client2(client); //without single_client, we need to define ASCS_AVOID_AUTO_STOP_SERVICE macro to forbid service_pump stopping services automatically
//method 2
/*
//close reconnection mechanism, 3 approaches
client2.register_on_create([](object_pool<socks4::client_socket>*, object_pool<socks4::client_socket>::object_ctype& socket_ptr) {
socket_ptr->set_reconnect(false);
});
client2.register_on_create(std::bind(on_create, std::placeholders::_1, std::placeholders::_2));
client2.register_on_create(on_create);
*/
//method 2
// argv[2] = "::1" //ipv6
// argv[2] = "127.0.0.1" //ipv4
......
......@@ -29,37 +29,77 @@ namespace ascs { namespace ext { namespace callbacks {
#define call_cb_1_return(super, type, fun, p) {auto re = type(); if (cb_##fun.first) re = cb_##fun.first(this, p); if (cb_##fun.second) re = super::fun(p); return re;}
#define call_cb_2_combine(super, fun, p1, p2) {auto re = cb_##fun.first ? cb_##fun.first(this, p1, p2) : true; if (re && cb_##fun.second) re = super::fun(p1, p2); return re;}
#define register_cb(fun, init) \
template<typename CallBack> void register_##fun(CallBack&& cb, bool pass_on = init) {cb_##fun.first = std::forward<CallBack>(cb); cb_##fun.second = pass_on;}
#define register_cb_1(fun, init) \
template<typename CallBack> void register_##fun(CallBack&& cb, bool pass_on = init) {cb_##fun.first = std::forward<CallBack>(cb); cb_##fun.second = pass_on;} \
void register_##fun(fo_##fun* cb, bool pass_on = init) {cb_##fun.first = std::bind(cb, std::placeholders::_1); cb_##fun.second = pass_on;}
#define register_cb_2(fun, init) \
template<typename CallBack> void register_##fun(CallBack&& cb, bool pass_on = init) {cb_##fun.first = std::forward<CallBack>(cb); cb_##fun.second = pass_on;} \
void register_##fun(fo_##fun* cb, bool pass_on = init) {cb_##fun.first = std::bind(cb, std::placeholders::_1, std::placeholders::_2); cb_##fun.second = pass_on;}
#define register_cb_3(fun, init) \
template<typename CallBack> void register_##fun(CallBack&& cb, bool pass_on = init) {cb_##fun.first = std::forward<CallBack>(cb); cb_##fun.second = pass_on;} \
void register_##fun(fo_##fun* cb, bool pass_on = init) \
{cb_##fun.first = std::bind(cb, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); cb_##fun.second = pass_on;}
template<typename Socket> class g_socket : public Socket //udp socket will use g_socket only
{
public:
typedef bool fo_obsoleted(Socket*);
typedef bool fo_is_ready(Socket*);
typedef void fo_send_heartbeat(Socket*);
typedef void fo_reset(Socket*);
typedef bool fo_on_heartbeat_error(Socket*);
typedef void fo_on_send_error(Socket*, const asio::error_code&, typename Socket::in_container_type&);
typedef void fo_on_recv_error(Socket*, const asio::error_code&);
typedef void fo_on_close(Socket*);
typedef void fo_after_close(Socket*);
#ifdef ASCS_SYNC_DISPATCH
typedef size_t fo_on_msg(Socket*, std::list<typename Socket::out_msg_type>&);
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
typedef size_t fo_on_msg_handle(Socket*, typename Socket::out_queue_type&);
#else
typedef bool fo_on_msg_handle(Socket*, typename Socket::out_msg_type&);
#endif
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
typedef void fo_on_msg_send(Socket*, typename Socket::in_msg_type&);
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
typedef void fo_on_all_msg_send(Socket*, typename Socket::in_msg_type&);
#endif
#ifdef ASCS_SHRINK_SEND_BUFFER
typedef size_t fo_calc_shrink_size(Socket*, size_t);
typedef void fo_on_msg_discard(Socket*, typename Socket::in_container_type&);
#endif
public:
template<typename Arg> g_socket(Arg& arg) : Socket(arg) {first_init();}
template<typename Arg1, typename Arg2> g_socket(Arg1& arg1, Arg2&& arg2) : Socket(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(obsoleted, true)
register_cb(is_ready, true)
register_cb(send_heartbeat, false)
register_cb(reset, true)
register_cb(on_heartbeat_error, true)
register_cb(on_send_error, true)
register_cb(on_recv_error, true)
register_cb(on_close, true)
register_cb(after_close, true)
register_cb_1(obsoleted, true)
register_cb_1(is_ready, true)
register_cb_1(send_heartbeat, false)
register_cb_1(reset, true)
register_cb_1(on_heartbeat_error, true)
register_cb_3(on_send_error, true)
register_cb_2(on_recv_error, true)
register_cb_1(on_close, true)
register_cb_1(after_close, true)
#ifdef ASCS_SYNC_DISPATCH
register_cb(on_msg, false)
register_cb_2(on_msg, false)
#endif
register_cb(on_msg_handle, false)
register_cb_2(on_msg_handle, false)
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
register_cb(on_msg_send, false)
register_cb_2(on_msg_send, false)
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
register_cb(on_all_msg_send, false)
register_cb_2(on_all_msg_send, false)
#endif
#ifdef ASCS_SHRINK_SEND_BUFFER
register_cb(calc_shrink_size, false)
register_cb(on_msg_discard, false)
register_cb_2(calc_shrink_size, false)
register_cb_2(on_msg_discard, false)
#endif
public:
......@@ -131,47 +171,47 @@ private:
}
private:
std::pair<std::function<bool(Socket*)>, bool> cb_obsoleted;
std::pair<std::function<bool(Socket*)>, bool> cb_is_ready;
std::pair<std::function<void(Socket*)>, bool> cb_send_heartbeat;
std::pair<std::function<void(Socket*)>, bool> cb_reset;
std::pair<std::function<bool(Socket*)>, bool> cb_on_heartbeat_error;
std::pair<std::function<void(Socket*, const asio::error_code&, typename Socket::in_container_type&)>, bool> cb_on_send_error;
std::pair<std::function<void(Socket*, const asio::error_code&)>, bool> cb_on_recv_error;
std::pair<std::function<void(Socket*)>, bool> cb_on_close;
std::pair<std::function<void(Socket*)>, bool> cb_after_close;
std::pair<std::function<fo_obsoleted>, bool> cb_obsoleted;
std::pair<std::function<fo_is_ready>, bool> cb_is_ready;
std::pair<std::function<fo_send_heartbeat>, bool> cb_send_heartbeat;
std::pair<std::function<fo_reset>, bool> cb_reset;
std::pair<std::function<fo_on_heartbeat_error>, bool> cb_on_heartbeat_error;
std::pair<std::function<fo_on_send_error>, bool> cb_on_send_error;
std::pair<std::function<fo_on_recv_error>, bool> cb_on_recv_error;
std::pair<std::function<fo_on_close>, bool> cb_on_close;
std::pair<std::function<fo_after_close>, bool> cb_after_close;
#ifdef ASCS_SYNC_DISPATCH
std::pair<std::function<size_t(Socket*, std::list<typename Socket::out_msg_type>&)>, bool> cb_on_msg;
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
std::pair<std::function<size_t(Socket*, typename Socket::out_queue_type&)>, bool> cb_on_msg_handle;
#else
std::pair<std::function<bool(Socket*, typename Socket::out_msg_type&)>, bool> cb_on_msg_handle;
std::pair<std::function<fo_on_msg>, bool> cb_on_msg;
#endif
std::pair<std::function<fo_on_msg_handle>, bool> cb_on_msg_handle;
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
std::pair<std::function<void(Socket*, typename Socket::in_msg_type&)>, bool> cb_on_msg_send;
std::pair<std::function<fo_on_msg_send>, bool> cb_on_msg_send;
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
std::pair<std::function<void(Socket*, typename Socket::in_msg_type&)>, bool> cb_on_all_msg_send;
std::pair<std::function<fo_on_all_msg_send>, bool> cb_on_all_msg_send;
#endif
#ifdef ASCS_SHRINK_SEND_BUFFER
std::pair<std::function<size_t(Socket*, size_t)>, bool> cb_calc_shrink_size;
std::pair<std::function<void(Socket*, typename Socket::in_container_type&)>, bool> cb_on_msg_discard;
std::pair<std::function<fo_calc_shrink_size>, bool> cb_calc_shrink_size;
std::pair<std::function<fo_on_msg_discard>, bool> cb_on_msg_discard;
#endif
};
template<typename Socket> class tcp_socket : public g_socket<Socket>
{
public:
typedef void fo_on_connect(Socket*);
typedef void fo_on_unpack_error(Socket*);
typedef void fo_on_async_shutdown_error(Socket*);
public:
template<typename Arg> tcp_socket(Arg& arg) : g_socket<Socket>(arg) {first_init();}
template<typename Arg1, typename Arg2> tcp_socket(Arg1& arg1, Arg2&& arg2) : g_socket<Socket>(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(on_connect, false)
register_cb(on_unpack_error, true)
register_cb(on_async_shutdown_error, true)
register_cb_1(on_connect, false)
register_cb_1(on_unpack_error, true)
register_cb_1(on_async_shutdown_error, true)
private:
call_cb_void(Socket, on_connect)
......@@ -186,18 +226,21 @@ private:
}
private:
std::pair<std::function<void(Socket*)>, bool> cb_on_connect;
std::pair<std::function<void(Socket*)>, bool> cb_on_unpack_error;
std::pair<std::function<void(Socket*)>, bool> cb_on_async_shutdown_error;
std::pair<std::function<fo_on_connect>, bool> cb_on_connect;
std::pair<std::function<fo_on_unpack_error>, bool> cb_on_unpack_error;
std::pair<std::function<fo_on_async_shutdown_error>, bool> cb_on_async_shutdown_error;
};
template<typename Socket> class c_socket : public tcp_socket<Socket> //for client socket
{
public:
typedef int fo_prepare_reconnect(Socket*, const asio::error_code&);
public:
template<typename Arg> c_socket(Arg& arg) : tcp_socket<Socket>(arg) {first_init();}
template<typename Arg1, typename Arg2> c_socket(Arg1& arg1, Arg2&& arg2) : tcp_socket<Socket>(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(prepare_reconnect, false)
register_cb_2(prepare_reconnect, false)
private:
virtual int prepare_reconnect(const asio::error_code& ec) call_cb_1_return(Socket, int, prepare_reconnect, ec)
......@@ -205,16 +248,19 @@ private:
void first_init() {cb_prepare_reconnect.second = true;}
private:
std::pair<std::function<int(Socket*, const asio::error_code&)>, bool> cb_prepare_reconnect;
std::pair<std::function<fo_prepare_reconnect>, bool> cb_prepare_reconnect;
};
template<typename Socket> class s_socket : public tcp_socket<Socket> //for server socket
{
public:
typedef void fo_take_over(Socket*, std::shared_ptr<typename Socket::type_of_object_restore>);
public:
template<typename Arg> s_socket(Arg& arg) : tcp_socket<Socket>(arg) {first_init();}
template<typename Arg1, typename Arg2> s_socket(Arg1& arg1, Arg2&& arg2) : tcp_socket<Socket>(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(take_over, false)
register_cb_2(take_over, false)
public:
virtual void take_over(std::shared_ptr<typename Socket::type_of_object_restore> socket_ptr) call_cb_1_void(Socket, take_over, socket_ptr)
......@@ -223,19 +269,25 @@ private:
void first_init() {cb_take_over.second = true;}
private:
std::pair<std::function<void(Socket*, std::shared_ptr<typename Socket::type_of_object_restore>)>, bool> cb_take_over;
std::pair<std::function<fo_take_over>, bool> cb_take_over;
};
template<typename Server> class server : public Server //for server
{
public:
typedef int fo_async_accept_num(Server*);
typedef void fo_start_next_accept(Server*);
typedef bool fo_on_accept(Server*, typename Server::object_ctype&);
typedef bool fo_on_accept_error(Server*, const asio::error_code&, typename Server::object_ctype&);
public:
template<typename Arg> server(Arg& arg) : Server(arg) {first_init();}
template<typename Arg1, typename Arg2> server(Arg1& arg1, Arg2&& arg2) : Server(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(async_accept_num, false)
register_cb(start_next_accept, true)
register_cb(on_accept, true)
register_cb(on_accept_error, true)
register_cb_1(async_accept_num, false)
register_cb_1(start_next_accept, true)
register_cb_2(on_accept, true)
register_cb_3(on_accept_error, true)
private:
call_cb_return(Server, int, async_accept_num)
......@@ -252,18 +304,21 @@ private:
}
private:
std::pair<std::function<int(Server*)>, bool> cb_async_accept_num;
std::pair<std::function<void(Server*)>, bool> cb_start_next_accept;
std::pair<std::function<bool(Server*, typename Server::object_ctype&)>, bool> cb_on_accept;
std::pair<std::function<bool(Server*, const asio::error_code&, typename Server::object_ctype&)>, bool> cb_on_accept_error;
std::pair<std::function<fo_async_accept_num>, bool> cb_async_accept_num;
std::pair<std::function<fo_start_next_accept>, bool> cb_start_next_accept;
std::pair<std::function<fo_on_accept>, bool> cb_on_accept;
std::pair<std::function<fo_on_accept_error>, bool> cb_on_accept_error;
};
template<typename ObjectPool> class object_pool : public ObjectPool
{
public:
typedef void fo_on_create(ObjectPool*, typename ObjectPool::object_ctype&);
public:
template<typename Arg> object_pool(Arg& arg) : ObjectPool(arg) {first_init();}
register_cb(on_create, false)
register_cb_2(on_create, false)
private:
virtual void on_create(typename ObjectPool::object_ctype& object_ptr) call_cb_1_void(ObjectPool, on_create, object_ptr)
......@@ -271,7 +326,7 @@ private:
void first_init() {cb_on_create.second = true;}
private:
std::pair<std::function<void(ObjectPool*, typename ObjectPool::object_ctype&)>, bool> cb_on_create;
std::pair<std::function<fo_on_create>, bool> cb_on_create;
};
}}} //namespace
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册