提交 606eb881 编写于 作者: Y youngwolf

Fix -- in extreme circumstances, messages can leave behind in the send buffer...

Fix -- in extreme circumstances, messages can leave behind in the send buffer until the next message sending.
Fix -- basic_buffer doesn't support fixed arrays (just in the constructor) in GCC and Clang.
Introduce macro ASCS_CAN_EMPTY_NOT_SAFE for containers used in send/recv buffer whose empty function is not thread safe.
上级 409ac32b
......@@ -579,12 +579,12 @@ template<typename _Predicate> void NAME(const _Predicate& __pred) const {for (au
}
#define GET_PENDING_MSG_SIZE(FUNNAME, CAN) size_t FUNNAME() const {return CAN.size_in_byte();}
#define POP_FIRST_PENDING_MSG(FUNNAME, CAN, MSGTYPE) void FUNNAME(MSGTYPE& msg) {msg.clear(); CAN.try_dequeue(msg);}
#define POP_FIRST_PENDING_MSG(FUNNAME, CAN, MSGTYPE) void FUNNAME(MSGTYPE& msg) {msg.clear(); if (CAN.try_dequeue(msg)) _send_msg();}
#define POP_FIRST_PENDING_MSG_NOTIFY(FUNNAME, CAN, MSGTYPE) void FUNNAME(MSGTYPE& msg) \
{msg.clear(); if (CAN.try_dequeue(msg) && msg.p) msg.p->set_value(sync_call_result::NOT_APPLICABLE);}
#define POP_ALL_PENDING_MSG(FUNNAME, CAN, CANTYPE) void FUNNAME(CANTYPE& can) {can.clear(); CAN.swap(can);}
{msg.clear(); if (CAN.try_dequeue(msg)) {_send_msg(); if (msg.p) msg.p->set_value(sync_call_result::NOT_APPLICABLE);}}
#define POP_ALL_PENDING_MSG(FUNNAME, CAN, CANTYPE) void FUNNAME(CANTYPE& can) {can.clear(); CAN.swap(can); _send_msg();}
#define POP_ALL_PENDING_MSG_NOTIFY(FUNNAME, CAN, CANTYPE) void FUNNAME(CANTYPE& can) \
{can.clear(); CAN.swap(can); ascs::do_something_to_all(can, [](typename CANTYPE::reference msg) {if (msg.p) msg.p->set_value(sync_call_result::NOT_APPLICABLE);});}
{can.clear(); CAN.swap(can); _send_msg(); ascs::do_something_to_all(can, [](typename CANTYPE::reference msg) {if (msg.p) msg.p->set_value(sync_call_result::NOT_APPLICABLE);});}
///////////////////////////////////////////////////
//TCP msg sending interface
......
......@@ -810,7 +810,7 @@
* SPECIAL ATTENTION (incompatible with old editions):
* Graceful shutdown does not support sync mode anymore.
* Introduce memory fence to synchronize socket's status -- sending and reading, dispatch_strand is not enough, except post_strand,
* but dispatch_strand is more efficient than post_strand.
* but dispatch_strand is more efficient than post_strand in specific circumstances.
*
* HIGHLIGHT:
* Make shutdown thread safe.
......@@ -819,12 +819,20 @@
* Fix alias for tcp and ssl.
* Fix -- in Windows, a TCP client must explicitly specify a full IP address (not only the port) to connect to.
* Fix -- reliable UDP sometimes cannot send messages successfully.
* Fix -- in extreme circumstances, following functions may cause messages leave behind in the send buffer until the next message sending:
* resume_sending, reliable UDP socket needs it, while general UDP socket doesn't.
* pop_first/all_pending_send_msg and pop_first/all_pending_recv_msg.
* socket::send_msg().
* Fix -- basic_buffer doesn't support fixed arrays (just in the constructor) in GCC and Clang.
*
* ENHANCEMENTS:
* heartbeat(ext) optimization.
* Add error check during file reading in file_server/file_client.
* Enhance basic_buffer to support more comprehensive buffers.
* Add ability to change the socket id if it's not managed by object_pool nor single_socket_service.
* ascs needs the container's empty() function (used by the queue for message sending and receiving) to be thread safe (here, thread safe does not means
* correctness, but just no memory access violation), almost all implementations of list::empty() in the world are thread safe, but not for list::size().
* if your container's empty() function is not thread safe, please define macro ASCS_CAN_EMPTY_NOT_SAFE, then ascs will make it thread safe for you.
*
* DELETION:
*
......@@ -1105,6 +1113,10 @@ static_assert(ASCS_RELIABLE_UDP_NSND_QUE >= 0, "kcp send queue must be bigger th
//'server_socket_base', 'ssl::client_socket_base' and 'ssl::server_socket_base'.
//we even can let a socket to use different queue (and / or different container) for input and output via template parameters.
//if your container's empty() function (used by the queue for message sending and receiving) is not thread safe, please define this macro,
// then ascs will make it thread safe for you.
//#define ASCS_CAN_EMPTY_NOT_SAFE
//buffer type used when receiving messages (unpacker's prepare_next_recv() need to return this type)
#ifndef ASCS_RECV_BUFFER_TYPE
#if ASIO_VERSION > 101100
......
......@@ -68,7 +68,6 @@ public:
using typename Container::reference;
using typename Container::const_reference;
//since some implementations (such as gcc before 5.0) of std::list::size() have linear complexity, we don't expose this size() function
//even this function is thread safe, if you rely it, you may still need a memory fence as we do for the empty() function.
//using Container::size;
queue() : total_size(0) {}
......@@ -76,9 +75,11 @@ public:
//thread safe
bool is_thread_safe() const {return Lockable::is_lockable();}
size_t size_in_byte() const {return total_size;}
//almost all implementations of list::empty() in the world are thread safe (not means correctness, but just no memory access violation),
// but we need a memory fence to synchronize memory with IO threads, so we use a lock here.
#ifdef ASCS_CAN_EMPTY_NOT_SAFE //container's empty() function is not thread safe
bool empty() {typename Lockable::lock_guard lock(*this); return Container::empty();}
#else
using Container::empty; //almost all implementations of list::empty() in the world are thread safe (not means correctness, but just no memory access violation)
#endif
void clear() {typename Lockable::lock_guard lock(*this); Container::clear(); total_size = 0;}
void swap(Container& can)
{
......
......@@ -68,6 +68,8 @@ public:
basic_buffer(char* buff) {do_detach(); operator+=(buff);}
basic_buffer(const char* buff) {do_detach(); operator+=(buff);}
basic_buffer(const char* buff, size_t len) {do_detach(); append(buff, len);}
template<size_t size> basic_buffer(char(&buff)[size], size_t len) {do_detach(); append(buff, len);}
template<size_t size> basic_buffer(const char(&buff)[size], size_t len) {do_detach(); append(buff, len);}
basic_buffer(std::initializer_list<char> ilist) {do_detach(); operator+=(ilist);}
basic_buffer(const basic_buffer& other) {do_detach(); append(other);}
template<typename Buff> basic_buffer(const Buff& other) {do_detach(); append(other);}
......@@ -114,7 +116,7 @@ public:
basic_buffer& append(std::initializer_list<char> ilist) {return append(std::begin(ilist), ilist.size());}
template<typename Buff> basic_buffer& append(const Buff& other) {return append(other.data(), other.size());}
basic_buffer& append(char* buff) {return append(buff, strlen(buff));}
basic_buffer& append(char* buff) {return append((const char*) buff);}
basic_buffer& append(const char* buff) {return append(buff, strlen(buff));}
basic_buffer& append(const char* _buff, size_t _len)
{
......@@ -132,6 +134,8 @@ public:
return *this;
}
template<size_t size> basic_buffer& append(char(&buff)[size], size_t len) {return append((const char*) buff, std::min(size, len));}
template<size_t size> basic_buffer & append(const char(&buff)[size], size_t len) {return append((const char*) buff, std::min(size, len));}
//nonstandard function append2 -- delete the last character if it's '\0' before appending another string.
//this feature makes basic_buffer to be able to works as std::string, which will append '\0' automatically.
......
......@@ -178,9 +178,9 @@ public:
protected:
#endif
#ifdef ASCS_ARBITRARY_SEND
void send_msg() {dispatch_in_io_strand([this]() {this->do_send_msg();});}
void send_msg() {_send_msg();}
#else
void send_msg() {if (is_ready() && !is_sending()) dispatch_in_io_strand([this]() {this->do_send_msg();});}
void send_msg() {if (is_ready() && 1 != sending.load(std::memory_order_acquire)) _send_msg();} //here we cannot use is_sending(), because we need memory fence
#endif
public:
......@@ -208,7 +208,7 @@ public:
return false;
#ifndef ASCS_ALWAYS_SEND_HEARTBEAT
if (!is_sending() && now - stat.last_send_time >= interval) //don't need to send heartbeat if we're sending messages
if (now - stat.last_send_time >= interval && !is_sending()) //don't need to send heartbeat if we're sending messages
#endif
send_heartbeat();
}
......@@ -624,6 +624,8 @@ private:
void reset_next_layer(const asio::any_io_executor& executor, Arg&& arg) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(executor, std::forward<Arg>(arg));}
#endif
void _send_msg() {dispatch_in_io_strand([this]() {this->do_send_msg();});}
#ifdef ASCS_SYNC_RECV
sync_call_result sync_recv_waiting(std::unique_lock<std::mutex>& lock, unsigned duration)
{
......
......@@ -380,7 +380,14 @@ private:
virtual bool do_send_msg(bool in_strand = false)
{
if (!in_strand && this->test_and_set_sending())
if (send_buffer.empty()) //without this, in extreme circumstances, messages can leave behind in the send buffer until the next message sending
{
if (in_strand)
this->clear_sending();
return false;
}
else if (!in_strand && this->test_and_set_sending())
return true;
auto end_time = statistic::now();
......
......@@ -327,7 +327,14 @@ private:
virtual bool do_send_msg(bool in_strand = false)
{
if (!in_strand && this->test_and_set_sending())
if (send_buffer.empty()) //without this, in extreme circumstances, messages can leave behind in the send buffer until the next message sending
{
if (in_strand)
this->clear_sending();
return false;
}
else if (!in_strand && this->test_and_set_sending())
return true;
else if (is_connected && !check_send_cc())
;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册