提交 8356dc4b 编写于 作者: Y youngwolf

1.1.3 release.

Introduce lock-free mechanism for some appropriate logics (many requesters, only one can succeed, others will fail rather than wait).
Remove all mutex (except mutex in object_pool, service_pump, lock_queue and udp::socket).
Sharply simplified timer class.
上级 afbf6243
......@@ -28,6 +28,9 @@ udp::i_unpacker:
udp解包器必须实现这个接口。
2. 类
scope_atomic_lock:
功能与std::scope_lock一致,只是它执行atomic的++和--操作,而不是调用mutex的lock和unlock。
dummy_packer:
仅仅提供发送消息的类型以便通过编译,无法真正的打包,所以只能通过direct_send_msg等发送消息。
......
......@@ -53,19 +53,13 @@ static_assert(ASCS_MAX_MSG_NUM > 0, "message capacity must be bigger than zero."
//after this duration, this socket can be freed from the heap or reused,
//you must define this macro as a value, not just define it, the value means the duration, unit is second.
//if macro ST_ASIO_ENHANCED_STABILITY been defined, this macro will always be zero.
#ifdef ASCS_ENHANCED_STABILITY
#if defined(ASCS_DELAY_CLOSE) && ASCS_DELAY_CLOSE != 0
#warning ASCS_DELAY_CLOSE will always be zero if ASCS_ENHANCED_STABILITY macro been defined.
#endif
#undef ASCS_DELAY_CLOSE
#define ASCS_DELAY_CLOSE 0
#else
//a value equal to zero will cause ascs to use a mechanism to guarantee 100% safety when reusing or freeing this socket,
//ascs will hook all async calls to avoid this socket to be reused or freed before all async calls finish
//or been interrupted (of course, this mechanism will slightly impact efficiency).
#ifndef ASCS_DELAY_CLOSE
#define ASCS_DELAY_CLOSE 5 //seconds
#endif
static_assert(ASCS_DELAY_CLOSE > 0, "ASCS_DELAY_CLOSE must be bigger than zero.");
#define ASCS_DELAY_CLOSE 0 //seconds, guarantee 100% safety when reusing or freeing this socket
#endif
static_assert(ASCS_DELAY_CLOSE >= 0, "delay close duration must be bigger than or equal to zero.");
//full statistic include time consumption, or only numerable informations will be gathered
//#define ASCS_FULL_STATISTIC
......@@ -165,11 +159,40 @@ namespace std {typedef shared_timed_mutex shared_mutex;}
#endif
//ConcurrentQueue is lock-free, please refer to https://github.com/cameron314/concurrentqueue
#ifdef ASCS_USE_CUSTOM_QUEUE
#elif defined(ASCS_USE_CONCURRENT_QUEUE) //if ASCS_USE_CONCURRENT_QUEUE macro not defined, ascs will use 'list' as the message queue, it's not thread safe, so need locks.
#ifdef ASCS_HAS_CONCURRENT_QUEUE
#include <concurrentqueue.h>
template<typename T> using concurrent_queue = moodycamel::ConcurrentQueue<T>;
#ifndef ASCS_INPUT_QUEUE
#define ASCS_INPUT_QUEUE lock_free_queue
#endif
#ifndef ASCS_INPUT_CONTAINER
#define ASCS_INPUT_CONTAINER concurrent_queue
#endif
#ifndef ASCS_OUTPUT_QUEUE
#define ASCS_OUTPUT_QUEUE lock_free_queue
#endif
#ifndef ASCS_OUTPUT_CONTAINER
#define ASCS_OUTPUT_CONTAINER concurrent_queue
#endif
#else
#ifndef ASCS_INPUT_QUEUE
#define ASCS_INPUT_QUEUE lock_queue
#endif
#ifndef ASCS_INPUT_CONTAINER
#define ASCS_INPUT_CONTAINER list
#endif
#ifndef ASCS_OUTPUT_QUEUE
#define ASCS_OUTPUT_QUEUE lock_queue
#endif
#ifndef ASCS_OUTPUT_CONTAINER
#define ASCS_OUTPUT_CONTAINER list
#endif
#endif
定义ASCS_USE_CUSTOM_QUEUE宏以便采用自定义消息队列,至于如何实现自己的消息队列,请参看message_queue,只要实现相应的函数即可(如enqueue,dequeue等)。
ascs默认提供了两种消息队列供你选择,一是concurrent queue,它是无锁的,具体请参看https://github.com/cameron314/concurrentqueue;一是普通的带锁的
消息队列,基于ascs::list加上std::shared_mutex实现。定义ASCS_USE_CONCURRENT_QUEUE宏以启用concurrent queue,否则将采用普通的带锁消息队列。
ascs默认提供了三种消息队列供你选择,一是concurrent queue,它是无锁线程安全的,具体请参看https://github.com/cameron314/concurrentqueue;
二是普通的带锁消息队列,基于ascs::list加上std::shared_mutex实现;三是无锁线程不安全队列,也是基于ascs::list实现。
使用无锁线程不安全队列时,一定要仔细审查你的业务,并不是所有业务都可以使用无锁线程不安全队列的,具有用法请看demo。
队列及其内部所使用的容器都是可以通过宏或者模板参数来指定的,所以这就自然而然的支持了自定义队列(及容器),至于如何实现自定义队列(及容器),
可以参看ascs::lock_free_queue,ascs::non_lock_queue及ascs::lock_queue,需要支持两个模板参数。
容器需要实现一些特定的接口(队列需要调用),可以参看moodycamel::ConcurrentQueue以及ascs::list,需要支持一个模板参数,至于确切的需要实现哪些接口,
因不同的队列而不同,最好的办法就是编译,让编译器告诉你缺少哪些接口,接口从名称上就可以看出其作用,非常清晰。
//configurations

#ifdef ASCS_HUGE_MSG
#define ASCS_HEAD_TYPE uint32_t
#define ASCS_HEAD_H2N htonl
......

#ifdef ASCS_HUGE_MSG
#define ASCS_HEAD_TYPE uint32_t
#define ASCS_HEAD_N2H ntohl
......
......@@ -6,4 +6,4 @@ asio::async_write(socket, buffer, [this](const auto& ec, auto bytes_transferred)
那么look之后应该是这样的:
asio::async_write(socket, buffer, this->make_handler_error_size([this](const auto& ec, auto bytes_transferred) {this->send_handler(ec, bytes_transferred);}));
如果定义了ASCS_ENHANCED_STABILITY宏,则ascs中所有用于异步调用的函数对象都将被自动的hook。
如果定义了ASCS_DELAY_CLOSE宏且其值等于0,则ascs中所有用于异步调用的函数对象都将被自动的hook。
......@@ -32,11 +32,8 @@ ssl使用。
清空所有buffer。
public:
void id(uint_fast64_t id);
uint_fast64_t id() const;
id的设置与获取,注意使用者不可设置id,只有socket的创建者(object_pool或者其继承者)才可设置id,
除非这个socket没有被任何对象池管理。
获取id。
Socket& next_layer();
const Socket& next_layer() const;
......@@ -69,7 +66,6 @@ socket内部有消息发送缓存,当连接未建立的时候,用户仍然
暂停/恢复消息发送,这里指暂停/恢复真正的消息发送,所以就算暂停了消息发送,在发送缓存可用的情况下,send_msg和send_safe_msg
仍然可以成功调用。
注意,如果消息发送处于暂停状态,则safe_send_msg在发送缓存溢出的情况下,马上返回失败,而不是等待发送缓存直到可用为止。
post_msg不受这个属性的影响,所以post_msg一定只能在on_msg和on_msg_handle里面调用,再次强调。
void suspend_dispatch_msg(bool suspend);
bool suspend_dispatch_msg() const;
......@@ -97,26 +93,18 @@ post_msg不受这个属性的影响,所以post_msg一定只能在on_msg和on_m
直接发送消息(放入消息发送缓存)而不再调用i_packer::pack_msg函数,其实socket内部在发送消息时也是调用这个函数,只是在调用
之前先调用了i_packer::pack_msg而已。
bool direct_post_msg(const InMsgType& msg, bool can_overflow = false);
bool direct_post_msg(InMsgType&& msg, bool can_overflow = false);
同上,它们之前的区别就是send_msg和post_msg之间的区别。
size_t get_pending_post_msg_num();
size_t get_pending_send_msg_num();
size_t get_pending_recv_msg_num();
获取缓存里面的消息数量,其中post和send缓存里面的消息是打包过的;recv缓存里面的消息是解包过后的,下同。
void peek_first_pending_post_msg(InMsgType& msg);
void peek_first_pending_send_msg(InMsgType& msg);
void peek_first_pending_recv_msg(OutMsgType& msg);
偷看一下缓存中第一个包,如果得到一个空包(msg.empty()等于true),则说明缓存里面没有消息。
void pop_first_pending_post_msg(InMsgType& msg);
void pop_first_pending_send_msg(InMsgType& msg);
void pop_first_pending_recv_msg(OutMsgType& msg);
弹出缓存中第一个包,如果得到一个空包(msg.empty()等于true),则说明缓存里面没有消息。
void pop_all_pending_post_msg(in_container_type& msg_list);
void pop_all_pending_send_msg(in_container_type& msg_list);
void pop_all_pending_recv_msg(out_container_type& msg_list);
弹出缓存中所有包,相当于清空了缓存。
......@@ -163,10 +151,10 @@ socket的判断结果,最终确认是否可发送数据。请看tcp::socket、
#endif
void close();
开启close流程,由继承者调用。st_socket会定时检测自己是否可以安全的被重用或被释放(即所有异步调用都已结束,包括正常结束和非正常结束),
如果是,调用上面的on_close(), 然后st_object_pool将完全接管这个st_socket,以便在适当的时候重用或者释放它。
如果定义了ST_ASIO_ENHANCED_STABILITY宏,则st_socket将保证以上说的行为,如果没有定义,则简单地在ST_ASIO_DELAY_CLOSE秒后,调用on_close(),
然后两样的道理,st_object_pool将完全接管这个st_socket,以便在适当的时候重用或者释放它。
开启close流程,由继承者调用。ascs::socket会定时检测自己是否可以安全的被重用或被释放(即所有异步调用都已结束,包括正常结束和非正常结束),
如果是,调用上面的on_close(), 然后object_pool将完全接管这个socket,以便在适当的时候重用或者释放它。
如果定义了ASCS_DELAY_CLOSE宏且其值等于0,则socket将保证以上说的行为,如果没有定义,则简单地在ASCS_DELAY_CLOSE秒后,调用on_close(),
然后同样的道理,object_pool将完全接管这个socket,以便在适当的时候重用或者释放它。
void dispatch_msg();
派发消息,它要么直接调用on_msg,要么把消息放入消息接收缓存,最后调用do_dispatch_msg,如果消息处理完毕(调用on_msg)
......@@ -175,12 +163,12 @@ socket的判断结果,最终确认是否可发送数据。请看tcp::socket、
void do_dispatch_msg(bool need_lock);
调用io_service::post发出一个异步调用,调度到时回调msg_handler。
bool do_direct_send_msg(InMsgType&& msg);
bool do_direct_post_msg(InMsgType&& msg);
把消息直接放入消息发送缓存,在direct_send_msg和direct_post_msg中有调用。
private:
bool timer_handler(unsigned char id);
template<typename Object> friend class object_pool;
void id(uint_fast64_t id);
设置id,注意使用者不可设置id,只有socket的创建者(object_pool或者其继承者)才可设置id,除非这个socket没有被任何对象池管理。
bool timer_handler(timer::tid id);
处理所有定时器
void msg_handler();
......@@ -197,37 +185,20 @@ protected:
std::shared_ptr<i_packer<MsgDataType>> packer_;
打包器。
in_container_type post_msg_buffer, send_msg_buffer;
out_container_type recv_msg_buffer, temp_msg_buffer;
std::shared_mutex post_msg_buffer_mutex, send_msg_buffer_mutex;
std::shared_mutex recv_msg_buffer_mutex;
缓存及操作它们时需要的互斥体,访问temp_msg_buffer无需互斥,它只能在内部访问,作用是当收到消息之后,消息无法存入接收缓存
(消息派发被暂停,或者正在post消息,即post_msg_buffer非空),那么消息将被存放于temp_msg_buffer,并且不再继续接收消息,直到temp_msg_buffer
in_container_type send_msg_buffer;
out_container_type recv_msg_buffer;
list<out_msg> temp_msg_buffer;
收发缓存,访问temp_msg_buffer无需互斥,它只能在内部访问,作用是当收到消息之后,当消息无法存入接收缓存
(消息派发被暂停,或者正在拥塞控制),那么消息将被存放于temp_msg_buffer,并且不再继续接收消息,直到temp_msg_buffer
里面的消息全部被处理掉,或者移到了recv_msg_buffer,socket会周期性的做以上尝试。
post_msg_buffer和send_msg_buffer里面存放的都是待发送的消息,通过send_msg发送的消息,只会进入send_msg_buffer,通过post_msg
发送的消息,如果发送缓存足够的话,会进入send_msg_buffer,如果不够的话,则会进入post_msg_buffer,当post_msg_buffer非空时,
tcp::socket或者udp::socket将暂停接收消息,直到post_msg_buffer里面的消息全部被移入send_msg_buffer,这样设计有什么用呢?
考虑如下情况,你需要发送一个消息,但不想发送缓存溢出,也不想等待直到发送缓存可用,你会怎么做?你不得不把这个消息暂存起来,
然后立即退出on_msg或者on_msg_handle,然后开个定时器周期性的尝试发送暂存的消息。由于你在第一个消息产生的结果还没有送入发送
缓存就退出了on_msg或者on_msg_handle,那么第二个消息会接着被派发,第二个消息产生的结果你也必须暂存起来,那么你是不是需要
一个链表来暂存这些还未送入发送缓存的消息呢?这个链表就是post_msg_buffer。socket通过post_msg_buffer和post_msg为你做了上面
的工作,你只需要把send_msg改为调用post_msg即可,这个调用既不会失败,也不会阻塞线程,还不会让发送缓存失控。
post_msg_buffer不做大小限制,那为什么不会让发送缓存失控呢?这个我在教程里面强调过,post_msg只能在on_msg或者on_msg_handle
里面调用,如果消息进入了post_msg_buffer,消息派发将被暂停(最终将导致接受缓存满,进而暂停消息接收),那么on_msg或者
on_msg_handle将不再被调用,所以最终post_msg_buffer将会被消化(移至send_msg_buffer),socket将会周期性的尝试消化
post_msg_buffer中的消息。
bool posting;
bool sending, suspend_send_msg_;
bool dispatching, suspend_dispatch_msg_;
bool sending, paused_sending;
std::shared_mutex send_mutex;
bool dispatching, paused_dispatching, congestion_controlling;
std::shared_mutex dispatch_mutex;
内部使用的一些状态,看名字应该能猜到其意思。
#ifndef ST_ASIO_ENHANCED_STABILITY
bool closing;
#endif
正在closing,意思是正在周期性的检测当前st_socket是否可以被安全的重用或释放。如果未定义ST_ASIO_ENHANCED_STABILITY宏,则无法做任何检测,
只是简单的等待ST_ASIO_DELAY_CLOSE秒(异步的)。
bool started_;
bool started_; //has started or not
std::shared_mutex start_mutex;
是否已经开始,开始的概念由子类具体实现,socket只是记录是否已经调用过start函数而已。
......

namespace ascs { namespace tcp {
支持多条连接的tcp客户端
......

namespace ascs { namespace tcp {
带连接功能的tcp::socket_base,算是一个真正的客户端了
......@@ -69,7 +69,7 @@ virtual int prepare_reconnect(const asio::error_code& ec);
如果允许(io_service仍然在运行且prepare_reconnect返回大于等于0),启动定时器以延时一小段时间之后重新连接服务器。
private:
bool async_shutdown_handler(unsigned char id, ssize_t loop_num);
bool async_shutdown_handler(timer::tid id, ssize_t loop_num);
异步优雅关闭(shutdown)超时定时器。
void connect_handler(const error_code& ec);
......

namespace ascs { namespace tcp {
服务端服务器类,主要功能是监听和接受连接,连接管理继承于Pool
......

namespace ascs { namespace tcp {
服务端套接字类
......

namespace ascs { namespace tcp {
tcp套接字类,实现tcp数据的收发
......@@ -118,9 +118,9 @@ protected:
protected:
typename super::in_container_type last_send_msg;
std::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
int shutdown_state; //2-the first step of graceful close, 1-force close, 0-normal state
boost::shared_mutex shutdown_mutex;
shutdown_states shutdown_state;
std::atomic_size_t shutdown_atomic;
让shutdown函数线程安全。
};
......
......@@ -5,7 +5,7 @@ namespace ascs
定时器类
class timer
{
protected:
public:
typedef std::chrono::milliseconds milliseconds;
#ifdef ASCS_USE_STEADY_TIMER
typedef asio::steady_timer timer_type;
......@@ -13,61 +13,57 @@ protected:
typedef asio::system_timer timer_type;
#endif
protected:
typedef unsigned char tid;
static const tid TIMER_END = 0;
继承者的定义器ID必须从父类的TIMER_END开始,然后最好也定义一个自己的TIMER_END,如果你这个类可能会被继承的话。
struct timer_info
{
enum timer_status {TIMER_OK, TIMER_CANCELED};
enum timer_status {TIMER_FAKE, TIMER_OK, TIMER_CANCELED};
unsigned char id;
tid id;
timer_status status;
size_t milliseconds;
std::function<bool (unsigned char)> call_back;
std::function<bool (tid)> call_back;
在定时器到达后,call_back被回调,并根据返回值决定是否继续这个定时器(true即继续),同一个定时器,call_back的调用是顺序的。
std::shared_ptr<timer_type> timer;
bool operator <(const timer_info& other) const {return id < other.id;}
timer_info() : id(0), status(TIMER_FAKE), milliseconds(0) {}
};
定时器数据结构。
static const unsigned char TIMER_END = 0;
继承者的定义器ID必须从父类的TIMER_END开始,然后最好也定义一个自己的TIMER_END,如果你这个类可能会被继承的话。
timer(io_service& _io_service_);
public:
typedef const timer_info timer_cinfo;
typedef std::set<timer_info> container_type;
typedef std::vector<timer_info> container_type;
void update_timer_info(unsigned char id, size_t milliseconds, std::function<bool(unsigned char)>&& call_back, bool start = false);
void update_timer_info(unsigned char id, size_t milliseconds, const std::function<bool(unsigned char)>& call_back, bool start = false)
void update_timer_info(tid id, size_t milliseconds, std::function<bool(tid)>&& call_back, bool start = false);
void update_timer_info(tid id, size_t milliseconds, const std::function<bool(tid)>& call_back, bool start = false)
更新timer,如果start为true,更新完了之后马上会开启或者重新开启这个timer。
注意,对同一个timer里面的同一个定时器操作并不是线程安全的。
void set_timer(unsigned char id, size_t milliseconds, std::function<bool(unsigned char)>&& call_back);
void set_timer(unsigned char id, size_t milliseconds, const std::function<bool(unsigned char)>& call_back);
void set_timer(tid id, size_t milliseconds, std::function<bool(tid)>&& call_back);
void set_timer(tid id, size_t milliseconds, const std::function<bool(tid)>& call_back);
开启定时器,定时器以id分区,如果定时器已经存在,则重新开始记时。这个函数其实就是以start为true调用update_timer_info而已。
timer_info find_timer(unsigned char id);
timer_info find_timer(tid id);
查找定义器。
bool start_timer(unsigned char id);
bool start_timer(tid id);
开启一个已经存在的定义器。注意stop_timer之后,定义器还是存在的,只是未启动。
void stop_timer(unsigned char id);
void stop_timer(tid id);
停止定时器,如果定时时间已经到达,且已经被post到io_server的队列中,则on_timer仍然会在未来某个时候被回调,这是asio的设计。
asio::io_service& get_io_service();
const asio::io_service& get_io_service() const;
void stop_all_timer();
停止所有定时器。
template<typename _Predicate> void do_something_to_all(const _Predicate& __pred);
对所有定时器做一个操作,操作由__pred来定,ascs库只是调用__pred()。
template<typename _Predicate> void do_something_to_one(const _Predicate& __pred);
与do_something_to_all类似,只是当__pred()返回真时就不再继续循环处理后面的定时器了(如果你永远返回false,那就等于
so_something_to_all),跟查找功能类似。
void stop_all_timer();
停止所有定时器。
与do_something_to_all类似,只是当__pred()返回真时就不再继续循环处理后面的定时器了(如果你永远返回false,那就等于so_something_to_all),跟查找功能类似。
protected:
void start_timer(timer_cinfo& ti);
......@@ -77,9 +73,11 @@ protected:
内部使用的helper函数,真正的结束定时器(调用timer_type::cancel)。
protected:
asio::io_service& io_service_;
container_type timer_can;
shared_mutex timer_can_mutex;
private:
using object::io_service_;
隐藏io_service。
};
} //namespace

namespace ascs { namespace udp {
udp套接字类,实现udp数据的收发
......

namespace ascs { namespace udp {
支持多个套接字的udp服务
......
......@@ -218,6 +218,138 @@ public:
///////////////////////////////////////////////////
};
void send_msg_one_by_one(echo_client& client, size_t msg_num, size_t msg_len, char msg_fill)
{
check_msg = true;
uint64_t total_msg_bytes = msg_num * msg_len * client.size();
cpu_timer begin_time;
client.begin(msg_num, msg_len, msg_fill);
unsigned percent = 0;
do
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto new_percent = (unsigned) (100 * client.get_recv_bytes() / total_msg_bytes);
if (percent != new_percent)
{
percent = new_percent;
printf("\r%u%%", percent);
fflush(stdout);
}
} while (100 != percent);
begin_time.stop();
printf("\r100%%\ntime spent statistics: %f seconds.\n", begin_time.elapsed());
printf("speed: %.0f(*2)kB/s.\n", total_msg_bytes / begin_time.elapsed() / 1024);
}
void send_msg_randomly(echo_client& client, size_t msg_num, size_t msg_len, char msg_fill)
{
check_msg = false;
uint64_t send_bytes = 0;
uint64_t total_msg_bytes = msg_num * msg_len;
auto buff = new char[msg_len];
memset(buff, msg_fill, msg_len);
cpu_timer begin_time;
unsigned percent = 0;
for (size_t i = 0; i < msg_num; ++i)
{
memcpy(buff, &i, sizeof(size_t)); //seq
//congestion control, method #1, the peer needs its own congestion control too.
client.safe_random_send_msg(buff, msg_len); //can_overflow is false, it's important
send_bytes += msg_len;
auto new_percent = (unsigned) (100 * send_bytes / total_msg_bytes);
if (percent != new_percent)
{
percent = new_percent;
printf("\r%u%%", percent);
fflush(stdout);
}
}
while(client.get_recv_bytes() != total_msg_bytes)
std::this_thread::sleep_for(std::chrono::milliseconds(50));
begin_time.stop();
delete[] buff;
printf("\r100%%\ntime spent statistics: %f seconds.\n", begin_time.elapsed());
printf("speed: %.0f(*2)kB/s.\n", total_msg_bytes / begin_time.elapsed() / 1024);
}
//use up to 16 (hard code) worker threads to send messages concurrently
void send_msg_concurrently(echo_client& client, size_t msg_num, size_t msg_len, char msg_fill)
{
check_msg = true;
auto link_num = client.size();
auto group_num = std::min((size_t) 16, link_num);
auto group_link_num = link_num / group_num;
auto left_link_num = link_num - group_num * group_link_num;
uint64_t total_msg_bytes = msg_num * msg_len * link_num;
auto group_index = (size_t) -1;
size_t this_group_link_num = 0;
std::vector<std::list<echo_client::object_type>> link_groups(group_num);
client.do_something_to_all([&](auto& item) {
if (0 == this_group_link_num)
{
this_group_link_num = group_link_num;
if (left_link_num > 0)
{
++this_group_link_num;
--left_link_num;
}
++group_index;
}
--this_group_link_num;
link_groups[group_index].push_back(item);
});
cpu_timer begin_time;
std::list<std::thread> threads;
do_something_to_all(link_groups, [&threads, msg_num, msg_len, msg_fill](const auto& item) {
threads.push_back(std::thread([&item, msg_num, msg_len, msg_fill]() {
auto buff = new char[msg_len];
memset(buff, msg_fill, msg_len);
for (size_t i = 0; i < msg_num; ++i)
{
memcpy(buff, &i, sizeof(size_t)); //seq
//congestion control, method #1, the peer needs its own congestion control too.
do_something_to_all(item, [buff, msg_len](const auto& item2) {item2->safe_send_msg(buff, msg_len);}); //can_overflow is false, it's important
}
delete[] buff;
}));
});
unsigned percent = 0;
do
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto new_percent = (unsigned) (100 * client.get_recv_bytes() / total_msg_bytes);
if (percent != new_percent)
{
percent = new_percent;
printf("\r%u%%", percent);
fflush(stdout);
}
} while (100 != percent);
begin_time.stop();
printf("\r100%%\ntime spent statistics: %f seconds.\n", begin_time.elapsed());
printf("speed: %.0f(*2)kB/s.\n", total_msg_bytes / begin_time.elapsed() / 1024);
do_something_to_all(threads, [](auto& t) {t.join();});
}
int main(int argc, const char* argv[])
{
printf("usage: %s [<service thread number=1> [<port=%d> [<ip=%s> [link num=16]]]]\n", argv[0], ASCS_SERVER_PORT, ASCS_SERVER_IP);
......@@ -335,7 +467,7 @@ int main(int argc, const char* argv[])
auto iter = std::begin(parameters);
if (iter != std::end(parameters)) msg_num = std::max((size_t) atoll(iter++->data()), (size_t) 1);
#if 1 == PACKER_UNPACKER_TYPE
#if 0 == PACKER_UNPACKER_TYPE || 1 == PACKER_UNPACKER_TYPE
if (iter != std::end(parameters)) msg_len = std::min(packer::get_max_msg_size(),
std::max((size_t) atoi(iter++->data()), sizeof(size_t))); //include seq
#elif 2 == PACKER_UNPACKER_TYPE
......@@ -343,99 +475,32 @@ int main(int argc, const char* argv[])
msg_len = 1024; //we hard code this because we fixedly initialized the length of fixed_length_unpacker to 1024
#elif 3 == PACKER_UNPACKER_TYPE
if (iter != std::end(parameters)) msg_len = std::min((size_t) ASCS_MSG_BUFFER_SIZE,
std::max((size_t) atoi(iter++->data()), sizeof(size_t)));
std::max((size_t) atoi(iter++->data()), sizeof(size_t))); //include seq
#endif
if (iter != std::end(parameters)) msg_fill = *iter++->data();
if (iter != std::end(parameters)) model = *iter++->data() - '0';
unsigned percent = 0;
uint64_t total_msg_bytes;
switch (model)
if (0 != model && 1 != model)
{
case 0:
check_msg = true;
total_msg_bytes = msg_num * link_num; break;
case 1:
check_msg = false;
srand(time(nullptr));
total_msg_bytes = msg_num; break;
default:
total_msg_bytes = 0; break;
puts("unrecognized model!");
continue;
}
if (total_msg_bytes > 0)
{
printf("test parameters after adjustment: " ASCS_SF " " ASCS_SF " %c %d\n", msg_num, msg_len, msg_fill, model);
puts("performance test begin, this application will have no response during the test!");
client.clear_status();
total_msg_bytes *= msg_len;
cpu_timer begin_time;
printf("test parameters after adjustment: " ASCS_SF " " ASCS_SF " %c %d\n", msg_num, msg_len, msg_fill, model);
puts("performance test begin, this application will have no response during the test!");
client.clear_status();
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
if (0 == model)
{
client.begin(msg_num, msg_len, msg_fill);
unsigned new_percent = 0;
do
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
new_percent = (unsigned) (100 * client.get_recv_bytes() / total_msg_bytes);
if (percent != new_percent)
{
percent = new_percent;
printf("\r%u%%", percent);
fflush(stdout);
}
} while (100 != new_percent);
printf("\r100%%\ntime spent statistics: %.1f seconds.\n", begin_time.elapsed());
printf("speed: %.0f(*2)kB/s.\n", total_msg_bytes / begin_time.elapsed() / 1024);
}
else
puts("if ASCS_WANT_MSG_SEND_NOTIFY defined, only support model 0!");
if (0 == model)
send_msg_one_by_one(client, msg_num, msg_len, msg_fill);
else
puts("if ASCS_WANT_MSG_SEND_NOTIFY defined, only support model 0!");
#else
auto buff = new char[msg_len];
memset(buff, msg_fill, msg_len);
uint64_t send_bytes = 0;
for (size_t i = 0; i < msg_num; ++i)
{
memcpy(buff, &i, sizeof(size_t)); //seq
//congestion control, method #1, the peer needs its own congestion control too.
switch (model)
{
case 0:
client.safe_broadcast_msg(buff, msg_len); //can_overflow is false, it's important
send_bytes += link_num * msg_len;
break;
case 1:
client.safe_random_send_msg(buff, msg_len); //can_overflow is false, it's important
send_bytes += msg_len;
break;
default:
break;
}
auto new_percent = (unsigned) (100 * send_bytes / total_msg_bytes);
if (percent != new_percent)
{
percent = new_percent;
printf("\r%u%%", percent);
fflush(stdout);
}
}
delete[] buff;
while(client.get_recv_bytes() != total_msg_bytes)
std::this_thread::sleep_for(std::chrono::milliseconds(50));
printf("\r100%%\ntime spent statistics: %.1f seconds.\n", begin_time.elapsed());
printf("speed: %.0f(*2)kB/s.\n", total_msg_bytes / begin_time.elapsed() / 1024);
if (0 == model)
send_msg_concurrently(client, msg_num, msg_len, msg_fill);
else
send_msg_randomly(client, msg_num, msg_len, msg_fill);
#endif
} // if (total_data_num > 0)
}
}
......
......@@ -163,9 +163,9 @@ private:
class file_client : public client_base<file_socket>
{
public:
static const unsigned char TIMER_BEGIN = client_base<file_socket>::TIMER_END;
static const unsigned char UPDATE_PROGRESS = TIMER_BEGIN;
static const unsigned char TIMER_END = TIMER_BEGIN + 10;
static const tid TIMER_BEGIN = client_base<file_socket>::TIMER_END;
static const tid UPDATE_PROGRESS = TIMER_BEGIN;
static const tid TIMER_END = TIMER_BEGIN + 10;
file_client(service_pump& service_pump_) : client_base<file_socket>(service_pump_) {}
......@@ -191,7 +191,7 @@ public:
}
private:
bool update_progress_handler(unsigned char id, unsigned last_percent)
bool update_progress_handler(tid id, unsigned last_percent)
{
assert(UPDATE_PROGRESS == id);
......
......@@ -19,6 +19,7 @@
#include <memory>
#include <string>
#include <thread>
#include <atomic>
#include <sstream>
#include <iomanip>
......@@ -30,6 +31,23 @@
namespace ascs
{
template<typename atomic_type = std::atomic_size_t>
class scope_atomic_lock : public asio::detail::noncopyable
{
public:
scope_atomic_lock(atomic_type& atomic_) : added(false), atomic(atomic_) {lock();} //atomic_ must has been initialized to zero
~scope_atomic_lock() {unlock();}
void lock() {if (!added) _locked = 1 == ++atomic; added = true;}
void unlock() {if (added) --atomic; _locked = false, added = false;}
bool locked() const {return _locked;}
private:
bool added;
bool _locked;
atomic_type& atomic;
};
class service_pump;
class timer;
class i_server
......
......@@ -11,6 +11,11 @@
*
* license: www.boost.org/LICENSE_1_0.txt
*
* Known issues:
* 1. concurrentqueue is not a FIFO (it is by design), navigate to the following links for more deatils:
* https://github.com/cameron314/concurrentqueue/issues/6
* https://github.com/cameron314/concurrentqueue/issues/52
*
* 2016.9.25 version 1.0.0
* Based on st_asio_wrapper 1.2.0.
* Directory structure refactoring.
......@@ -47,7 +52,7 @@
* Replaceable packer/unpacker now support replaceable_buffer (an alias of auto_buffer) and shared_buffer to be their message type.
* Move class statistic and obj_with_begin_time out of ascs::socket to reduce template tiers.
*
* 2016.1.1 version 1.1.2
* 2016.11.1 version 1.1.2
* Fix bug: ascs::list cannot be moved properly via moving constructor.
* Use ASCS_DELAY_CLOSE instead of ASCS_ENHANCED_STABILITY macro to control delay close duration,
* 0 is an equivalent of defining ASCS_ENHANCED_STABILITY, other values keep the same meanings as before.
......@@ -57,6 +62,11 @@
* Add move capture in lambda.
* Optimize lambda expressions.
*
* 2016.11.13 version 1.1.3
* Introduce lock-free mechanism for some appropriate logics (many requesters, only one can succeed, others will fail rather than wait).
* Remove all mutex (except mutex in object_pool, service_pump, lock_queue and udp::socket).
* Sharply simplified timer class.
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -66,8 +76,8 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10102 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.1.2"
#define ASCS_VER 10103 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.1.3"
//asio and compiler check
#ifdef _MSC_VER
......
......@@ -116,7 +116,6 @@ public:
//lockable, dummy
void lock() const {}
void unlock() const {}
bool idle() const {return true;} //locked or not
};
class lockable
......@@ -127,7 +126,6 @@ public:
//lockable
void lock() {mutex.lock();}
void unlock() {mutex.unlock();}
bool idle() {std::unique_lock<std::shared_mutex> lock(mutex, std::try_to_lock); return lock.owns_lock();} //locked or not
private:
std::shared_mutex mutex;
......
......@@ -36,9 +36,10 @@ protected:
static const tid TIMER_DELAY_CLOSE = TIMER_BEGIN + 2;
static const tid TIMER_END = TIMER_BEGIN + 10;
socket(asio::io_service& io_service_) : timer(io_service_), _id(-1), next_layer_(io_service_), packer_(std::make_shared<Packer>()), started_(false) {reset_state();}
template<typename Arg>
socket(asio::io_service& io_service_, Arg& arg) : timer(io_service_), _id(-1), next_layer_(io_service_, arg), packer_(std::make_shared<Packer>()), started_(false) {reset_state();}
socket(asio::io_service& io_service_) : timer(io_service_), _id(-1), next_layer_(io_service_), packer_(std::make_shared<Packer>()),
send_atomic(0), dispatch_atomic(0), started_(false), start_atomic(0) {reset_state();}
template<typename Arg> socket(asio::io_service& io_service_, Arg& arg) : timer(io_service_), _id(-1), next_layer_(io_service_, arg), packer_(std::make_shared<Packer>()),
send_atomic(0), dispatch_atomic(0), started_(false), start_atomic(0) {reset_state();}
void reset()
{
......@@ -55,7 +56,6 @@ protected:
sending = paused_sending = false;
dispatching = paused_dispatching = congestion_controlling = false;
// started_ = false;
}
void clear_buffer()
......@@ -81,9 +81,12 @@ public:
bool started() const {return started_;}
void start()
{
std::unique_lock<std::shared_mutex> lock(start_mutex);
if (!started_)
started_ = do_start();
{
scope_atomic_lock<> lock(start_atomic);
if (!started_ && lock.locked())
started_ = do_start();
}
}
//return false if send buffer is empty or sending not allowed or io_service stopped
......@@ -91,8 +94,8 @@ public:
{
if (!sending)
{
std::unique_lock<std::shared_mutex> lock(send_mutex);
if (!sending)
scope_atomic_lock<> lock(send_atomic);
if (!sending && lock.locked())
{
sending = true;
lock.unlock();
......@@ -192,13 +195,18 @@ protected:
virtual void on_all_msg_send(InMsgType& msg) {}
#endif
//subclass notify socket the shutdown event.
//subclass notify socket the shutdown event, not thread safe
void close()
{
if (is_closable())
if (started_)
{
set_async_calling(true);
set_timer(TIMER_DELAY_CLOSE, ASCS_DELAY_CLOSE * 1000 + 50, [this](auto id)->bool {return this->timer_handler(id);});
started_ = false;
if (is_closable())
{
set_async_calling(true);
set_timer(TIMER_DELAY_CLOSE, ASCS_DELAY_CLOSE * 1000 + 50, [this](auto id)->bool {return this->timer_handler(id);});
}
}
}
......@@ -240,8 +248,8 @@ protected:
{
if (!dispatching)
{
std::unique_lock<std::shared_mutex> lock(dispatch_mutex);
if (!dispatching)
scope_atomic_lock<> lock(dispatch_atomic);
if (!dispatching && lock.locked())
{
dispatching = true;
lock.unlock();
......@@ -298,7 +306,7 @@ private:
//please do not change id at runtime via the following function, except this socket is not managed by object_pool,
//it should only be used by object_pool when reusing or creating new socket.
template<typename Object> friend class object_pool;
void id(uint_fast64_t id) {assert(!started_); if (started_) unified_out::error_out("id is unchangeable!"); else _id = id;}
void id(uint_fast64_t id) {_id = id;}
bool timer_handler(tid id)
{
......@@ -373,12 +381,12 @@ protected:
//ascs::socket will delay 50 milliseconds(non-blocking) to invoke handle_msg() again, and now, as you known, temp_msg_buffer is used to hold these msgs temporarily.
bool sending, paused_sending;
std::shared_mutex send_mutex;
std::atomic_size_t send_atomic;
bool dispatching, paused_dispatching, congestion_controlling;
std::shared_mutex dispatch_mutex;
std::atomic_size_t dispatch_atomic;
bool started_; //has started or not
std::shared_mutex start_mutex;
std::atomic_size_t start_atomic;
struct statistic stat;
typename statistic::stat_time recv_idle_begin_time;
......
......@@ -13,8 +13,6 @@
#ifndef _ASCS_TCP_SOCKET_H_
#define _ASCS_TCP_SOCKET_H_
#include <vector>
#include "../socket.h"
namespace ascs { namespace tcp {
......@@ -37,9 +35,10 @@ protected:
enum shutdown_states {NONE, FORCE, GRACEFUL};
socket_base(asio::io_service& io_service_) : super(io_service_), unpacker_(std::make_shared<Unpacker>()), shutdown_state(shutdown_states::NONE) {}
template<typename Arg>
socket_base(asio::io_service& io_service_, Arg& arg) : super(io_service_, arg), unpacker_(std::make_shared<Unpacker>()), shutdown_state(shutdown_states::NONE) {}
socket_base(asio::io_service& io_service_) : super(io_service_), unpacker_(std::make_shared<Unpacker>()),
shutdown_state(shutdown_states::NONE), shutdown_atomic(0) {}
template<typename Arg> socket_base(asio::io_service& io_service_, Arg& arg) : super(io_service_, arg), unpacker_(std::make_shared<Unpacker>()),
shutdown_state(shutdown_states::NONE), shutdown_atomic(0) {}
public:
virtual bool obsoleted() {return !is_shutting_down() && super::obsoleted();}
......@@ -172,12 +171,12 @@ protected:
void shutdown()
{
std::unique_lock<std::shared_mutex> lock(shutdown_mutex);
scope_atomic_lock<> lock(shutdown_atomic);
if (!lock.locked())
return;
shutdown_state = shutdown_states::FORCE;
this->stop_all_timer();
this->started_ = false;
// reset_state();
if (this->lowest_layer().is_open())
{
......@@ -252,9 +251,9 @@ private:
protected:
list<typename super::in_msg> last_send_msg;
std::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
shutdown_states shutdown_state;
std::shared_mutex shutdown_mutex;
shutdown_states shutdown_state;
std::atomic_size_t shutdown_atomic;
};
}} //namespace
......
......@@ -13,15 +13,15 @@
#ifndef _ASCS_TIMER_H_
#define _ASCS_TIMER_H_
#include <vector>
#include <chrono>
#ifdef ASCS_USE_STEADY_TIMER
#include <asio/steady_timer.hpp>
#else
#include <asio/system_timer.hpp>
#endif
#include <set>
#include <chrono>
#include "object.h"
//If you inherit a class from class X, your own timer ids must begin from X::TIMER_END
......@@ -29,13 +29,13 @@ namespace ascs
{
//timers are identified by id.
//for the same timer in the same timer, set_timer and stop_timer are not thread safe, please pay special attention.
//for the same timer in the same timer, any manipulations are not thread safe, please pay special attention.
//to resolve this defect, we must add a mutex member variable to timer_info, it's not worth
//
//suppose you have more than one service thread(see service_pump for service thread number control), then:
//same timer, same timer, on_timer is called sequentially
//same timer, different timer, on_timer is called concurrently
//different timer, on_timer is always called concurrently
//for same timer: same timer, on_timer is called sequentially
//for same timer: distinct timer, on_timer is called concurrently
//for distinct timer: on_timer is always called concurrently
class timer : public object
{
public:
......@@ -51,42 +51,34 @@ public:
struct timer_info
{
enum timer_status {TIMER_OK, TIMER_CANCELED};
enum timer_status {TIMER_FAKE, TIMER_OK, TIMER_CANCELED};
tid id;
mutable timer_status status;
mutable size_t milliseconds;
mutable std::function<bool(tid)> call_back;
mutable std::shared_ptr<timer_type> timer;
timer_status status;
size_t milliseconds;
std::function<bool(tid)> call_back;
std::shared_ptr<timer_type> timer;
bool operator <(const timer_info& other) const {return id < other.id;}
timer_info() : id(0), status(TIMER_FAKE), milliseconds(0) {}
};
typedef const timer_info timer_cinfo;
typedef std::set<timer_info> container_type;
typedef std::vector<timer_info> container_type;
timer(asio::io_service& _io_service_) : object(_io_service_) {}
timer(asio::io_service& _io_service_) : object(_io_service_), timer_can(256) {tid id = -1; do_something_to_all([&id](auto& item) {item.id = ++id;});}
void update_timer_info(tid id, size_t milliseconds, std::function<bool(tid)>&& call_back, bool start = false)
{
timer_info ti = {id};
std::unique_lock<std::shared_mutex> lock(timer_can_mutex);
auto iter = timer_can.find(ti);
if (iter == std::end(timer_can))
{
iter = timer_can.insert(ti).first;
iter->timer = std::make_shared<timer_type>(io_service_);
}
lock.unlock();
timer_info& ti = timer_can[id];
//items in timer_can not locked
iter->status = timer_info::TIMER_OK;
iter->milliseconds = milliseconds;
iter->call_back.swap(call_back);
if (timer_info::TIMER_FAKE == ti.status)
ti.timer = std::make_shared<timer_type>(io_service_);
ti.status = timer_info::TIMER_OK;
ti.milliseconds = milliseconds;
ti.call_back.swap(call_back);
if (start)
start_timer(*iter);
start_timer(ti);
}
void update_timer_info(tid id, size_t milliseconds, const std::function<bool(tid)>& call_back, bool start = false)
{update_timer_info(id, milliseconds, std::function<bool(tid)>(call_back), start);}
......@@ -94,69 +86,50 @@ public:
void set_timer(tid id, size_t milliseconds, std::function<bool(tid)>&& call_back) {update_timer_info(id, milliseconds, std::move(call_back), true);}
void set_timer(tid id, size_t milliseconds, const std::function<bool(tid)>& call_back) {update_timer_info(id, milliseconds, call_back, true);}
timer_info find_timer(tid id)
{
timer_info ti = {id, timer_info::TIMER_CANCELED, 0};
std::shared_lock<std::shared_mutex> lock(timer_can_mutex);
auto iter = timer_can.find(ti);
if (iter == std::end(timer_can))
return *iter;
else
return ti;
}
timer_info find_timer(tid id) const {return timer_can[id];}
bool start_timer(tid id)
{
timer_info ti = {id};
timer_info& ti = timer_can[id];
std::shared_lock<std::shared_mutex> lock(timer_can_mutex);
auto iter = timer_can.find(ti);
if (iter == std::end(timer_can))
if (timer_info::TIMER_FAKE == ti.status)
return false;
lock.unlock();
start_timer(*iter);
return true;
}
void stop_timer(tid id)
{
timer_info ti = {id};
ti.status = timer_info::TIMER_OK;
start_timer(ti); //if timer already started, this will cancel it first
std::shared_lock<std::shared_mutex> lock(timer_can_mutex);
auto iter = timer_can.find(ti);
if (iter != std::end(timer_can))
{
lock.unlock();
stop_timer(*iter);
}
return true;
}
DO_SOMETHING_TO_ALL_MUTEX(timer_can, timer_can_mutex)
DO_SOMETHING_TO_ONE_MUTEX(timer_can, timer_can_mutex)
void stop_timer(tid id) {stop_timer(timer_can[id]);}
void stop_all_timer() {do_something_to_all([this](auto& item) {this->stop_timer(item);});}
void stop_all_timer() {do_something_to_all([this](const auto& item) {this->stop_timer(item);});}
DO_SOMETHING_TO_ALL(timer_can)
DO_SOMETHING_TO_ONE(timer_can)
protected:
void reset() {object::reset();}
void start_timer(timer_cinfo& ti)
{
assert(timer_info::TIMER_OK == ti.status);
ti.timer->expires_from_now(milliseconds(ti.milliseconds));
//return true from call_back to continue the timer, or the timer will stop
ti.timer->async_wait(make_handler_error([this, &ti](const auto& ec) {if (!ec && ti.call_back(ti.id) && timer_info::TIMER_OK == ti.status) this->start_timer(ti);}));
}
void stop_timer(timer_cinfo& ti)
void stop_timer(timer_info& ti)
{
asio::error_code ec;
ti.timer->cancel(ec);
ti.status = timer_info::TIMER_CANCELED;
if (timer_info::TIMER_OK == ti.status) //enable stopping timers that has been stopped
{
asio::error_code ec;
ti.timer->cancel(ec);
ti.status = timer_info::TIMER_CANCELED;
}
}
container_type timer_can;
std::shared_mutex timer_can_mutex;
private:
using object::io_service_;
......
......@@ -166,8 +166,6 @@ protected:
std::unique_lock<std::shared_mutex> lock(shutdown_mutex);
this->stop_all_timer();
this->started_ = false;
// reset_state();
if (this->lowest_layer().is_open())
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册