提交 6251fb2d 编写于 作者: Y youngowlf 提交者: youngwolf

1.1.5 release.

Support heartbeat (via OOB data), see ASCS_HEARTBEAT_INTERVAL macro for more details.
Support scatter-gather buffers when receiving messages, this feature needs modification of i_unpacker, you must explicitly define
 ASCS_SCATTERED_RECV_BUFFER macro to open it, this is just for compatibility.
Simplify lock-free mechanism and use std::atomic_flag instead of std::atomic_size_t.
Optimize container insertion (use series of emplace functions instead).
Demo echo_client support alterable number of sending thread (before, it's a hard code 16).
Fix bug: In extreme cases, messages may get starved in receive buffer and will not be dispatched until arrival of next message.
Fix bug: In extreme cases, messages may get starved in send buffer and will not be sent until arrival of next message.
Fix bug: Sometimes, connector_base cannot reconnect to the server after link broken.

known issues:
1. heartbeat mechanism cannot work properly between windows (at least win-10) and Ubuntu (at least Ubuntu-16.04).
上级 e3a9f3f0
......@@ -8,6 +8,7 @@
//any value which is bigger than zero is okay.
#define ASCS_FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define ASCS_CUSTOM_LOG
#define ASCS_HEARTBEAT_INTERVAL 0 //disable heartbeat just because echo_server disabled heartbeat.
#define ASCS_DEFAULT_UNPACKER non_copy_unpacker
//#define ASCS_DEFAULT_UNPACKER stream_unpacker
......
......@@ -14,14 +14,22 @@ else
dir = release
endif
cflag += -DASIO_STANDALONE
# If your compiler detected duplicated 'shared_mutex' definition, please define ASCS_HAS_STD_SHARED_MUTEX macro:
#cflag += -DASCS_HAS_STD_SHARED_MUTEX
# If you used concurrent queue (https://github.com/cameron314/concurrentqueue), please define ASCS_HAS_CONCURRENT_QUEUE macro:
#cflag += -DASCS_HAS_CONCURRENT_QUEUE
# And guarantee header file concurrentqueue.h is reachable, for example, add its path to ext_location:
#ext_location += -I/path of concurrent queue/
kernel = ${shell uname -s}
ifeq (${kernel}, SunOS)
cflag += -pthreads ${ext_cflag} ${ext_location} -I../../include/
lflag += -pthreads -lsocket -lnsl ${ext_libs}
else
cflag += -pthread ${ext_cflag} ${ext_location} -I../../include/
lflag += -pthread ${ext_libs}
endif
target = ${dir}/${module}
sources = ${shell ls *.cpp}
......
......@@ -13,7 +13,11 @@
#define ASCS_INPUT_QUEUE non_lock_queue //we will never operate sending buffer concurrently, so need no locks
#define ASCS_INPUT_CONTAINER list
#endif
//configuration
#define ASCS_HEARTBEAT_INTERVAL 0 //disable heartbeat when doing performance test
//#define ASCS_MAX_MSG_NUM 16
//if there's a huge number of links, please reduce messge buffer via ASCS_MAX_MSG_NUM macro.
//please think about if we have 512 links, how much memory we can accupy at most with default ASCS_MAX_MSG_NUM?
//it's 2 * 1024 * 1024 * 512 = 1G
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 0
......@@ -32,6 +36,7 @@
#define ASCS_DEFAULT_PACKER prefix_suffix_packer
#define ASCS_DEFAULT_UNPACKER prefix_suffix_unpacker
#endif
//configuration
#include <ascs/ext/tcp.h>
using namespace ascs;
......@@ -63,8 +68,8 @@ static bool check_msg;
// for sender, send msgs in on_msg_send() or use sending buffer limitation (like safe_send_msg(..., false)),
// but must not in service threads, please note.
//
//2. for sender, if responses are available (like pingpong test), send msgs in on_msg()/on_msg_handle().
// this will reduce IO throughput, because SOCKET's sliding window is not fully used, pleae note.
//2. for sender, if responses are available (like pingpong test), send msgs in on_msg()/on_msg_handle(),
// but this will reduce IO throughput because SOCKET's sliding window is not fully used, pleae note.
//
//test_client chose method #1
......@@ -281,15 +286,16 @@ void send_msg_randomly(echo_client& client, size_t msg_num, size_t msg_len, char
printf("speed: %.0f(*2)kB/s.\n", total_msg_bytes / begin_time.elapsed() / 1024);
}
//use up to 16 (hard code) worker threads to send messages concurrently
void send_msg_concurrently(echo_client& client, size_t msg_num, size_t msg_len, char msg_fill)
//use up to a specific worker threads to send messages concurrently
void send_msg_concurrently(echo_client& client, size_t send_thread_num, size_t msg_num, size_t msg_len, char msg_fill)
{
check_msg = true;
auto link_num = client.size();
auto group_num = std::min((size_t) 16, link_num);
auto group_num = std::min(send_thread_num, link_num);
auto group_link_num = link_num / group_num;
auto left_link_num = link_num - group_num * group_link_num;
uint64_t total_msg_bytes = msg_num * msg_len * link_num;
uint64_t total_msg_bytes = link_num * msg_len;
total_msg_bytes *= msg_num;
auto group_index = (size_t) -1;
size_t this_group_link_num = 0;
......@@ -309,13 +315,13 @@ void send_msg_concurrently(echo_client& client, size_t msg_num, size_t msg_len,
}
--this_group_link_num;
link_groups[group_index].push_back(item);
link_groups[group_index].emplace_back(item);
});
cpu_timer begin_time;
std::list<std::thread> threads;
do_something_to_all(link_groups, [&threads, msg_num, msg_len, msg_fill](const auto& item) {
threads.push_back(std::thread([&item, msg_num, msg_len, msg_fill]() {
threads.emplace_back([&item, msg_num, msg_len, msg_fill]() {
auto buff = new char[msg_len];
memset(buff, msg_fill, msg_len);
for (size_t i = 0; i < msg_num; ++i)
......@@ -326,7 +332,7 @@ void send_msg_concurrently(echo_client& client, size_t msg_num, size_t msg_len,
do_something_to_all(item, [buff, msg_len](const auto& item2) {item2->safe_send_msg(buff, msg_len);}); //can_overflow is false, it's important
}
delete[] buff;
}));
});
});
unsigned percent = 0;
......@@ -342,17 +348,16 @@ void send_msg_concurrently(echo_client& client, size_t msg_num, size_t msg_len,
fflush(stdout);
}
} while (100 != percent);
do_something_to_all(threads, [](auto& t) {t.join();});
begin_time.stop();
printf("\r100%%\ntime spent statistics: %f seconds.\n", begin_time.elapsed());
printf("speed: %.0f(*2)kB/s.\n", total_msg_bytes / begin_time.elapsed() / 1024);
do_something_to_all(threads, [](auto& t) {t.join();});
}
int main(int argc, const char* argv[])
{
printf("usage: %s [<service thread number=1> [<port=%d> [<ip=%s> [link num=16]]]]\n", argv[0], ASCS_SERVER_PORT, ASCS_SERVER_IP);
printf("usage: %s [<service thread number=1> [<send thread number=8> [<port=%d> [<ip=%s> [link num=16]]]]]\n", argv[0], ASCS_SERVER_PORT, ASCS_SERVER_IP);
if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h")))
return 0;
else
......@@ -360,8 +365,8 @@ int main(int argc, const char* argv[])
///////////////////////////////////////////////////////////
size_t link_num = 16;
if (argc > 4)
link_num = std::min(ASCS_MAX_OBJECT_NUM, std::max(atoi(argv[4]), 1));
if (argc > 5)
link_num = std::min(ASCS_MAX_OBJECT_NUM, std::max(atoi(argv[5]), 1));
printf("exec: %s with " ASCS_SF " links\n", argv[0], link_num);
///////////////////////////////////////////////////////////
......@@ -371,10 +376,10 @@ int main(int argc, const char* argv[])
//echo client means to cooperate with echo server while doing performance test, it will not send msgs back as echo server does,
//otherwise, dead loop will occur, network resource will be exhausted.
// argv[2] = "::1" //ipv6
// argv[2] = "127.0.0.1" //ipv4
unsigned short port = argc > 2 ? atoi(argv[2]) : ASCS_SERVER_PORT;
std::string ip = argc > 3 ? argv[3] : ASCS_SERVER_IP;
// argv[4] = "::1" //ipv6
// argv[4] = "127.0.0.1" //ipv4
std::string ip = argc > 4 ? argv[4] : ASCS_SERVER_IP;
unsigned short port = argc > 3 ? atoi(argv[3]) : ASCS_SERVER_PORT;
//method #1, create and add clients manually.
auto client_ptr = client.create_object();
......@@ -386,12 +391,16 @@ int main(int argc, const char* argv[])
//method #2, add clients first without any arguments, then set the server address.
for (size_t i = 1; i < link_num / 2; ++i)
client.add_client();
client.do_something_to_all([argv, port, &ip](const auto& item) {item->set_server_addr(port, ip);});
client.do_something_to_all([port, &ip](const auto& item) {item->set_server_addr(port, ip);});
//method #3, add clients and set server address in one invocation.
for (auto i = std::max((size_t) 1, link_num / 2); i < link_num; ++i)
client.add_client(port, ip);
size_t send_thread_num = 8;
if (argc > 2)
send_thread_num = (size_t) std::max(1, std::min(16, atoi(argv[2])));
auto thread_num = 1;
if (argc > 1)
thread_num = std::min(16, std::max(thread_num, atoi(argv[1])));
......@@ -462,6 +471,7 @@ int main(int argc, const char* argv[])
size_t msg_len = 1024; //must greater than or equal to sizeof(size_t)
auto msg_fill = '0';
char model = 0; //0 broadcast, 1 randomly pick one link per msg
auto repeat_times = 1;
auto parameters = split_string(str);
auto iter = std::begin(parameters);
......@@ -479,6 +489,7 @@ int main(int argc, const char* argv[])
#endif
if (iter != std::end(parameters)) msg_fill = *iter++->data();
if (iter != std::end(parameters)) model = *iter++->data() - '0';
if (iter != std::end(parameters)) repeat_times = std::max(atoi(iter++->data()), 1);
if (0 != model && 1 != model)
{
......@@ -488,19 +499,22 @@ int main(int argc, const char* argv[])
printf("test parameters after adjustment: " ASCS_SF " " ASCS_SF " %c %d\n", msg_num, msg_len, msg_fill, model);
puts("performance test begin, this application will have no response during the test!");
client.clear_status();
for (int i = 0; i < repeat_times; ++i)
{
printf("thie is the %d / %d test.\n", i + 1, repeat_times);
client.clear_status();
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
if (0 == model)
send_msg_one_by_one(client, msg_num, msg_len, msg_fill);
else
puts("if ASCS_WANT_MSG_SEND_NOTIFY defined, only support model 0!");
if (0 == model)
send_msg_one_by_one(client, msg_num, msg_len, msg_fill);
else
puts("if ASCS_WANT_MSG_SEND_NOTIFY defined, only support model 0!");
#else
if (0 == model)
send_msg_concurrently(client, msg_num, msg_len, msg_fill);
else
send_msg_randomly(client, msg_num, msg_len, msg_fill);
if (0 == model)
send_msg_concurrently(client, send_thread_num, msg_num, msg_len, msg_fill);
else
send_msg_randomly(client, msg_num, msg_len, msg_fill);
#endif
}
}
}
......
......@@ -3,13 +3,17 @@
//configuration
#define ASCS_SERVER_PORT 9527
#define ASCS_ASYNC_ACCEPT_NUM 5
#define ASCS_REUSE_OBJECT //use objects pool
//#define ASCS_FREE_OBJECT_INTERVAL 60 //it's useless if ASCS_REUSE_OBJECT macro been defined
//#define ASCS_FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define ASCS_ENHANCED_STABILITY
//#define ASCS_FULL_STATISTIC //full statistic will slightly impact efficiency
//#define ASCS_USE_STEADY_TIMER
#define ASCS_HEARTBEAT_INTERVAL 0 //disable heartbeat when doing performance test
//#define ASCS_MAX_MSG_NUM 16
//if there's a huge number of links, please reduce messge buffer via ASCS_MAX_MSG_NUM macro.
//please think about if we have 512 links, how much memory we can accupy at most with default ASCS_MAX_MSG_NUM?
//it's 2 * 1024 * 1024 * 512 = 1G
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 0
......@@ -60,8 +64,8 @@ auto global_packer(std::make_shared<ASCS_DEFAULT_PACKER>());
// for sender, send msgs in on_msg_send() or use sending buffer limitation (like safe_send_msg(..., false)),
// but must not in service threads, please note.
//
//2. for sender, if responses are available (like pingpong test), send msgs in on_msg()/on_msg_handle().
// this will reduce IO throughput, because SOCKET's sliding window is not fully used, pleae note.
//2. for sender, if responses are available (like pingpong test), send msgs in on_msg()/on_msg_handle(),
// but this will reduce IO throughput because SOCKET's sliding window is not fully used, pleae note.
//
//asio_server chose method #1
......@@ -109,9 +113,12 @@ protected:
{
auto re = send_msg(msg.data(), msg.size());
if (!re)
congestion_control(true);
{
//cannot handle (send it back) this msg timely, begin congestion control
//'msg' will be put into receiving buffer, and be dispatched via on_msg_handle() in the future
congestion_control(true);
//unified_out::warning_out("open congestion control."); //too many prompts will affect efficiency
}
return re;
}
......@@ -120,9 +127,12 @@ protected:
{
auto re = send_msg(msg.data(), msg.size());
if (re)
congestion_control(false);
{
//successfully handled the only one msg in receiving buffer, end congestion control
//subsequent msgs will be dispatched via on_msg() again.
congestion_control(false);
//unified_out::warning_out("close congestion control."); //too many prompts will affect efficiency
}
return re;
}
......
......@@ -3,7 +3,6 @@
//configuration
#define ASCS_SERVER_PORT 5050
#define ASCS_ASYNC_ACCEPT_NUM 5
#define ASCS_CLEAR_OBJECT_INTERVAL 60
#define ASCS_ENHANCED_STABILITY
#define ASCS_WANT_MSG_SEND_NOTIFY
......
//configuration
#define ASCS_SERVER_PORT 5050
#define ASCS_ASYNC_ACCEPT_NUM 5
#define ASCS_CLEAR_OBJECT_INTERVAL 60
#define ASCS_ENHANCED_STABILITY
#define ASCS_WANT_MSG_SEND_NOTIFY
......
......@@ -102,7 +102,7 @@ public:
}
if (0 == _data_len)
msg_can.resize(msg_can.size() + 1);
msg_can.emplace_back();
return true;
}
......
......@@ -10,6 +10,7 @@
#define ASCS_MSG_BUFFER_SIZE 65536
#define ASCS_INPUT_QUEUE non_lock_queue //we will never operate sending buffer concurrently, so need no locks
#define ASCS_INPUT_CONTAINER list
#define ASCS_HEARTBEAT_INTERVAL 0 //disable heartbeat when doing performance test
#define ASCS_DEFAULT_UNPACKER stream_unpacker //non-protocol
//configuration
......@@ -40,8 +41,8 @@ std::atomic_ushort completed_session_num;
// for sender, send msgs in on_msg_send() or use sending buffer limitation (like safe_send_msg(..., false)),
// but must not in service threads, please note.
//
//2. for sender, if responses are available (like pingpong test), send msgs in on_msg()/on_msg_handle().
// this will reduce IO throughput because, SOCKET's sliding window is not fully used, pleae note.
//2. for sender, if responses are available (like pingpong test), send msgs in on_msg()/on_msg_handle(),
// but this will reduce IO throughput because SOCKET's sliding window is not fully used, pleae note.
//
//pingpong_client will choose method #1 if defined ASCS_WANT_MSG_SEND_NOTIFY, otherwise #2
//BTW, if pingpong_client chose method #2, then pingpong_server can work properly without any congestion control,
......
......@@ -3,7 +3,6 @@
//configuration
#define ASCS_SERVER_PORT 9527
#define ASCS_ASYNC_ACCEPT_NUM 5
#define ASCS_REUSE_OBJECT //use objects pool
#define ASCS_DELAY_CLOSE 5 //define this to avoid hooks for async call (and slightly improve efficiency)
//#define ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
......@@ -16,6 +15,7 @@
//
//if pingpong_client send message in on_msg_send(), then using non_lock_queue as input queue in pingpong_server will lead
//undefined behavior, please note.
#define ASCS_HEARTBEAT_INTERVAL 0 //disable heartbeat when doing performance test
#define ASCS_DEFAULT_UNPACKER stream_unpacker //non-protocol
//configuration
......@@ -42,8 +42,8 @@ using namespace ascs::ext::tcp;
// for sender, send msgs in on_msg_send() or use sending buffer limitation (like safe_send_msg(..., false)),
// but must not in service threads, please note.
//
//2. for sender, if responses are available (like pingpong test), send msgs in on_msg()/on_msg_handle().
// this will reduce IO throughput, because SOCKET's sliding window is not fully used, pleae note.
//2. for sender, if responses are available (like pingpong test), send msgs in on_msg()/on_msg_handle(),
// but this will reduce IO throughput because SOCKET's sliding window is not fully used, pleae note.
//
//pingpong_server chose method #1
//BTW, if pingpong_client chose method #2, then pingpong_server can work properly without any congestion control,
......@@ -64,9 +64,12 @@ protected:
{
auto re = direct_send_msg(std::move(msg));
if (!re)
congestion_control(true);
{
//cannot handle (send it back) this msg timely, begin congestion control
//'msg' will be put into receiving buffer, and be dispatched via on_msg_handle() in the future
congestion_control(true);
//unified_out::warning_out("open congestion control."); //too many prompts will affect efficiency
}
return re;
}
......@@ -75,9 +78,12 @@ protected:
{
auto re = direct_send_msg(std::move(msg));
if (re)
congestion_control(false);
{
//successfully handled the only one msg in receiving buffer, end congestion control
//subsequent msgs will be dispatched via on_msg() again.
congestion_control(false);
//unified_out::warning_out("close congestion control."); //too many prompts will affect efficiency
}
return re;
}
......
......@@ -3,7 +3,6 @@
//configuration
#define ASCS_SERVER_PORT 9527
#define ASCS_ASYNC_ACCEPT_NUM 5
//#define ASCS_FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define ASCS_ENHANCED_STABILITY
//#define ASCS_DEFAULT_PACKER replaceable_packer<>
......
......@@ -17,6 +17,7 @@
#include <stdarg.h>
#include <list>
#include <vector>
#include <chrono>
#include <memory>
#include <string>
......@@ -32,21 +33,19 @@
namespace ascs
{
template<typename atomic_type = std::atomic_size_t>
class scope_atomic_lock : public asio::detail::noncopyable
{
public:
scope_atomic_lock(atomic_type& atomic_) : added(false), atomic(atomic_) {lock();} //atomic_ must has been initialized to zero
scope_atomic_lock(std::atomic_flag& atomic_) : _locked(false), atomic(atomic_) {lock();} //atomic_ must has been initialized with false
~scope_atomic_lock() {unlock();}
void lock() {if (!added) _locked = 1 == ++atomic; added = true;}
void unlock() {if (added) --atomic; _locked = false, added = false;}
void lock() {if (!_locked) _locked = !atomic.test_and_set(std::memory_order_acq_rel);}
void unlock() {if (_locked) atomic.clear(std::memory_order_release); _locked = false;}
bool locked() const {return _locked;}
private:
bool added;
bool _locked;
atomic_type& atomic;
std::atomic_flag& atomic;
};
class service_pump;
......@@ -180,6 +179,11 @@ namespace tcp
public:
typedef MsgType msg_type;
typedef const msg_type msg_ctype;
#ifdef ASCS_SCATTERED_RECV_BUFFER
typedef std::vector<asio::mutable_buffers_1> buffer_type;
#else
typedef asio::mutable_buffers_1 buffer_type;
#endif
typedef std::list<msg_type> container_type;
protected:
......@@ -189,7 +193,7 @@ namespace tcp
virtual void reset_state() = 0;
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) = 0;
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred) = 0;
virtual asio::mutable_buffers_1 prepare_next_recv() = 0;
virtual buffer_type prepare_next_recv() = 0;
};
} //namespace
......@@ -218,6 +222,7 @@ namespace udp
public:
typedef MsgType msg_type;
typedef const msg_type msg_ctype;
typedef asio::mutable_buffers_1 buffer_type;
typedef std::list<udp_msg<msg_type>> container_type;
protected:
......@@ -226,7 +231,7 @@ namespace udp
public:
virtual void reset_state() {}
virtual msg_type parse_msg(size_t bytes_transferred) = 0;
virtual asio::mutable_buffers_1 prepare_next_recv() = 0;
virtual buffer_type prepare_next_recv() = 0;
};
} //namespace
//unpacker concept
......@@ -433,7 +438,7 @@ template<typename _Predicate> void NAME(const _Predicate& __pred) const {for (au
//used by both TCP and UDP
#define SAFE_SEND_MSG_CHECK \
{ \
if (!this->is_send_allowed() || this->stopped()) return false; \
if (!this->is_send_allowed()) return false; \
std::this_thread::sleep_for(std::chrono::milliseconds(50)); \
}
......
......@@ -70,10 +70,24 @@
* 2016.12.6 version 1.1.4
* Drop unnecessary macro definition (ASIO_HAS_STD_CHRONO).
* Simplify header files' dependence.
* Add Visual C++ solution and project files (Visuall C++ 14.0).
* Add Visual C++ solution and project files (Visual C++ 14.0).
* Monitor time consumptions for message packing and unpacking.
* Fix bug: pop_first_pending_send_msg and pop_first_pending_recv_msg cannot work.
*
* 2017.1.1 version 1.1.5
* Support heartbeat (via OOB data), see ASCS_HEARTBEAT_INTERVAL macro for more details.
* Support scatter-gather buffers when receiving messages, this feature needs modification of i_unpacker, you must explicitly define
* ASCS_SCATTERED_RECV_BUFFER macro to open it, this is just for compatibility.
* Simplify lock-free mechanism and use std::atomic_flag instead of std::atomic_size_t.
* Optimize container insertion (use series of emplace functions instead).
* Demo echo_client support alterable number of sending thread (before, it's a hard code 16).
* Fix bug: In extreme cases, messages may get starved in receive buffer and will not be dispatched until arrival of next message.
* Fix bug: In extreme cases, messages may get starved in send buffer and will not be sent until arrival of next message.
* Fix bug: Sometimes, connector_base cannot reconnect to the server after link broken.
*
* known issues:
* 1. heartbeat mechanism cannot work properly between windows (at least win-10) and Ubuntu (at least Ubuntu-16.04).
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -83,8 +97,8 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10104 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.1.4"
#define ASCS_VER 10105 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.1.5"
//asio and compiler check
#ifdef _MSC_VER
......@@ -227,7 +241,7 @@ static_assert(ASCS_GRACEFUL_SHUTDOWN_MAX_DURATION > 0, "graceful shutdown durati
//how many async_accept delivery concurrently
#ifndef ASCS_ASYNC_ACCEPT_NUM
#define ASCS_ASYNC_ACCEPT_NUM 1
#define ASCS_ASYNC_ACCEPT_NUM 16
#endif
static_assert(ASCS_ASYNC_ACCEPT_NUM > 0, "async accept number must be bigger than zero.");
......@@ -281,6 +295,24 @@ template<typename T> using concurrent_queue = moodycamel::ConcurrentQueue<T>;
//we also can control the queues (and their containers) via template parameters on calss 'connector_base'
//'server_socket_base', 'ssl::connector_base' and 'ssl::server_socket_base'.
//we even can let a socket to use different queue (and / or different container) for input and output via template parameters.
//#define ASCS_SCATTERED_RECV_BUFFER
//define this macro will let ascs to support scatter-gather buffers when doing async read,
//it's very useful under certain situations (for example, you're using ring buffer in unpacker).
#ifndef ASCS_HEARTBEAT_INTERVAL
#define ASCS_HEARTBEAT_INTERVAL 5 //second(s)
#endif
//at every ASCS_HEARTBEAT_INTERVAL second(s), send an OOB data (heartbeat) if no normal messages been sent,
//less than or equal to zero means disable heartbeat, then you can send and check heartbeat with you own logic by calling connector_base::check_heartbeat
// or server_socket_base::check_heartbeat, and you still need to define a valid ASCS_HEARTBEAT_MAX_ABSENCE macro, please note.
#ifndef ASCS_HEARTBEAT_MAX_ABSENCE
#define ASCS_HEARTBEAT_MAX_ABSENCE 3 //times of ASCS_HEARTBEAT_INTERVAL
#endif
static_assert(ASCS_HEARTBEAT_MAX_ABSENCE > 0, "heartbeat absence must be bigger than zero.");
//if no any data (include heartbeats) been received within ASCS_HEARTBEAT_INTERVAL * ASCS_HEARTBEAT_MAX_ABSENCE second(s), shut down the link.
//configurations
#endif /* _ASCS_CONFIG_H_ */
......@@ -70,6 +70,8 @@ public:
void push_front(const _Ty& _Val) {++s; impl.push_front(_Val);}
void push_front(_Ty&& _Val) {++s; impl.push_front(std::move(_Val));}
template<class... _Valty>
void emplace_front(_Valty&&... _Val) {++s; impl.emplace_front(std::forward<_Valty>(_Val)...);}
void pop_front() {--s; impl.pop_front();}
reference front() {return impl.front();}
iterator begin() {return impl.begin();}
......@@ -80,6 +82,8 @@ public:
void push_back(const _Ty& _Val) {++s; impl.push_back(_Val);}
void push_back(_Ty&& _Val) {++s; impl.push_back(std::move(_Val));}
template<class... _Valty>
void emplace_back(_Valty&&... _Val) {++s; impl.emplace_back(std::forward<_Valty>(_Val)...);}
void pop_back() {--s; impl.pop_back();}
reference back() {return impl.back();}
iterator end() {return impl.end();}
......@@ -143,8 +147,8 @@ private:
//Container must at least has the following functions:
// Container() and Container(size_t) constructor
// move constructor
// size_approx (must be thread safe, but doesn't have to be coherent)
// swap
// size_approx
// enqueue(const T& item)
// enqueue(T&& item)
// try_dequeue(T& item)
......@@ -161,9 +165,8 @@ public:
size_t size() const {return this->size_approx();}
bool empty() const {return 0 == size();}
//not thread-safe
void clear() {super(std::move(*this));}
void clear() {super(std::move(*this));} //not thread-safe
using Container::swap;
void move_items_in(std::list<T>& can) {move_items_in_(can);}
......@@ -175,12 +178,12 @@ public:
//Container must at least has the following functions:
// Container() and Container(size_t) constructor
// size
// empty
// size (must be thread safe, but doesn't have to be coherent, std::list before gcc 5 doesn't meet this requirement, ascs::list does)
// empty (must be thread safe, but doesn't have to be coherent)
// clear
// swap
// push_back(const T& item)
// push_back(T&& item)
// emplace_back(const T& item)
// emplace_back(T&& item)
// splice(Container::const_iterator, std::list<T>&), after this, std::list<T> must be empty
// front
// pop_front
......@@ -195,13 +198,19 @@ public:
queue() {}
queue(size_t size) : super(size) {}
using Container::size;
using Container::clear;
using Container::swap;
//thread safe
bool enqueue(const T& item) {typename Lockable::lock_guard lock(*this); return enqueue_(item);}
bool enqueue(T&& item) {typename Lockable::lock_guard lock(*this); return enqueue_(std::move(item));}
void move_items_in(std::list<T>& can) {typename Lockable::lock_guard lock(*this); move_items_in_(can);}
bool try_dequeue(T& item) {typename Lockable::lock_guard lock(*this); return try_dequeue_(item);}
bool enqueue_(const T& item) {this->push_back(item); return true;}
bool enqueue_(T&& item) {this->push_back(std::move(item)); return true;}
//not thread safe
bool enqueue_(const T& item) {this->emplace_back(item); return true;}
bool enqueue_(T&& item) {this->emplace_back(std::move(item)); return true;}
void move_items_in_(std::list<T>& can) {this->splice(std::end(*this), can);}
bool try_dequeue_(T& item) {if (this->empty()) return false; item.swap(this->front()); this->pop_front(); return true;}
};
......
......@@ -100,7 +100,7 @@ inline std::list<std::string> split_string(const std::string& str) //delimiters
{
if (std::string::npos != start_pos)
{
re.push_back(str.substr(start_pos, pos - start_pos));
re.emplace_back(std::next(str.data(), start_pos), pos - start_pos);
start_pos = std::string::npos;
}
}
......@@ -109,7 +109,7 @@ inline std::list<std::string> split_string(const std::string& str) //delimiters
}
if (std::string::npos != start_pos)
re.push_back(str.substr(start_pos));
re.emplace_back(std::next(str.data(), start_pos), str.size() - start_pos);
return re;
}
......
......@@ -43,14 +43,14 @@ public:
auto pnext = &*std::begin(raw_buff);
auto unpack_ok = true;
while (unpack_ok) //considering stick package problem, we need a loop
while (unpack_ok) //considering sticky package problem, we need a loop
if ((size_t) -1 != cur_msg_len)
{
if (cur_msg_len > ASCS_MSG_BUFFER_SIZE || cur_msg_len <= ASCS_HEAD_LEN)
unpack_ok = false;
else if (remain_len >= cur_msg_len) //one msg received
{
msg_can.push_back(std::make_pair(std::next(pnext, ASCS_HEAD_LEN), cur_msg_len - ASCS_HEAD_LEN));
msg_can.emplace_back(std::next(pnext, ASCS_HEAD_LEN), cur_msg_len - ASCS_HEAD_LEN);
remain_len -= cur_msg_len;
std::advance(pnext, cur_msg_len);
cur_msg_len = -1;
......@@ -58,7 +58,7 @@ public:
else
break;
}
else if (remain_len >= ASCS_HEAD_LEN) //the msg's head been received, stick package found
else if (remain_len >= ASCS_HEAD_LEN) //the msg's head been received, sticky package found
{
ASCS_HEAD_TYPE head;
memcpy(&head, pnext, ASCS_HEAD_LEN);
......@@ -79,7 +79,7 @@ public:
{
std::list<std::pair<const char*, size_t>> msg_pos_can;
auto unpack_ok = parse_msg(bytes_transferred, msg_pos_can);
do_something_to_all(msg_pos_can, [&msg_can](const auto& item) {msg_can.resize(msg_can.size() + 1); msg_can.back().assign(item.first, item.second);});
do_something_to_all(msg_pos_can, [&msg_can](const auto& item) {msg_can.emplace_back(item.first, item.second);});
if (unpack_ok && remain_len > 0)
{
......@@ -87,13 +87,13 @@ public:
memcpy(&*std::begin(raw_buff), pnext, remain_len); //left behind unparsed data
}
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
//if unpacking failed, successfully parsed msgs will still returned via msg_can(sticky package), please note.
return unpack_ok;
}
//a return value of 0 indicates that the read operation is complete. a non-zero value indicates the maximum number
//of bytes to be read on the next call to the stream's async_read_some function. ---asio::async_read
//read as many as possible to reduce asynchronous call-back, and don't forget to handle stick package carefully in parse_msg function.
//read as many as possible to reduce asynchronous call-back, and don't forget to handle sticky package carefully in parse_msg function.
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred)
{
if (ec)
......@@ -115,11 +115,13 @@ public:
//read as many as possible except that we have already got an entire msg
}
virtual asio::mutable_buffers_1 prepare_next_recv()
{
assert(remain_len < ASCS_MSG_BUFFER_SIZE);
return asio::buffer(asio::buffer(raw_buff) + remain_len);
}
#ifdef ASCS_SCATTERED_RECV_BUFFER
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return buffer_type(1, asio::buffer(asio::buffer(raw_buff) + remain_len));}
#else
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(asio::buffer(raw_buff) + remain_len);}
#endif
protected:
std::array<char, ASCS_MSG_BUFFER_SIZE> raw_buff;
......@@ -132,7 +134,7 @@ class udp_unpacker : public udp::i_unpacker<std::string>
{
public:
virtual msg_type parse_msg(size_t bytes_transferred) {assert(bytes_transferred <= ASCS_MSG_BUFFER_SIZE); return msg_type(raw_buff.data(), bytes_transferred);}
virtual asio::mutable_buffers_1 prepare_next_recv() {return asio::buffer(raw_buff);}
virtual buffer_type prepare_next_recv() {return asio::buffer(raw_buff);}
protected:
std::array<char, ASCS_MSG_BUFFER_SIZE> raw_buff;
......@@ -155,16 +157,15 @@ public:
do_something_to_all(tmp_can, [&msg_can](auto& item) {
auto raw_msg = new string_buffer();
raw_msg->swap(item);
msg_can.resize(msg_can.size() + 1);
msg_can.back().raw_buffer(raw_msg);
msg_can.emplace_back(raw_msg);
});
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
//if unpacking failed, successfully parsed msgs will still returned via msg_can(sticky package), please note.
return unpack_ok;
}
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred) {return unpacker_.completion_condition(ec, bytes_transferred);}
virtual asio::mutable_buffers_1 prepare_next_recv() {return unpacker_.prepare_next_recv();}
virtual typename super::buffer_type prepare_next_recv() {return unpacker_.prepare_next_recv();}
protected:
unpacker unpacker_;
......@@ -187,7 +188,7 @@ public:
raw_msg->assign(raw_buff.data(), bytes_transferred);
return typename super::msg_type(raw_msg);
}
virtual asio::mutable_buffers_1 prepare_next_recv() {return asio::buffer(raw_buff);}
virtual typename super::buffer_type prepare_next_recv() {return asio::buffer(raw_buff);}
protected:
std::array<char, ASCS_MSG_BUFFER_SIZE> raw_buff;
......@@ -222,8 +223,7 @@ public:
if (bytes_transferred != raw_buff.size())
return false;
msg_can.resize(msg_can.size() + 1);
msg_can.back().swap(raw_buff);
msg_can.emplace_back(std::move(raw_buff));
step = 0;
}
......@@ -253,7 +253,13 @@ public:
return 0;
}
virtual asio::mutable_buffers_1 prepare_next_recv() {return raw_buff.empty() ? asio::buffer((char*) &head, ASCS_HEAD_LEN) : asio::buffer(raw_buff.data(), raw_buff.size());}
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
#ifdef ASCS_SCATTERED_RECV_BUFFER
virtual buffer_type prepare_next_recv() {return buffer_type(1, raw_buff.empty() ? asio::buffer((char*) &head, ASCS_HEAD_LEN) : asio::buffer(raw_buff.data(), raw_buff.size()));}
#else
virtual buffer_type prepare_next_recv() {return raw_buff.empty() ? asio::buffer((char*) &head, ASCS_HEAD_LEN) : asio::buffer(raw_buff.data(), raw_buff.size());}
#endif
private:
ASCS_HEAD_TYPE head;
......@@ -283,8 +289,7 @@ public:
if (bytes_transferred != raw_buff.size())
return false;
msg_can.resize(msg_can.size() + 1);
msg_can.back().swap(raw_buff);
msg_can.emplace_back(std::move(raw_buff));
return true;
}
......@@ -293,7 +298,13 @@ public:
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred)
{return ec || bytes_transferred == raw_buff.size() ? 0 : asio::detail::default_max_transfer_size;}
virtual asio::mutable_buffers_1 prepare_next_recv() {raw_buff.assign(_fixed_length); return asio::buffer(raw_buff.data(), raw_buff.size());}
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
#ifdef ASCS_SCATTERED_RECV_BUFFER
virtual buffer_type prepare_next_recv() {raw_buff.assign(_fixed_length); return buffer_type(1, asio::buffer(raw_buff.data(), raw_buff.size()));}
#else
virtual buffer_type prepare_next_recv() {raw_buff.assign(_fixed_length); return asio::buffer(raw_buff.data(), raw_buff.size());}
#endif
private:
basic_buffer raw_buff;
......@@ -372,8 +383,7 @@ public:
assert(first_msg_len > min_len);
auto msg_len = first_msg_len - min_len;
msg_can.resize(msg_can.size() + 1);
msg_can.back().assign(std::next(pnext, _prefix.size()), msg_len);
msg_can.emplace_back(std::next(pnext, _prefix.size()), msg_len);
remain_len -= first_msg_len;
std::advance(pnext, first_msg_len);
first_msg_len = -1;
......@@ -384,12 +394,13 @@ public:
else if (remain_len > 0)
memcpy(&*std::begin(raw_buff), pnext, remain_len); //left behind unparsed msg
return true;
//if unpacking failed, successfully parsed msgs will still returned via msg_can(sticky package), please note.
return unpack_ok;
}
//a return value of 0 indicates that the read operation is complete. a non-zero value indicates the maximum number
//of bytes to be read on the next call to the stream's async_read_some function. ---asio::async_read
//read as many as possible to reduce asynchronous call-back, and don't forget to handle stick package carefully in parse_msg function.
//read as many as possible to reduce asynchronous call-back, and don't forget to handle sticky package carefully in parse_msg function.
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred)
{
if (ec)
......@@ -401,11 +412,13 @@ public:
return peek_msg(data_len, &*std::begin(raw_buff));
}
virtual asio::mutable_buffers_1 prepare_next_recv()
{
assert(remain_len < ASCS_MSG_BUFFER_SIZE);
return asio::buffer(asio::buffer(raw_buff) + remain_len);
}
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
#ifdef ASCS_SCATTERED_RECV_BUFFER
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return buffer_type(1, asio::buffer(asio::buffer(raw_buff) + remain_len));}
#else
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(asio::buffer(raw_buff) + remain_len);}
#endif
private:
std::array<char, ASCS_MSG_BUFFER_SIZE> raw_buff;
......@@ -426,13 +439,19 @@ public:
assert(bytes_transferred <= ASCS_MSG_BUFFER_SIZE);
msg_can.resize(msg_can.size() + 1);
msg_can.back().assign(raw_buff.data(), bytes_transferred);
msg_can.emplace_back(raw_buff.data(), bytes_transferred);
return true;
}
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred) {return ec || bytes_transferred > 0 ? 0 : asio::detail::default_max_transfer_size;}
virtual asio::mutable_buffers_1 prepare_next_recv() {return asio::buffer(raw_buff);}
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
#ifdef ASCS_SCATTERED_RECV_BUFFER
virtual buffer_type prepare_next_recv() {return buffer_type(1, asio::buffer(raw_buff));}
#else
virtual buffer_type prepare_next_recv() {return asio::buffer(raw_buff);}
#endif
protected:
std::array<char, ASCS_MSG_BUFFER_SIZE> raw_buff;
......
......@@ -56,7 +56,7 @@ protected:
assert(object_ptr && !object_ptr->is_equal_to(-1));
std::unique_lock<std::shared_mutex> lock(object_can_mutex);
return object_can.size() < max_size_ ? object_can.insert(std::make_pair(object_ptr->id(), object_ptr)).second : false;
return object_can.size() < max_size_ ? object_can.emplace(object_ptr->id(), object_ptr).second : false;
}
//only add object_ptr to invalid_object_can when it's in object_can, this can avoid duplicated items in invalid_object_can, because invalid_object_can is a list,
......@@ -72,7 +72,7 @@ protected:
if (exist)
{
std::unique_lock<std::shared_mutex> lock(invalid_object_can_mutex);
invalid_object_can.push_back(object_ptr);
invalid_object_can.emplace_back(object_ptr);
}
return exist;
......@@ -84,7 +84,7 @@ protected:
{
if (object_ptr)
{
object_ptr->id(++cur_id);
object_ptr->id(1 + cur_id.fetch_add(1, std::memory_order_relaxed));
on_create(object_ptr);
}
else
......@@ -229,7 +229,7 @@ public:
for (auto iter = std::begin(object_can); iter != std::end(object_can);)
if (iter->second->obsoleted())
{
objects.push_back(std::move(iter->second));
objects.emplace_back(std::move(iter->second));
iter = object_can.erase(iter);
}
else
......
......@@ -148,7 +148,7 @@ public:
bool is_running() const {return !stopped();}
bool is_service_started() const {return started;}
void add_service_thread(int thread_num) {for (auto i = 0; i < thread_num; ++i) service_threads.push_back(std::thread([this]() {asio::error_code ec; this->run(ec);}));}
void add_service_thread(int thread_num) {for (auto i = 0; i < thread_num; ++i) service_threads.emplace_back([this]() {asio::error_code ec; this->run(ec);});}
protected:
void do_service(int thread_num)
......@@ -197,7 +197,7 @@ private:
assert(nullptr != i_service_);
std::unique_lock<std::shared_mutex> lock(service_can_mutex);
service_can.push_back(i_service_);
service_can.emplace_back(i_service_);
}
protected:
......
......@@ -21,7 +21,7 @@ namespace ascs
template<typename Socket, typename Packer, typename Unpacker, typename InMsgType, typename OutMsgType,
template<typename, typename> class InQueue, template<typename> class InContainer,
template<typename, typename> class OutQueue, template<typename> class OutContainer>
class socket: public timer
class socket : public timer
{
protected:
static const tid TIMER_BEGIN = timer::TIMER_END;
......@@ -30,10 +30,22 @@ protected:
static const tid TIMER_DELAY_CLOSE = TIMER_BEGIN + 2;
static const tid TIMER_END = TIMER_BEGIN + 10;
socket(asio::io_service& io_service_) : timer(io_service_), _id(-1), next_layer_(io_service_), packer_(std::make_shared<Packer>()),
send_atomic(0), dispatch_atomic(0), started_(false), start_atomic(0) {reset_state();}
template<typename Arg> socket(asio::io_service& io_service_, Arg& arg) : timer(io_service_), _id(-1), next_layer_(io_service_, arg), packer_(std::make_shared<Packer>()),
send_atomic(0), dispatch_atomic(0), started_(false), start_atomic(0) {reset_state();}
socket(asio::io_service& io_service_) : timer(io_service_), next_layer_(io_service_) {first_init();}
template<typename Arg> socket(asio::io_service& io_service_, Arg& arg) : timer(io_service_), next_layer_(io_service_, arg) {first_init();}
//helper function, just call it in constructor
void first_init()
{
_id = -1;
packer_ = std::make_shared<Packer>();
sending = paused_sending = false;
dispatching = paused_dispatching = false;
congestion_controlling = false;
started_ = false;
send_atomic.clear(std::memory_order_relaxed);
dispatch_atomic.clear(std::memory_order_relaxed);
start_atomic.clear(std::memory_order_relaxed);
}
void reset()
{
......@@ -82,18 +94,18 @@ public:
{
if (!started_)
{
scope_atomic_lock<> lock(start_atomic);
scope_atomic_lock lock(start_atomic);
if (!started_ && lock.locked())
started_ = do_start();
}
}
//return false if send buffer is empty or sending not allowed or io_service stopped
//return false if send buffer is empty or sending not allowed
bool send_msg()
{
if (!sending)
{
scope_atomic_lock<> lock(send_atomic);
scope_atomic_lock lock(send_atomic);
if (!sending && lock.locked())
{
sending = true;
......@@ -109,12 +121,14 @@ public:
void suspend_send_msg(bool suspend) {if (!(paused_sending = suspend)) send_msg();}
bool suspend_send_msg() const {return paused_sending;}
bool is_sending_msg() const {return sending;}
//for a socket that has been shut down, resuming message dispatching will not take effect for left messages.
void suspend_dispatch_msg(bool suspend) {if (!(paused_dispatching = suspend) && started()) dispatch_msg();}
void suspend_dispatch_msg(bool suspend) {if (!(paused_dispatching = suspend)) dispatch_msg();}
bool suspend_dispatch_msg() const {return paused_dispatching;}
bool is_dispatching_msg() const {return dispatching;}
void congestion_control(bool enable) {congestion_controlling = enable; unified_out::warning_out("%s congestion control.", enable ? "open" : "close");}
void congestion_control(bool enable) {congestion_controlling = enable;}
bool congestion_control() const {return congestion_controlling;}
//in ascs, it's thread safe to access stat without mutex, because for a specific member of stat, ascs will never access it concurrently.
......@@ -156,7 +170,7 @@ protected:
virtual void do_recv_msg() = 0;
virtual bool is_closable() {return true;}
virtual bool is_send_allowed() {return !paused_sending;} //can send msg or not(just put into send buffer)
virtual bool is_send_allowed() {return !paused_sending && !stopped();} //can send msg or not(just put into send buffer)
//generally, you don't have to rewrite this to maintain the status of connections(TCP)
virtual void on_send_error(const asio::error_code& ec) {unified_out::error_out("send msg error (%d %s)", ec.value(), ec.message().data());}
......@@ -248,12 +262,12 @@ protected:
}
}
//return false if receiving buffer is empty or dispatching not allowed or io_service stopped
//return false if receiving buffer is empty or dispatching not allowed (include io_service stopped)
bool dispatch_msg()
{
if (!dispatching)
{
scope_atomic_lock<> lock(dispatch_atomic);
scope_atomic_lock lock(dispatch_atomic);
if (!dispatching && lock.locked())
{
dispatching = true;
......@@ -267,7 +281,7 @@ protected:
return dispatching;
}
//return false if receiving buffer is empty or dispatching not allowed or io_service stopped
//return false if receiving buffer is empty or dispatching not allowed (include io_service stopped)
bool do_dispatch_msg()
{
if (paused_dispatching)
......@@ -276,16 +290,15 @@ protected:
{
#ifndef ASCS_DISCARD_MSG_WHEN_LINK_DOWN
if (!last_dispatch_msg.empty())
{
on_msg_handle(last_dispatch_msg, true);
last_dispatch_msg.clear();
}
out_msg msg;
#endif
typename out_container_type::lock_guard lock(recv_msg_buffer);
while (recv_msg_buffer.try_dequeue_(msg))
on_msg_handle(msg, true);
#ifndef ST_ASIO_DISCARD_MSG_WHEN_LINK_DOWN
while (recv_msg_buffer.try_dequeue_(last_dispatch_msg))
on_msg_handle(last_dispatch_msg, true);
#endif
recv_msg_buffer.clear();
last_dispatch_msg.clear();
}
else if (!last_dispatch_msg.empty() || recv_msg_buffer.try_dequeue(last_dispatch_msg))
{
......@@ -298,12 +311,17 @@ protected:
bool do_direct_send_msg(InMsgType&& msg)
{
if (!msg.empty())
if (msg.empty())
unified_out::error_out("found an empty message, please check your packer.");
else
{
send_msg_buffer.enqueue(in_msg(std::move(msg)));
send_msg();
}
//even if we meet an empty message (most likely, this is because message length is too long, or insufficient memory), we still return true, why?
//please think about the function safe_send_(native_)msg, if we keep returning false, it will enter a dead loop.
//the packer provider has the responsibility to write detailed reasons down when packing message failed.
return true;
}
......@@ -325,8 +343,13 @@ private:
dispatch_msg();
break;
case TIMER_DELAY_CLOSE:
if (!this->is_last_async_call())
if (!is_last_async_call())
{
stop_all_timer();
revive_timer(TIMER_DELAY_CLOSE);
return true;
}
else if (lowest_layer().is_open())
{
asio::error_code ec;
......@@ -385,13 +408,18 @@ protected:
// 2. congestion control opened;
//ascs::socket will delay 50 milliseconds(non-blocking) to invoke handle_msg() again, and now, as you known, temp_msg_buffer is used to hold these msgs temporarily.
bool sending, paused_sending;
std::atomic_size_t send_atomic;
bool dispatching, paused_dispatching, congestion_controlling;
std::atomic_size_t dispatch_atomic;
volatile bool sending;
bool paused_sending;
std::atomic_flag send_atomic;
volatile bool dispatching;
bool paused_dispatching;
std::atomic_flag dispatch_atomic;
volatile bool congestion_controlling;
bool started_; //has started or not
std::atomic_size_t start_atomic;
volatile bool started_; //has started or not
std::atomic_flag start_atomic;
struct statistic stat;
typename statistic::stat_time recv_idle_begin_time;
......
......@@ -29,13 +29,12 @@ public:
static const timer::tid TIMER_BEGIN = super::TIMER_END;
static const timer::tid TIMER_CONNECT = TIMER_BEGIN;
static const timer::tid TIMER_ASYNC_SHUTDOWN = TIMER_BEGIN + 1;
static const timer::tid TIMER_HEARTBEAT_CHECK = TIMER_BEGIN + 2;
static const timer::tid TIMER_END = TIMER_BEGIN + 10;
connector_base(asio::io_service& io_service_) : super(io_service_), connected(false), reconnecting(true)
{set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
connector_base(asio::io_service& io_service_) : super(io_service_), connected(false), reconnecting(true) {set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
template<typename Arg>
connector_base(asio::io_service& io_service_, Arg& arg) : super(io_service_, arg), connected(false), reconnecting(true)
{set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
connector_base(asio::io_service& io_service_, Arg& arg) : super(io_service_, arg), connected(false), reconnecting(true) {set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
//reset all, be ensure that there's no any operations performed on this connector_base when invoke it
//notice, when reusing this connector_base, object_pool will invoke reset(), child must re-write this to initialize
......@@ -162,6 +161,28 @@ protected:
return false;
}
//unit is second
//if macro ASCS_HEARTBEAT_INTERVAL is bigger than zero, connector_base will start a timer to call this automatically with interval equal to ASCS_HEARTBEAT_INTERVAL.
//otherwise, you can call check_heartbeat with you own logic, but you still need to define a valid ASCS_HEARTBEAT_MAX_ABSENCE macro, please note.
bool check_heartbeat(int interval)
{
assert(interval > 0);
auto now = time(nullptr);
if (now - this->last_interact_time >= interval) //client send heartbeat on its own initiative
this->send_heartbeat('c');
if (this->clean_heartbeat() > 0)
this->last_interact_time = now;
else if (now - this->last_interact_time >= interval * ASCS_HEARTBEAT_MAX_ABSENCE)
{
show_info("client link:", "broke unexpectedly.");
force_shutdown(this->is_shutting_down() ? reconnecting : prepare_reconnect(asio::error_code(asio::error::network_down)) >= 0);
}
return this->started(); //always keep this timer
}
private:
bool async_shutdown_handler(timer::tid id, size_t loop_num)
{
......@@ -192,6 +213,9 @@ private:
connected = reconnecting = true;
this->reset_state();
on_connect();
this->last_interact_time = time(nullptr);
if (ASCS_HEARTBEAT_INTERVAL > 0)
this->set_timer(TIMER_HEARTBEAT_CHECK, ASCS_HEARTBEAT_INTERVAL * 1000, [this](auto id)->bool {return this->check_heartbeat(ASCS_HEARTBEAT_INTERVAL);});
this->send_msg(); //send buffer may have msgs, send them
do_start();
}
......
......@@ -29,6 +29,7 @@ protected:
public:
static const timer::tid TIMER_BEGIN = super::TIMER_END;
static const timer::tid TIMER_ASYNC_SHUTDOWN = TIMER_BEGIN;
static const timer::tid TIMER_HEARTBEAT_CHECK = TIMER_BEGIN + 1;
static const timer::tid TIMER_END = TIMER_BEGIN + 10;
server_socket_base(Server& server_) : super(server_.get_service_pump()), server(server_) {}
......@@ -83,6 +84,9 @@ protected:
{
if (!this->stopped())
{
this->last_interact_time = time(nullptr);
if (ASCS_HEARTBEAT_INTERVAL > 0)
this->set_timer(TIMER_HEARTBEAT_CHECK, ASCS_HEARTBEAT_INTERVAL * 1000, [this](auto id)->bool {return this->check_heartbeat(ASCS_HEARTBEAT_INTERVAL);});
this->do_recv_msg();
return true;
}
......@@ -104,6 +108,30 @@ protected:
this->shutdown_state = super::shutdown_states::NONE;
}
//unit is second
//if macro ASCS_HEARTBEAT_INTERVAL is bigger than zero, server_socket_base will start a timer to call this automatically with interval equal to ASCS_HEARTBEAT_INTERVAL.
//otherwise, you can call check_heartbeat with you own logic, but you still need to define a valid ASCS_HEARTBEAT_MAX_ABSENCE macro, please note.
bool check_heartbeat(int interval)
{
assert(interval > 0);
auto now = time(nullptr);
if (this->clean_heartbeat() > 0)
{
if (now - this->last_interact_time >= interval) //server never send heartbeat on its own initiative
this->send_heartbeat('s');
this->last_interact_time = now;
}
else if (now - this->last_interact_time >= interval * ASCS_HEARTBEAT_MAX_ABSENCE)
{
show_info("server link:", "broke unexpectedly.");
force_shutdown();
}
return this->started(); //always keep this timer
}
private:
bool async_shutdown_handler(timer::tid id, size_t loop_num)
{
......
......@@ -13,8 +13,6 @@
#ifndef _ASCS_TCP_SOCKET_H_
#define _ASCS_TCP_SOCKET_H_
#include <vector>
#include "../socket.h"
#include "../container.h"
......@@ -38,10 +36,16 @@ protected:
enum shutdown_states {NONE, FORCE, GRACEFUL};
socket_base(asio::io_service& io_service_) : super(io_service_), unpacker_(std::make_shared<Unpacker>()),
shutdown_state(shutdown_states::NONE), shutdown_atomic(0) {}
template<typename Arg> socket_base(asio::io_service& io_service_, Arg& arg) : super(io_service_, arg), unpacker_(std::make_shared<Unpacker>()),
shutdown_state(shutdown_states::NONE), shutdown_atomic(0) {}
socket_base(asio::io_service& io_service_) : super(io_service_) {first_init();}
template<typename Arg> socket_base(asio::io_service& io_service_, Arg& arg) : super(io_service_, arg) {first_init();}
//helper function, just call it in constructor
void first_init()
{
unpacker_ = std::make_shared<Unpacker>();
shutdown_state = shutdown_states::NONE;
shutdown_atomic.clear(std::memory_order_relaxed);
}
public:
virtual bool obsoleted() {return !is_shutting_down() && super::obsoleted();}
......@@ -108,12 +112,12 @@ protected:
}
//ascs::socket will guarantee not call this function in more than one thread concurrently.
//return false if send buffer is empty or sending not allowed or io_service stopped
//return false if send buffer is empty or sending not allowed
virtual bool do_send_msg()
{
if (is_send_allowed() && !this->stopped() && !this->send_msg_buffer.empty())
if (is_send_allowed())
{
std::vector<asio::const_buffer> bufs;
std::list<asio::const_buffer> bufs;
{
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
const size_t max_send_size = 0;
......@@ -129,8 +133,8 @@ protected:
{
this->stat.send_delay_sum += end_time - msg.begin_time;
size += msg.size();
last_send_msg.push_back(std::move(msg));
bufs.push_back(asio::buffer(last_send_msg.back().data(), last_send_msg.back().size()));
last_send_msg.emplace_back(std::move(msg));
bufs.emplace_back(last_send_msg.back().data(), last_send_msg.back().size());
if (size >= max_send_size)
break;
}
......@@ -174,27 +178,61 @@ protected:
void shutdown()
{
scope_atomic_lock<> lock(shutdown_atomic);
scope_atomic_lock lock(shutdown_atomic);
if (!lock.locked())
return;
shutdown_state = shutdown_states::FORCE;
this->stop_all_timer();
this->close();
if (this->lowest_layer().is_open())
{
asio::error_code ec;
this->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
}
}
this->close(); //call this at the end of 'shutdown', it's very important
int clean_heartbeat()
{
auto heartbeat_len = 0;
auto s = this->lowest_layer().native_handle();
while (true)
{
#ifdef _WIN32
char oob_data;
unsigned long no_oob_data = 1;
ioctlsocket(s, SIOCATMARK, &no_oob_data);
if (0 == no_oob_data && recv(s, &oob_data, 1, MSG_OOB) > 0)
++heartbeat_len;
#else
char oob_data[1024];
auto has_oob_data = 0;
int recv_len;
ioctl(s, SIOCATMARK, &has_oob_data);
if (1 == has_oob_data && (recv_len = recv(s, oob_data, sizeof(oob_data), MSG_OOB)) > 0)
{
heartbeat_len += recv_len;
if (recv_len < (int) sizeof(oob_data))
break;
}
#endif
else
break;
}
return heartbeat_len;
}
void send_heartbeat(const char c) {send(this->lowest_layer().native_handle(), &c, 1, MSG_OOB);}
private:
void recv_handler(const asio::error_code& ec, size_t bytes_transferred)
{
if (!ec && bytes_transferred > 0)
{
last_interact_time = time(nullptr);
typename Unpacker::container_type temp_msg_can;
auto_duration dur(this->stat.unpack_time_sum);
auto unpack_ok = unpacker_->parse_msg(bytes_transferred, temp_msg_can);
......@@ -205,10 +243,10 @@ private:
this->stat.recv_msg_sum += msg_num;
this->temp_msg_buffer.resize(this->temp_msg_buffer.size() + msg_num);
auto op_iter = this->temp_msg_buffer.rbegin();
for (auto iter = temp_msg_can.rbegin(); iter != temp_msg_can.rend();)
for (auto iter = temp_msg_can.rbegin(); iter != temp_msg_can.rend(); ++op_iter, ++iter)
{
this->stat.recv_byte_sum += (++iter).base()->size();
(++op_iter).base()->swap(*iter.base());
this->stat.recv_byte_sum += iter->size();
op_iter->swap(*iter);
}
}
this->handle_msg();
......@@ -228,6 +266,8 @@ private:
{
if (!ec)
{
last_interact_time = time(nullptr);
this->stat.send_time_sum += statistic::now() - last_send_msg.front().begin_time;
this->stat.send_byte_sum += bytes_transferred;
this->stat.send_msg_sum += last_send_msg.size();
......@@ -257,8 +297,11 @@ protected:
list<typename super::in_msg> last_send_msg;
std::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
shutdown_states shutdown_state;
std::atomic_size_t shutdown_atomic;
volatile shutdown_states shutdown_state;
std::atomic_flag shutdown_atomic;
//heartbeat
time_t last_interact_time;
};
}} //namespace
......
......@@ -13,8 +13,6 @@
#ifndef _ASCS_TIMER_H_
#define _ASCS_TIMER_H_
#include <vector>
#ifdef ASCS_USE_STEADY_TIMER
#include <asio/steady_timer.hpp>
#else
......@@ -54,11 +52,11 @@ public:
tid id;
timer_status status;
size_t milliseconds;
size_t interval_ms;
std::function<bool(tid)> call_back;
std::shared_ptr<timer_type> timer;
timer_info() : id(0), status(TIMER_FAKE), milliseconds(0) {}
timer_info() : id(0), status(TIMER_FAKE), interval_ms(0) {}
};
typedef const timer_info timer_cinfo;
......@@ -66,26 +64,35 @@ public:
timer(asio::io_service& _io_service_) : object(_io_service_), timer_can(256) {tid id = -1; do_something_to_all([&id](auto& item) {item.id = ++id;});}
void update_timer_info(tid id, size_t milliseconds, std::function<bool(tid)>&& call_back, bool start = false)
void update_timer_info(tid id, size_t interval, std::function<bool(tid)>&& call_back, bool start = false)
{
timer_info& ti = timer_can[id];
if (timer_info::TIMER_FAKE == ti.status)
ti.timer = std::make_shared<timer_type>(io_service_);
ti.status = timer_info::TIMER_OK;
ti.milliseconds = milliseconds;
ti.interval_ms = interval;
ti.call_back.swap(call_back);
if (start)
start_timer(ti);
}
void update_timer_info(tid id, size_t milliseconds, const std::function<bool(tid)>& call_back, bool start = false)
{update_timer_info(id, milliseconds, std::function<bool(tid)>(call_back), start);}
void update_timer_info(tid id, size_t interval, const std::function<bool(tid)>& call_back, bool start = false)
{update_timer_info(id, interval, std::function<bool(tid)>(call_back), start);}
void set_timer(tid id, size_t milliseconds, std::function<bool(tid)>&& call_back) {update_timer_info(id, milliseconds, std::move(call_back), true);}
void set_timer(tid id, size_t milliseconds, const std::function<bool(tid)>& call_back) {update_timer_info(id, milliseconds, call_back, true);}
void change_timer_interval(tid id, size_t interval) {timer_can[id].interval_ms = interval;}
timer_info find_timer(tid id) const {return timer_can[id];}
bool revive_timer(tid id)
{
if (timer_info::TIMER_FAKE == timer_can[id].status)
return false;
timer_can[id].status = timer_info::TIMER_OK;
return true;
}
void set_timer(tid id, size_t interval, std::function<bool(tid)>&& call_back) {update_timer_info(id, interval, std::move(call_back), true);}
void set_timer(tid id, size_t interval, const std::function<bool(tid)>& call_back) {update_timer_info(id, interval, call_back, true);}
bool start_timer(tid id)
{
......@@ -100,6 +107,7 @@ public:
return true;
}
timer_info find_timer(tid id) const {return timer_can[id];}
void stop_timer(tid id) {stop_timer(timer_can[id]);}
void stop_all_timer() {do_something_to_all([this](auto& item) {this->stop_timer(item);});}
......@@ -113,7 +121,7 @@ protected:
{
assert(timer_info::TIMER_OK == ti.status);
ti.timer->expires_from_now(milliseconds(ti.milliseconds));
ti.timer->expires_from_now(milliseconds(ti.interval_ms));
//return true from call_back to continue the timer, or the timer will stop
ti.timer->async_wait(make_handler_error([this, &ti](const auto& ec) {if (!ec && ti.call_back(ti.id) && timer_info::TIMER_OK == ti.status) this->start_timer(ti);}));
}
......
......@@ -122,7 +122,7 @@ protected:
//return false if send buffer is empty or sending not allowed or io_service stopped
virtual bool do_send_msg()
{
if (is_send_allowed() && !this->stopped() && !this->send_msg_buffer.empty() && this->send_msg_buffer.try_dequeue(last_send_msg))
if (!this->send_msg_buffer.empty() && is_send_allowed() && this->send_msg_buffer.try_dequeue(last_send_msg))
{
this->stat.send_delay_sum += statistic::now() - last_send_msg.begin_time;
......@@ -167,6 +167,7 @@ protected:
std::unique_lock<std::shared_mutex> lock(shutdown_mutex);
this->stop_all_timer();
this->close();
if (this->lowest_layer().is_open())
{
......@@ -174,8 +175,6 @@ protected:
this->lowest_layer().shutdown(asio::ip::udp::socket::shutdown_both, ec);
this->lowest_layer().close(ec);
}
this->close(); //call this at the end of 'shutdown', it's very important
}
private:
......@@ -185,7 +184,7 @@ private:
{
++this->stat.recv_msg_sum;
this->stat.recv_byte_sum += bytes_transferred;
this->temp_msg_buffer.resize(this->temp_msg_buffer.size() + 1);
this->temp_msg_buffer.emplace_back();
this->temp_msg_buffer.back().swap(peer_addr, unpacker_->parse_msg(bytes_transferred));
this->handle_msg();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册