提交 976c3817 编写于 作者: Y youngwolf

Add resend_msg and sync_resend_msg interface to ascs::socket, it takes packed...

Add resend_msg and sync_resend_msg interface to ascs::socket, it takes packed messages and insert them into the front of the send buffer.
上级 5ee20c5f
......@@ -611,7 +611,8 @@
* If defined macro ASCS_WANT_ALL_MSG_SEND_NOTIFY, virtual function ascs::socket::on_all_msg_send(InMsgType& msg) must be implemented.
*
* ENHANCEMENTS:
* Add socket_exist interface to i_matrix, exist function to object_pool to just check the existence of a socket by an given id.
* Add resend_msg and sync_resend_msg interface to ascs::socket, it takes packed messages and insert them into the front of the send buffer.
* Add socket_exist interface to i_matrix, it calls object_pool::exist to check the existence of a socket by an given id.
* Add following 3 interfaces to i_unpacker as i_packer did (the default implementation is meaningless as i_packer, just satisfy compilers):
* virtual char* raw_data(msg_type& msg) const
* virtual const char* raw_data(msg_ctype& msg) const
......
......@@ -50,6 +50,8 @@ private:
// swap
// template<typename T> emplace_back(const T& item), if you call direct_(sync_)send_msg which accepts other than rvalue reference
// template<typename T> emplace_back(T&& item)
// template<typename T> emplace_front(const T& item), if you call (sync_)resend_msg which accepts other than rvalue reference
// template<typename T> emplace_front(T&& item), if you call (sync_)resend_msg
// splice(iter, Container&)
// splice(iter, Container&, iter, iter)
// front
......@@ -85,6 +87,8 @@ public:
template<typename T> bool enqueue(T&& item) {typename Lockable::lock_guard lock(*this); return enqueue_(std::forward<T>(item));}
void move_items_in(Container& src, size_t size_in_byte = 0) {typename Lockable::lock_guard lock(*this); move_items_in_(src, size_in_byte);}
template<typename T> bool enqueue_front(T&& item) {typename Lockable::lock_guard lock(*this); return enqueue_front_(std::forward<T>(item));}
void move_items_in_front(Container& src, size_t size_in_byte = 0) {typename Lockable::lock_guard lock(*this); move_items_in_front_(src, size_in_byte);}
bool try_dequeue(reference item) {typename Lockable::lock_guard lock(*this); return try_dequeue_(item);}
void move_items_out(Container& dest, size_t max_item_num = -1) {typename Lockable::lock_guard lock(*this); move_items_out_(dest, max_item_num);}
void move_items_out(size_t max_size_in_byte, Container& dest) {typename Lockable::lock_guard lock(*this); move_items_out_(max_size_in_byte, dest);}
......@@ -121,6 +125,34 @@ public:
total_size += size_in_byte;
}
template<typename T> bool enqueue_front_(T&& item)
{
try
{
auto size = item.size();
this->emplace_front(std::forward<T>(item));
total_size += size;
}
catch (const std::exception & e)
{
unified_out::error_out("cannot hold more objects (%s)", e.what());
return false;
}
return true;
}
void move_items_in_front_(Container& src, size_t size_in_byte = 0)
{
if (0 == size_in_byte)
size_in_byte = ascs::get_size_in_byte(src);
else
assert(ascs::get_size_in_byte(src) == size_in_byte);
this->splice(this->begin(), src);
total_size += size_in_byte;
}
bool try_dequeue_(reference item) {if (this->empty()) return false; item.swap(this->front()); this->pop_front(); total_size -= item.size(); return true;}
void move_items_out_(Container& dest, size_t max_item_num = -1)
......
......@@ -222,12 +222,24 @@ public:
bool direct_send_msg(list<InMsgType>& msg_can, bool can_overflow = false)
{return can_overflow || is_send_buffer_available() ? do_direct_send_msg(msg_can) : false;}
//don't use the packer but insert into the front of the send buffer directly
template<typename T> bool resend_msg(T&& msg, bool can_overflow = false)
{return can_overflow || is_send_buffer_available() ? do_resend_msg(std::forward<T>(msg)) : false;}
bool resend_msg(list<InMsgType>& msg_can, bool can_overflow = false)
{return can_overflow || is_send_buffer_available() ? do_resend_msg(msg_can) : false;}
#ifdef ASCS_SYNC_SEND
//don't use the packer but insert into send buffer directly, then wait for the sending to finish, unit of the duration is millisecond, 0 means wait infinitely
//don't use the packer but insert into send buffer directly, then wait the sending to finish, unit of the duration is millisecond, 0 means wait infinitely
template<typename T> sync_call_result direct_sync_send_msg(T&& msg, unsigned duration = 0, bool can_overflow = false)
{return can_overflow || is_send_buffer_available() ? do_direct_sync_send_msg(std::forward<T>(msg), duration) : sync_call_result::NOT_APPLICABLE;}
sync_call_result direct_sync_send_msg(list<InMsgType>& msg_can, unsigned duration = 0, bool can_overflow = false)
{return can_overflow || is_send_buffer_available() ? do_direct_sync_send_msg(msg_can, duration) : sync_call_result::NOT_APPLICABLE;}
//don't use the packer but insert into the front of the send buffer directly, then wait the sending to finish, unit of the duration is millisecond, 0 means wait infinitely
template<typename T> sync_call_result sync_resend_msg(T&& msg, unsigned duration = 0, bool can_overflow = false)
{return can_overflow || is_send_buffer_available() ? do_sync_resend_msg(std::forward<T>(msg), duration) : sync_call_result::NOT_APPLICABLE;}
sync_call_result sync_resend_msg(list<InMsgType>& msg_can, unsigned duration = 0, bool can_overflow = false)
{return can_overflow || is_send_buffer_available() ? do_sync_resend_msg(msg_can, duration) : sync_call_result::NOT_APPLICABLE;}
#endif
#ifdef ASCS_SYNC_RECV
......@@ -467,6 +479,30 @@ protected:
return true;
}
template<typename T> bool do_resend_msg(T&& msg)
{
if (msg.empty())
unified_out::error_out(ASCS_LLF " found an empty message, please check your packer.", id());
else if (send_buffer.enqueue_front(std::forward<T>(msg)))
send_msg();
//even if we meet an empty message (because of too big message or insufficient memory, most likely), we still return true, why?
//please think about the function safe_send_(native_)msg, if we keep returning false, it will enter a dead loop.
//the packer provider has the responsibility to write detailed reasons down when packing message failed.
return true;
}
bool do_resend_msg(list<InMsgType>& msg_can)
{
size_t size_in_byte = 0;
in_container_type temp_buffer;
ascs::do_something_to_all(msg_can, [&size_in_byte, &temp_buffer](InMsgType& msg) {size_in_byte += msg.size(); temp_buffer.emplace_back(std::move(msg));});
send_buffer.move_items_in_front(temp_buffer, size_in_byte);
send_msg();
return true;
}
#ifdef ASCS_SYNC_SEND
template<typename T> sync_call_result do_direct_sync_send_msg(T&& msg, unsigned duration = 0)
{
......@@ -507,6 +543,46 @@ protected:
send_msg();
return 0 == duration || std::future_status::ready == f.wait_for(std::chrono::milliseconds(duration)) ? f.get() : sync_call_result::TIMEOUT;
}
template<typename T> sync_call_result do_sync_resend_msg(T&& msg, unsigned duration = 0)
{
if (stopped())
return sync_call_result::NOT_APPLICABLE;
else if (msg.empty())
{
unified_out::error_out(ASCS_LLF " found an empty message, please check your packer.", id());
return sync_call_result::SUCCESS;
}
auto unused = in_msg(std::forward<T>(msg), true);
auto p = unused.p;
auto f = p->get_future();
if (!send_buffer.enqueue_front(std::move(unused)))
return sync_call_result::NOT_APPLICABLE;
send_msg();
return 0 == duration || std::future_status::ready == f.wait_for(std::chrono::milliseconds(duration)) ? f.get() : sync_call_result::TIMEOUT;
}
sync_call_result do_sync_resend_msg(list<InMsgType>& msg_can, unsigned duration = 0)
{
if (stopped())
return sync_call_result::NOT_APPLICABLE;
else if (msg_can.empty())
return sync_call_result::SUCCESS;
size_t size_in_byte = 0;
in_container_type temp_buffer;
ascs::do_something_to_all(msg_can, [&size_in_byte, &temp_buffer](InMsgType& msg) {size_in_byte += msg.size(); temp_buffer.emplace_back(std::move(msg));});
temp_buffer.back().check_and_create_promise(true);
auto p = temp_buffer.back().p;
auto f = p->get_future();
send_buffer.move_items_in_front(temp_buffer, size_in_byte);
send_msg();
return 0 == duration || std::future_status::ready == f.wait_for(std::chrono::milliseconds(duration)) ? f.get() : sync_call_result::TIMEOUT;
}
#endif
private:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册