提交 f79297d8 编写于 作者: Y youngwolf 提交者: youngwolf

Support one io_context per one thread.

上级 d777b489
Subproject commit 57577c6db46a4e2de5351af2b185bf52696699a9
Subproject commit d038fb3c2fb56fb91ff1d17b0715cff7887aa09e
......@@ -92,11 +92,7 @@ protected:
//msg handling end
private:
void handle_msg(out_msg_type& msg)
{
if (check_delay(true))
direct_send_msg(std::move(msg), true);
}
void handle_msg(out_msg_type& msg) {if (check_delay(true)) direct_send_msg(std::move(msg), true);}
private:
float max_delay;
......
......@@ -5,6 +5,7 @@
#define ASCS_SERVER_PORT 9527
#define ASCS_REUSE_OBJECT //use objects pool
#define ASCS_MAX_OBJECT_NUM 1024
#define ASCS_AVOID_AUTO_STOP_SERVICE
//configuration
#include <ascs/ext/tcp.h>
......@@ -111,6 +112,12 @@ int main(int argc, const char* argv[])
"type " QUIT_COMMAND " to end.");
service_pump sp;
#ifndef ASCS_DECREASE_THREAD_AT_RUNTIME
//if you want to decrease service thread at runtime, then you cannot use multiple io_context, if somebody indeed needs it, please let me know.
//with multiple io_context, the number of service thread must be bigger than or equal to the number of io_context, please note.
//with multiple io_context, please also define macro ASCS_AVOID_AUTO_STOP_SERVICE.
sp.set_io_context_num(8);
#endif
server_base<echo_socket, timed_object_pool<object_pool<echo_socket>>> echo_server(sp);
server_base<echo_stream_socket, timed_object_pool<object_pool<echo_stream_socket>>> echo_stream_server(sp);
single_udp_service udp_service(sp);
......
......@@ -411,6 +411,12 @@ int main(int argc, const char* argv[])
///////////////////////////////////////////////////////////
service_pump sp;
#ifndef ASCS_DECREASE_THREAD_AT_RUNTIME
//if you want to decrease service thread at runtime, then you cannot use multiple io_context, if somebody indeed needs it, please let me know.
//with multiple io_context, the number of service thread must be bigger than or equal to the number of io_context, please note.
//with multiple io_context, please also define macro ASCS_AVOID_AUTO_STOP_SERVICE.
sp.set_io_context_num(4);
#endif
echo_client client(sp);
//echo client means to cooperate with echo server while doing performance test, it will not send msgs back as echo server does,
//otherwise, dead loop will occur, network resource will be exhausted.
......@@ -468,8 +474,10 @@ int main(int argc, const char* argv[])
client.list_all_object();
else if (INCREASE_THREAD == str)
sp.add_service_thread(1);
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
else if (DECREASE_THREAD == str)
sp.del_service_thread(1);
#endif
else if (is_testing)
puts("testing has not finished yet!");
else if (QUIT_COMMAND == str)
......
......@@ -228,6 +228,12 @@ int main(int argc, const char* argv[])
puts("type " QUIT_COMMAND " to end.");
service_pump sp;
#ifndef ASCS_DECREASE_THREAD_AT_RUNTIME
//if you want to decrease service thread at runtime, then you cannot use multiple io_context, if somebody indeed needs it, please let me know.
//with multiple io_context, the number of service thread must be bigger than or equal to the number of io_context, please note.
//with multiple io_context, please also define macro ASCS_AVOID_AUTO_STOP_SERVICE.
sp.set_io_context_num(4);
#endif
echo_server echo_server_(sp); //echo server
//demonstrate how to use singel_service
......@@ -298,8 +304,10 @@ int main(int argc, const char* argv[])
}
else if (INCREASE_THREAD == str)
sp.add_service_thread(1);
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
else if (DECREASE_THREAD == str)
sp.del_service_thread(1);
#endif
else
{
// /*
......
......@@ -32,6 +32,12 @@ int main(int argc, const char* argv[])
puts("type " QUIT_COMMAND " to end.");
service_pump sp;
#ifndef ASCS_DECREASE_THREAD_AT_RUNTIME
//if you want to decrease service thread at runtime, then you cannot use multiple io_context, if somebody indeed needs it, please let me know.
//with multiple io_context, the number of service thread must be bigger than or equal to the number of io_context, please note.
//with multiple io_context, please also define macro ASCS_AVOID_AUTO_STOP_SERVICE.
sp.set_io_context_num(4);
#endif
file_client client(sp);
if (argc > 3)
......
......@@ -5,9 +5,9 @@
#define ASCS_DEFAULT_PACKER packer2<>
//#define ASCS_RECV_BUFFER_TYPE std::vector<asio::mutable_buffer> //scatter-gather buffer, it's very useful under certain situations (for example, ring buffer).
//#define ASCS_SCATTERED_RECV_BUFFER //used by unpackers, not belongs to ascs
//note, these two macro are not requisite, i'm just showing how to use them.
//note, these two macro are not requisite, I'm just showing how to use them.
//all other definitions are in the makefile, because we have two cpp files, defining them in more than one place is riskful (
//all other definitions are in the makefile, because we have two cpp files, defining them in more than one place is risky (
// we may define them to different values between the two cpp files)
//configuration
......@@ -44,6 +44,12 @@ int main(int argc, const char* argv[])
}
service_pump sp;
#ifndef ASCS_DECREASE_THREAD_AT_RUNTIME
//if you want to decrease service thread at runtime, then you cannot use multiple io_context, if somebody indeed needs it, please let me know.
//with multiple io_context, the number of service thread must be bigger than or equal to the number of io_context, please note.
//with multiple io_context, please also define macro ASCS_AVOID_AUTO_STOP_SERVICE.
sp.set_io_context_num(8);
#endif
server_base<file_socket> file_server_(sp);
if (argc > 2 + index)
......
......@@ -3,9 +3,9 @@
#define ASCS_DEFAULT_PACKER packer2<>
//#define ASCS_RECV_BUFFER_TYPE std::vector<asio::mutable_buffer> //scatter-gather buffer, it's very useful under certain situations (for example, ring buffer).
//#define ASCS_SCATTERED_RECV_BUFFER //used by unpackers, not belongs to ascs
//note, these two macro are not requisite, i'm just showing how to use them.
//note, these two macro are not requisite, I'm just showing how to use them.
//all other definitions are in the makefile, because we have two cpp files, defining them in more than one place is riskful (
//all other definitions are in the makefile, because we have two cpp files, defining them in more than one place is risky (
// we may define them to different values between the two cpp files)
//configuration
......
......@@ -756,8 +756,11 @@
* 2021.x.x version 1.6.0
*
* SPECIAL ATTENTION (incompatible with old editions):
* client_socket's function open_reconnect and close_reconnect have been replaced by function set_reconnect(bool).
*
* HIGHLIGHT:
* service_pump support multiple io_context, just needs the number of service thread to be bigger than or equal to the number of io_context.
* Support reliable UDP (based on KCP -- https://github.com/skywind3000/kcp.git).
* Support connected UDP socket, set macro ASCS_UDP_CONNECT_MODE to true to open it, you must also provide peer's ip address via set_peer_addr,
* function set_connect_mode can open it too (before start_service). For connected UDP socket, the peer_addr parameter in send_msg (series)
* will be ignored, please note.
......@@ -772,8 +775,10 @@
* Delete macro ASCS_REUSE_SSL_STREAM, now ascs' ssl sockets can be reused just as normal socket.
*
* REFACTORING:
* Re-implement the reusability (object reuse and reconnecting) of ascs' ssl sockets.
*
* REPLACEMENTS:
* client_socket's function open_reconnect and close_reconnect have been replaced by function set_reconnect(bool).
*
*/
......@@ -784,8 +789,8 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10502 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.5.2"
#define ASCS_VER 10600 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.6.0"
//asio and compiler check
#ifdef _MSC_VER
......@@ -820,13 +825,17 @@
#define ASCS_LLF "%lld" //format used to print 'uint_fast64_t'
#endif
static_assert(ASIO_VERSION >= 101001, "ascs needs asio 1.10.1 or higher.");
#if ASIO_VERSION < 101100
namespace asio {typedef io_service io_context;}
#define make_strand_handler(S, F) S.wrap(F)
namespace asio {typedef io_service io_context; typedef io_context execution_context;}
#define make_strand_handler(S, F) S.wrap(F)
#elif ASIO_VERSION == 101100
namespace asio {typedef io_service io_context;}
#define make_strand_handler(S, F) asio::wrap(S, F)
#elif ASIO_VERSION < 101700
namespace asio {typedef executor any_io_executor;}
#define make_strand_handler(S, F) asio::bind_executor(S, F)
#else
#define make_strand_handler(S, F) asio::bind_executor(S, F)
#define make_strand_handler(S, F) asio::bind_executor(S, F)
#endif
//asio and compiler check
......@@ -1039,7 +1048,7 @@ static_assert(ASCS_ASYNC_ACCEPT_NUM > 0, "async accept number must be bigger tha
//buffer type used when receiving messages (unpacker's prepare_next_recv() need to return this type)
#ifndef ASCS_RECV_BUFFER_TYPE
#if ASIO_VERSION >= 101100
#if ASIO_VERSION > 101100
#define ASCS_RECV_BUFFER_TYPE asio::mutable_buffer
#else
#define ASCS_RECV_BUFFER_TYPE asio::mutable_buffers_1
......
......@@ -157,7 +157,7 @@ public:
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return typename super::buffer_type(1, asio::buffer(raw_buff) + remain_len);}
#elif ASIO_VERSION < 101100
#elif ASIO_VERSION <= 101100
virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(asio::buffer(raw_buff) + remain_len);}
#else
virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(raw_buff) + remain_len;}
......@@ -319,7 +319,7 @@ public:
return typename super::buffer_type(1, asio::buffer(const_cast<char*>(big_msg.data()), big_msg.size()) + remain_len);
}
#elif ASIO_VERSION < 101100
#elif ASIO_VERSION <= 101100
virtual typename super::buffer_type prepare_next_recv()
{
assert(remain_len < (big_msg.empty() ? raw_buff.size() : big_msg.size()));
......@@ -706,7 +706,7 @@ public:
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
#ifdef ASCS_SCATTERED_RECV_BUFFER
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return buffer_type(1, asio::buffer(raw_buff) + remain_len);}
#elif ASIO_VERSION < 101100
#elif ASIO_VERSION <= 101100
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(asio::buffer(raw_buff) + remain_len);}
#else
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(raw_buff) + remain_len;}
......
......@@ -18,7 +18,7 @@
namespace ascs
{
class service_pump : public asio::io_context
class service_pump
{
public:
class i_service
......@@ -55,29 +55,121 @@ public:
void* data; //magic data, you can use it in any way
};
protected:
struct context
{
asio::io_context io_context;
uint_fast64_t refs;
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
#if ASIO_VERSION > 101100
asio::executor_work_guard<typename asio::io_context::executor_type> work;
#else
std::shared_ptr<asio::io_service::work> work;
#endif
#endif
std::list<std::thread> threads;
#if ASIO_VERSION >= 101200
context(int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE) : io_context(concurrency_hint), refs(0)
#else
context() : refs(0)
#endif
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
#if ASIO_VERSION > 101100
, work(io_context.get_executor())
#else
, work(std::make_shared<asio::io_service::work>(io_context))
#endif
#endif
{}
};
public:
typedef i_service* object_type;
typedef const object_type object_ctype;
typedef std::list<object_type> container_type;
#if ASIO_VERSION >= 101200
service_pump(int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE) : asio::io_context(concurrency_hint), started(false)
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
service_pump(int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE) : started(false), real_thread_num(0), del_thread_num(0), single_io_context(true)
{context_can.emplace_back(concurrency_hint);}
#else
service_pump() : started(false)
service_pump(int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE) : started(false), single_io_context(true) {context_can.emplace_back(concurrency_hint);}
bool set_io_context_num(int io_context_num, int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE) //call this before adding any services to this service_pump
{
if (io_context_num < 1 || is_service_started() || context_can.size() > 1) //can only be called once
return false;
for (auto i = 1; i < io_context_num; ++i)
context_can.emplace_back(concurrency_hint);
single_io_context = context_can.size() < 2;
return true;
}
#endif
#else
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
, real_thread_num(0), del_thread_num(0)
#endif
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
#if ASIO_VERSION >= 101100
, work(get_executor())
service_pump() : started(false), real_thread_num(0), del_thread_num(0), single_io_context(true), context_can(1) {}
#else
, work(std::make_shared<asio::io_service::work>(*this))
service_pump() : started(false), single_io_context(true), context_can(1) {}
bool set_io_context_num(int io_context_num) //call this before adding any services to this service_pump
{
if (io_context_num < 1 || is_service_started() || context_can.size() > 1) //can only be called once
return false;
context_can.resize(io_context_num);
single_io_context = context_can.size() < 2;
return true;
}
#endif
#endif
{}
int get_io_context_num() const {return (int) context_can.size();}
virtual ~service_pump() {stop_service();}
operator asio::io_context& () {return assign_io_context();}
#if ASIO_VERSION > 101100
asio::io_context::executor_type get_executor() {return assign_io_context().get_executor();}
#endif
asio::io_context& assign_io_context() //pick the context which has the least references
{
if (single_io_context)
return context_can.front().io_context;
context* ctx = nullptr;
uint_fast64_t refs = 0;
std::lock_guard<std::mutex> lock(context_can_mutex);
ascs::do_something_to_one(context_can, [&](context& item) {
if (0 == item.refs || 0 == refs || refs > item.refs)
{
refs = item.refs;
ctx = &item;
}
return 0 == item.refs;
});
if (nullptr != ctx)
{
++ctx->refs;
return ctx->io_context;
}
throw "no available io_context!";
}
void return_io_context(const asio::execution_context& io_context)
{
if (!single_io_context)
ascs::do_something_to_one(context_can, context_can_mutex, [&](context& item) {return &io_context != &item.io_context ? false : (--item.refs, true);});
}
void assign_io_context(const asio::execution_context& io_context)
{
if (!single_io_context)
ascs::do_something_to_one(context_can, context_can_mutex, [&](context& item) {return &io_context != &item.io_context ? false : (++item.refs, true);});
}
object_type find(int id)
{
std::lock_guard<std::mutex> lock(service_can_mutex);
......@@ -154,8 +246,7 @@ public:
{
if (!is_service_started())
{
do_service(thread_num - 1);
run();
do_service(thread_num, true);
wait_service();
}
}
......@@ -168,40 +259,64 @@ public:
if (is_service_started())
{
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
work.reset();
ascs::do_something_to_all(context_can, [](context& item) {item.work.reset();});
#endif
do_something_to_all([](object_type& item) {item->stop_service();});
}
}
bool is_running() const {return !stopped();}
bool is_running() const
{
auto running = false;
ascs::do_something_to_one(context_can, [&](const context& item) {return (running = !item.io_context.stopped());});
return running;
}
bool is_service_started() const {return started;}
void add_service_thread(int thread_num) {for (auto i = 0; i < thread_num; ++i) service_threads.emplace_back([this]() {this->run();});}
//not thread safe
void add_service_thread(int thread_num, bool block = false)
{
for (auto i = 0; i < thread_num; ++i)
{
auto ctx = assign_thread();
if (nullptr == ctx)
unified_out::error_out("no available io_context!");
else if (block && i + 1 == thread_num)
run(ctx); //block at here
else
ctx->threads.emplace_back([this, ctx]() {this->run(ctx);});
}
}
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
void del_service_thread(int thread_num) {if (thread_num > 0) {del_thread_num += thread_num;}}
void del_service_thread(int thread_num) {if (thread_num > 0) del_thread_num += thread_num;}
int service_thread_num() const {return real_thread_num;}
#endif
protected:
void do_service(int thread_num)
void do_service(int thread_num, bool block = false)
{
if (thread_num <= 0 || (size_t) thread_num < context_can.size())
{
unified_out::error_out("thread_num must be bigger than zero and bigger than or equal to io_context_num.");
return;
}
started = true;
unified_out::info_out("service pump started.");
#if ASIO_VERSION >= 101100
restart(); //this is needed when restart service
ascs::do_something_to_all(context_can, [](context& item) {item.io_context.restart();}); //this is needed when restart service
#else
reset(); //this is needed when restart service
ascs::do_something_to_all(context_can, [](context& item) {item.io_context.reset();}); //this is needed when restart service
#endif
do_something_to_all([](object_type& item) {item->start_service();});
add_service_thread(thread_num);
add_service_thread(thread_num, block);
}
void wait_service()
{
ascs::do_something_to_all(service_threads, [](std::thread& t) {t.join();});
service_threads.clear();
ascs::do_something_to_all(context_can, [](context& item) {ascs::do_something_to_all(item.threads, [](std::thread& t) {t.join();});});
started = false;
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
......@@ -228,7 +343,7 @@ protected:
#endif
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
size_t run()
size_t run(context* ctx)
{
size_t n = 0;
......@@ -254,9 +369,9 @@ protected:
//we cannot always decrease service thread timely (because run_one can block).
size_t this_n = 0;
#ifdef ASCS_NO_TRY_CATCH
this_n = asio::io_context::run_one();
this_n = ctx->io_context.run_one();
#else
try {this_n = asio::io_context::run_one();} catch (const std::exception& e) {if (!on_exception(e)) break;}
try {this_n = ctx->io_context.run_one();} catch (const std::exception& e) {if (!on_exception(e)) break;}
#endif
if (this_n > 0)
n += this_n; //n can overflow, please note.
......@@ -273,13 +388,34 @@ protected:
return n;
}
#elif !defined(ASCS_NO_TRY_CATCH)
size_t run() {while (true) {try {return asio::io_context::run();} catch (const std::exception& e) {if (!on_exception(e)) return 0;}}}
size_t run(context* ctx) {while (true) {try {return ctx->io_context.run();} catch (const std::exception& e) {if (!on_exception(e)) return 0;}}}
#else
size_t run(context* ctx) {return ctx->io_context.run();}
#endif
DO_SOMETHING_TO_ALL_MUTEX(service_can, service_can_mutex, std::lock_guard<std::mutex>)
DO_SOMETHING_TO_ONE_MUTEX(service_can, service_can_mutex, std::lock_guard<std::mutex>)
private:
context* assign_thread() //pick the context which has the least threads
{
context* ctx = nullptr;
size_t num = 0;
ascs::do_something_to_one(context_can, [&](context& item) {
auto this_num = item.threads.size();
if (0 == this_num || 0 == num || num > this_num)
{
num = this_num;
ctx = &item;
}
return 0 == this_num;
});
return ctx;
}
void add(object_type i_service_)
{
assert(nullptr != i_service_);
......@@ -296,20 +432,15 @@ private:
bool started;
container_type service_can;
std::mutex service_can_mutex;
std::list<std::thread> service_threads;
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
std::atomic_int_fast32_t real_thread_num;
std::atomic_int_fast32_t del_thread_num;
#endif
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
#if ASIO_VERSION >= 101100
asio::executor_work_guard<executor_type> work;
#else
std::shared_ptr<asio::io_service::work> work;
#endif
#endif
bool single_io_context;
std::list<context> context_can;
std::mutex context_can_mutex;
};
} //namespace
......
......@@ -64,9 +64,27 @@ protected:
}
//guarantee no operations (include asynchronous operations) be performed on this socket during call following two functions.
void reset_next_layer(asio::io_context& io_context_) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(io_context_);}
#if ASIO_VERSION <= 101100
void reset_next_layer() {reset_next_layer(next_layer_.get_io_service());}
template<typename Arg> void reset_next_layer(Arg&& arg) {reset_next_layer(next_layer_.get_io_service(), std::forward<Arg>(arg));}
#elif ASIO_VERSION < 101300
void reset_next_layer() {reset_next_layer(static_cast<asio::io_context&>(next_layer_.get_executor().context()));}
template<typename Arg>
void reset_next_layer(asio::io_context& io_context_, Arg&& arg) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(io_context_, std::forward<Arg>(arg));}
void reset_next_layer(Arg&& arg) {reset_next_layer(static_cast<asio::io_context&>(next_layer_.get_executor().context()), std::forward<Arg>(arg));}
#else
void reset_next_layer() {reset_next_layer((const asio::any_io_executor&) next_layer_.get_executor());}
template<typename Arg> void reset_next_layer(Arg&& arg) {reset_next_layer(next_layer_.get_executor(), std::forward<Arg>(arg));}
#endif
#if ASIO_VERSION < 101300
void reset_next_layer(asio::io_context& io_context) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(io_context);}
template<typename Arg>
void reset_next_layer(asio::io_context& io_context, Arg&& arg) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(io_context, std::forward<Arg>(arg));}
#else
void reset_next_layer(const asio::any_io_executor& executor) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(executor);}
template<typename Arg>
void reset_next_layer(const asio::any_io_executor& executor, Arg&& arg) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(executor, std::forward<Arg>(arg));}
#endif
void reset()
{
......@@ -319,7 +337,7 @@ protected:
// in this socket except this socket itself, because this socket maybe is being maintained by object_pool.
//otherwise (bigger than zero), socket simply call this callback ASCS_DELAY_CLOSE seconds later after link down, no any guarantees.
virtual void on_close() {unified_out::info_out(ASCS_LLF " on_close()", id());}
virtual void after_close() {} //a good case for using this is to reconnect the server, please refer to client_socket_base.
virtual void after_close() = 0; //a good case for using this is to reconnect the server (please refer to client_socket_base) and return io_context to service_pump.
#ifdef ASCS_SYNC_DISPATCH
//return positive value if handled some messages (include all messages), if some msg left behind, socket will re-dispatch them asynchronously
......
......@@ -65,7 +65,17 @@ public:
virtual const char* type_name() const {return "TCP (client endpoint)";}
virtual int type_id() const {return 1;}
virtual void reset() {need_reconnect = ASCS_RECONNECT; super::reset();}
virtual void reset()
{
need_reconnect = ASCS_RECONNECT;
if (nullptr != matrix)
#if ASIO_VERSION < 101100
matrix->get_service_pump().assign_io_context(this->next_layer().get_io_service());
#else
matrix->get_service_pump().assign_io_context(this->next_layer().get_executor().context());
#endif
super::reset();
}
bool set_server_addr(unsigned short port, const std::string& ip = ASCS_SERVER_IP) {return set_addr(server_addr, port, ip);}
bool set_server_addr(const std::string& file_name) {server_addr = typename Family::endpoint(file_name); return true;}
......@@ -162,7 +172,17 @@ protected:
//reconnect at here rather than in on_recv_error to make sure no async invocations performed on this socket before reconnecting.
//if you don't want to reconnect the server after link broken, rewrite this virtual function and do nothing in it or call close_reconnt().
//if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function.
virtual void after_close() {if (need_reconnect) this->start();}
virtual void after_close()
{
if (need_reconnect)
this->start();
else if (nullptr != matrix)
#if ASIO_VERSION < 101100
matrix->get_service_pump().return_io_context(this->next_layer().get_io_service());
#else
matrix->get_service_pump().return_io_context(this->next_layer().get_executor().context());
#endif
}
virtual bool bind() {return true;}
......
......@@ -43,7 +43,11 @@ public:
buff[0] = 4;
buff[1] = 1;
*((unsigned short*) std::next(buff, 2)) = htons(target_addr.port());
#if ASIO_VERSION == 101100
memcpy(std::next(buff, 4), asio::ip::address_cast<asio::ip::address_v4>(target_addr.address()).to_bytes().data(), 4);
#else
memcpy(std::next(buff, 4), target_addr.address().to_v4().to_bytes().data(), 4);
#endif
memcpy(std::next(buff, 8), "ascs", sizeof("ascs"));
req_len = 8 + sizeof("ascs");
......@@ -221,14 +225,22 @@ private:
else if (target_addr.address().is_v4())
{
buff[3] = 1;
#if ASIO_VERSION == 101100
memcpy(std::next(buff, 4), asio::ip::address_cast<asio::ip::address_v4>(target_addr.address()).to_bytes().data(), 4);
#else
memcpy(std::next(buff, 4), target_addr.address().to_v4().to_bytes().data(), 4);
#endif
*((unsigned short*) std::next(buff, 8)) = htons(target_addr.port());
req_len = 10;
}
else //ipv6
{
buff[3] = 4;
#if ASIO_VERSION == 101100
memcpy(std::next(buff, 4), asio::ip::address_cast<asio::ip::address_v6>(target_addr.address()).to_bytes().data(), 16);
#else
memcpy(std::next(buff, 4), target_addr.address().to_v6().to_bytes().data(), 16);
#endif
*((unsigned short*) std::next(buff, 20)) = htons(target_addr.port());
req_len = 22;
}
......@@ -347,7 +359,7 @@ private:
this->force_shutdown(false);
}
else
this->next_layer().async_read_some(asio::buffer(buff, sizeof(buff)) + res_len,
this->next_layer().async_read_some(asio::buffer(std::next(buff, res_len), sizeof(buff) - res_len),
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);}));
}
}
......
......@@ -82,7 +82,7 @@ public:
else
unified_out::info_out("finished pre-creating server sockets.");
#if ASIO_VERSION >= 101100
#if ASIO_VERSION > 101100
acceptor.listen(asio::socket_base::max_listen_connections, ec); assert(!ec);
#else
acceptor.listen(asio::socket_base::max_connections, ec); assert(!ec);
......
......@@ -33,6 +33,15 @@ public:
virtual const char* type_name() const {return "TCP (server endpoint)";}
virtual int type_id() const {return 2;}
virtual void reset()
{
#if ASIO_VERSION < 101100
server.get_service_pump().assign_io_context(this->lowest_layer().get_io_service());
#else
server.get_service_pump().assign_io_context(this->lowest_layer().get_executor().context());
#endif
super::reset();
}
virtual void take_over(std::shared_ptr<generic_server_socket> socket_ptr) {} //restore this socket from socket_ptr
void disconnect() {force_shutdown();}
......@@ -78,6 +87,11 @@ protected:
virtual void on_async_shutdown_error() {force_shutdown();}
virtual bool on_heartbeat_error() {this->show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;}
#if ASIO_VERSION < 101100
virtual void after_close() {server.get_service_pump().return_io_context(this->lowest_layer().get_io_service());}
#else
virtual void after_close() {server.get_service_pump().return_io_context(this->lowest_layer().get_executor().context());}
#endif
private:
Server& server;
......
......@@ -79,8 +79,8 @@ private:
typedef socket<tcp::client_socket_base<Packer, Unpacker, Matrix, asio::ssl::stream<asio::ip::tcp::socket>, InQueue, InContainer, OutQueue, OutContainer>> super;
public:
client_socket_base(asio::io_context& io_context_, asio::ssl::context& ctx_) : super(io_context_, ctx_), io_context(io_context_), ctx(ctx_) {}
client_socket_base(Matrix& matrix_, asio::ssl::context& ctx_) : super(matrix_, ctx_), io_context(matrix_.get_service_pump()), ctx(ctx_) {}
client_socket_base(asio::io_context& io_context_, asio::ssl::context& ctx_) : super(io_context_, ctx_), ctx(ctx_) {}
client_socket_base(Matrix& matrix_, asio::ssl::context& ctx_) : super(matrix_, ctx_), ctx(ctx_) {}
virtual const char* type_name() const {return "SSL (client endpoint)";}
virtual int type_id() const {return 3;}
......@@ -103,7 +103,7 @@ protected:
virtual void after_close()
{
if (this->is_reconnect())
this->reset_next_layer(io_context, ctx);
this->reset_next_layer(ctx);
super::after_close();
}
......@@ -133,7 +133,6 @@ private:
using super::shutdown_ssl;
private:
asio::io_context& io_context;
asio::ssl::context& ctx;
};
......@@ -170,7 +169,7 @@ public:
virtual void reset()
{
this->reset_next_layer(this->get_server().get_service_pump(), ctx);
this->reset_next_layer(ctx);
super::reset();
}
......
......@@ -78,8 +78,14 @@ public:
virtual void reset()
{
is_connected = is_bound = false;
sending_msg.clear();
if (nullptr != matrix)
#if ASIO_VERSION < 101100
matrix->get_service_pump().assign_io_context(this->lowest_layer().get_io_service());
#else
matrix->get_service_pump().assign_io_context(this->lowest_layer().get_executor().context());
#endif
super::reset();
}
......@@ -230,6 +236,11 @@ protected:
#ifdef ASCS_SYNC_SEND
virtual void on_close() {if (sending_msg.p) sending_msg.p->set_value(sync_call_result::NOT_APPLICABLE); super::on_close();}
#endif
#if ASIO_VERSION < 101100
virtual void after_close() {if (nullptr != matrix) matrix->get_service_pump().return_io_context(this->lowest_layer().get_io_service());}
#else
virtual void after_close() {if (nullptr != matrix) matrix->get_service_pump().return_io_context(this->lowest_layer().get_executor().context());};
#endif
private:
using super::close;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册