提交 b2a82702 编写于 作者: Y YangLi

Use future instead of condition_variable when doing sync message sending.

上级 7016c8a8
......@@ -62,16 +62,16 @@ std::thread create_sync_recv_thread(single_client& client)
{
return std::thread([&client]() {
typename ASCS_DEFAULT_UNPACKER::container_type msg_can;
single_client::sync_call_result re = single_client::sync_call_result::SUCCESS;
sync_call_result re = sync_call_result::SUCCESS;
do
{
re = client.sync_recv_msg(msg_can, 50); //ascs will not maintain messages in msg_can anymore after sync_recv_msg return, please note.
if (single_client::sync_call_result::SUCCESS == re)
if (sync_call_result::SUCCESS == re)
{
do_something_to_all(msg_can, [](single_client::out_msg_type& msg) {printf("sync recv(" ASCS_SF ") : %s\n", msg.size(), msg.data());});
msg_can.clear(); //sync_recv_msg just append new message(s) to msg_can, please note.
}
} while (single_client::sync_call_result::SUCCESS == re || single_client::sync_call_result::TIMEOUT == re);
} while (sync_call_result::SUCCESS == re || sync_call_result::TIMEOUT == re);
puts("sync recv end.");
});
}
......
......@@ -85,7 +85,7 @@
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>C:\Users\wolf\Documents\GitHub\concurrentqueue\;C:\Users\wolf\Documents\GitHub\asio\asio\include\;C:\Users\wolf\Documents\GitHub\ascs\include\;$(IncludePath)</IncludePath>
<IncludePath>C:\Users\wolf\Documents\GitHub\asio\asio\include\;C:\Users\wolf\Documents\GitHub\ascs\include\;$(IncludePath)</IncludePath>
<LibraryPath>$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
......
......@@ -27,16 +27,16 @@ std::thread create_sync_recv_thread(single_service& service)
{
return std::thread([&service]() {
std::list<single_service::out_msg_type> msg_can;
single_service::sync_call_result re = single_service::sync_call_result::SUCCESS;
sync_call_result re = sync_call_result::SUCCESS;
do
{
re = service.sync_recv_msg(msg_can, 50); //ascs will not maintain messages in msg_can anymore after sync_recv_msg return, please note.
if (single_service::sync_call_result::SUCCESS == re)
if (sync_call_result::SUCCESS == re)
{
do_something_to_all(msg_can, [](single_service::out_msg_type& msg) {printf("sync recv(" ASCS_SF ") : %s\n", msg.size(), msg.data());});
msg_can.clear(); //sync_recv_msg just append new message(s) to msg_can, please note.
}
} while (single_service::sync_call_result::SUCCESS == re || single_service::sync_call_result::TIMEOUT == re);
} while (sync_call_result::SUCCESS == re || sync_call_result::TIMEOUT == re);
puts("sync recv end.");
});
}
......
......@@ -30,7 +30,9 @@
#include <atomic>
#include <sstream>
#include <iomanip>
#if defined(ASCS_SYNC_SEND) || defined(ASCS_SYNC_RECV)
#ifdef ASCS_SYNC_SEND
#include <future>
#elif defined(ASCS_SYNC_RECV)
#include <condition_variable>
#endif
......@@ -367,6 +369,8 @@ private:
statistic::stat_duration& duration;
};
enum sync_call_result {SUCCESS, NOT_APPLICABLE, DUPLICATE, TIMEOUT};
template<typename T> struct obj_with_begin_time : public T
{
obj_with_begin_time() {}
......@@ -384,26 +388,20 @@ template<typename T> struct obj_with_begin_time : public T
};
#ifdef ASCS_SYNC_SEND
struct condition_variable : public std::condition_variable
{
bool signaled;
condition_variable() : signaled(false) {}
};
template<typename T> struct obj_with_begin_time_cv : public obj_with_begin_time<T>
template<typename T> struct obj_with_begin_time_promise : public obj_with_begin_time<T>
{
obj_with_begin_time_cv(bool need_cv = false) {check_and_create_cv(need_cv);}
obj_with_begin_time_cv(T&& obj, bool need_cv = false) : obj_with_begin_time<T>(std::move(obj)) {check_and_create_cv(need_cv);}
obj_with_begin_time_cv(obj_with_begin_time_cv&& other) : obj_with_begin_time<T>(std::move(other)), cv(std::move(other.cv)) {}
obj_with_begin_time_cv& operator=(obj_with_begin_time_cv&& other) {obj_with_begin_time<T>::operator=(std::move(other)); cv = std::move(other.cv); return *this;}
obj_with_begin_time_promise(bool need_promise = false) {check_and_create_promise(need_promise);}
obj_with_begin_time_promise(T&& obj, bool need_promise = false) : obj_with_begin_time<T>(std::move(obj)) {check_and_create_promise(need_promise);}
obj_with_begin_time_promise(obj_with_begin_time_promise&& other) : obj_with_begin_time<T>(std::move(other)), p(std::move(other.p)) {}
obj_with_begin_time_promise& operator=(obj_with_begin_time_promise&& other) {obj_with_begin_time<T>::operator=(std::move(other)); p = std::move(other.p); return *this;}
void swap(T& obj, bool need_cv = false) {obj_with_begin_time<T>::swap(obj); check_and_create_cv(need_cv);}
void swap(obj_with_begin_time_cv& other) {obj_with_begin_time<T>::swap(other); cv.swap(other.cv);}
void swap(T& obj, bool need_promise = false) {obj_with_begin_time<T>::swap(obj); check_and_create_promise(need_promise);}
void swap(obj_with_begin_time_promise& other) {obj_with_begin_time<T>::swap(other); p.swap(other.p);}
void clear() {cv.reset(); T::clear();}
void check_and_create_cv(bool need_cv) {if (!need_cv) cv.reset(); else if (!cv) cv = std::make_shared<condition_variable>();}
void clear() {p.reset(); T::clear();}
void check_and_create_promise(bool need_promise) {if (!need_promise) p.reset(); else if (!p) p = std::make_shared<std::promise<sync_call_result>>();}
std::shared_ptr<condition_variable> cv;
std::shared_ptr<std::promise<sync_call_result>> p;
};
#endif
......@@ -455,10 +453,11 @@ template<typename _Predicate> void NAME(const _Predicate& __pred) const {for (au
#define GET_PENDING_MSG_NUM(FUNNAME, CAN) size_t FUNNAME() const {return CAN.size();}
#define POP_FIRST_PENDING_MSG(FUNNAME, CAN, MSGTYPE) void FUNNAME(MSGTYPE& msg) {msg.clear(); CAN.try_dequeue(msg);}
#define POP_FIRST_PENDING_MSG_CV(FUNNAME, CAN, MSGTYPE) void FUNNAME(MSGTYPE& msg) {msg.clear(); if (CAN.try_dequeue(msg) && msg.cv) msg.cv->notify_all();}
#define POP_FIRST_PENDING_MSG_NOTIFY(FUNNAME, CAN, MSGTYPE) void FUNNAME(MSGTYPE& msg) \
{msg.clear(); if (CAN.try_dequeue(msg) && msg.p) msg.p->set_value(sync_call_result::NOT_APPLICABLE);}
#define POP_ALL_PENDING_MSG(FUNNAME, CAN, CANTYPE) void FUNNAME(CANTYPE& can) {can.clear(); CAN.swap(can);}
#define POP_ALL_PENDING_MSG_CV(FUNNAME, CAN, CANTYPE) void FUNNAME(CANTYPE& can) { \
can.clear(); CAN.swap(can); ascs::do_something_to_all(can, [](decltype(can.front()) msg) {if (msg.cv) msg.cv->notify_all();});}
#define POP_ALL_PENDING_MSG_NOTIFY(FUNNAME, CAN, CANTYPE) void FUNNAME(CANTYPE& can) \
{can.clear(); CAN.swap(can); ascs::do_something_to_all(can, [](decltype(can.front()) msg) {if (msg.p) msg.p->set_value(sync_call_result::NOT_APPLICABLE);});}
///////////////////////////////////////////////////
//TCP msg sending interface
......@@ -501,24 +500,24 @@ template<typename Buffer> TYPE FUNNAME(const Buffer& buffer, unsigned duration =
{return FUNNAME(buffer.data(), buffer.size(), duration, can_overflow);}
#define TCP_SYNC_SEND_MSG(FUNNAME, NATIVE, SEND_FUNNAME) \
typename super::sync_call_result FUNNAME(const char* const pstr[], const size_t len[], size_t num, unsigned duration = 0, bool can_overflow = false) \
sync_call_result FUNNAME(const char* const pstr[], const size_t len[], size_t num, unsigned duration = 0, bool can_overflow = false) \
{ \
if (!can_overflow && !this->is_send_buffer_available()) \
return super::sync_call_result::NOT_APPLICABLE; \
return sync_call_result::NOT_APPLICABLE; \
auto_duration dur(this->stat.pack_time_sum); \
auto msg = this->packer_->pack_msg(pstr, len, num, NATIVE); \
dur.end(); \
return this->SEND_FUNNAME(std::move(msg), duration); \
} \
TCP_SYNC_SEND_MSG_CALL_SWITCH(FUNNAME, typename super::sync_call_result)
TCP_SYNC_SEND_MSG_CALL_SWITCH(FUNNAME, sync_call_result)
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into tcp::socket_base's send buffer successfully
//if can_overflow equal to false and the buffer is not available, will wait until it becomes available
#define TCP_SYNC_SAFE_SEND_MSG(FUNNAME, SEND_FUNNAME) \
typename super::sync_call_result FUNNAME(const char* const pstr[], const size_t len[], size_t num, unsigned duration = 0, bool can_overflow = false) \
{while (super::sync_call_result::SUCCESS != SEND_FUNNAME(pstr, len, num, duration, can_overflow)) \
SAFE_SEND_MSG_CHECK(super::sync_call_result::NOT_APPLICABLE) return super::sync_call_result::SUCCESS;} \
TCP_SYNC_SEND_MSG_CALL_SWITCH(FUNNAME, typename super::sync_call_result)
sync_call_result FUNNAME(const char* const pstr[], const size_t len[], size_t num, unsigned duration = 0, bool can_overflow = false) \
{while (sync_call_result::SUCCESS != SEND_FUNNAME(pstr, len, num, duration, can_overflow)) \
SAFE_SEND_MSG_CHECK(sync_call_result::NOT_APPLICABLE) return sync_call_result::SUCCESS;} \
TCP_SYNC_SEND_MSG_CALL_SWITCH(FUNNAME, sync_call_result)
//TCP sync msg sending interface
///////////////////////////////////////////////////
#endif
......@@ -565,28 +564,28 @@ template<typename Buffer> TYPE FUNNAME(const asio::ip::udp::endpoint& peer_addr,
{return FUNNAME(peer_addr, buffer.data(), buffer.size(), duration, can_overflow);}
#define UDP_SYNC_SEND_MSG(FUNNAME, NATIVE, SEND_FUNNAME) \
typename super::sync_call_result FUNNAME(const char* const pstr[], const size_t len[], size_t num, unsigned duration = 0, bool can_overflow = false) \
sync_call_result FUNNAME(const char* const pstr[], const size_t len[], size_t num, unsigned duration = 0, bool can_overflow = false) \
{return FUNNAME(peer_addr, pstr, len, num, duration, can_overflow);} \
typename super::sync_call_result FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, \
sync_call_result FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, \
unsigned duration = 0, bool can_overflow = false) \
{ \
if (!can_overflow && !this->is_send_buffer_available()) \
return super::sync_call_result::NOT_APPLICABLE; \
return sync_call_result::NOT_APPLICABLE; \
in_msg_type msg(peer_addr, this->packer_->pack_msg(pstr, len, num, NATIVE)); \
return this->SEND_FUNNAME(std::move(msg), duration); \
} \
UDP_SYNC_SEND_MSG_CALL_SWITCH(FUNNAME, typename super::sync_call_result)
UDP_SYNC_SEND_MSG_CALL_SWITCH(FUNNAME, sync_call_result)
//guarantee send msg successfully even if can_overflow equal to false, success at here just means putting the msg into udp::socket_base's send buffer successfully
//if can_overflow equal to false and the buffer is not available, will wait until it becomes available
#define UDP_SYNC_SAFE_SEND_MSG(FUNNAME, SEND_FUNNAME) \
typename super::sync_call_result FUNNAME(const char* const pstr[], const size_t len[], size_t num, unsigned duration = 0, bool can_overflow = false) \
sync_call_result FUNNAME(const char* const pstr[], const size_t len[], size_t num, unsigned duration = 0, bool can_overflow = false) \
{return FUNNAME(peer_addr, pstr, len, num, duration, can_overflow);} \
typename super::sync_call_result FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, \
sync_call_result FUNNAME(const asio::ip::udp::endpoint& peer_addr, const char* const pstr[], const size_t len[], size_t num, \
unsigned duration = 0, bool can_overflow = false) \
{while (super::sync_call_result::SUCCESS != SEND_FUNNAME(peer_addr, pstr, len, num, duration, can_overflow)) \
SAFE_SEND_MSG_CHECK(super::sync_call_result::NOT_APPLICABLE) return super::sync_call_result::SUCCESS;} \
UDP_SYNC_SEND_MSG_CALL_SWITCH(FUNNAME, typename super::sync_call_result)
{while (sync_call_result::SUCCESS != SEND_FUNNAME(peer_addr, pstr, len, num, duration, can_overflow)) \
SAFE_SEND_MSG_CHECK(sync_call_result::NOT_APPLICABLE) return sync_call_result::SUCCESS;} \
UDP_SYNC_SEND_MSG_CALL_SWITCH(FUNNAME, sync_call_result)
//UDP sync msg sending interface
///////////////////////////////////////////////////
#endif
......
......@@ -709,6 +709,9 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus
//if you don't define this macro, the next callback will be called at (xx:xx:xx + 21), plase note.
//#define ASCS_SYNC_SEND
#ifdef ASCS_SYNC_SEND
static_assert(ASIO_HAS_STD_FUTURE == 1, "sync message sending needs std::future.");
#endif
//#define ASCS_SYNC_RECV
//define these macro to gain additional series of sync message sending and receiving, they are:
// sync_send_msg
......
......@@ -159,13 +159,13 @@ private:
//Container must at least has the following functions (like list):
// Container() and Container(size_t) constructor
// size (must be thread safe, but doesn't have to be coherent, std::list before gcc 5 doesn't meet this requirement, ascs::list does)
// empty (must be thread safe, but doesn't have to be coherent)
// size (must be thread safe, but doesn't have to be consistent, std::list before gcc 5 doesn't meet this requirement, ascs::list does)
// empty (must be thread safe, but doesn't have to be consistent)
// clear
// swap
// emplace_back(const T& item)
// emplace_back(T&& item)
// splice(Container::const_iterator, std::list<T>&), after this, std::list<T> must be empty
// splice(decltype(Container::end()), std::list<T>&), after this, std::list<T> must be empty
// front
// pop_front
// end
......
......@@ -101,7 +101,7 @@ protected:
public:
#ifdef ASCS_SYNC_SEND
typedef obj_with_begin_time_cv<InMsgType> in_msg;
typedef obj_with_begin_time_promise<InMsgType> in_msg;
#else
typedef obj_with_begin_time<InMsgType> in_msg;
#endif
......@@ -111,8 +111,6 @@ public:
typedef InQueue<in_msg, in_container_type> in_queue_type;
typedef OutQueue<out_msg, out_container_type> out_queue_type;
enum sync_call_result {SUCCESS, NOT_APPLICABLE, DUPLICATE, TIMEOUT};
uint_fast64_t id() const {return _id;}
bool is_equal_to(uint_fast64_t id) const {return _id == id;}
......@@ -244,8 +242,8 @@ public:
GET_PENDING_MSG_NUM(get_pending_recv_msg_num, recv_msg_buffer)
#ifdef ASCS_SYNC_SEND
POP_FIRST_PENDING_MSG_CV(pop_first_pending_send_msg, send_msg_buffer, in_msg)
POP_ALL_PENDING_MSG_CV(pop_all_pending_send_msg, send_msg_buffer, in_container_type)
POP_FIRST_PENDING_MSG_NOTIFY(pop_first_pending_send_msg, send_msg_buffer, in_msg)
POP_ALL_PENDING_MSG_NOTIFY(pop_all_pending_send_msg, send_msg_buffer, in_container_type)
#else
POP_FIRST_PENDING_MSG(pop_first_pending_send_msg, send_msg_buffer, in_msg)
POP_ALL_PENDING_MSG(pop_all_pending_send_msg, send_msg_buffer, in_container_type)
......@@ -442,13 +440,12 @@ protected:
}
auto unused = in_msg(std::move(msg), true);
auto cv = unused.cv;
auto f = unused.p->get_future();
send_msg_buffer.enqueue(std::move(unused));
if (!sending && is_ready())
send_msg();
std::unique_lock<std::mutex> lock(sync_send_mutex);
return sync_send_waiting(lock, cv, duration);
return 0 == duration || std::future_status::ready == f.wait_for(std::chrono::milliseconds(duration)) ? f.get() : sync_call_result::TIMEOUT;
}
#endif
......@@ -474,19 +471,6 @@ private:
}
#endif
#ifdef ASCS_SYNC_SEND
sync_call_result sync_send_waiting(std::unique_lock<std::mutex>& lock, const std::shared_ptr<condition_variable>& cv, unsigned duration)
{
auto pred = [this, &cv]() {return !this->started_ || cv->signaled;};
if (0 == duration)
cv->wait(lock, std::move(pred));
else if (!cv->wait_for(lock, std::chrono::milliseconds(duration), std::move(pred)))
return sync_call_result::TIMEOUT;
return cv->signaled ? sync_call_result::SUCCESS : sync_call_result::NOT_APPLICABLE;
}
#endif
bool check_receiving(bool raise_recv)
{
if (is_recv_buffer_available())
......@@ -642,10 +626,6 @@ private:
std::atomic_flag start_atomic;
asio::io_context::strand strand;
#ifdef ASCS_SYNC_SEND
std::mutex sync_send_mutex;
#endif
#ifdef ASCS_SYNC_RECV
enum sync_recv_status {NOT_REQUESTED, REQUESTED, RESPONDED};
sync_recv_status sr_status;
......
......@@ -180,7 +180,8 @@ protected:
}
#ifdef ASCS_SYNC_SEND
virtual void on_close() {ascs::do_something_to_all(last_send_msg, [](typename super::in_msg& msg) {if (msg.cv) msg.cv->notify_all();}); super::on_close();}
virtual void on_close() {ascs::do_something_to_all(last_send_msg,
[](typename super::in_msg& msg) {if (msg.p) msg.p->set_value(sync_call_result::NOT_APPLICABLE);}); super::on_close();}
#endif
virtual void on_connect() {}
......@@ -309,7 +310,7 @@ private:
stat.send_time_sum += statistic::now() - last_send_msg.front().begin_time;
stat.send_msg_sum += last_send_msg.size();
#ifdef ASCS_SYNC_SEND
ascs::do_something_to_all(last_send_msg, [](typename super::in_msg& item) {if (item.cv) {item.cv->signaled = true; item.cv->notify_one();}});
ascs::do_something_to_all(last_send_msg, [](typename super::in_msg& item) {if (item.p) {item.p->set_value(sync_call_result::SUCCESS);}});
#endif
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
this->on_msg_send(last_send_msg.front());
......
......@@ -136,7 +136,7 @@ protected:
}
#ifdef ASCS_SYNC_SEND
virtual void on_close() {if (last_send_msg.cv) last_send_msg.cv->notify_all(); super::on_close();}
virtual void on_close() {if (last_send_msg.p) last_send_msg.p->set_value(sync_call_result::NOT_APPLICABLE); super::on_close();}
#endif
private:
......@@ -238,11 +238,8 @@ private:
stat.send_time_sum += statistic::now() - last_send_msg.begin_time;
++stat.send_msg_sum;
#ifdef ASCS_SYNC_SEND
if (last_send_msg.cv)
{
last_send_msg.cv->signaled = true;
last_send_msg.cv->notify_one();
}
if (last_send_msg.p)
last_send_msg.p->set_value(sync_call_result::SUCCESS);
#endif
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
this->on_msg_send(last_send_msg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册