提交 46adf6f0 编写于 作者: Y youngwolf

Enhance object restoration.

上级 1c8f545a
......@@ -37,7 +37,7 @@ int main(int argc, const char* argv[])
file_client client(sp);
if (argc > 3)
link_num = std::min(256, std::max(atoi(argv[3]), 1));
link_num = std::min(256, std::max(atoi(argv[3]), 1)); //link number cannot exceed 500, becuase file_server's macro ASCS_START_OBJECT_ID is defined as 500
for (auto i = 0; i < link_num; ++i)
{
......
......@@ -92,7 +92,7 @@
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;ASCS_SERVER_PORT=5050;ASCS_RESTORE_OBJECT;ASCS_ENHANCED_STABILITY;ASCS_WANT_MSG_SEND_NOTIFY;ASCS_INPUT_QUEUE=non_lock_queue;ASCS_RECV_BUFFER_TYPE=std::vector&lt;asio::mutable_buffer&gt;;ASCS_SCATTERED_RECV_BUFFER;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;ASCS_RESTORE_OBJECT;ASCS_SERVER_PORT=5050;ASCS_START_OBJECT_ID=500;ASCS_SCATTERED_RECV_BUFFER;ASCS_WANT_MSG_SEND_NOTIFY;ASCS_INPUT_QUEUE=non_lock_queue;ASCS_RECV_BUFFER_TYPE=std::vector&lt;asio::mutable_buffer&gt;;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PrecompiledHeaderFile />
<PrecompiledHeaderOutputFile />
</ClCompile>
......@@ -106,7 +106,7 @@
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;ASCS_SERVER_PORT=5050;ASCS_RESTORE_OBJECT;ASCS_ENHANCED_STABILITY;ASCS_WANT_MSG_SEND_NOTIFY;ASCS_INPUT_QUEUE=non_lock_queue;ASCS_RECV_BUFFER_TYPE=std::vector&lt;asio::mutable_buffer&gt;;ASCS_SCATTERED_RECV_BUFFER;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;ASCS_RESTORE_OBJECT;ASCS_SERVER_PORT=5050;ASCS_START_OBJECT_ID=500;ASCS_SCATTERED_RECV_BUFFER;ASCS_WANT_MSG_SEND_NOTIFY;ASCS_INPUT_QUEUE=non_lock_queue;ASCS_RECV_BUFFER_TYPE=std::vector&lt;asio::mutable_buffer&gt;;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PrecompiledHeaderFile />
<PrecompiledHeaderOutputFile />
</ClCompile>
......@@ -122,7 +122,7 @@
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;ASCS_SERVER_PORT=5050;ASCS_RESTORE_OBJECT;ASCS_ENHANCED_STABILITY;ASCS_WANT_MSG_SEND_NOTIFY;ASCS_INPUT_QUEUE=non_lock_queue;ASCS_RECV_BUFFER_TYPE=std::vector&lt;asio::mutable_buffer&gt;;ASCS_SCATTERED_RECV_BUFFER;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;ASCS_RESTORE_OBJECT;ASCS_SERVER_PORT=5050;ASCS_START_OBJECT_ID=500;ASCS_SCATTERED_RECV_BUFFER;ASCS_WANT_MSG_SEND_NOTIFY;ASCS_INPUT_QUEUE=non_lock_queue;ASCS_RECV_BUFFER_TYPE=std::vector&lt;asio::mutable_buffer&gt;;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PrecompiledHeaderFile />
<PrecompiledHeaderOutputFile />
</ClCompile>
......@@ -140,7 +140,7 @@
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;ASCS_SERVER_PORT=5050;ASCS_RESTORE_OBJECT;ASCS_ENHANCED_STABILITY;ASCS_WANT_MSG_SEND_NOTIFY;ASCS_INPUT_QUEUE=non_lock_queue;ASCS_RECV_BUFFER_TYPE=std::vector&lt;asio::mutable_buffer&gt;;ASCS_SCATTERED_RECV_BUFFER;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;ASCS_RESTORE_OBJECT;ASCS_SERVER_PORT=5050;ASCS_START_OBJECT_ID=500;ASCS_SCATTERED_RECV_BUFFER;ASCS_WANT_MSG_SEND_NOTIFY;ASCS_INPUT_QUEUE=non_lock_queue;ASCS_RECV_BUFFER_TYPE=std::vector&lt;asio::mutable_buffer&gt;;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PrecompiledHeaderFile />
<PrecompiledHeaderOutputFile />
</ClCompile>
......
......@@ -26,9 +26,9 @@ file_socket::~file_socket() {trans_end();}
void file_socket::reset() {trans_end(); server_socket::reset();}
//socket_ptr actually is a pointer of file_socket, use std::dynamic_pointer_cast to convert it.
void file_socket::take_over(std::shared_ptr<server_socket> socket_ptr) {printf("restore user data from invalid object (" ASCS_LLF ").\n", socket_ptr->id());}
void file_socket::take_over(std::shared_ptr<server_socket> socket_ptr) {if (socket_ptr) printf("restore user data from invalid object (" ASCS_LLF ").\n", socket_ptr->id());}
//this works too, but brings warnings with -Woverloaded-virtual option.
//void file_socket::take_over(std::shared_ptr<file_socket> socket_ptr) {printf("restore user data from invalid object (" ASCS_LLF ").\n", socket_ptr->id());}
//void file_socket::take_over(std::shared_ptr<file_socket> socket_ptr) {if (socket_ptr) printf("restore user data from invalid object (" ASCS_LLF ").\n", socket_ptr->id());}
//msg handling
bool file_socket::on_msg_handle(out_msg_type& msg) {handle_msg(msg); return true;}
......@@ -117,7 +117,7 @@ void file_socket::handle_msg(out_msg_ctype& msg)
{
uint_fast64_t id;
memcpy(&id, std::next(msg.data(), ORDER_LEN), sizeof(uint_fast64_t));
get_server().restore_socket(this->shared_from_this(), id);
get_server().restore_socket(this->shared_from_this(), id, true);
}
default:
break;
......
module = file_server
ext_cflag = -DASCS_SERVER_PORT=5050 -DASCS_RESTORE_OBJECT -DASCS_WANT_MSG_SEND_NOTIFY -DASCS_INPUT_QUEUE=non_lock_queue
ext_cflag = -DASCS_RESTORE_OBJECT -DASCS_SERVER_PORT=5050 -DASCS_START_OBJECT_ID=500 -DASCS_WANT_MSG_SEND_NOTIFY -DASCS_INPUT_QUEUE=non_lock_queue
ext_cflag += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE
include ../config.mk
......
......@@ -81,7 +81,7 @@ namespace tcp
{
public:
virtual bool del_socket(const std::shared_ptr<tracked_executor>& socket_ptr) = 0;
virtual bool restore_socket(const std::shared_ptr<tracked_executor>& socket_ptr, uint_fast64_t id) = 0;
virtual bool restore_socket(const std::shared_ptr<tracked_executor>& socket_ptr, uint_fast64_t id, bool init) = 0;
};
} //namespace
......
......@@ -662,8 +662,14 @@
*
* SPECIAL ATTENTION (incompatible with old editions):
* Rename function i_service::is_started() to i_service::service_started().
* A additional parameter named 'init' is added to interface i_server::restore_socket, now it has following signature:
* virtual bool i_server::restore_socket(const std::shared_ptr<tracked_executor>& socket_ptr, uint_fast64_t id, bool init)
* you use init = true to initialize the server_socket's id (use macro ASCS_START_OBJECT_ID to separate your id rang from object_pool's id rang),
* use init = false to restore a server_socket after client_socket reconnected to the server.
*
* HIGHLIGHT:
* Introduce new macro ASCS_START_OBJECT_ID to set the start id that object_pool used to assign object ids, and the new function
* object_pool::set_start_object_id(uint_fast64_t) with the same purpose, please call it as immediately as possible.
* Support changing the size of send buffer and recv buffer at runtime.
* Make function server_base::start_listen() and server_base::stop_listen() thread safe.
* packer2 now can customize the real message type (before, it's always string_buffer).
......@@ -817,6 +823,11 @@ static_assert(ASCS_DELAY_CLOSE >= 0, "delay close duration must be bigger than o
//after sending buffer became empty, call ascs::socket::on_all_msg_send(InMsgType& msg)
//#define ASCS_WANT_ALL_MSG_SEND_NOTIFY
//object_pool will asign object ids (used to distinguish objects) from this
#ifndef ASCS_START_OBJECT_ID
#define ASCS_START_OBJECT_ID 0
#endif
//max number of objects object_pool can hold.
#ifndef ASCS_MAX_OBJECT_NUM
#define ASCS_MAX_OBJECT_NUM 4096
......
......@@ -36,8 +36,11 @@ public:
static const tid TIMER_CLEAR_SOCKET = TIMER_BEGIN + 1;
static const tid TIMER_END = TIMER_BEGIN + 10;
public:
void set_start_object_id(uint_fast64_t id) {cur_id.store(id - 1, std::memory_order_relaxed);} //call this right after object_pool been constructed
protected:
object_pool(service_pump& service_pump_) : i_service(service_pump_), timer<executor>(service_pump_), cur_id(-1), max_size_(ASCS_MAX_OBJECT_NUM) {}
object_pool(service_pump& service_pump_) : i_service(service_pump_), timer<executor>(service_pump_), cur_id(ASCS_START_OBJECT_ID - 1), max_size_(ASCS_MAX_OBJECT_NUM) {}
void start()
{
......@@ -94,25 +97,41 @@ protected:
unified_out::error_out("create object failed!");
}
bool init_object_id(object_ctype& object_ptr, uint_fast64_t id)
{
assert(object_ptr && !object_ptr->is_equal_to(-1));
if (object_ptr->is_equal_to(id))
return true;
std::lock_guard<ASCS_SHARED_MUTEX_TYPE> lock(object_can_mutex);
auto& stub = object_can[id];
if (stub)
return false;
object_can.erase(object_ptr->id());
object_ptr->id(id);
stub = object_ptr; //must succeed
return true;
}
//change object_ptr's id to id, and reinsert it into object_can.
//there MUST exist an object in invalid_object_can whose id is equal to id to guarantee the id has been abandoned
// (checking existence of such object in object_can is NOT enough, because there are some sockets used by async
// acceptance, they don't exist in object_can nor invalid_object_can), further more, the invalid object MUST be
//obsoleted and has no additional reference.
// obsoleted and has no additional reference.
//return the invalid object (null means failure), please note that the invalid object has been removed from invalid_object_can.
object_type change_object_id(object_ctype& object_ptr, uint_fast64_t id)
{
assert(object_ptr && !object_ptr->is_equal_to(-1));
auto old_object_ptr = invalid_object_pop(id);
if (old_object_ptr)
if (old_object_ptr && !init_object_id(object_ptr, id))
{
assert(!find(id));
std::lock_guard<ASCS_SHARED_MUTEX_TYPE> lock(object_can_mutex);
object_can.erase(object_ptr->id());
object_ptr->id(id);
object_can.emplace(id, object_ptr); //must succeed
std::lock_guard<std::mutex> lock(invalid_object_can_mutex);
invalid_object_can.push_back(old_object_ptr);
old_object_ptr.reset();
}
return old_object_ptr;
......
......@@ -113,16 +113,18 @@ public:
raw_socket_ptr->force_shutdown();
return this->del_object(raw_socket_ptr);
}
//restore the invalid socket whose id is equal to id, if successful, socket_ptr's take_over function will be invoked,
//restore the invalid socket whose id is equal to 'id', if successful, socket_ptr's take_over function will be invoked,
//you can restore the invalid socket to socket_ptr, everything can be restored except socket::next_layer_ (on the other
//hand, restore socket::next_layer_ doesn't make any sense).
virtual bool restore_socket(const std::shared_ptr<tracked_executor>& socket_ptr, uint_fast64_t id)
virtual bool restore_socket(const std::shared_ptr<tracked_executor>& socket_ptr, uint_fast64_t id, bool init)
{
auto raw_socket_ptr(std::dynamic_pointer_cast<Socket>(socket_ptr));
if (!raw_socket_ptr)
return false;
auto this_id = raw_socket_ptr->id();
if (!init)
{
auto old_socket_ptr = this->change_object_id(raw_socket_ptr, id);
if (old_socket_ptr)
{
......@@ -133,6 +135,12 @@ public:
return true;
}
}
else if (this->init_object_id(raw_socket_ptr, id))
{
unified_out::info_out("object id " ASCS_LLF " been set to " ASCS_LLF, this_id, raw_socket_ptr->id());
return true;
}
return false;
}
......@@ -194,8 +202,7 @@ protected:
return true;
}
socket_ptr->show_info("client:", "been refused because of too many clients.");
socket_ptr->force_shutdown();
socket_ptr->show_info("client:", "been refused because of too many clients or id conflict.");
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册