提交 69faa009 编写于 作者: Y YangLi

1.3.1 release.

Support Cygwin and Mingw.
Dynamically allocate timers when needed (multithreading releated behaviors kept as before).
Sharply reduced the memory occupation of timers.
Expand the range of timer ID from [0, 256) to [0, 65536).
Add new macro ASCS_ALIGNED_TIMER to align timers.
Realign member variables to save a few memory.
Amend doc.
Supplement English docs.
Avoid chinese file names.
Simplified config.mk
Make demos easier to use.
Optimize file_server and file_client.
Supplement _LARGEFILE_SOURCE definition for file_server and file_client.
上级 7d079d4d
doc/ClassDiagram.jpg

132.1 KB | W: | H:

doc/ClassDiagram.jpg

131.9 KB | W: | H:

doc/ClassDiagram.jpg
doc/ClassDiagram.jpg
doc/ClassDiagram.jpg
doc/ClassDiagram.jpg
  • 2-up
  • Swipe
  • Onion skin
......@@ -40,9 +40,6 @@ strncmp(msg->msg.cc, "abc", sizeof(msg->msg.cc));
...; //初始其它的成员
client.direct_send_msg(msg); //不能再调用以send_开头的消息了,因为它们需要调用打包器来打包,我们在这里用的是dumm_packer
最后,当你收到消息的时候,在on_msg_handle里面得到的消息类型的ascs::auto_buffer<message>,你调用msg.raw_buffer()
即得到了一个message对象指针。
5. 解包器是必须要写的
因为解包器还负责提供接收数据时需要的缓存。好在提供它非常简单,如下:
class my_unpacker : public ascs::tcp::i_unpacker<auto_buffer<message>>
......@@ -52,10 +49,10 @@ public:
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
if (bytes_transferred != raw_buff.size())
{
raw_buff.clear();
return false;
}
msg_can.emplace_back(std::move(raw_buff));
return true;
......@@ -77,6 +74,9 @@ private:
auto_buffer<message> raw_buff;
};
最后,当你收到消息的时候,在on_msg_handle里面得到的消息类型是ascs::auto_buffer<message>&,你调用msg.raw_buffer()
即得到了一个message对象指针。
6. 最后的代码如下:
class message;
#define ASCS_DEFAULT_PACKER ascs::dummy_packer<ascs::auto_buffer<message>>
......@@ -104,7 +104,3 @@ int main(int argc, const char* argv[])
}
}
除了main函数,有效代码不超过15行!但注意,大小端未考虑,一般也不需要考虑。
Suppose you want to introduce ascs to an old project.
Before, you always sent structures directly, what should you do now after introduced ascs?
Base on <<ascs development documentation (en).docx>>, we should:
1. You already have a structure:
struct native_msg
{
char c;
short s;
int i;
long long ll;
char cc[100];
short ss[100];
};
2. Implement a class named message (can be any other names):
class message
{
public:
char* data() { return (char*) &msg; } //unpacker will use this to directly write msg, and avoid replication for msg
const char* data() const { return (const char*) &msg; }
size_t size() const { return sizeof(native_msg); }
bool empty() const { return false; }
public:
native_msg msg;
};
3. You must have realized that class message doesn't support swap operation, this will introduce memory replication (so we will fail to
compile ascs, it's by design), we can use ascs::auto_buffer to wrap message, then all problems gone:
ascs::auto_buffer<messge> really_used_msg;
4. Should we implement a packer?
No, because ascs can only see i_packer interface, you cannot make a native_msg object from any function in i_packer, but ascs always need a packer,
so we can use dummy_packer:
#define ASCS_DEFAULT_PACKER ascs::dummy_packer<ascs::auto_buffer<message>>
Then, when we need to send messages:
ascs::auto_buffer<message> msg(new message);
strncmp(msg->msg.cc, "abc", sizeof(msg->msg.cc));
...; //other initializations
client.direct_send_msg(msg); //cannot call functions whoes name starts with 'send_', because they need packers, here we used dumm_packer
5. unpacker is necessary
Because unpacker also provide buffers during network data reading:
class my_unpacker : public ascs::tcp::i_unpacker<auto_buffer<message>>
{
public:
virtual void reset() { raw_buff.clear(); }
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
if (bytes_transferred != raw_buff.size())
{
raw_buff.clear();
return false;
}
msg_can.emplace_back(std::move(raw_buff));
return true;
}
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred)
{
return ec || bytes_transferred == raw_buff.size() ? 0 : asio::detail::default_max_transfer_size;
}
virtual buffer_type prepare_next_recv()
{
if (raw_buff.empty())
raw_buff.raw_buffer(new message);
return asio::buffer(raw_buff.raw_buffer()->data(), raw_buff.size()); //write native_msg object directly
}
private:
auto_buffer<message> raw_buff;
};
Finally, when a whole message been received, in on_msg_handle, the type of msg is ascs::auto_buffer<message>&,
you call msg.raw_buffer() and get a pointer of message object.
6. The main function should be:
class message;
#define ASCS_DEFAULT_PACKER ascs::dummy_packer<ascs::auto_buffer<message>>
class my_unpacker;
#define ASCS_DEFAULT_UNPACKER my_unpacker
int main(int argc, const char* argv[])
{
service_pump sp;
single_client client(sp);
sp.start_service();
while(sp.is_running())
{
std::string str;
std::cin >> str;
if (!str.empty())
{
ascs::auto_buffer<message> msg(new message);
strncmp(msg->msg.cc, str.data(), sizeof(msg->msg.cc));
...; //other initializations
client.direct_send_msg(msg);
}
}
}
PS. no Big-endian or little-endian been considered.
......@@ -7,6 +7,7 @@
//so, define this to avoid hooks for async call (and slightly improve efficiency),
//any value which is bigger than zero is okay.
#define ASCS_DISPATCH_BATCH_MSG
#define ASCS_ALIGNED_TIMER
#define ASCS_CUSTOM_LOG
#define ASCS_DEFAULT_UNPACKER non_copy_unpacker
//#define ASCS_DEFAULT_UNPACKER stream_unpacker
......@@ -49,7 +50,7 @@ using namespace ascs::ext::tcp;
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
#define RECONNECT_COMMAND "reconnect"
#define RECONNECT "reconnect"
int main(int argc, const char* argv[])
{
......@@ -83,7 +84,7 @@ int main(int argc, const char* argv[])
sp.stop_service();
sp.start_service();
}
else if (RECONNECT_COMMAND == str)
else if (RECONNECT == str)
client.graceful_shutdown(true);
else
client.safe_send_msg(str, false);
......
......@@ -19,11 +19,11 @@ using namespace ascs::ext;
using namespace ascs::ext::tcp;
#define QUIT_COMMAND "quit"
#define LIST_ALL_CLIENT "list_all_client"
#define STATISTIC "statistic"
#define STATUS "status"
#define INCREASE_THREAD "increase_thread"
#define DECREASE_THREAD "decrease_thread"
#define STATISTIC "statistic"
#define LIST_ALL_CLIENT "list all client"
#define INCREASE_THREAD "increase thread"
#define DECREASE_THREAD "decrease thread"
class echo_socket : public client_socket
{
......@@ -51,10 +51,10 @@ protected:
}
//msg handling
virtual bool on_msg_handle(out_msg_type& msg) {handle_msg(std::move(msg)); return true;}
virtual bool on_msg_handle(out_msg_type& msg) {handle_msg(msg); return true;}
private:
void handle_msg(out_msg_type&& msg)
void handle_msg(out_msg_type& msg)
{
last_send_time.restart();
direct_send_msg(std::move(msg), true);
......
......@@ -18,11 +18,11 @@ using namespace ascs::tcp;
using namespace ascs::ext::tcp;
#define QUIT_COMMAND "quit"
#define LIST_ALL_CLIENT "list_all_client"
#define STATISTIC "statistic"
#define STATUS "status"
#define INCREASE_THREAD "increase_thread"
#define DECREASE_THREAD "decrease_thread"
#define STATISTIC "statistic"
#define LIST_ALL_CLIENT "list all client"
#define INCREASE_THREAD "increase thread"
#define DECREASE_THREAD "decrease thread"
class echo_socket : public server_socket
{
......@@ -68,7 +68,7 @@ int main(int argc, const char* argv[])
while(sp.is_running())
{
std::string str;
std::cin >> str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
sp.stop_service();
else if (STATISTIC == str)
......
......@@ -22,13 +22,22 @@ cflag += -DASIO_STANDALONE -DASIO_NO_DEPRECATED
kernel = ${shell uname -s}
ifeq (${kernel}, SunOS)
cflag += -pthreads ${ext_cflag} ${ext_location} -I../../include/
lflag += -pthreads -lsocket -lnsl ${ext_libs}
cflag += -pthreads
lflag += -pthreads -lsocket -lnsl
else
cflag += -pthread ${ext_cflag} ${ext_location} -I../../include/
lflag += -pthread ${ext_libs}
cflag += -pthread
lflag += -pthread
cygwin = ${findstring CYGWIN, ${kernel}}
ifeq (${cygwin}, CYGWIN)
cflag += -D__USE_W32_SOCKETS -D_WIN32_WINNT=0x0501
lflag += -lws2_32 -lwsock32
endif
endif
cflag += ${ext_cflag} ${ext_location} -I../../include/
lflag += ${ext_libs}
target = ${dir}/${module}
sources = ${shell ls *.cpp}
objects = ${patsubst %.cpp,${dir}/%.o,${sources}}
......
......@@ -60,11 +60,11 @@ using namespace ascs::ext::tcp;
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
#define LIST_ALL_CLIENT "list_all_client"
#define STATISTIC "statistic"
#define STATUS "status"
#define INCREASE_THREAD "increase_thread"
#define DECREASE_THREAD "decrease_thread"
#define STATISTIC "statistic"
#define LIST_ALL_CLIENT "list all client"
#define INCREASE_THREAD "increase thread"
#define DECREASE_THREAD "decrease thread"
static bool check_msg;
......
......@@ -9,6 +9,7 @@
#define ASCS_ENHANCED_STABILITY
//#define ASCS_FULL_STATISTIC //full statistic will slightly impact efficiency
//#define ASCS_USE_STEADY_TIMER
#define ASCS_ALIGNED_TIMER
#define ASCS_AVOID_AUTO_STOP_SERVICE
#define ASCS_DECREASE_THREAD_AT_RUNTIME
//#define ASCS_MAX_MSG_NUM 16
......@@ -51,11 +52,11 @@ using namespace ascs::ext::tcp;
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
#define LIST_ALL_CLIENT "list_all_client"
#define STATISTIC "statistic"
#define STATUS "status"
#define INCREASE_THREAD "increase_thread"
#define DECREASE_THREAD "decrease_thread"
#define STATISTIC "statistic"
#define LIST_ALL_CLIENT "list all client"
#define INCREASE_THREAD "increase thread"
#define DECREASE_THREAD "decrease thread"
//demonstrate how to use custom packer
//under the default behavior, each tcp::socket has their own packer, and cause memory waste
......@@ -194,7 +195,7 @@ int main(int argc, const char* argv[])
while(sp.is_running())
{
std::string str;
std::cin >> str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
sp.stop_service();
else if (RESTART_COMMAND == str)
......
......@@ -16,6 +16,9 @@
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
#define STATUS "status"
#define STATISTIC "statistic"
#define LIST_ALL_CLIENT "list all client"
#define REQUEST_FILE "get"
int link_num = 1;
......@@ -61,18 +64,20 @@ int main(int argc, const char* argv[])
sp.stop_service();
sp.start_service();
}
else if (STATISTIC == str)
{
printf("link #: " ASCS_SF ", valid links: " ASCS_SF ", invalid links: " ASCS_SF "\n", client.size(), client.valid_size(), client.invalid_object_size());
puts("");
puts(client.get_statistic().to_string().data());
}
else if (STATUS == str)
client.list_all_status();
else if (LIST_ALL_CLIENT == str)
client.list_all_object();
else if (str.size() > sizeof(REQUEST_FILE) && !strncmp(REQUEST_FILE, str.data(), sizeof(REQUEST_FILE) - 1) && isspace(str[sizeof(REQUEST_FILE) - 1]))
{
str.erase(0, sizeof(REQUEST_FILE));
auto files = split_string(str);
do_something_to_all(files, [&](const std::string& item) {
file_size = -1;
received_size = 0;
if (client.get_file(item))
while (client.is_transferring())
std::this_thread::sleep_for(std::chrono::milliseconds(50));
});
client.get_file(split_string(str));
}
else
client.at(0)->talk(str);
......
......@@ -171,17 +171,37 @@ class file_client : public multi_client_base<file_socket>
public:
static const tid TIMER_BEGIN = multi_client_base<file_socket>::TIMER_END;
static const tid UPDATE_PROGRESS = TIMER_BEGIN;
static const tid TIMER_END = TIMER_BEGIN + 10;
static const tid TIMER_END = TIMER_BEGIN + 5;
file_client(service_pump& service_pump_) : multi_client_base<file_socket>(service_pump_) {}
bool is_transferring() const {return is_timer(UPDATE_PROGRESS);}
bool get_file(const std::string& file_name)
void get_file(std::list<std::string>&& files)
{
if (is_transferring())
printf("file transfer is ongoing for file %s", file_name.data());
else
std::unique_lock<std::mutex> lock(file_list_mutex);
file_list.splice(std::end(file_list), files);
lock.unlock();
get_file();
}
void get_file(const std::list<std::string>& files) {get_file(std::list<std::string>(files));}
private:
void get_file()
{
std::unique_lock<std::mutex> lock(file_list_mutex);
if (is_timer(UPDATE_PROGRESS))
return;
while (!file_list.empty())
{
std::string file_name(std::move(file_list.front()));
file_list.pop_front();
file_size = -1;
received_size = 0;
printf("transfer %s begin.\n", file_name.data());
if (find(0)->get_file(file_name))
{
......@@ -191,16 +211,13 @@ public:
begin_time.restart();
set_timer(UPDATE_PROGRESS, 50, [this](tid id)->bool {return this->update_progress_handler(id, -1);});
return true;
break;
}
else
printf("transfer %s failed!\n", file_name.data());
}
return false;
}
private:
bool update_progress_handler(tid id, unsigned last_percent)
{
assert(UPDATE_PROGRESS == id);
......@@ -215,7 +232,7 @@ private:
printf("\r%u%%", new_percent);
fflush(stdout);
update_timer_info(id, 50, [new_percent, this](tid id)->bool {return this->update_progress_handler(id, new_percent);});
change_timer_call_back(id, [new_percent, this](tid id)->bool {return this->update_progress_handler(id, new_percent);});
}
}
......@@ -223,11 +240,17 @@ private:
return true;
printf("\r100%%\nend, speed: %f MBps.\n", file_size / begin_time.elapsed() / 1024 / 1024);
change_timer_status(id, timer_info::TIMER_CANCELED);
get_file();
return false;
}
protected:
cpu_timer begin_time;
std::list<std::string> file_list;
std::mutex file_list_mutex;
};
#endif //#ifndef FILE_CLIENT_H_
module = file_client
ext_cflag = -D_FILE_OFFSET_BITS=64
ext_cflag = -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE
include ../config.mk
......@@ -36,7 +36,7 @@ public:
return false;
}
msg_can.emplace_back(); //need empty message to trigger the next message receiving
msg_can.emplace_back(); //need empty message to trigger the next receiving
return true;
}
......
......@@ -25,7 +25,9 @@
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
#define LIST_ALL_CLIENT "list_all_client"
#define STATUS "status"
#define STATISTIC "statistic"
#define LIST_ALL_CLIENT "list all client"
int main(int argc, const char* argv[])
{
......@@ -56,6 +58,14 @@ int main(int argc, const char* argv[])
sp.stop_service();
sp.start_service();
}
else if (STATISTIC == str)
{
printf("link #: " ASCS_SF ", invalid links: " ASCS_SF "\n", file_server_.size(), file_server_.invalid_object_size());
puts("");
puts(file_server_.get_statistic().to_string().data());
}
else if (STATUS == str)
file_server_.list_all_status();
else if (LIST_ALL_CLIENT == str)
file_server_.list_all_object();
}
......
module = file_server
ext_cflag = -D_FILE_OFFSET_BITS=64
ext_cflag = -D_FILE_OFFSET_BITS=64 -D_LARGEFILE_SOURCE
include ../config.mk
@echo off
set cflag=-Wall -fexceptions -std=c++11 -pthread -O2 -DNDEBUG -DASIO_STANDALONE -DASIO_NO_DEPRECATED -I../../../asio/asio/include -I../../include/
set lflag=-pthread -s -lstdc++ -lws2_32 -lwsock32
for /d %%i in (*) do call:build %%i
goto:eof
:build
@echo on
cd %~1
g++ %cflag% *.cpp -o %~1.exe %lflag%
cd ..
@echo off
goto:eof
......@@ -24,11 +24,11 @@ using namespace ascs::ext::tcp;
#endif
#define QUIT_COMMAND "quit"
#define LIST_ALL_CLIENT "list_all_client"
#define STATISTIC "statistic"
#define STATUS "status"
#define INCREASE_THREAD "increase_thread"
#define DECREASE_THREAD "decrease_thread"
#define STATISTIC "statistic"
#define LIST_ALL_CLIENT "list all client"
#define INCREASE_THREAD "increase thread"
#define DECREASE_THREAD "decrease thread"
cpu_timer begin_time;
std::atomic_ushort completed_session_num;
......
......@@ -8,7 +8,7 @@
#define ASCS_MSG_BUFFER_SIZE 65536
#define ASCS_INPUT_QUEUE non_lock_queue
#define ASCS_INPUT_CONTAINER list
//if pingpong_client only send message in on_msg() or on_msg_handle(), which means a responsive system, a real pingpong test,
//if pingpong_client only send message in on_msg_handle(), which means a responsive system, a real pingpong test,
//then, before pingpong_server send each message, the previous message has been sent to pingpong_client,
//so sending buffer will always be empty, which means we will never operate sending buffer concurrently, so need no locks.
//
......@@ -24,11 +24,11 @@ using namespace ascs::tcp;
using namespace ascs::ext::tcp;
#define QUIT_COMMAND "quit"
#define LIST_ALL_CLIENT "list_all_client"
#define STATISTIC "statistic"
#define STATUS "status"
#define INCREASE_THREAD "increase_thread"
#define DECREASE_THREAD "decrease_thread"
#define STATISTIC "statistic"
#define LIST_ALL_CLIENT "list all client"
#define INCREASE_THREAD "increase thread"
#define DECREASE_THREAD "decrease thread"
class echo_socket : public server_socket
{
......@@ -74,7 +74,7 @@ int main(int argc, const char* argv[])
while(sp.is_running())
{
std::string str;
std::cin >> str;
std::getline(std::cin, str);
if (QUIT_COMMAND == str)
sp.stop_service();
else if (STATISTIC == str)
......
......@@ -22,7 +22,7 @@ using namespace ascs::ext::ssl;
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
#define RECONNECT_COMMAND "reconnect"
#define RECONNECT "reconnect"
#define SHOW_ALL_LINKS "show_all_links"
#define SHUTDOWN_LINK "shutdown"
......@@ -93,7 +93,7 @@ int main(int argc, const char* argv[])
client_.list_all_object();
}
#ifndef ASCS_REUSE_SSL_STREAM
else if (RESTART_COMMAND == str || RECONNECT_COMMAND == str)
else if (RESTART_COMMAND == str || RECONNECT == str)
puts("please define macro ASCS_REUSE_SSL_STREAM to test this feature.");
else if (SHUTDOWN_LINK == str)
// server_.at(0)->graceful_shutdown();
......@@ -115,7 +115,7 @@ int main(int argc, const char* argv[])
sp.start_service();
}
else if (RECONNECT_COMMAND == str)
else if (RECONNECT == str)
// server_.graceful_shutdown();
client_.graceful_shutdown(true);
else if (SHUTDOWN_LINK == str)
......
......@@ -13,6 +13,10 @@
#ifndef _ASCS_BASE_H_
#define _ASCS_BASE_H_
#if defined(__MINGW32__) || defined(__MINGW64__) //terrible Mingw
#include <bits/c++config.h> //for printf in stdio.h, it needs macro __USE_MINGW_ANSI_STDIO
#include <pthread.h> //for ctime_r in time.h, it needs macro _POSIX_THREAD_SAFE_FUNCTIONS
#endif
#include <stdio.h>
#include <stdarg.h>
......
......@@ -355,6 +355,30 @@
*
* REPLACEMENTS:
*
* ===============================================================
* 2018.8.1 version 1.3.1
*
* SPECIAL ATTENTION (incompatible with old editions):
* The data type of timer ID has been changed from unsigned char to unsigned short.
*
* HIGHLIGHT:
* Support Cygwin and Mingw.
* Dynamically allocate timers when needed (multithreading releated behaviors kept as before, so we must introduce a mutex for ascs::timer object).
*
* FIX:
*
* ENHANCEMENTS:
* The range of timer ID has been expanded from [0, 256) to [0, 65536).
* Add new macro ASCS_ALIGNED_TIMER to align timers.
*
* DELETION:
*
* REFACTORING:
* Realigned member variables for ascs::socket to save a few memory.
* Make demos more easier to use.
*
* REPLACEMENTS:
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -364,18 +388,14 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10300 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.3.0"
#define ASCS_VER 10301 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.3.1"
//asio and compiler check
#ifdef _MSC_VER
#define ASCS_SF "%Iu" //format used to print 'size_t'
static_assert(_MSC_VER >= 1800, "ascs needs Visual C++ 12.0 (2013) or higher.");
#elif defined(__GNUC__)
#define ASCS_SF "%zu" //format used to print 'size_t'
#ifdef __x86_64__
#define ASCS_LLF "%lu" //format used to print 'uint_fast64_t'
#endif
#ifdef __clang__
static_assert(__clang_major__ > 3 || (__clang_major__ == 3 && __clang_minor__ >= 1), "ascs needs Clang 3.1 or higher.");
#else
......@@ -385,6 +405,11 @@
#if !defined(__GXX_EXPERIMENTAL_CXX0X__) && (!defined(__cplusplus) || __cplusplus < 201103L)
#error ascs needs c++11 or higher.
#endif
#define ASCS_SF "%zu" //format used to print 'size_t'
#if defined(__x86_64__) && !defined(__MINGW64__) //terrible mingw
#define ASCS_LLF "%lu" //format used to print 'uint_fast64_t'
#endif
#else
#error ascs only support Visual C++, GCC and Clang.
#endif
......@@ -635,6 +660,11 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus
//it's very useful if you want to re-dispatch message in your own logic or with very simple message handling (such as echo server).
//it's your responsibility to remove handled messages from the container (can be part of them).
//#define ASCS_ALIGNED_TIMER
//for example, start a timer at xx:xx:xx, interval is 10 seconds, the callback will be called at (xx:xx:xx + 10), and suppose that the callback
//returned at (xx:xx:xx + 11), then the interval will be temporarily changed to 9 seconds to make the next callback to be called at (xx:xx:xx + 20),
//if you don't define this macro, the next callback will be called at (xx:xx:xx + 21), plase note.
//configurations
#endif /* _ASCS_CONFIG_H_ */
......@@ -19,8 +19,11 @@ namespace ascs
{
//ascs requires that container must take one and only one template argument.
#if defined(_MSC_VER) || defined(__clang__) || __GNUC__ >= 5
#if defined(_MSC_VER) || defined(__clang__) || (!defined(__CYGWIN__) && !defined(__MINGW32__) && !defined(__MINGW64__) && __GNUC__ >= 5)
template<typename T> using list = std::list<T>;
//on cygwin and mingw, even gcc 7 and 8 still have not made list::size() to be O(1) complexity, which also means function size() and empty() are
// not thread safe, but ascs::queue needs them to be thread safe no matter itself is lockable or dummy lockable (see ascs::queue for more details),
//so, we must use ascs::list instead of std::list for cygwin and mingw (terrible cygwin and mingw).
#else
//a substitute of std::list (before gcc 5), it's size() function has O(1) complexity
//BTW, the naming rule is not mine, I copied them from std::list in Visual C++ 14.0
......
......@@ -13,6 +13,8 @@
#ifndef _ASCS_EXECUTOR_H_
#define _ASCS_EXECUTOR_H_
#include <functional>
#include <asio.hpp>
#include "config.h"
......
......@@ -13,8 +13,6 @@
#ifndef _ASCS_EXT_H_
#define _ASCS_EXT_H_
#include <chrono>
#include "../base.h"
//the size of the buffer used when receiving msg, must equal to or larger than the biggest msg size,
......
......@@ -16,8 +16,8 @@
#include <unordered_map>
#include "timer.h"
#include "executor.h"
#include "timer.h"
#include "container.h"
#include "service_pump.h"
......
......@@ -13,8 +13,8 @@
#ifndef _ASCS_SOCKET_H_
#define _ASCS_SOCKET_H_
#include "timer.h"
#include "tracked_executor.h"
#include "timer.h"
namespace ascs
{
......@@ -24,8 +24,11 @@ template<typename Socket, typename Packer, typename Unpacker, typename InMsgType
template<typename, typename> class OutQueue, template<typename> class OutContainer>
class socket : public timer<tracked_executor>
{
private:
typedef timer<tracked_executor> super;
public:
static const tid TIMER_BEGIN = timer<tracked_executor>::TIMER_END;
static const tid TIMER_BEGIN = super::TIMER_END;
static const tid TIMER_CHECK_RECV = TIMER_BEGIN;
static const tid TIMER_DISPATCH_MSG = TIMER_BEGIN + 1;
static const tid TIMER_DELAY_CLOSE = TIMER_BEGIN + 2;
......@@ -33,8 +36,8 @@ public:
static const tid TIMER_END = TIMER_BEGIN + 10;
protected:
socket(asio::io_context& io_context_) : timer<tracked_executor>(io_context_), next_layer_(io_context_), strand(io_context_) {first_init();}
template<typename Arg> socket(asio::io_context& io_context_, Arg& arg) : timer<tracked_executor>(io_context_), next_layer_(io_context_, arg), strand(io_context_) {first_init();}
socket(asio::io_context& io_context_) : super(io_context_), next_layer_(io_context_), strand(io_context_) {first_init();}
template<typename Arg> socket(asio::io_context& io_context_, Arg& arg) : super(io_context_), next_layer_(io_context_, arg), strand(io_context_) {first_init();}
//helper function, just call it in constructor
void first_init()
......@@ -151,8 +154,8 @@ public:
bool is_dispatching() const {return dispatching;}
bool is_recv_idle() const {return recv_idle_began;}
void msg_resuming_interval(size_t interval) {msg_resuming_interval_ = interval;}
size_t msg_resuming_interval() const {return msg_resuming_interval_;}
void msg_resuming_interval(unsigned interval) {msg_resuming_interval_ = interval;}
unsigned msg_resuming_interval() const {return msg_resuming_interval_;}
void msg_handling_interval(size_t interval) {msg_handling_interval_ = interval;}
size_t msg_handling_interval() const {return msg_handling_interval_;}
......@@ -458,36 +461,35 @@ private:
}
protected:
struct statistic stat;
std::shared_ptr<i_packer<typename Packer::msg_type>> packer_;
volatile bool sending;
in_queue_type send_msg_buffer;
volatile bool sending;
#ifdef ASCS_PASSIVE_RECV
volatile bool reading;
#endif
struct statistic stat;
private:
bool recv_idle_began;
volatile bool dispatching;
volatile bool started_; //has started or not
typename statistic::stat_time recv_idle_begin_time;
out_queue_type recv_msg_buffer;
uint_fast64_t _id;
Socket next_layer_;
volatile bool started_; //has started or not
std::atomic_flag start_atomic;
#ifndef ASCS_DISPATCH_BATCH_MSG
out_msg last_dispatch_msg;
#endif
out_queue_type recv_msg_buffer;
typename statistic::stat_time recv_idle_begin_time;
bool recv_idle_began;
volatile bool dispatching;
std::atomic_flag start_atomic;
asio::io_context::strand strand;
size_t msg_resuming_interval_, msg_handling_interval_;
unsigned msg_resuming_interval_, msg_handling_interval_;
};
} //namespace
......
......@@ -28,7 +28,7 @@ private:
public:
static const typename super::tid TIMER_BEGIN = super::TIMER_END;
static const typename super::tid TIMER_CONNECT = TIMER_BEGIN;
static const typename super::tid TIMER_END = TIMER_BEGIN + 10;
static const typename super::tid TIMER_END = TIMER_BEGIN + 5;
client_socket_base(asio::io_context& io_context_) : super(io_context_), need_reconnect(true) {set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
template<typename Arg>
......
......@@ -44,7 +44,7 @@ protected:
public:
static const typename super::tid TIMER_BEGIN = super::TIMER_END;
static const typename super::tid TIMER_ASYNC_SHUTDOWN = TIMER_BEGIN;
static const typename super::tid TIMER_END = TIMER_BEGIN + 10;
static const typename super::tid TIMER_END = TIMER_BEGIN + 5;
virtual bool obsoleted() {return !is_shutting_down() && super::obsoleted();}
virtual bool is_ready() {return is_connected();}
......@@ -319,7 +319,7 @@ private:
--loop_num;
if (loop_num > 0)
{
this->update_timer_info(TIMER_ASYNC_SHUTDOWN, 10, [loop_num, this](typename super::tid id)->bool {return this->async_shutdown_handler(loop_num);});
this->change_timer_call_back(TIMER_ASYNC_SHUTDOWN, [loop_num, this](typename super::tid id)->bool {return this->async_shutdown_handler(loop_num);});
return true;
}
else
......
......@@ -44,113 +44,142 @@ public:
typedef asio::system_timer timer_type;
#endif
typedef unsigned char tid;
static const tid TIMER_END = 0; //user timer's id must begin from parent class' TIMER_END
typedef unsigned short tid;
static const tid TIMER_END = 0; //subclass' id must begin from parent class' TIMER_END
struct timer_info
{
enum timer_status {TIMER_FAKE, TIMER_OK, TIMER_CANCELED};
enum timer_status : char {TIMER_CREATED, TIMER_STARTED, TIMER_CANCELED};
tid id;
unsigned char seq;
timer_status status;
size_t interval_ms;
unsigned interval_ms;
timer_type timer;
std::function<bool(tid)> call_back; //return true from call_back to continue the timer, or the timer will stop
std::shared_ptr<timer_type> timer;
timer_info() : seq(-1), status(TIMER_FAKE), interval_ms(0) {}
timer_info(tid id_, asio::io_context& io_context_) : id(id_), seq(-1), status(TIMER_CREATED), interval_ms(0), timer(io_context_) {}
bool operator ==(const timer_info& other) {return id == other.id;}
bool operator ==(tid id_) {return id == id_;}
};
typedef const timer_info timer_cinfo;
typedef std::vector<timer_info> container_type;
timer(asio::io_context& io_context_) : Executor(io_context_), timer_can((tid) -1) {tid id = -1; do_something_to_all([&id](timer_info& item) {item.id = ++id;});}
timer(asio::io_context& io_context_) : Executor(io_context_) {}
~timer() {stop_all_timer();}
bool update_timer_info(tid id, size_t interval, std::function<bool(tid)>&& call_back, bool start = false)
bool create_or_update_timer(tid id, unsigned interval, std::function<bool(tid)>&& call_back, bool start = false)
{
timer_info& ti = timer_can[id];
timer_info* ti = nullptr;
{
std::lock_guard<std::mutex> lock(timer_can_mutex);
auto iter = std::find(std::begin(timer_can), std::end(timer_can), id);
if (iter == std::end(timer_can))
{
try {timer_can.emplace_back(id, io_context_); ti = &timer_can.back();}
catch (const std::exception& e) {unified_out::error_out("cannot create timer %d (%s)", id, e.what()); return false;}
}
else
ti = &*iter;
}
assert (nullptr != ti);
if (timer_info::TIMER_FAKE == ti.status)
try {ti.timer = std::make_shared<timer_type>(io_context_);}
catch (const std::exception& e) {unified_out::error_out("cannot create timer %d (%s)", ti.id, e.what()); return false;}
ti.status = timer_info::TIMER_OK;
ti.interval_ms = interval;
ti.call_back.swap(call_back);
ti->interval_ms = interval;
ti->call_back.swap(call_back);
if (start)
start_timer(ti);
start_timer(*ti);
return true;
}
bool update_timer_info(tid id, size_t interval, const std::function<bool(tid)>& call_back, bool start = false)
{return update_timer_info(id, interval, std::function<bool(tid)>(call_back), start);}
bool create_or_update_timer(tid id, unsigned interval, const std::function<bool(tid)>& call_back, bool start = false)
{return create_or_update_timer(id, interval, std::function<bool(tid)>(call_back), start);}
void change_timer_status(tid id, typename timer_info::timer_status status) {timer_can[id].status = status;}
void change_timer_interval(tid id, size_t interval) {timer_can[id].interval_ms = interval;}
bool change_timer_status(tid id, typename timer_info::timer_status status) {auto ti = find_timer(id); return nullptr != ti ? ti->status = status, true : false;}
bool change_timer_interval(tid id, size_t interval) {auto ti = find_timer(id); return nullptr != ti ? ti->interval_ms = interval, true : false;}
void change_timer_call_back(tid id, std::function<bool(tid)>&& call_back) {timer_can[id].call_back.swap(call_back);}
void change_timer_call_back(tid id, const std::function<bool(tid)>& call_back) {change_timer_call_back(id, std::function<bool(tid)>(call_back));}
bool change_timer_call_back(tid id, std::function<bool(tid)>&& call_back) {auto ti = find_timer(id); return nullptr != ti ? ti->call_back.swap(call_back), true : false;}
bool change_timer_call_back(tid id, const std::function<bool(tid)>& call_back) {return change_timer_call_back(id, std::function<bool(tid)>(call_back));}
bool set_timer(tid id, size_t interval, std::function<bool(tid)>&& call_back) {return update_timer_info(id, interval, std::move(call_back), true);}
bool set_timer(tid id, size_t interval, const std::function<bool(tid)>& call_back) {return update_timer_info(id, interval, call_back, true);}
bool set_timer(tid id, unsigned interval, std::function<bool(tid)>&& call_back) {return create_or_update_timer(id, interval, std::move(call_back), true);}
bool set_timer(tid id, unsigned interval, const std::function<bool(tid)>& call_back) {return create_or_update_timer(id, interval, call_back, true);}
bool start_timer(tid id)
timer_info* find_timer(tid id)
{
timer_info& ti = timer_can[id];
std::lock_guard<std::mutex> lock(timer_can_mutex);
auto iter = std::find(std::begin(timer_can), std::end(timer_can), id);
if (iter != std::end(timer_can))
return &*iter;
if (timer_info::TIMER_FAKE == ti.status)
return false;
ti.status = timer_info::TIMER_OK;
start_timer(ti); //if timer already started, this will cancel it first
return true;
return nullptr;
}
timer_info find_timer(tid id) const {return timer_can[id];}
bool is_timer(tid id) const {return timer_info::TIMER_OK == timer_can[id].status;}
void stop_timer(tid id) {stop_timer(timer_can[id]);}
bool is_timer(tid id) {auto ti = find_timer(id); return nullptr != ti ? timer_info::TIMER_STARTED == ti->status : false;}
bool start_timer(tid id) {auto ti = find_timer(id); return nullptr != ti ? start_timer(*ti) : false;}
void stop_timer(tid id) {auto ti = find_timer(id); if (nullptr != ti) stop_timer(*ti);}
void stop_all_timer() {do_something_to_all([this](timer_info& item) {this->stop_timer(item);});}
void stop_all_timer(tid excepted_id) {do_something_to_all([=](timer_info& item) {if (excepted_id != item.id) this->stop_timer(item);});}
DO_SOMETHING_TO_ALL(timer_can)
DO_SOMETHING_TO_ONE(timer_can)
DO_SOMETHING_TO_ALL_MUTEX(timer_can, timer_can_mutex)
DO_SOMETHING_TO_ONE_MUTEX(timer_can, timer_can_mutex)
protected:
void start_timer(timer_info& ti)
bool start_timer(timer_info& ti)
{
assert(timer_info::TIMER_OK == ti.status);
if (!ti.call_back)
return false;
ti.status = timer_info::TIMER_STARTED;
#if ASIO_VERSION >= 101100
ti.timer->expires_after(milliseconds(ti.interval_ms));
ti.timer.expires_after(milliseconds(ti.interval_ms));
#else
ti.timer->expires_from_now(milliseconds(ti.interval_ms));
ti.timer.expires_from_now(milliseconds(ti.interval_ms));
#endif
//if timer already started, this will cancel it first
#if (defined(_MSC_VER) && _MSC_VER > 1800) || (defined(__cplusplus) && __cplusplus > 201103L)
ti.timer->async_wait(this->make_handler_error([this, &ti, prev_seq(++ti.seq)](const asio::error_code& ec) {
ti.timer.async_wait(this->make_handler_error([this, &ti, prev_seq(++ti.seq)](const asio::error_code& ec) {
#else
auto prev_seq = ++ti.seq;
ti.timer->async_wait(this->make_handler_error([this, &ti, prev_seq](const asio::error_code& ec) {
ti.timer.async_wait(this->make_handler_error([this, &ti, prev_seq](const asio::error_code& ec) {
#endif
if (!ec && ti.call_back(ti.id) && timer_info::TIMER_OK == ti.status)
#ifdef ASCS_ALIGNED_TIMER
auto begin_time = std::chrono::system_clock::now();
if (!ec && ti.call_back(ti.id) && timer_info::TIMER_STARTED == ti.status)
{
auto elapsed_ms = (unsigned) std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - begin_time).count();
if (elapsed_ms > ti.interval_ms)
elapsed_ms %= ti.interval_ms;
ti.interval_ms -= elapsed_ms;
this->start_timer(ti);
ti.interval_ms += elapsed_ms;
}
#else
if (!ec && ti.call_back(ti.id) && timer_info::TIMER_STARTED == ti.status)
this->start_timer(ti);
#endif
else if (prev_seq == ti.seq) //exclude a particular situation--start the same timer in call_back and return false
ti.status = timer_info::TIMER_CANCELED;
}));
return true;
}
void stop_timer(timer_info& ti)
{
if (timer_info::TIMER_OK == ti.status) //enable stopping timers that has been stopped
if (timer_info::TIMER_STARTED == ti.status) //enable stopping timers that has been stopped
{
try {ti.timer->cancel();} catch (const asio::system_error& e) {unified_out::error_out("cannot stop timer %d (%d %s)", ti.id, e.code().value(), e.what());}
try {ti.timer.cancel();}
catch (const asio::system_error& e) {unified_out::error_out("cannot stop timer %d (%d %s)", ti.id, e.code().value(), e.what());}
ti.status = timer_info::TIMER_CANCELED;
}
}
private:
typedef std::list<timer_info> container_type;
container_type timer_can;
std::mutex timer_can_mutex;
using Executor::io_context_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册