提交 2aabeebb 编写于 作者: Y youngwolf

1.1.1 release.

Support non-lock queue, it's totally not thread safe and lock-free, it can improve IO throughput with particular business.
Demonstrate how and when to use non-lock queue as the input and output message buffer.
Queues (and their internal containers) used as input and output message buffer are now configurable (by macros or template arguments).
New macros--ASCS_INPUT_QUEUE, ASCS_INPUT_CONTAINER, ASCS_OUTPUT_QUEUE and ASCS_OUTPUT_CONTAINER.
Drop macro ASCS_USE_CONCURRENT_QUEUE, rename macro ASCS_USE_CONCURRE to ASCS_HAS_CONCURRENT_QUEUE.
In contrast to non_lock_queue, split message_queue into lock_queue and lock_free_queue.
Move container related classes and functions from st_asio_wrapper_base.h to st_asio_wrapper_container.h.
Improve efficiency in scenarios of low throughput like pingpong test.
Replaceable packer/unpacker now support replaceable_buffer (an alias of auto_buffer) and shared_buffer to be their message type.
Move class statistic and obj_with_begin_time out of ascs::socket to reduce template tiers.
上级 232247da
......@@ -31,15 +31,12 @@ udp解包器必须实现这个接口。
dummy_packer:
仅仅提供发送消息的类型以便通过编译,无法真正的打包,所以只能通过direct_send_msg等发送消息。
std_list:
封装自std::list,优化了其size()的复杂度,达到O(1),只在gcc 5.1以下才生效。
auto_buffer:
如果想要运行时替换打包解包器,则打包解包器必须以auto_buffer或者shared_buffer作为消息类型(你写个类似的也可,但没有必要,如果需要更多的功能可以用继承)。
这个类是不允许被复制的,用户只负责分配内存。
shared_buffer:
同auto_buffer,除了允许被复制。
同auto_buffer,除了允许被复制(表面上的被复制,不是深度复制,我们要尽量避免深度复制,std::string就深度复制)
dummy_packer:
这个类只提供消息类型的定义,不做真正的打包,所以用户必须以native方式发送消息,或者用direct_send_msg等发送消息。
......@@ -47,6 +44,28 @@ dummy_packer:
udp::udp_msg:
udp消息,其实就是在tcp消息上加了一个对端地址。
statistic:
性能统计,如果定义了ASCS_FULL_STATISTIC,则统计数量和时间(对性能有些影响),否则只统计数量。
消息发送相关的统计
uint_fast64_t send_msg_sum; 成功发送到asio的消息条数
uint_fast64_t send_byte_sum; 成功发送到asio的消息总字节数
stat_duration send_delay_sum; 从消息发送(send_(native_)msg,不包括打包时间)到真正发送(asio::async_write)的延迟时间
stat_duration send_time_sum; 从真正的消息发送(asio::async_write)到发送完成(发送到SOCKET缓存),这一项体现了你的网络吞吐率,注意吞吐率低不代表是你的问题,也有可能是接收方慢了。
消息接收相关统计
uint_fast64_t recv_msg_sum; 收到的消息条数
uint_fast64_t recv_byte_sum; 收到的消息总字节数
stat_duration dispatch_dealy_sum; 从消息解包完成之后,到on_msg_handle的时间延迟,如果这项偏大,可能是因为service线程总不够用
stat_duration recv_idle_sum; 暂停消息接收的总时间,在接收缓存满,消息派发被暂停或者拥塞控制时,都将会暂停消息接收。
#ifndef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
stat_duration handle_time_1_sum; 调用on_msg花费的总时间
#endif
stat_duration handle_time_2_sum; 调用on_msg_handle花费的总时间
obj_with_begin_time:
可包装任何对象,并且加上一个时间(用于时间统计)。
log_formater:
log打印函数,只是打印到屏幕,如果需要更详细的功能需要自己实现。
......@@ -60,6 +79,3 @@ do_something_to_all:
do_something_to_one:
同do_something_to_all,如果对某一个对象执行的动作返回true,则跳过所有剩下的对象,如果全部对象都返回false,则等于do_something_to_all,但效率上稍慢。
splice_helper:
拼接两个list,并且将结果(list::size)限制在一个指定的大小之内。

包括了容器相关的类和方法等。
类:
list:
一个size()复杂为O(1)的链表类,如果std::list::size()已经是复杂度为O(1)了,则它就是std::list的别名,只是省略了第二个参数,
否则将基于std::list实现一个链表,保证size()有O(1)复杂度。
dummy_lockable:
实现了lock()和unlock()接口,但什么也不做。
lockable:
实现了lock()和unlock()接口,分别对std::shared_mutex执行加锁和解锁操作。
template<typename T, typename Container>
class lock_free_queue : public Container, public dummy_lockable
一个lock-free队列,其容器必须是concurrent queue (https://github.com/cameron314/concurrentqueue),或者是有相同接口的自定义容器,
必须保证无锁且线程安全。
template<typename T, typename Container, typename Lockable>
class queue : public Container, public Lockable
一个队列基类,可选择容器(list和deque,或者是有相同接口的定义容器)和锁类型(dummy_lockable和lockable,或者是有相同接口的定义锁)。
如果你想用自己的容器,那么这个容器必须提供必要的接口,具体需要哪些接口,我的建议是,先用上你的容器,然后由编译器告诉你缺少什么接口,
接口需要做的工作从名字便知(比如push_back,pop_front等)。
template<typename T, typename Container> using non_lock_queue = queue<T, Container, dummy_lockable>;
无锁不安全队列(在某些特定的业务逻辑下,队列无需加锁,由业务保证其线程安全性,比如绝对的一应一答式逻辑)。
template<typename T, typename Container> using lock_queue = queue<T, Container, lockable>;
有锁安全队列。
方法:
template<typename Q>
size_t move_items_in(Q& dest, Q& other, size_t max_size = ASCS_MAX_MSG_NUM)
移动队列里项,不会拷贝。
template<typename Q, typename Q2>
size_t move_items_in(Q& dest, Q2& other, size_t max_size = ASCS_MAX_MSG_NUM)
移动队列里项,不会拷贝。
splice_helper:
拼接两个list,并且将结果(size())限制在一个指定的大小之内。
......@@ -28,13 +28,14 @@ class packer : public i_packer<std::string>;
当你想在运行时替换打包器的话,可以把这个打包器设置为默认打包器,这个打包器返回replaceable_buffer对象,由于replaceable_buffer对象
保存了一个i_buffer指针,所以只要是实现了i_buffer接口的对象,都能赋予replaceable_buffer,具体请参看replaceable_buffer及i_buffer的定义。
class replaceable_packer : public i_packer<replaceable_buffer>;
template<typename T = replaceable_buffer>
class replaceable_packer : public i_packer<T>;
固定长度的打包器。
class fixed_length_packer : public packer;
带固定头和固定尾的打包器,头可以为空,尾不能为空。
class prefix_suffix_packer : public i_packer<std::string>;
注意,没有固定长度的打包器(固定长度的解包器有),固定长度的打包器实际意义不大,因为它实际并没有协议相关的东西在包里面,所以每次发送
必须已经是一个完整的包,直接发送即可(ascs::socket::direct_send_msg,ascs::socket::direct_post_msg)。
}} //namespace
......@@ -19,8 +19,10 @@ class unpacker : public i_unpacker<std::string>;
class udp_unpacker : public i_udp_unpacker<std::string>;
作用参看replaceable_packer。
class replaceable_unpacker : public i_unpacker<replaceable_buffer>, public unpacker;
class replaceable_udp_unpacker : public i_udp_unpacker<replaceable_buffer>;
template<typename T = replaceable_buffer>
class replaceable_unpacker : public ascs::tcp::i_unpacker<T>;
template<typename T = replaceable_buffer>
class replaceable_udp_unpacker : public ascs::udp::i_unpacker<T>;
这个解包器与unpacker的不同之处在于,它不需要一个固定大小的缓存,而是先接收包头,再根据包头得到消息的长度信息,
然后分配(new)适当的缓存,这样当缓存写满了之后,一个完整的消息就接收完毕了,所以省掉了消息的拷贝;但也有一个坏处,
......
......@@ -2,66 +2,22 @@
namespace ascs
{
template<typename Socket, typename Packer, typename Unpacker, typename InMsgType = typename Packer::msg_type, typename OutMsgType = typename Unpacker::msg_type>
template<typename Socket, typename Packer, typename Unpacker, typename InMsgType, typename OutMsgType,
template<typename, typename> class InQueue, template<typename> class InContainer,
template<typename, typename> class OutQueue, template<typename> class OutContainer>
class socket: public timer
{
public:
效率统计,注意这个功能会轻微的影响性能,默认关闭,可以通过ASCS_FULL_STATISTIC宏来开启。在关闭情况下,所有整数项统计(uint_fast64_t)仍然有效,
所有时间统计项将无效(stat_duration)。在打开情况下,时间统计的数据类型其实是std::chrono::system_clock::time_point。
struct statistic
{
#ifdef ASCS_FULL_STATISTIC
static bool enabled();
typedef std::chrono::system_clock::time_point stat_time;
static stat_time now();
typedef std::chrono::system_clock::duration stat_duration;
#else
struct dummy_duration;
struct dummy_time;
static bool enabled();
typedef dummy_time stat_time;
static stat_time now();
typedef dummy_duration stat_duration;
#endif
statistic();
void reset();
由于统计涉及多个方面,并且是多线程修改不同的部分,这个函数只是在某些特殊情况下才可以调用,比如在构造函数里面,或者只有一个service线程,
所以这个函数基本上还是用在对象重用时。
statistic& operator +=(const struct statistic& other);
std::string to_string() const;
消息发送相关的统计
uint_fast64_t send_msg_sum; 成功发送到asio的消息条数
uint_fast64_t send_byte_sum; 成功发送到asio的消息总字节数
stat_duration send_delay_sum; 从消息发送(send_(native_)msg, post_(native_)msg,不包括打包时间)到真正发送(asio::async_write)的延迟时间
stat_duration send_time_sum; 从真正的消息发送(asio::async_write)到发送完成(发送到SOCKET缓存),这一项体现了你的网络吞吐率,注意吞吐率低
不代表是你的问题,也有可能是接收方慢了。
消息接收相关统计
uint_fast64_t recv_msg_sum; 收到的消息条数
uint_fast64_t recv_byte_sum; 收到的消息总字节数
stat_duration dispatch_dealy_sum; 从消息解包完成之后,到on_msg_handle的时间延迟,如果这项偏大,可能是因为service线程总不够用
stat_duration recv_idle_sum; 暂停消息接收的总时间,如果接收缓存满,或者在发送缓存满的情况下调用了post_(native_)msg,将会暂停消息接收,
注意,在回调on_msg期间,也算是暂停了消息接收。
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
stat_duration handle_time_1_sum; 调用on_msg花费的总时间
#endif
stat_duration handle_time_2_sum; 调用on_msg_handle花费的总时间
};
protected:
struct in_msg : public InMsgType;
struct out_msg : public OutMsgType;
在InMsgType或OutMsgType的基础上增加了一个stat_time,用于统计时间消耗
typedef std::list<in_msg> in_container_type;
typedef std::list<out_msg> out_container_type;
static const unsigned char TIMER_BEGIN = st_timer::TIMER_END;
static const unsigned char TIMER_END = TIMER_BEGIN+ 10;
typedef obj_with_begin_time<InMsgType> in_msg;
typedef obj_with_begin_time<OutMsgType> out_msg;
typedef InQueue<in_msg, InContainer<in_msg>> in_container_type;
typedef OutQueue<out_msg, OutContainer<out_msg>> out_container_type;
static const tid TIMER_BEGIN = timer::TIMER_END;
static const tid TIMER_HANDLE_MSG = TIMER_BEGIN;
static const tid TIMER_DISPATCH_MSG = TIMER_BEGIN + 1;
static const tid TIMER_DELAY_CLOSE = TIMER_BEGIN + 2;
static const tid TIMER_END = TIMER_BEGIN + 10;
socket(asio::io_service& io_service_);
......
......@@ -2,8 +2,10 @@
namespace ascs { namespace tcp {
带连接功能的tcp::socket_base,算是一个真正的客户端了
template <typename Packer, typename Unpacker, typename Socket = asio::ip::tcp::socket>
class connector_base : public socket_base<Socket, Packer, Unpacker>
template <typename Packer, typename Unpacker, typename Socket = asio::ip::tcp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class connector_base : public socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer>
{
public:
connector_base(asio::io_service& io_service_);
......
......@@ -2,8 +2,11 @@
namespace ascs { namespace tcp {
服务端套接字类
template<typename Packer, typename Unpacker, typename Server = i_server, typename Socket = asio::ip::tcp::socket>
class server_socket_base : public socket_base<Socket, Packer, Unpacker>, public std::enable_shared_from_this<server_socket_base<Packer, Unpacker, Server, Socket>>
template<typename Packer, typename Unpacker, typename Server = i_server, typename Socket = asio::ip::tcp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class server_socket_base : public socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer>,
public std::enable_shared_from_this<server_socket_base<Packer, Unpacker, Server, Socket, InQueue, InContainer, OutQueue, OutContainer>>
{
public:
server_socket_base(Server& server_);
......
......@@ -2,8 +2,10 @@
namespace ascs { namespace tcp {
tcp套接字类,实现tcp数据的收发
template <typename Socket, typename Packer, typename Unpacker>
class socket_base : public socket<Socket, Packer, Unpacker>
template <typename Socket, typename Packer, typename Unpacker,
template<typename, typename> class InQueue, template<typename> class InContainer,
template<typename, typename> class OutQueue, template<typename> class OutContainer>
class socket_base : public socket<Socket, Packer, Unpacker, typename Packer::msg_type, typename Unpacker::msg_type, InQueue, InContainer, OutQueue, OutContainer>
{
public:
typedef typename Packer::msg_type in_msg_type;
......@@ -12,6 +14,8 @@ public:
typedef typename Unpacker::msg_ctype out_msg_ctype;
protected:
enum shutdown_states {NONE, FORCE, GRACEFUL};
socket_base(asio::io_service& io_service_);
template<typename Arg>
......
......@@ -2,8 +2,10 @@
namespace ascs { namespace udp {
udp套接字类,实现udp数据的收发
template <typename Packer, typename Unpacker, typename Socket = asio::ip::udp::socket>
class socket_base : public socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>>
template <typename Packer, typename Unpacker, typename Socket = asio::ip::udp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class socket_base : public socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer>
{
public:
typedef udp_msg<typename Packer::msg_type> in_msg_type;
......
......@@ -5,7 +5,6 @@
#define ASCS_SERVER_PORT 9527
#define ASCS_FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define ASCS_CUSTOM_LOG
#define ASCS_USE_CONCURRENT_QUEUE
#define ASCS_DEFAULT_UNPACKER non_copy_unpacker
//#define ASCS_DEFAULT_UNPACKER stream_unpacker
......@@ -94,15 +93,3 @@ int main(int argc, const char* argv[])
return 0;
}
//restore configuration
#undef ASCS_SERVER_PORT
#undef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
#undef ASCS_CUSTOM_LOG
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_UNPACKER
//#undef ASCS_HUGE_MSG
//#undef ASCS_MAX_MSG_LEN
//#undef ASCS_MAX_MSG_NUM
//restore configuration
# If your compiler cannot find asio, please specify it explicitly like this:
#asio_location = -I/usr/local/include/
#ext_location = -I/path of asio/
# asio.hpp and asio directory should be available in this place.
# If possible, open c++17 (-std=c++17) would be better.
cflag = -Wall -fexceptions -std=c++1y
ifeq (${MAKECMDGOALS}, debug)
cflag += -g -DDEBUG
......@@ -15,7 +16,11 @@ endif
cflag += -DASIO_STANDALONE -DASIO_HAS_STD_CHRONO
# If your compiler detected duplicated 'shared_mutex' definition, please define ASCS_HAS_STD_SHARED_MUTEX macro:
#cflag += -DASCS_HAS_STD_SHARED_MUTEX
cflag += -pthread ${ext_cflag} ${asio_location} -I../../include/
# If you used concurrent queue (https://github.com/cameron314/concurrentqueue), please define ASCS_HAS_CONCURRENT_QUEUE macro:
#cflag += -DASCS_HAS_CONCURRENT_QUEUE
# And guarantee header file concurrentqueue.h is reachable, for example, add its path to ext_location:
#ext_location += -I/path of concurrent queue/
cflag += -pthread ${ext_cflag} ${ext_location} -I../../include/
lflag += -pthread ${ext_libs}
target = ${dir}/${module}
......
......@@ -8,7 +8,10 @@
//#define ASCS_CLEAR_OBJECT_INTERVAL 1
//#define ASCS_WANT_MSG_SEND_NOTIFY
#define ASCS_FULL_STATISTIC //full statistic will slightly impact efficiency
#define ASCS_USE_CONCURRENT_QUEUE
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
#define ASCS_INPUT_QUEUE non_lock_queue //we will never operate sending buffer concurrently, so need no locks.
#define ASCS_INPUT_CONTAINER list
#endif
//configuration
//use the following macro to control the type of packer and unpacker
......@@ -19,8 +22,8 @@
//3-prefix and suffix packer and unpacker
#if 1 == PACKER_UNPACKER_TYPE
#define ASCS_DEFAULT_PACKER replaceable_packer
#define ASCS_DEFAULT_UNPACKER replaceable_unpacker
#define ASCS_DEFAULT_PACKER replaceable_packer<>
#define ASCS_DEFAULT_UNPACKER replaceable_unpacker<>
#elif 2 == PACKER_UNPACKER_TYPE
#define ASCS_DEFAULT_PACKER fixed_length_packer
#define ASCS_DEFAULT_UNPACKER fixed_length_unpacker
......@@ -130,9 +133,7 @@ protected:
++send_index;
memcpy(pstr, &send_index, sizeof(size_t)); //seq
send_msg(pstr, msg_len);
//this invocation has no chance to fail (by insufficient sending buffer), even can_overflow is false
//this is because here is the only place that will send msgs and here also means the receiving buffer at least can hold one more msg.
send_msg(pstr, msg_len, true);
}
#endif
......@@ -167,9 +168,9 @@ public:
return total_recv_bytes;
}
echo_socket::statistic get_statistic()
statistic get_statistic()
{
echo_socket::statistic stat;
statistic stat;
do_something_to_all([&stat](const auto& item) {stat += item->get_statistic();});
return stat;
......@@ -439,15 +440,3 @@ int main(int argc, const char* argv[])
return 0;
}
//restore configuration
#undef ASCS_SERVER_PORT
#undef ASCS_REUSE_OBJECT
#undef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
#undef ASCS_CLEAR_OBJECT_INTERVAL
#undef ASCS_WANT_MSG_SEND_NOTIFY
#undef ASCS_FULL_STATISTIC
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_PACKER
#undef ASCS_DEFAULT_UNPACKER
//restore configuration
......@@ -5,12 +5,11 @@
#define ASCS_SERVER_PORT 9527
#define ASCS_ASYNC_ACCEPT_NUM 5
#define ASCS_REUSE_OBJECT //use objects pool
//#define ASCS_FREE_OBJECT_INTERVAL 60 //it's useless if ST_ASIO_REUSE_OBJECT macro been defined
//#define ASCS_FREE_OBJECT_INTERVAL 60 //it's useless if ASCS_REUSE_OBJECT macro been defined
//#define ASCS_FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define ASCS_ENHANCED_STABILITY
//#define ASCS_FULL_STATISTIC //full statistic will slightly impact efficiency
//#define ASCS_USE_STEADY_TIMER
#define ASCS_USE_CONCURRENT_QUEUE
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 0
......@@ -20,8 +19,8 @@
//3-prefix and suffix packer and unpacker
#if 1 == PACKER_UNPACKER_TYPE
#define ASCS_DEFAULT_PACKER replaceable_packer
#define ASCS_DEFAULT_UNPACKER replaceable_unpacker
#define ASCS_DEFAULT_PACKER replaceable_packer<>
#define ASCS_DEFAULT_UNPACKER replaceable_unpacker<>
#elif 2 == PACKER_UNPACKER_TYPE
#define ASCS_DEFAULT_PACKER fixed_length_packer
#define ASCS_DEFAULT_UNPACKER fixed_length_unpacker
......@@ -139,9 +138,9 @@ class echo_server : public server_base<echo_socket, object_pool<echo_socket>, i_
public:
echo_server(service_pump& service_pump_) : server_base(service_pump_) {}
echo_socket::statistic get_statistic()
statistic get_statistic()
{
echo_socket::statistic stat;
statistic stat;
do_something_to_all([&stat](const auto& item) {stat += item->get_statistic();});
return stat;
......@@ -243,17 +242,3 @@ int main(int argc, const char* argv[])
return 0;
}
//restore configuration
#undef ASCS_SERVER_PORT
#undef ASCS_ASYNC_ACCEPT_NUM
#undef ASCS_REUSE_OBJECT
#undef ASCS_FREE_OBJECT_INTERVAL
#undef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
#undef ASCS_ENHANCED_STABILITY
#undef ASCS_FULL_STATISTIC
#undef ASCS_USE_STEADY_TIMER
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_PACKER
#undef ASCS_DEFAULT_UNPACKER
//restore configuration
......@@ -3,8 +3,8 @@
//configuration
#define ASCS_SERVER_PORT 5050
#define ASCS_USE_CONCURRENT_QUEUE
#define ASCS_DEFAULT_UNPACKER replaceable_unpacker
//we cannot use non_lock_queue, because we also send messages (talking messages) out of ascs::socket::on_msg_send().
#define ASCS_DEFAULT_UNPACKER replaceable_unpacker<>
//configuration
#include "file_client.h"
......@@ -86,9 +86,3 @@ int main(int argc, const char* argv[])
return 0;
}
//restore configuration
#undef ASCS_SERVER_PORT
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_UNPACKER
//restore configuration
......@@ -70,10 +70,11 @@ public:
protected:
//msg handling
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
//we can handle msg very fast, so we don't use recv buffer
//we always handle messages in on_msg(), so we don't care the type of input queue and input container at all.
virtual bool on_msg(out_msg_type& msg) {handle_msg(msg); return true;}
#endif
//we will change unpacker at runtime, this operation can only be done in on_msg(), reset() or constructor
//we will change unpacker at runtime, this operation can only be done in on_msg(), reset() or constructor,
//so we must guarantee all messages to be handled in on_msg()
//virtual bool on_msg_handle(out_msg_type& msg, bool link_down) {handle_msg(msg); return true;}
//msg handling end
......
......@@ -7,8 +7,13 @@
#define ASCS_CLEAR_OBJECT_INTERVAL 60
#define ASCS_ENHANCED_STABILITY
#define ASCS_WANT_MSG_SEND_NOTIFY
#define ASCS_USE_CONCURRENT_QUEUE
#define ASCS_DEFAULT_PACKER replaceable_packer
#define ASCS_INPUT_QUEUE non_lock_queue
//file_server / file_client is a responsive system, before file_server send each message (except talking message,
//but file_server only receive talking message, not send talking message proactively), the previous message has been
//sent to file_client, so sending buffer will always be empty, which means we will never operate sending buffer concurrently,
//so need no locks.
#define ASCS_INPUT_CONTAINER list
#define ASCS_DEFAULT_PACKER replaceable_packer<>
//configuration
#include <ascs/tcp/server.h>
......@@ -56,13 +61,3 @@ int main(int argc, const char* argv[])
return 0;
}
//restore configuration
#undef ASCS_SERVER_PORT
#undef ASCS_ASYNC_ACCEPT_NUM
#undef ASCS_CLEAR_OBJECT_INTERVAL
#undef ASCS_ENHANCED_STABILITY
#undef ASCS_WANT_MSG_SEND_NOTIFY
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_PACKER
//restore configuration
......@@ -5,8 +5,13 @@
#define ASCS_CLEAR_OBJECT_INTERVAL 60
#define ASCS_ENHANCED_STABILITY
#define ASCS_WANT_MSG_SEND_NOTIFY
#define ASCS_USE_CONCURRENT_QUEUE
#define ASCS_DEFAULT_PACKER replaceable_packer
#define ASCS_INPUT_QUEUE non_lock_queue
//file_server / file_client is a responsive system, before file_server send each message (except talking message,
//but file_server only receive talking message, not send talking message proactively), the previous message has been
//sent to file_client, so sending buffer will always be empty, which means we will never operate sending buffer concurrently,
//so need no locks.
#define ASCS_INPUT_CONTAINER list
#define ASCS_DEFAULT_PACKER replaceable_packer<>
//configuration
#include "file_socket.h"
......@@ -106,13 +111,3 @@ void file_socket::handle_msg(out_msg_ctype& msg)
break;
}
}
//restore configuration
#undef ASCS_SERVER_PORT
#undef ASCS_ASYNC_ACCEPT_NUM
#undef ASCS_CLEAR_OBJECT_INTERVAL
#undef ASCS_ENHANCED_STABILITY
#undef ASCS_WANT_MSG_SEND_NOTIFY
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_PACKER
//restore configuration
......@@ -7,7 +7,8 @@
//#define ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
//#define ASCS_WANT_MSG_SEND_NOTIFY
#define ASCS_MSG_BUFFER_SIZE 65536
#define ASCS_USE_CONCURRENT_QUEUE
#define ASCS_INPUT_QUEUE non_lock_queue //we will never operate sending buffer concurrently, so need no locks.
#define ASCS_INPUT_CONTAINER list
#define ASCS_DEFAULT_UNPACKER stream_unpacker //non-protocol
//configuration
......@@ -41,7 +42,7 @@ std::atomic_ushort completed_session_num;
//2. for sender, if responses are available (like pingpong test), send msgs in on_msg()/on_msg_handle().
// this will reduce IO throughput because, SOCKET's sliding window is not fully used, pleae note.
//
//pingpong_client will choose method #1 if defined ST_ASIO_WANT_MSG_SEND_NOTIFY, otherwise #2
//pingpong_client will choose method #1 if defined ASCS_WANT_MSG_SEND_NOTIFY, otherwise #2
//BTW, if pingpong_client chose method #2, then pingpong_server can work properly without any congestion control,
//which means pingpong_server can send msgs back with can_overflow parameter equal to true, and memory occupation
//will be under control.
......@@ -75,9 +76,7 @@ protected:
{
send_bytes += msg.size();
if (send_bytes < total_bytes)
direct_send_msg(std::move(msg));
//this invocation has no chance to fail (by insufficient sending buffer), even can_overflow is false
//this is because here is the only place that will send msgs and here also means the receiving buffer at least can hold one more msg.
direct_send_msg(std::move(msg), true);
}
private:
......@@ -103,10 +102,7 @@ private:
begin_time.stop();
}
else
direct_send_msg(std::move(msg));
//this invocation has no chance to fail (by insufficient sending buffer), even can_overflow is false
//this is because pingpong_server never send msgs initiatively, and,
//here is the only place that will send msgs and here also means the receiving buffer at least can hold one more msg.
direct_send_msg(std::move(msg), true);
}
#endif
......@@ -119,9 +115,9 @@ class echo_client : public client_base<echo_socket>
public:
echo_client(service_pump& service_pump_) : client_base<echo_socket>(service_pump_) {}
echo_socket::statistic get_statistic()
statistic get_statistic()
{
echo_socket::statistic stat;
statistic stat;
do_something_to_all([&stat](const auto& item) {stat += item->get_statistic(); });
return stat;
......@@ -215,13 +211,3 @@ int main(int argc, const char* argv[])
return 0;
}
//restore configuration
#undef ASCS_SERVER_PORT
#undef ASCS_REUSE_OBJECT
#undef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
#undef ASCS_WANT_MSG_SEND_NOTIFY
#undef ASCS_MSG_BUFFER_SIZE
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_UNPACKER
//restore configuration
......@@ -7,7 +7,14 @@
#define ASCS_REUSE_OBJECT //use objects pool
//#define ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
#define ASCS_MSG_BUFFER_SIZE 65536
#define ASCS_USE_CONCURRENT_QUEUE
#define ASCS_INPUT_QUEUE non_lock_queue
#define ASCS_INPUT_CONTAINER list
//if pingpong_client only send message in on_msg() or on_msg_handle(), which means a responsive system, a real pingpong test,
//then, before pingpong_server send each message, the previous message has been sent to pingpong_client,
//so sending buffer will always be empty, which means we will never operate sending buffer concurrently, so need no locks.
//
//if pingpong_client send message in on_msg_send(), then using non_lock_queue as input queue in pingpong_server will lead
//undefined behavior, please note.
#define ASCS_DEFAULT_UNPACKER stream_unpacker //non-protocol
//configuration
......@@ -51,7 +58,7 @@ protected:
//msg handling: send the original msg back(echo server)
//congestion control, method #1, the peer needs its own congestion control too.
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
//this virtual function doesn't exists if ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER been defined
//this virtual function doesn't exists if ASCS_FORCE_TO_USE_MSG_RECV_BUFFER been defined
virtual bool on_msg(out_msg_type& msg)
{
auto re = direct_send_msg(std::move(msg));
......@@ -85,9 +92,9 @@ class echo_server : public server_base<echo_socket>
public:
echo_server(service_pump& service_pump_) : server_base(service_pump_) {}
echo_socket::statistic get_statistic()
statistic get_statistic()
{
echo_socket::statistic stat;
statistic stat;
do_something_to_all([&stat](const auto& item) {stat += item->get_statistic();});
return stat;
......@@ -134,13 +141,3 @@ int main(int argc, const char* argv[])
return 0;
}
//restore configuration
#undef ASCS_SERVER_PORT
#undef ASCS_ASYNC_ACCEPT_NUM
#undef ASCS_REUSE_OBJECT
#undef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
#undef ASCS_MSG_BUFFER_SIZE
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_UNPACKER
//restore configuration
......@@ -6,9 +6,8 @@
#define ASCS_ASYNC_ACCEPT_NUM 5
//#define ASCS_FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define ASCS_ENHANCED_STABILITY
#define ASCS_USE_CONCURRENT_QUEUE
//#define ASCS_DEFAULT_PACKER replaceable_packer
//#define ASCS_DEFAULT_UNPACKER replaceable_unpacker
//#define ASCS_DEFAULT_PACKER replaceable_packer<>
//#define ASCS_DEFAULT_UNPACKER replaceable_unpacker<>
//configuration
#include <ascs/ext/ssl.h>
......@@ -97,13 +96,3 @@ int main(int argc, const char* argv[])
return 0;
}
//restore configuration
#undef ASCS_SERVER_PORT
#undef ASCS_ASYNC_ACCEPT_NUM
#undef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
#undef ASCS_ENHANCED_STABILITY
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_PACKER
#undef ASCS_DEFAULT_UNPACKER
//restore configuration
......@@ -2,9 +2,8 @@
#include <iostream>
//configuration
#define ASCS_USE_CONCURRENT_QUEUE
//#define ASCS_DEFAULT_PACKER replaceable_packer
//#define ASCS_DEFAULT_UDP_UNPACKER replaceable_udp_unpacker
//#define ASCS_DEFAULT_PACKER replaceable_packer<>
//#define ASCS_DEFAULT_UDP_UNPACKER replaceable_udp_unpacker<>
//configuration
#include <ascs/ext/udp.h>
......@@ -51,9 +50,3 @@ int main(int argc, const char* argv[])
return 0;
}
//restore configuration
#undef ASCS_USE_CONCURRENT_QUEUE
#undef ASCS_DEFAULT_PACKER
#undef ASCS_DEFAULT_UNPACKER
//restore configuration
......@@ -7,7 +7,7 @@
* QQ: 676218192
* Community on QQ: 198941541
*
* this is a global head file
* interfaces, free functions, convenience and logs etc.
*/
#ifndef _ASCS_BASE_H_
......@@ -16,17 +16,16 @@
#include <stdio.h>
#include <stdarg.h>
#include <list>
#include <memory>
#include <string>
#include <thread>
#include <sstream>
#include <shared_mutex>
#include <iomanip>
#include <asio.hpp>
#include <asio/detail/noncopyable.hpp>
#include "config.h"
#include "container.h"
namespace ascs
{
......@@ -94,6 +93,7 @@ public:
typedef const buffer_type buffer_ctype;
shared_buffer() {}
shared_buffer(T* _buffer) {buffer.reset(_buffer);}
shared_buffer(buffer_type _buffer) : buffer(_buffer) {}
shared_buffer(const shared_buffer& other) : buffer(other.buffer) {}
shared_buffer(shared_buffer&& other) : buffer(std::move(other.buffer)) {}
......@@ -103,6 +103,7 @@ public:
shared_buffer& operator=(shared_buffer&& other) {clear(); swap(other); return *this;}
buffer_type raw_buffer() const {return buffer;}
void raw_buffer(T* _buffer) {buffer.reset(_buffer);}
void raw_buffer(buffer_ctype _buffer) {buffer = _buffer;}
//the following five functions are needed by ascs
......@@ -151,217 +152,6 @@ public:
virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false) {assert(false); return msg_type();}
};
#if !defined(__clang__) && defined(__GNUC__) && __GNUC__ < 5
//a substitute of std::list (gcc before 5.1), it's size() function has O(1) complexity
//BTW, the naming rule is not mine, I copied them from std::list in Visual C++ 14.0
template<typename _Ty, typename _Alloc = std::allocator<_Ty>>
class list
{
public:
typedef list<_Ty, _Alloc> _Myt;
typedef std::list<_Ty, _Alloc> _Mybase;
typedef typename _Mybase::size_type size_type;
typedef typename _Mybase::reference reference;
typedef typename _Mybase::const_reference const_reference;
typedef typename _Mybase::iterator iterator;
typedef typename _Mybase::const_iterator const_iterator;
typedef typename _Mybase::reverse_iterator reverse_iterator;
typedef typename _Mybase::const_reverse_iterator const_reverse_iterator;
list() : s(0) {}
void swap(list& other) {impl.swap(other.impl); std::swap(s, other.s);}
bool empty() const {return 0 == s;}
size_type size() const {return s;}
void resize(size_type _Newsize)
{
while (s < _Newsize)
{
++s;
impl.emplace_back();
}
if (s > _Newsize)
{
auto end_iter = std::end(impl);
auto begin_iter = _Newsize <= s / 2 ? std::next(std::begin(impl), _Newsize) : std::prev(end_iter, s - _Newsize); //minimize iterator movement
s = _Newsize;
impl.erase(begin_iter, end_iter);
}
}
void clear() {s = 0; impl.clear();}
iterator erase(const_iterator _Where) {--s; return impl.erase(_Where);}
void push_front(const _Ty& _Val) {++s; impl.push_front(_Val);}
void push_front(_Ty&& _Val) {++s; impl.push_front(std::move(_Val));}
void pop_front() {--s; impl.pop_front();}
reference front() {return impl.front();}
iterator begin() {return impl.begin();}
reverse_iterator rbegin() {return impl.rbegin();}
const_reference front() const {return impl.front();}
const_iterator begin() const {return impl.begin();}
const_reverse_iterator rbegin() const {return impl.rbegin();}
void push_back(const _Ty& _Val) {++s; impl.push_back(_Val);}
void push_back(_Ty&& _Val) {++s; impl.push_back(std::move(_Val));}
void pop_back() {--s; impl.pop_back();}
reference back() {return impl.back();}
iterator end() {return impl.end();}
reverse_iterator rend() {return impl.rend();}
const_reference back() const {return impl.back();}
const_iterator end() const {return impl.end();}
const_reverse_iterator rend() const {return impl.rend();}
void splice(const_iterator _Where, _Myt& _Right) {s += _Right.size(); _Right.s = 0; impl.splice(_Where, _Right.impl);}
void splice(const_iterator _Where, _Myt& _Right, const_iterator _First) {++s; --_Right.s; impl.splice(_Where, _Right.impl, _First);}
void splice(const_iterator _Where, _Myt& _Right, const_iterator _First, const_iterator _Last)
{
auto size = std::distance(_First, _Last);
//this std::distance invocation is the penalty for making complexity of size() constant.
s += size;
_Right.s -= size;
impl.splice(_Where, _Right.impl, _First, _Last);
}
private:
size_type s;
_Mybase impl;
};
#else
template<typename T, typename _Alloc = std::allocator<T>> using list = std::list<T, _Alloc>;
#endif
#ifdef ASCS_USE_CUSTOM_QUEUE
#elif defined(ASCS_USE_CONCURRENT_QUEUE)
template<typename T>
class message_queue_ : public moodycamel::ConcurrentQueue<T>
{
public:
typedef moodycamel::ConcurrentQueue<T> super;
typedef message_queue_<T> me;
typedef std::lock_guard<me> lock_guard;
message_queue_() {}
message_queue_(size_t size) : super(size) {}
size_t size() const {return super::size_approx();}
bool empty() const {return 0 == size();}
//not thread-safe
void clear() {super(std::move(*this));}
void swap(me& other) {super::swap(other);}
//lockable, dummy
void lock() const {}
void unlock() const {}
bool idle() const {return true;}
bool enqueue_(const T& item) {return this->enqueue(item);}
bool enqueue_(T&& item) {return this->enqueue(std::move(item));}
bool try_enqueue_(const T& item) {return this->try_enqueue(item);}
bool try_enqueue_(T&& item) {return this->try_enqueue(std::move(item));}
bool try_dequeue_(T& item) {return this->try_dequeue(item);}
};
#else
template<typename T>
class message_queue_ : public list<T>
{
public:
typedef list<T> super;
typedef message_queue_<T> me;
typedef std::lock_guard<me> lock_guard;
message_queue_() {}
message_queue_(size_t) {}
//not thread-safe
void clear() {super::clear();}
void swap(me& other) {super::swap(other);}
//lockable
void lock() {mutex.lock();}
void unlock() {mutex.unlock();}
bool idle() {std::unique_lock<std::shared_mutex> lock(mutex, std::try_to_lock); return lock.owns_lock();}
bool enqueue(const T& item) {lock_guard lock(*this); return enqueue_(item);}
bool enqueue(T&& item) {lock_guard lock(*this); return enqueue_(std::move(item));}
bool try_enqueue(const T& item) {lock_guard lock(*this); return try_enqueue_(item);}
bool try_enqueue(T&& item) {lock_guard lock(*this); return try_enqueue_(std::move(item));}
bool try_dequeue(T& item) {lock_guard lock(*this); return try_dequeue_(item);}
bool enqueue_(const T& item) {this->push_back(item); return true;}
bool enqueue_(T&& item) {this->push_back(std::move(item)); return true;}
bool try_enqueue_(const T& item) {return enqueue_(item);}
bool try_enqueue_(T&& item) {return enqueue_(std::move(item));}
bool try_dequeue_(T& item) {if (this->empty()) return false; item.swap(this->front()); this->pop_front(); return true;}
private:
std::shared_mutex mutex;
};
#endif
#ifndef ASCS_USE_CUSTOM_QUEUE
template<typename T>
class message_queue : public message_queue_<T>
{
public:
typedef message_queue_<T> super;
message_queue() {}
message_queue(size_t size) : super(size) {}
//it's not thread safe for 'other', please note.
size_t move_items_in(typename super::me& other, size_t max_size = ASCS_MAX_MSG_NUM)
{
typename super::lock_guard lock(*this);
auto cur_size = this->size();
if (cur_size >= max_size)
return 0;
size_t num = 0;
while (cur_size < max_size)
{
T item;
if (!other.try_dequeue_(item)) //not thread safe for 'other', because we called 'try_dequeue_'
break;
this->enqueue_(std::move(item));
++cur_size;
++num;
}
return num;
}
//it's no thread safe for 'other', please note.
size_t move_items_in(list<T>& other, size_t max_size = ASCS_MAX_MSG_NUM)
{
typename super::lock_guard lock(*this);
auto cur_size = this->size();
if (cur_size >= max_size)
return 0;
size_t num = 0;
while (cur_size < max_size && !other.empty())
{
this->enqueue_(std::move(other.front()));
other.pop_front();
++cur_size;
++num;
}
return num;
}
};
#endif
//unpacker concept
namespace tcp
{
......@@ -418,6 +208,121 @@ namespace udp
} //namespace
//unpacker concept
struct statistic
{
#ifdef ASCS_FULL_STATISTIC
static bool enabled() {return true;}
typedef std::chrono::system_clock::time_point stat_time;
static stat_time now() {return std::chrono::system_clock::now();}
typedef std::chrono::system_clock::duration stat_duration;
#else
struct dummy_duration {const dummy_duration& operator +=(const dummy_duration& other) {return *this;}}; //not a real duration, just satisfy compiler(d1 += d2)
struct dummy_time {dummy_duration operator -(const dummy_time& other) {return dummy_duration();}}; //not a real time, just satisfy compiler(t1 - t2)
static bool enabled() {return false;}
typedef dummy_time stat_time;
static stat_time now() {return stat_time();}
typedef dummy_duration stat_duration;
#endif
statistic() {reset();}
void reset_number() {send_msg_sum = send_byte_sum = 0; recv_msg_sum = recv_byte_sum = 0;}
#ifdef ASCS_FULL_STATISTIC
void reset() {reset_number(); reset_duration();}
void reset_duration()
{
send_delay_sum = send_time_sum = stat_duration(0);
dispatch_dealy_sum = recv_idle_sum = stat_duration(0);
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
handle_time_1_sum = stat_duration(0);
#endif
handle_time_2_sum = stat_duration(0);
}
#else
void reset() {reset_number();}
#endif
statistic& operator +=(const struct statistic& other)
{
send_msg_sum += other.send_msg_sum;
send_byte_sum += other.send_byte_sum;
send_delay_sum += other.send_delay_sum;
send_time_sum += other.send_time_sum;
recv_msg_sum += other.recv_msg_sum;
recv_byte_sum += other.recv_byte_sum;
dispatch_dealy_sum += other.dispatch_dealy_sum;
recv_idle_sum += other.recv_idle_sum;
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
handle_time_1_sum += other.handle_time_1_sum;
#endif
handle_time_2_sum += other.handle_time_2_sum;
return *this;
}
std::string to_string() const
{
std::ostringstream s;
#ifdef ASCS_FULL_STATISTIC
s << "send corresponding statistic:\n"
<< "message sum: " << send_msg_sum << std::endl
<< "size in bytes: " << send_byte_sum << std::endl
<< "send delay: " << std::chrono::duration_cast<std::chrono::duration<float>>(send_delay_sum).count() << std::endl
<< "send duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(send_time_sum).count() << std::endl
<< "\nrecv corresponding statistic:\n"
<< "message sum: " << recv_msg_sum << std::endl
<< "size in bytes: " << recv_byte_sum << std::endl
<< "dispatch delay: " << std::chrono::duration_cast<std::chrono::duration<float>>(dispatch_dealy_sum).count() << std::endl
<< "recv idle duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(recv_idle_sum).count() << std::endl
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
<< "on_msg duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(handle_time_1_sum).count() << std::endl
#endif
<< "on_msg_handle duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(handle_time_2_sum).count();
#else
s << std::setfill('0') << "send corresponding statistic:\n"
<< "message sum: " << send_msg_sum << std::endl
<< "size in bytes: " << send_byte_sum << std::endl
<< "\nrecv corresponding statistic:\n"
<< "message sum: " << recv_msg_sum << std::endl
<< "size in bytes: " << recv_byte_sum;
#endif
return s.str();
}
//send corresponding statistic
uint_fast64_t send_msg_sum; //not counted msgs in sending buffer
uint_fast64_t send_byte_sum; //not counted msgs in sending buffer
stat_duration send_delay_sum; //from send_(native_)msg (exclude msg packing) to asio::async_write
stat_duration send_time_sum; //from asio::async_write to send_handler
//above two items indicate your network's speed or load
//recv corresponding statistic
uint_fast64_t recv_msg_sum; //include msgs in receiving buffer
uint_fast64_t recv_byte_sum; //include msgs in receiving buffer
stat_duration dispatch_dealy_sum; //from parse_msg(exclude msg unpacking) to on_msg_handle
stat_duration recv_idle_sum;
//during this duration, socket suspended msg reception (receiving buffer overflow, msg dispatching suspended or doing congestion control)
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
stat_duration handle_time_1_sum; //on_msg consumed time, this indicate the efficiency of msg handling
#endif
stat_duration handle_time_2_sum; //on_msg_handle consumed time, this indicate the efficiency of msg handling
};
template<typename T>
struct obj_with_begin_time : public T
{
obj_with_begin_time() {restart();}
obj_with_begin_time(T&& msg) : T(std::move(msg)) {restart();}
void restart() {restart(statistic::now());}
void restart(const typename statistic::stat_time& begin_time_) {begin_time = begin_time_;}
using T::swap;
void swap(obj_with_begin_time& other) {T::swap(other); std::swap(begin_time, other.begin_time);}
typename statistic::stat_time begin_time;
};
//free functions, used to do something to any container(except map and multimap) optionally with any mutex
template<typename _Can, typename _Mutex, typename _Predicate>
void do_something_to_all(_Can& __can, _Mutex& __mutex, const _Predicate& __pred) {std::shared_lock<std::shared_mutex> lock(__mutex); for (auto& item : __can) __pred(item);}
......@@ -435,30 +340,6 @@ void do_something_to_one(_Can& __can, _Mutex& __mutex, const _Predicate& __pred)
template<typename _Can, typename _Predicate>
void do_something_to_one(_Can& __can, const _Predicate& __pred) {for (auto iter = std::begin(__can); iter != std::end(__can); ++iter) if (__pred(*iter)) break;}
template<typename _Can>
bool splice_helper(_Can& dest_can, _Can& src_can, size_t max_size = ASCS_MAX_MSG_NUM)
{
if (src_can.empty())
return false;
auto size = dest_can.size();
if (size >= max_size) //dest_can can hold more items.
return false;
size = max_size - size; //maximum items this time can handle
if (src_can.size() > size) //some items left behind
{
auto begin_iter = std::begin(src_can);
auto left_size = src_can.size() - size;
auto end_iter = left_size > size ? std::next(begin_iter, size) : std::prev(std::end(src_can), left_size); //minimize iterator movement
dest_can.splice(std::end(dest_can), src_can, begin_iter, end_iter);
}
else
dest_can.splice(std::end(dest_can), src_can);
return true;
}
//member functions, used to do something to any member container(except map and multimap) optionally with any member mutex
#define DO_SOMETHING_TO_ALL_MUTEX(CAN, MUTEX) DO_SOMETHING_TO_ALL_MUTEX_NAME(do_something_to_all, CAN, MUTEX)
#define DO_SOMETHING_TO_ALL(CAN) DO_SOMETHING_TO_ALL_NAME(do_something_to_all, CAN)
......
......@@ -35,6 +35,18 @@
* Add a new packer--fixed_length_packer.
* Add a new class--message_queue.
*
* 2016.10.16 version 1.3.1
* Support non-lock queue, it's totally not thread safe and lock-free, it can improve IO throughput with particular business.
* Demonstrate how and when to use non-lock queue as the input and output message buffer.
* Queues (and their internal containers) used as input and output message buffer are now configurable (by macros or template arguments).
* New macros--ASCS_INPUT_QUEUE, ASCS_INPUT_CONTAINER, ASCS_OUTPUT_QUEUE and ASCS_OUTPUT_CONTAINER.
* Drop macro ASCS_USE_CONCURRENT_QUEUE, rename macro ASCS_USE_CONCURRE to ASCS_HAS_CONCURRENT_QUEUE.
* In contrast to non_lock_queue, split message_queue into lock_queue and lock_free_queue.
* Move container related classes and functions from st_asio_wrapper_base.h to st_asio_wrapper_container.h.
* Improve efficiency in scenarios of low throughput like pingpong test.
* Replaceable packer/unpacker now support replaceable_buffer (an alias of auto_buffer) and shared_buffer to be their message type.
* Move class statistic and obj_with_begin_time out of ascs::socket to reduce template tiers.
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -44,26 +56,31 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10100 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.1.0"
#define ASCS_VER 10101 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.1.1"
//asio and compiler check
#ifdef _MSC_VER
#define ASCS_SF "%Iu" //printing format for 'size_t'
static_assert(_MSC_VER >= 1900, "ascs need Visual C++ 14.0 or higher.");
#ifdef _HAS_SHARED_MUTEX
#define ASCS_HAS_STD_SHARED_MUTEX
#endif
#elif defined(__GNUC__)
#define ASCS_SF "%zu" //printing format for 'size_t'
#ifdef __clang__
static_assert(__clang_major__ > 3 || (__clang_major__ == 3 && __clang_minor__ >= 4), "ascs need Clang 3.4 or higher.");
#else
static_assert(__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 9), "ascs need GCC 4.9 or higher.");
#if __GNUC__ > 5
#define ASCS_HAS_STD_SHARED_MUTEX
#if __GNUC__ > 5 && __cplusplus <= 201402L
#warning your compiler maybe support c++17, please open it (-std=c++17), then ascs will be able to use std::shared_mutex.
#endif
#endif
#if !defined(__cplusplus) || __cplusplus <= 201103L
#error ascs at least need c++14.
#elif __cplusplus > 201402L //TBD
#define ASCS_HAS_STD_SHARED_MUTEX
#endif
#else
#error ascs only support Visual C++, GCC and Clang.
......@@ -107,7 +124,7 @@ static_assert(ASCS_MAX_MSG_NUM > 0, "message capacity must be bigger than zero."
//after this duration, this socket can be freed from the heap or reused,
//you must define this macro as a value, not just define it, the value means the duration, unit is second.
//if macro ST_ASIO_ENHANCED_STABILITY been defined, this macro will always be zero.
//if macro ASCS_ENHANCED_STABILITY been defined, this macro will always be zero.
#ifdef ASCS_ENHANCED_STABILITY
#if defined(ASCS_DELAY_CLOSE) && ASCS_DELAY_CLOSE != 0
#warning ASCS_DELAY_CLOSE will always be zero if ASCS_ENHANCED_STABILITY macro been defined.
......@@ -151,7 +168,7 @@ static_assert(ASCS_MAX_OBJECT_NUM > 0, "object capacity must be bigger than zero
#ifndef ASCS_REUSE_OBJECT
#ifndef ASCS_FREE_OBJECT_INTERVAL
#define ASCS_FREE_OBJECT_INTERVAL 60 //seconds
#elif ST_ASIO_FREE_OBJECT_INTERVAL <= 0
#elif ASCS_FREE_OBJECT_INTERVAL <= 0
#error free object interval must be bigger than zero.
#endif
#endif
......@@ -209,10 +226,38 @@ namespace std {typedef shared_timed_mutex shared_mutex;}
#endif
//ConcurrentQueue is lock-free, please refer to https://github.com/cameron314/concurrentqueue
#ifdef ASCS_USE_CUSTOM_QUEUE
#elif defined(ASCS_USE_CONCURRENT_QUEUE) //if ASCS_USE_CONCURRENT_QUEUE macro not defined, ascs will use 'list' as the message queue, it's not thread safe, so need locks.
#ifdef ASCS_HAS_CONCURRENT_QUEUE
#include <concurrentqueue.h>
template<typename T> using concurrent_queue = moodycamel::ConcurrentQueue<T>;
#ifndef ASCS_INPUT_QUEUE
#define ASCS_INPUT_QUEUE lock_free_queue
#endif
#ifndef ASCS_INPUT_CONTAINER
#define ASCS_INPUT_CONTAINER concurrent_queue
#endif
#ifndef ASCS_OUTPUT_QUEUE
#define ASCS_OUTPUT_QUEUE lock_free_queue
#endif
#ifndef ASCS_OUTPUT_CONTAINER
#define ASCS_OUTPUT_CONTAINER concurrent_queue
#endif
#else
#ifndef ASCS_INPUT_QUEUE
#define ASCS_INPUT_QUEUE lock_queue
#endif
#ifndef ASCS_INPUT_CONTAINER
#define ASCS_INPUT_CONTAINER list
#endif
#ifndef ASCS_OUTPUT_QUEUE
#define ASCS_OUTPUT_QUEUE lock_queue
#endif
#ifndef ASCS_OUTPUT_CONTAINER
#define ASCS_OUTPUT_CONTAINER list
#endif
#endif
//we also can control the queues (and their containers) via template parameters on calss 'connector_base'
//'server_socket_base', 'ssl::connector_base' and 'ssl::server_socket_base'.
//we even can let a socket to use different queue (and / or different container) for input and output via template parameters.
//configurations
#endif /* _ASCS_CONFIG_H_ */
/*
* container.h
*
* Created on: 2016-10-10
* Author: youngwolf
* email: mail2tao@163.com
* QQ: 676218192
* Community on QQ: 198941541
*
* containers.
*/
#ifndef _ASCS_CONTAINER_H_
#define _ASCS_CONTAINER_H_
#include <list>
#include <shared_mutex>
#include "config.h"
namespace ascs
{
//ascs requires that container must take one and only one template argument.
#if defined(_MSC_VER) || defined(__clang__) || __GNUC__ >= 5
template<typename T> using list = std::list<T>;
#else
//a substitute of std::list (before gcc 5), it's size() function has O(1) complexity
//BTW, the naming rule is not mine, I copied them from std::list in Visual C++ 14.0
template<typename _Ty>
class list
{
public:
typedef list<_Ty> _Myt;
typedef std::list<_Ty> _Mybase;
typedef typename _Mybase::size_type size_type;
typedef typename _Mybase::reference reference;
typedef typename _Mybase::const_reference const_reference;
typedef typename _Mybase::iterator iterator;
typedef typename _Mybase::const_iterator const_iterator;
typedef typename _Mybase::reverse_iterator reverse_iterator;
typedef typename _Mybase::const_reverse_iterator const_reverse_iterator;
list() : s(0) {}
void swap(list& other) {impl.swap(other.impl); std::swap(s, other.s);}
bool empty() const {return 0 == s;}
size_type size() const {return s;}
void resize(size_type _Newsize)
{
while (s < _Newsize)
{
++s;
impl.emplace_back();
}
if (s > _Newsize)
{
auto end_iter = std::end(impl);
auto begin_iter = _Newsize <= s / 2 ? std::next(std::begin(impl), _Newsize) : std::prev(end_iter, s - _Newsize); //minimize iterator movement
s = _Newsize;
impl.erase(begin_iter, end_iter);
}
}
void clear() {s = 0; impl.clear();}
iterator erase(const_iterator _Where) {--s; return impl.erase(_Where);}
void push_front(const _Ty& _Val) {++s; impl.push_front(_Val);}
void push_front(_Ty&& _Val) {++s; impl.push_front(std::move(_Val));}
void pop_front() {--s; impl.pop_front();}
reference front() {return impl.front();}
iterator begin() {return impl.begin();}
reverse_iterator rbegin() {return impl.rbegin();}
const_reference front() const {return impl.front();}
const_iterator begin() const {return impl.begin();}
const_reverse_iterator rbegin() const {return impl.rbegin();}
void push_back(const _Ty& _Val) {++s; impl.push_back(_Val);}
void push_back(_Ty&& _Val) {++s; impl.push_back(std::move(_Val));}
void pop_back() {--s; impl.pop_back();}
reference back() {return impl.back();}
iterator end() {return impl.end();}
reverse_iterator rend() {return impl.rend();}
const_reference back() const {return impl.back();}
const_iterator end() const {return impl.end();}
const_reverse_iterator rend() const {return impl.rend();}
void splice(const_iterator _Where, _Myt& _Right) {s += _Right.size(); _Right.s = 0; impl.splice(_Where, _Right.impl);}
void splice(const_iterator _Where, _Myt& _Right, const_iterator _First) {++s; --_Right.s; impl.splice(_Where, _Right.impl, _First);}
void splice(const_iterator _Where, _Myt& _Right, const_iterator _First, const_iterator _Last)
{
auto size = std::distance(_First, _Last);
//this std::distance invocation is the penalty for making complexity of size() constant.
s += size;
_Right.s -= size;
impl.splice(_Where, _Right.impl, _First, _Last);
}
private:
size_type s;
_Mybase impl;
};
#endif
class dummy_lockable
{
public:
typedef std::lock_guard<dummy_lockable> lock_guard;
//lockable, dummy
void lock() const {}
void unlock() const {}
bool idle() const {return true;} //locked or not
};
class lockable
{
public:
typedef std::lock_guard<lockable> lock_guard;
//lockable
void lock() {mutex.lock();}
void unlock() {mutex.unlock();}
bool idle() {std::unique_lock<std::shared_mutex> lock(mutex, std::try_to_lock); return lock.owns_lock();} //locked or not
private:
std::shared_mutex mutex;
};
//Container must at least has the following functions:
// Container(size) and Container() constructor
// move constructor
// swap
// size_approx
// enqueue(const T& item)
// enqueue(T&& item)
// try_dequeue(T& item)
template<typename T, typename Container>
class lock_free_queue : public Container, public dummy_lockable
{
public:
typedef T data_type;
typedef Container super;
typedef lock_free_queue<T, Container> me;
lock_free_queue() {}
lock_free_queue(size_t size) : super(size) {}
size_t size() const {return this->size_approx();}
bool empty() const {return 0 == size();}
//not thread-safe
void clear() {super(std::move(*this));}
void swap(me& other) {super::swap(other);}
bool enqueue_(const T& item) {return this->enqueue(item);}
bool enqueue_(T&& item) {return this->enqueue(std::move(item));}
bool try_dequeue_(T& item) {return this->try_dequeue(item);}
};
//Container must at least has the following functions:
// Container() constructor
// size
// empty
// clear
// swap
// push_back(const T& item)
// push_back(T&& item)
// front
// pop_front
template<typename T, typename Container, typename Lockable>
class queue : public Container, public Lockable
{
public:
typedef T data_type;
typedef Container super;
typedef queue<T, Container, Lockable> me;
queue() {}
queue(size_t size) : super(size) {}
//not thread-safe
void clear() {super::clear();}
void swap(me& other) {super::swap(other);}
bool enqueue(const T& item) {typename Lockable::lock_guard lock(*this); return enqueue_(item);}
bool enqueue(T&& item) {typename Lockable::lock_guard lock(*this); return enqueue_(std::move(item));}
bool try_dequeue(T& item) {typename Lockable::lock_guard lock(*this); return try_dequeue_(item);}
bool enqueue_(const T& item) {this->push_back(item); return true;}
bool enqueue_(T&& item) {this->push_back(std::move(item)); return true;}
bool try_dequeue_(T& item) {if (this->empty()) return false; item.swap(this->front()); this->pop_front(); return true;}
};
template<typename T, typename Container> using non_lock_queue = queue<T, Container, dummy_lockable>; //totally not thread safe
template<typename T, typename Container> using lock_queue = queue<T, Container, lockable>;
//it's not thread safe for 'other', please note. for this queue, depends on 'Q'
template<typename Q>
size_t move_items_in(Q& dest, Q& other, size_t max_size = ASCS_MAX_MSG_NUM)
{
if (other.empty())
return 0;
auto cur_size = dest.size();
if (cur_size >= max_size)
return 0;
size_t num = 0;
typename Q::data_type item;
typename Q::lock_guard lock(dest);
while (cur_size < max_size && other.try_dequeue_(item)) //size not controlled accurately
{
dest.enqueue_(std::move(item));
++cur_size;
++num;
}
return num;
}
//it's not thread safe for 'other', please note. for this queue, depends on 'Q'
template<typename Q, typename Q2>
size_t move_items_in(Q& dest, Q2& other, size_t max_size = ASCS_MAX_MSG_NUM)
{
if (other.empty())
return 0;
auto cur_size = dest.size();
if (cur_size >= max_size)
return 0;
size_t num = 0;
typename Q::lock_guard lock(dest);
while (cur_size < max_size && !other.empty()) //size not controlled accurately
{
dest.enqueue_(std::move(other.front()));
other.pop_front();
++cur_size;
++num;
}
return num;
}
template<typename _Can>
bool splice_helper(_Can& dest_can, _Can& src_can, size_t max_size = ASCS_MAX_MSG_NUM)
{
if (src_can.empty())
return false;
auto size = dest_can.size();
if (size >= max_size) //dest_can can hold more items.
return false;
size = max_size - size; //maximum items this time can handle
if (src_can.size() > size) //some items left behind
{
auto begin_iter = std::begin(src_can);
auto left_size = src_can.size() - size;
auto end_iter = left_size > size ? std::next(begin_iter, size) : std::prev(std::end(src_can), left_size); //minimize iterator movement
dest_can.splice(std::end(dest_can), src_can, begin_iter, end_iter);
}
else
dest_can.splice(std::end(dest_can), src_can);
return true;
}
} //namespace
#endif /* _ASCS_CONTAINER_H_ */
\ No newline at end of file
......@@ -29,7 +29,7 @@ static_assert(ASCS_MSG_BUFFER_SIZE > 0, "message buffer size must be bigger than
namespace ascs { namespace ext {
//buffers who implemented i_buffer interface can be wrapped by replaceable_buffer
//implement i_buffer interface, then string_buffer can be wrapped by replaceable_buffer
class string_buffer : public std::string, public i_buffer
{
public:
......
......@@ -99,21 +99,26 @@ public:
};
//protocol: length + body
class replaceable_packer : public i_packer<replaceable_buffer>
//T can be replaceable_buffer (an alias of auto_buffer) or shared_buffer, the latter makes output messages seemingly copyable.
template<typename T = replaceable_buffer>
class replaceable_packer : public i_packer<T>
{
protected:
typedef i_packer<T> super;
public:
using i_packer<msg_type>::pack_msg;
virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false)
using super::pack_msg;
virtual typename super::msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false)
{
auto raw_msg = new string_buffer();
auto str = packer().pack_msg(pstr, len, num, native);
raw_msg->swap(str);
return msg_type(raw_msg);
return typename super::msg_type(raw_msg);
}
virtual char* raw_data(msg_type& msg) const {return const_cast<char*>(std::next(msg.data(), ASCS_HEAD_LEN));}
virtual const char* raw_data(msg_ctype& msg) const {return std::next(msg.data(), ASCS_HEAD_LEN);}
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ASCS_HEAD_LEN;}
virtual char* raw_data(typename super::msg_type& msg) const {return const_cast<char*>(std::next(msg.data(), ASCS_HEAD_LEN));}
virtual const char* raw_data(typename super::msg_ctype& msg) const {return std::next(msg.data(), ASCS_HEAD_LEN);}
virtual size_t raw_data_len(typename super::msg_ctype& msg) const {return msg.size() - ASCS_HEAD_LEN;}
};
//protocol: fixed lenght
......
......@@ -139,19 +139,24 @@ protected:
};
//protocol: length + body
class replaceable_unpacker : public tcp::i_unpacker<replaceable_buffer>
//T can be replaceable_buffer (an alias of auto_buffer) or shared_buffer, the latter makes output messages seemingly copyable,
template<typename T = replaceable_buffer>
class replaceable_unpacker : public ascs::tcp::i_unpacker<T>
{
protected:
typedef ascs::tcp::i_unpacker<T> super;
public:
virtual void reset_state() {unpacker_.reset_state();}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
virtual bool parse_msg(size_t bytes_transferred, typename super::container_type& msg_can)
{
unpacker::container_type tmp_can;
auto unpack_ok = unpacker_.parse_msg(bytes_transferred, tmp_can);
do_something_to_all(tmp_can, [&msg_can](auto& item) {
auto raw_buffer = new string_buffer();
raw_buffer->swap(item);
auto raw_msg = new string_buffer();
raw_msg->swap(item);
msg_can.resize(msg_can.size() + 1);
msg_can.back().raw_buffer(raw_buffer);
msg_can.back().raw_buffer(raw_msg);
});
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
......@@ -166,16 +171,21 @@ protected:
};
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
class replaceable_udp_unpacker : public udp::i_unpacker<replaceable_buffer>
//T can be replaceable_buffer (an alias of auto_buffer) or shared_buffer, the latter makes output messages seemingly copyable.
template<typename T = replaceable_buffer>
class replaceable_udp_unpacker : public ascs::udp::i_unpacker<T>
{
protected:
typedef ascs::udp::i_unpacker<T> super;
public:
virtual msg_type parse_msg(size_t bytes_transferred)
virtual typename super::msg_type parse_msg(size_t bytes_transferred)
{
assert(bytes_transferred <= ASCS_MSG_BUFFER_SIZE);
auto raw_msg = new string_buffer();
raw_msg->assign(raw_buff.data(), bytes_transferred);
return msg_type(raw_msg);
return typename super::msg_type(raw_msg);
}
virtual asio::mutable_buffers_1 prepare_next_recv() {return asio::buffer(raw_buff);}
......
......@@ -13,138 +13,22 @@
#ifndef _ASCS_SOCKET_H_
#define _ASCS_SOCKET_H_
#include <iomanip>
#include "base.h"
#include "timer.h"
namespace ascs
{
template<typename Socket, typename Packer, typename Unpacker, typename InMsgType, typename OutMsgType>
template<typename Socket, typename Packer, typename Unpacker, typename InMsgType, typename OutMsgType,
template<typename, typename> class InQueue, template<typename> class InContainer,
template<typename, typename> class OutQueue, template<typename> class OutContainer>
class socket: public timer
{
public:
struct statistic
{
#ifdef ASCS_FULL_STATISTIC
static bool enabled() {return true;}
typedef std::chrono::system_clock::time_point stat_time;
static stat_time now() {return std::chrono::system_clock::now();}
typedef std::chrono::system_clock::duration stat_duration;
#else
struct dummy_duration {const dummy_duration& operator +=(const dummy_duration& other) {return *this;}}; //not a real duration, just satisfy compiler(d1 += d2)
struct dummy_time {dummy_duration operator -(const dummy_time& other) {return dummy_duration();}}; //not a real time, just satisfy compiler(t1 - t2)
static bool enabled() {return false;}
typedef dummy_time stat_time;
static stat_time now() {return stat_time();}
typedef dummy_duration stat_duration;
#endif
statistic() {reset();}
void reset_number() {send_msg_sum = send_byte_sum = 0; recv_msg_sum = recv_byte_sum = 0;}
#ifdef ASCS_FULL_STATISTIC
void reset() {reset_number(); reset_duration();}
void reset_duration()
{
send_delay_sum = send_time_sum = stat_duration(0);
dispatch_dealy_sum = recv_idle_sum = stat_duration(0);
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
handle_time_1_sum = stat_duration(0);
#endif
handle_time_2_sum = stat_duration(0);
}
#else
void reset() {reset_number();}
#endif
statistic& operator +=(const struct statistic& other)
{
send_msg_sum += other.send_msg_sum;
send_byte_sum += other.send_byte_sum;
send_delay_sum += other.send_delay_sum;
send_time_sum += other.send_time_sum;
recv_msg_sum += other.recv_msg_sum;
recv_byte_sum += other.recv_byte_sum;
dispatch_dealy_sum += other.dispatch_dealy_sum;
recv_idle_sum += other.recv_idle_sum;
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
handle_time_1_sum += other.handle_time_1_sum;
#endif
handle_time_2_sum += other.handle_time_2_sum;
return *this;
}
std::string to_string() const
{
std::ostringstream s;
#ifdef ASCS_FULL_STATISTIC
s << "send corresponding statistic:\n"
<< "message sum: " << send_msg_sum << std::endl
<< "size in bytes: " << send_byte_sum << std::endl
<< "send delay: " << std::chrono::duration_cast<std::chrono::duration<float>>(send_delay_sum).count() << std::endl
<< "send duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(send_time_sum).count() << std::endl
<< "\nrecv corresponding statistic:\n"
<< "message sum: " << recv_msg_sum << std::endl
<< "size in bytes: " << recv_byte_sum << std::endl
<< "dispatch delay: " << std::chrono::duration_cast<std::chrono::duration<float>>(dispatch_dealy_sum).count() << std::endl
<< "recv idle duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(recv_idle_sum).count() << std::endl
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
<< "on_msg duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(handle_time_1_sum).count() << std::endl
#endif
<< "on_msg_handle duration: " << std::chrono::duration_cast<std::chrono::duration<float>>(handle_time_2_sum).count();
#else
s << std::setfill('0') << "send corresponding statistic:\n"
<< "message sum: " << send_msg_sum << std::endl
<< "size in bytes: " << send_byte_sum << std::endl
<< "\nrecv corresponding statistic:\n"
<< "message sum: " << recv_msg_sum << std::endl
<< "size in bytes: " << recv_byte_sum;
#endif
return s.str();
}
//send corresponding statistic
uint_fast64_t send_msg_sum; //not counted msgs in sending buffer
uint_fast64_t send_byte_sum; //not counted msgs in sending buffer
stat_duration send_delay_sum; //from send_(native_)msg (exclude msg packing) to asio::async_write
stat_duration send_time_sum; //from asio::async_write to send_handler
//above two items indicate your network's speed or load
//recv corresponding statistic
uint_fast64_t recv_msg_sum; //include msgs in receiving buffer
uint_fast64_t recv_byte_sum; //include msgs in receiving buffer
stat_duration dispatch_dealy_sum; //from parse_msg(exclude msg unpacking) to on_msg_handle
stat_duration recv_idle_sum;
//during this duration, socket suspended msg reception (receiving buffer overflow, msg dispatching suspended or doing congestion control)
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
stat_duration handle_time_1_sum; //on_msg consumed time, this indicate the efficiency of msg handling
#endif
stat_duration handle_time_2_sum; //on_msg_handle consumed time, this indicate the efficiency of msg handling
};
protected:
template<typename T>
struct obj_with_begin_time : public T
{
obj_with_begin_time() {restart();}
obj_with_begin_time(T&& msg) : T(std::move(msg)) {restart();}
void restart() {restart(statistic::now());}
void restart(const typename statistic::stat_time& begin_time_) {begin_time = begin_time_;}
using T::swap;
void swap(obj_with_begin_time& other) {T::swap(other); std::swap(begin_time, other.begin_time);}
typename statistic::stat_time begin_time;
};
typedef obj_with_begin_time<InMsgType> in_msg;
typedef obj_with_begin_time<OutMsgType> out_msg;
typedef message_queue<in_msg> in_container_type;
typedef message_queue<out_msg> out_container_type;
typedef InQueue<in_msg, InContainer<in_msg>> in_container_type;
typedef OutQueue<out_msg, OutContainer<out_msg>> out_container_type;
static const tid TIMER_BEGIN = timer::TIMER_END;
static const tid TIMER_HANDLE_MSG = TIMER_BEGIN;
......@@ -353,11 +237,8 @@ protected:
auto temp_buffer(std::move(temp_msg_buffer));
#endif
if (!temp_buffer.empty())
{
recv_msg_buffer.move_items_in(temp_buffer, -1);
if (move_items_in(recv_msg_buffer, temp_buffer, -1) > 0)
dispatch_msg();
}
if (temp_msg_buffer.empty() && recv_msg_buffer.size() < ASCS_MAX_MSG_NUM)
do_recv_msg(); //receive msg sequentially, which means second receiving only after first receiving success
......@@ -480,7 +361,8 @@ private:
if (!do_dispatch_msg())
{
dispatching = false;
dispatch_msg(); //just make sure no pending msgs
if (!recv_msg_buffer.empty())
dispatch_msg(); //just make sure no pending msgs
}
}
}
......
......@@ -27,11 +27,13 @@
namespace ascs { namespace ssl {
template <typename Packer, typename Unpacker, typename Socket = asio::ssl::stream<asio::ip::tcp::socket>>
class connector_base : public tcp::connector_base<Packer, Unpacker, Socket>
template <typename Packer, typename Unpacker, typename Socket = asio::ssl::stream<asio::ip::tcp::socket>,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class connector_base : public tcp::connector_base<Packer, Unpacker, Socket, InQueue, InContainer, OutQueue, OutContainer>
{
protected:
typedef tcp::connector_base<Packer, Unpacker, Socket> super;
typedef tcp::connector_base<Packer, Unpacker, Socket, InQueue, InContainer, OutQueue, OutContainer> super;
public:
using super::TIMER_BEGIN;
......@@ -162,8 +164,10 @@ protected:
asio::ssl::context ctx;
};
template<typename Packer, typename Unpacker, typename Server = i_server, typename Socket = asio::ssl::stream<asio::ip::tcp::socket>>
using server_socket_base = tcp::server_socket_base<Packer, Unpacker, Server, Socket>;
template<typename Packer, typename Unpacker, typename Server = i_server, typename Socket = asio::ssl::stream<asio::ip::tcp::socket>,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
using server_socket_base = tcp::server_socket_base<Packer, Unpacker, Server, Socket, InQueue, InContainer, OutQueue, OutContainer>;
template<typename Socket, typename Pool = object_pool<Socket>, typename Server = i_server>
class server_base : public tcp::server_base<Socket, Pool, Server>
......
/*
* sconnector.h
* connector.h
*
* Created on: 2012-3-2
* Author: youngwolf
......@@ -17,11 +17,13 @@
namespace ascs { namespace tcp {
template <typename Packer, typename Unpacker, typename Socket = asio::ip::tcp::socket>
class connector_base : public socket_base<Socket, Packer, Unpacker>
template <typename Packer, typename Unpacker, typename Socket = asio::ip::tcp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class connector_base : public socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer>
{
protected:
typedef socket_base<Socket, Packer, Unpacker> super;
typedef socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer> super;
public:
static const timer::tid TIMER_BEGIN = super::TIMER_END;
......
......@@ -17,11 +17,14 @@
namespace ascs { namespace tcp {
template<typename Packer, typename Unpacker, typename Server = i_server, typename Socket = asio::ip::tcp::socket>
class server_socket_base : public socket_base<Socket, Packer, Unpacker>, public std::enable_shared_from_this<server_socket_base<Packer, Unpacker, Server, Socket>>
template<typename Packer, typename Unpacker, typename Server = i_server, typename Socket = asio::ip::tcp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class server_socket_base : public socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer>,
public std::enable_shared_from_this<server_socket_base<Packer, Unpacker, Server, Socket, InQueue, InContainer, OutQueue, OutContainer>>
{
protected:
typedef socket_base<Socket, Packer, Unpacker> super;
typedef socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer> super;
public:
static const timer::tid TIMER_BEGIN = super::TIMER_END;
......
......@@ -19,8 +19,10 @@
namespace ascs { namespace tcp {
template <typename Socket, typename Packer, typename Unpacker>
class socket_base : public socket<Socket, Packer, Unpacker, typename Packer::msg_type, typename Unpacker::msg_type>
template <typename Socket, typename Packer, typename Unpacker,
template<typename, typename> class InQueue, template<typename> class InContainer,
template<typename, typename> class OutQueue, template<typename> class OutContainer>
class socket_base : public socket<Socket, Packer, Unpacker, typename Packer::msg_type, typename Unpacker::msg_type, InQueue, InContainer, OutQueue, OutContainer>
{
public:
typedef typename Packer::msg_type in_msg_type;
......@@ -29,7 +31,7 @@ public:
typedef typename Unpacker::msg_ctype out_msg_ctype;
protected:
typedef socket<Socket, Packer, Unpacker, typename Packer::msg_type, typename Unpacker::msg_type> super;
typedef socket<Socket, Packer, Unpacker, typename Packer::msg_type, typename Unpacker::msg_type, InQueue, InContainer, OutQueue, OutContainer> super;
using super::TIMER_BEGIN;
using super::TIMER_END;
......@@ -118,7 +120,7 @@ protected:
#endif
size_t size = 0;
typename super::in_msg msg;
auto end_time = super::statistic::now();
auto end_time = statistic::now();
typename super::in_container_type::lock_guard lock(this->send_msg_buffer);
while (this->send_msg_buffer.try_dequeue_(msg))
......@@ -221,7 +223,7 @@ private:
{
if (!ec)
{
this->stat.send_time_sum += super::statistic::now() - last_send_msg.front().begin_time;
this->stat.send_time_sum += statistic::now() - last_send_msg.front().begin_time;
this->stat.send_byte_sum += bytes_transferred;
this->stat.send_msg_sum += last_send_msg.size();
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
......@@ -236,10 +238,13 @@ private:
this->on_send_error(ec);
last_send_msg.clear();
if (ec || !do_send_msg()) //send msg sequentially, which means second sending only after first sending success
if (ec)
this->sending = false;
else if (!do_send_msg()) //send msg sequentially, which means second sending only after first sending success
{
this->sending = false;
this->send_msg(); //just make sure no pending msgs
if (!this->send_msg_buffer.empty())
this->send_msg(); //just make sure no pending msgs
}
}
......
......@@ -17,11 +17,13 @@
namespace ascs { namespace udp {
template <typename Packer, typename Unpacker, typename Socket = asio::ip::udp::socket>
class socket_base : public socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>>
template <typename Packer, typename Unpacker, typename Socket = asio::ip::udp::socket,
template<typename, typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename, typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class socket_base : public socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer>
{
protected:
typedef socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>> super;
typedef socket<Socket, Packer, Unpacker, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer> super;
public:
typedef udp_msg<typename Packer::msg_type> in_msg_type;
......@@ -121,7 +123,7 @@ protected:
{
if (is_send_allowed() && !this->stopped() && !this->send_msg_buffer.empty() && this->send_msg_buffer.try_dequeue(last_send_msg))
{
this->stat.send_delay_sum += super::statistic::now() - last_send_msg.begin_time;
this->stat.send_delay_sum += statistic::now() - last_send_msg.begin_time;
last_send_msg.restart();
std::shared_lock<std::shared_mutex> lock(shutdown_mutex);
......@@ -201,7 +203,7 @@ private:
{
assert(bytes_transferred == last_send_msg.size());
this->stat.send_time_sum += super::statistic::now() - last_send_msg.begin_time;
this->stat.send_time_sum += statistic::now() - last_send_msg.begin_time;
this->stat.send_byte_sum += bytes_transferred;
++this->stat.send_msg_sum;
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
......@@ -222,7 +224,8 @@ private:
if (!do_send_msg())
{
this->sending = false;
this->send_msg(); //just make sure no pending msgs
if (!this->send_msg_buffer.empty())
this->send_msg(); //just make sure no pending msgs
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册