提交 e4be5094 编写于 作者: Y youngwolf

1.3.4 release.

A container holding messages that were failed to send will be provided via on_send_error virtual function.
Failure of binding or listening in server_base will not stop the service_pump any more.
Virtual function client_socket_base::prepare_reconnect() now only control the retry times and delay time after reconnecting failed.
Expose server_base's acceptor via next_layer().
Prefix suffix packer and unpacker support heartbeat.
New demo socket_management demonstrates how to manage sockets if you use other keys rather than the original id.
Control reconnecting more flexibly, see function client_socket_base::open_reconnect and client_socket_base::close_reconnect for more details.
上级 96477b3e
......@@ -25,6 +25,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "concurrent_client", "concur
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "concurrent_server", "concurrent_server\concurrent_server.vcxproj", "{827343A9-D15E-4070-909D-2DFCC7CA8574}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "socket_management", "socket_management\socket_management.vcxproj", "{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Win32 = Debug|Win32
......@@ -121,6 +123,14 @@ Global
{827343A9-D15E-4070-909D-2DFCC7CA8574}.Release|Win32.Build.0 = Release|Win32
{827343A9-D15E-4070-909D-2DFCC7CA8574}.Release|x64.ActiveCfg = Release|x64
{827343A9-D15E-4070-909D-2DFCC7CA8574}.Release|x64.Build.0 = Release|x64
{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}.Debug|Win32.ActiveCfg = Debug|Win32
{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}.Debug|Win32.Build.0 = Debug|Win32
{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}.Debug|x64.ActiveCfg = Debug|x64
{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}.Debug|x64.Build.0 = Debug|x64
{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}.Release|Win32.ActiveCfg = Release|Win32
{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}.Release|Win32.Build.0 = Release|Win32
{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}.Release|x64.ActiveCfg = Release|x64
{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......
......@@ -19,6 +19,7 @@ release debug clean :
cd pingpong_client && ${ST_MAKE}
cd concurrent_server && ${ST_MAKE}
cd concurrent_client && ${ST_MAKE}
cd socket_management && ${ST_MAKE}
cd udp_test && ${ST_MAKE}
cd ssl_test && ${ST_MAKE}
#ifndef _CLIENT_H_
#define _CLIENT_H_
extern bool add_link(const std::string& name, uint_fast64_t id);
extern bool del_link(const std::string& name);
extern uint_fast64_t find_link(const std::string& name);
extern uint_fast64_t find_and_del_link(const std::string& name);
class my_client_socket : public client_socket
{
public:
my_client_socket(asio::io_context& io_context_) : client_socket(io_context_)
{
std::dynamic_pointer_cast<prefix_suffix_packer>(packer())->prefix_suffix("", "\n");
std::dynamic_pointer_cast<prefix_suffix_unpacker>(unpacker())->prefix_suffix("", "\n");
}
void name(const std::string& name_) {_name = name_;}
const std::string& name() const {return _name;}
protected:
//disable reconnecting
//disconnect(bool), force_shutdown(bool) and graceful_shutdown(bool, bool) can overwrite reconnecting behavior, and reset() virtual function will
// open reconnecting too, so we close reconnecting repeatedly at here. you also can rewrite after_close() virtual function and do nothing in it.
virtual void on_connect() {close_reconnect();}
//msg handling
virtual bool on_msg_handle(out_msg_type& msg) {printf("received: %s, I'm %s\n", msg.data(), _name.data()); return true;}
//msg handling end
virtual void on_recv_error(const asio::error_code& ec) {del_link(_name); client_socket::on_recv_error(ec);}
private:
std::string _name;
};
class my_client : public multi_client_base<my_client_socket>
{
public:
my_client(service_pump& service_pump_) : multi_client_base<my_client_socket>(service_pump_) {}
bool add_link(const std::string& name)
{
auto socket_ptr = create_object();
assert(socket_ptr);
if (::add_link(name, socket_ptr->id()))
{
//socket_ptr->set_server_addr(9527, "127.0.0.1"); //if you want to set server ip, do it at here like this
if (!add_socket(socket_ptr)) //exceed ST_ASIO_MAX_OBJECT_NUM
::del_link(name);
else
{
socket_ptr->name(name);
return true;
}
}
return false;
}
bool del_link(const std::string& name)
{
auto socket_ptr = find(find_and_del_link(name));
return socket_ptr ? (socket_ptr->force_shutdown(false), true) : false;
}
bool send_msg(const std::string& name, const std::string& msg)
{
auto socket_ptr = find(find_link(name));
return socket_ptr ? socket_ptr->send_msg(msg) : false;
}
};
#endif //#define _CLIENT_H_
module = socket_management
include ../config.mk
#ifndef _SERVER_H_
#define _SERVER_H_
class my_server_socket : public server_socket
{
public:
my_server_socket(i_server& server_) : server_socket(server_)
{
std::dynamic_pointer_cast<prefix_suffix_packer>(packer())->prefix_suffix("", "\n");
std::dynamic_pointer_cast<prefix_suffix_unpacker>(unpacker())->prefix_suffix("", "\n");
}
protected:
//msg handling
virtual bool on_msg_handle(out_msg_type& msg) {return send_msg(msg + " (from the server)");}
//msg handling end
};
typedef server_base<my_server_socket> my_server;
#endif //#define _SERVER_H_
#include <iostream>
#include <map>
//configuration
#define ASCS_REUSE_OBJECT //use objects pool
#define ASCS_HEARTBEAT_INTERVAL 5
#define ASCS_AVOID_AUTO_STOP_SERVICE
#define ASCS_DEFAULT_PACKER prefix_suffix_packer
#define ASCS_DEFAULT_UNPACKER prefix_suffix_unpacker
//configuration
#include <ascs/ext/tcp.h>
using namespace ascs;
using namespace ascs::tcp;
using namespace ascs::ext;
using namespace ascs::ext::tcp;
#include "server.h"
#include "client.h"
static std::map<std::string, uint_fast64_t> link_map;
static std::mutex link_map_mutex;
bool add_link(const std::string& name, uint_fast64_t id)
{
std::lock_guard<std::mutex> lock(link_map_mutex);
if (link_map.count(name) > 0)
{
printf("%s already exists.\n", name.data());
return false;
}
printf("add socket %s.\n", name.data());
link_map[name] = id;
return true;
}
bool del_link(const std::string& name)
{
std::lock_guard<std::mutex> lock(link_map_mutex);
return link_map.erase(name) > 0;
}
uint_fast64_t find_link(const std::string& name)
{
std::lock_guard<std::mutex> lock(link_map_mutex);
auto iter = link_map.find(name);
return iter != std::end(link_map) ? iter->second : -1;
}
uint_fast64_t find_and_del_link(const std::string& name)
{
uint_fast64_t id = -1;
std::lock_guard<std::mutex> lock(link_map_mutex);
auto iter = link_map.find(name);
if (iter != std::end(link_map))
{
id = iter->second;
link_map.erase(iter);
}
return id;
}
int main(int argc, const char* argv[])
{
service_pump sp;
my_server server(sp);
my_client client(sp);
sp.start_service();
while(sp.is_running())
{
std::string str;
std::getline(std::cin, str);
if (str.empty())
;
else if ("quit" == str)
sp.stop_service();
else
{
auto parameters = split_string(str);
auto iter = std::begin(parameters);
if (iter == std::end(parameters))
continue;
if ("add" == *iter)
{
++iter;
if (iter != std::end(parameters))
client.add_link(*iter);
}
else if ("del" == *iter)
{
++iter;
if (iter != std::end(parameters))
client.del_link(*iter);
}
else
{
std::string name = *iter++;
for (; iter != std::end(parameters); ++iter)
client.send_msg(name, *iter);
}
}
}
return 0;
}
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{6CCBD6A3-D5BF-4568-9ED5-860D19B6A2C7}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<RootNamespace>socket_management</RootNamespace>
<WindowsTargetPlatformVersion>10.0.16299.0</WindowsTargetPlatformVersion>
<ProjectName>socket_management</ProjectName>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<CharacterSet>MultiByte</CharacterSet>
<PlatformToolset>v141</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<CharacterSet>MultiByte</CharacterSet>
<PlatformToolset>v141</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>MultiByte</CharacterSet>
<PlatformToolset>v141</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>MultiByte</CharacterSet>
<PlatformToolset>v141</PlatformToolset>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="PropertySheets">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="PropertySheets">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>C:\Users\wolf\Documents\GitHub\asio\asio\include\;C:\Users\wolf\Documents\GitHub\ascs\include\;$(IncludePath)</IncludePath>
<LibraryPath>$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>C:\Users\wolf\Documents\GitHub\asio\asio\include\;C:\Users\wolf\Documents\GitHub\ascs\include\;$(IncludePath)</IncludePath>
<LibraryPath>$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>C:\Users\wolf\Documents\GitHub\asio\asio\include\;C:\Users\wolf\Documents\GitHub\ascs\include\;$(IncludePath)</IncludePath>
<LibraryPath>$(LibraryPath)</LibraryPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>C:\Users\wolf\Documents\GitHub\asio\asio\include\;C:\Users\wolf\Documents\GitHub\ascs\include\;$(IncludePath)</IncludePath>
<LibraryPath>$(LibraryPath)</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PrecompiledHeaderFile />
<PrecompiledHeaderOutputFile />
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PrecompiledHeaderFile />
<PrecompiledHeaderOutputFile />
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PrecompiledHeaderFile />
<PrecompiledHeaderOutputFile />
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;ASIO_STANDALONE;ASIO_NO_DEPRECATED;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<PrecompiledHeaderFile />
<PrecompiledHeaderOutputFile />
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="socket_management.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="client.h" />
<ClInclude Include="server.h" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
\ No newline at end of file
......@@ -212,7 +212,7 @@ protected:
public:
virtual void reset() {}
//heartbeat must not be included in msg_can, otherwise you must handle heartbeat at where you handle normal messges.
//heartbeat must not be included in msg_can, otherwise you must handle heartbeat at where you handle normal messages.
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) = 0;
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred) {return 0;}
virtual buffer_type prepare_next_recv() = 0;
......
......@@ -12,14 +12,14 @@
* license: http://think-async.com/ (current is www.boost.org/LICENSE_1_0.txt)
*
* Known issues:
* 1. since 1.1.0 until 1.3, concurrentqueue is not a FIFO queue (it is by design), navigate to the following links for more deatils:
* 1. since 1.1.0 until 1.3, concurrentqueue is not a FIFO queue (it is by design), navigate to the following links for more details:
* https://github.com/cameron314/concurrentqueue/issues/6
* https://github.com/cameron314/concurrentqueue/issues/52
* if you're using concurrentqueue, please play attention, this is by design.
* 2. since 1.1.5 until 1.2, heartbeat function cannot work properly between windows (at least win-10) and Ubuntu (at least Ubuntu-16.04).
* 3. since 1.1.5 until 1.2, UDP doesn't support heartbeat because UDP doesn't support OOB data.
* 4. since 1.1.5 until 1.2, SSL doesn't support heartbeat because SSL doesn't support OOB data.
* 5. with old openssl (at least 0.9.7), ssl::client_socket_base and ssl_server_socket_base are not reuable, i'm not sure in which version,
* 5. with old openssl (at least 0.9.7), ssl::client_socket_base and ssl_server_socket_base are not reusable, I'm not sure in which version,
* they became available, seems it's 1.0.0.
*
* 2016.9.25 version 1.0.0
......@@ -97,7 +97,7 @@
* SPECIAL ATTENTION (incompatible with old editions):
* Virtual function reset_state in i_packer and i_unpacker have been renamed to reset.
* Virtual function is_send_allowed has been renamed to is_ready, it also means ready to receive messages
* since message sending is not suspendable any more.
* since message sending cannot be suspended any more.
* Virtual function on_msg_handle has been changed, the link_down variable will not be presented any more.
* Interface i_server::del_client has been renamed to i_server::del_socket.
* Function inner_packer and inner_unpacker have been renamed to packer and unpacker.
......@@ -239,7 +239,7 @@
*
* FIX:
* If start the same timer and return false in the timer's call_back, its status will be set to TIMER_CANCELED (the right value should be TIMER_OK).
* In old compilers (for example gcc 4.7), std::list::splice needs a non-const iterator as the insert point.
* In old compilers (for example gcc 4.7), std::list::splice needs a non-constant iterator as the insert point.
* If call stop_service after service_pump stopped, timer TIMER_DELAY_CLOSE will be left behind and be triggered after the next start_service,
* this will bring disorders to ascs::socket.
*
......@@ -257,7 +257,7 @@
* Drop useless variables which need macro ASCS_DECREASE_THREAD_AT_RUNTIME to be defined.
*
* REFACTORING:
* Move variable last_send_time and last_recv_time from ascs::socket to ascs::socet::stat (a statistic object).
* Move variable last_send_time and last_recv_time from ascs::socket to ascs::socket::stat (a statistic object).
* Move common operations in client_socket_base::do_start and server_socket_base::do_start to tcp::socket_base::do_start and socket::do_start.
*
* REPLACEMENTS:
......@@ -289,15 +289,15 @@
* 2018.4.10 version 1.2.6
*
* SPECIAL ATTENTION (incompatible with old editions):
* Do reconnecting in client_socket_base::after_close rather in client_socket_base::on_close.
* Do reconnecting in client_socket_base::after_close rather than in client_socket_base::on_close.
*
* HIGHLIGHT:
*
* FIX:
* Reconnectiong may happen in ascs::socket::reset, it's not a right behavior.
* Reconnecting may happen in ascs::socket::reset, it's not a right behavior.
*
* ENHANCEMENTS:
* Add ascs::socket::after_close virtual function, a good case for using it is to reconnect to the server in client_socket_base.
* Add ascs::socket::after_close virtual function, a good case for using it is to reconnect the server in client_socket_base.
*
* DELETION:
*
......@@ -329,7 +329,7 @@
*
* HIGHLIGHT:
* After introduced asio::io_context::strand (which is required, see FIX section for more details), we wiped two atomic in ascs::socket.
* Introduced macro ASCS_DISPATCH_BATCH_MSG, then all messages will be dispatched via on_handle_msg with a variable-length contianer.
* Introduced macro ASCS_DISPATCH_BATCH_MSG, then all messages will be dispatched via on_handle_msg with a variable-length container.
*
* FIX:
* Wiped race condition between async_read and async_write on the same ascs::socket, so sync sending mode will not be supported any more.
......@@ -363,7 +363,7 @@
*
* HIGHLIGHT:
* Support Cygwin and Mingw.
* Dynamically allocate timers when needed (multithreading releated behaviors kept as before, so we must introduce a mutex for ascs::timer object).
* Dynamically allocate timers when needed (multi-threading related behaviors kept as before, so we must introduce a mutex for ascs::timer object).
*
* FIX:
*
......@@ -390,7 +390,7 @@
*
* HIGHLIGHT:
* Fully support sync message sending and receiving (even be able to mix with async message sending and receiving without any limitations), but please note
* that this feature will slightly impact efficiency even if you always use async message sending and receiving, so only open this feature when realy needed.
* that this feature will slightly impact efficiency even if you always use async message sending and receiving, so only open this feature when really needed.
*
* FIX:
* Fix race condition when aligning timers, see macro ASCS_ALIGNED_TIMER for more details.
......@@ -441,6 +441,31 @@
*
* REPLACEMENTS:
*
* ===============================================================
* 2019.1.1 version 1.3.4
*
* SPECIAL ATTENTION (incompatible with old editions):
* The virtual function socket::on_send_error has been moved to tcp::socket_base and udp::socket_base.
* The signature of virtual function socket::on_send_error has been changed, a container holding messages that were failed to send will be provided.
* Failure of binding or listening in server_base will not stop the service_pump any more.
* Virtual function client_socket_base::prepare_reconnect() now only control the retry times and delay time after reconnecting failed.
*
* HIGHLIGHT:
*
* FIX:
*
* ENHANCEMENTS:
* Expose server_base's acceptor via next_layer().
* Prefix suffix packer and unpacker support heartbeat.
* New demo socket_management demonstrates how to manage sockets if you use other keys rather than the original id.
* Control reconnecting more flexibly, see function client_socket_base::open_reconnect and client_socket_base::close_reconnect for more details.
*
* DELETION:
*
* REFACTORING:
*
* REPLACEMENTS:
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -450,8 +475,8 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10303 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.3.3"
#define ASCS_VER 10304 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.3.4"
//asio and compiler check
#ifdef _MSC_VER
......@@ -649,7 +674,7 @@ static_assert(ASCS_ASYNC_ACCEPT_NUM > 0, "async accept number must be bigger tha
#ifndef ASCS_OUTPUT_CONTAINER
#define ASCS_OUTPUT_CONTAINER list
#endif
//we also can control the queues (and their containers) via template parameters on calss 'client_socket_base'
//we also can control the queues (and their containers) via template parameters on class 'client_socket_base'
//'server_socket_base', 'ssl::client_socket_base' and 'ssl::server_socket_base'.
//we even can let a socket to use different queue (and / or different container) for input and output via template parameters.
......@@ -707,7 +732,7 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus
//this value can be changed via ascs::socket::msg_handling_interval(size_t) at runtime.
//#define ASCS_PASSIVE_RECV
//to gain the ability of changing the unpacker at runtime, with this mcro, ascs will not do message receiving automatically (except the firt one),
//to gain the ability of changing the unpacker at runtime, with this macro, ascs will not do message receiving automatically (except the first one),
// so you need to manually call recv_msg(), if you need to change the unpacker, do it before recv_msg() invocation, please note.
//during async message receiving, calling recv_msg() will fail, this is by design to avoid asio::io_context using up all virtual memory.
//because user can greedily call recv_msg(), it's your responsibility to keep the recv buffer from overflowed, please pay special attention.
......@@ -722,7 +747,7 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus
//#define ASCS_ALIGNED_TIMER
//for example, start a timer at xx:xx:xx, interval is 10 seconds, the callback will be called at (xx:xx:xx + 10), and suppose that the callback
//returned at (xx:xx:xx + 11), then the interval will be temporarily changed to 9 seconds to make the next callback to be called at (xx:xx:xx + 20),
//if you don't define this macro, the next callback will be called at (xx:xx:xx + 21), plase note.
//if you don't define this macro, the next callback will be called at (xx:xx:xx + 21), please note.
//#define ASCS_SYNC_SEND
#ifdef ASCS_SYNC_SEND
......@@ -736,7 +761,7 @@ static_assert(ASIO_HAS_STD_FUTURE == 1, "sync message sending needs std::future.
// sync_safe_send_native_msg
// sync_recv_msg
//please note that:
// this feature will slightly impact efficiency even if you always use async message sending and receiving, so only open this feature when realy needed.
// this feature will slightly impact efficiency even if you always use async message sending and receiving, so only open this feature when really needed.
// we must avoid to do sync message sending and receiving in service threads.
// if prior sync_recv_msg() not returned, the second sync_recv_msg() will return false immediately.
// with macro ASCS_PASSIVE_RECV, in sync_recv_msg(), recv_msg() will be automatically called.
......
......@@ -171,9 +171,8 @@ public:
return msg;
}
//not support heartbeat because prefix_suffix_unpacker cannot recognize heartbeat message, but it's possible to make
//prefix_suffix_unpacker to be able to recognize heartbeat message, just need some changes.
virtual msg_type pack_heartbeat() {return msg_type(1, '\n');}
virtual char* raw_data(msg_type& msg) const {return const_cast<char*>(std::next(msg.data(), _prefix.size()));}
virtual const char* raw_data(msg_ctype& msg) const {return std::next(msg.data(), _prefix.size());}
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - _prefix.size() - _suffix.size();}
......
......@@ -394,11 +394,14 @@ public:
auto min_len = _prefix.size() + _suffix.size();
while (0 == peek_msg(remain_len, pnext) && (size_t) -1 != cur_msg_len && 0 != cur_msg_len)
{
assert(cur_msg_len > min_len);
if (stripped())
msg_can.emplace_back(std::next(pnext, _prefix.size()), cur_msg_len - min_len);
else
msg_can.emplace_back(pnext, cur_msg_len);
assert(cur_msg_len >= min_len);
if (cur_msg_len > min_len) //exclude heartbeat
{
if (stripped())
msg_can.emplace_back(std::next(pnext, _prefix.size()), cur_msg_len - min_len);
else
msg_can.emplace_back(pnext, cur_msg_len);
}
remain_len -= cur_msg_len;
std::advance(pnext, cur_msg_len);
cur_msg_len = -1;
......
......@@ -96,8 +96,8 @@ protected:
//change object_ptr's id to id, and reinsert it into object_can.
//there MUST exist an object in invalid_object_can whose id is equal to id to guarantee the id has been abandoned
// (checking existence of such object in object_can is NOT enough, because there're some sockets used by async
// acception, they don't exist in object_can nor invalid_object_can), further more, the invalid object MUST be
// (checking existence of such object in object_can is NOT enough, because there are some sockets used by async
// acceptance, they don't exist in object_can nor invalid_object_can), further more, the invalid object MUST be
//obsoleted and has no additional reference.
//return the invalid object (null means failure), please note that the invalid object has been removed from invalid_object_can.
object_type change_object_id(object_ctype& object_ptr, uint_fast64_t id)
......@@ -314,11 +314,11 @@ private:
std::mutex object_can_mutex;
size_t max_size_;
//because all objects are dynamic created and stored in object_can, maybe when receiving error occur
//(you are recommended to delete the object from object_can, for example via i_server::del_socket), some other asynchronous calls are still queued in asio::io_context,
//and will be dequeued in the future, we must guarantee these objects not be freed from the heap or reused, so we move these objects from object_can to invalid_object_can,
//and free them from the heap or reuse them in the near future.
//if ASCS_CLEAR_OBJECT_INTERVAL been defined, clear_obsoleted_object() will be invoked automatically and periodically to move all invalid objects into invalid_object_can.
//because all objects are dynamic created and stored in object_can, after receiving error occurred (you are recommended to delete the object from object_can,
//for example via i_server::del_socket), maybe some other asynchronous calls are still queued in asio::io_context, and will be dequeued in the future,
//we must guarantee these objects not be freed from the heap or reused, so we move these objects from object_can to invalid_object_can, and free them
//from the heap or reuse them in the near future. if ASCS_CLEAR_OBJECT_INTERVAL been defined, clear_obsoleted_object() will be invoked automatically and
//periodically to move all invalid objects into invalid_object_can.
list<object_type> invalid_object_can;
std::mutex invalid_object_can_mutex;
};
......
......@@ -277,8 +277,12 @@ private:
{
assert(nullptr != i_service_);
std::lock_guard<std::mutex> lock(service_can_mutex);
std::unique_lock<std::mutex> lock(service_can_mutex);
service_can.emplace_back(i_service_);
lock.unlock();
if (is_service_started())
unified_out::warning_out("service been added, please remember to call start_service for it!");
}
private:
......
......@@ -266,9 +266,7 @@ protected:
return true;
}
//generally, you don't have to rewrite this to maintain the status of connections (TCP)
virtual void on_send_error(const asio::error_code& ec) {unified_out::error_out("send msg error (%d %s)", ec.value(), ec.message().data());}
virtual void on_recv_error(const asio::error_code& ec) = 0; //receiving error or peer endpoint quit(false ec means ok)
virtual void on_recv_error(const asio::error_code& ec) = 0; //receiving error or peer endpoint quit(false ec means okay)
virtual bool on_heartbeat_error() = 0; //heartbeat timed out, return true to continue heartbeat function (useful for UDP)
//if ASCS_DELAY_CLOSE is equal to zero, in this callback, socket guarantee that there's no any other async call associated it,
......@@ -276,7 +274,7 @@ protected:
// in this socket except this socket itself, because this socket maybe is being maintained by object_pool.
//otherwise (bigger than zero), socket simply call this callback ASCS_DELAY_CLOSE seconds later after link down, no any guarantees.
virtual void on_close() {unified_out::info_out("on_close()");}
virtual void after_close() {} //a good case for using this is to reconnect to the server, please refer to client_socket_base.
virtual void after_close() {} //a good case for using this is to reconnect the server, please refer to client_socket_base.
#ifdef ASCS_SYNC_DISPATCH
//return the number of handled msg, if some msg left behind, socket will re-dispatch them asynchronously
......
......@@ -34,11 +34,9 @@ public:
template<typename Arg>
client_socket_base(asio::io_context& io_context_, Arg& arg) : super(io_context_, arg), need_reconnect(true) {set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
//reset all, be ensure that there's no any operations performed on this socket when invoke it
//subclass must re-write this function to initialize itself, and then do not forget to invoke superclass' reset function too
//notice, when reusing this socket, object_pool will invoke this function
//reset all, be ensure that no operations performed on this socket when invoke it, subclass must rewrite this function to initialize itself, and then
// call superclass' reset function, before reusing this socket, object_pool will invoke this function
virtual void reset() {need_reconnect = true; super::reset();}
virtual bool obsoleted() {return !need_reconnect && super::obsoleted();}
bool set_server_addr(unsigned short port, const std::string& ip = ASCS_SERVER_IP)
{
......@@ -56,7 +54,14 @@ public:
}
const asio::ip::tcp::endpoint& get_server_addr() const {return server_addr;}
//if the connection is broken unexpectedly, client_socket_base will try to reconnect to the server automatically.
//if you don't want to reconnect to the server after link broken, call close_reconnect() or rewrite after_close() virtual function and do nothing in it,
//if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function.
//disconnect(bool), force_shutdown(bool) and graceful_shutdown(bool, bool) can overwrite reconnecting behavior, please note.
//reset() virtual function will open reconnecting, please note.
void open_reconnect() {need_reconnect = true;}
void close_reconnect() {need_reconnect = false;}
//if the connection is broken unexpectedly, client_socket_base will try to reconnect to the server automatically (if need_reconnect is true).
void disconnect(bool reconnect = false) {force_shutdown(reconnect);}
void force_shutdown(bool reconnect = false)
{
......@@ -95,11 +100,11 @@ protected:
{
if (!ec) //already started, so cannot call start()
super::do_start();
else
else if (need_reconnect)
prepare_next_reconnect(ec);
}
//after how much time(ms), client_socket_base will try to reconnect to the server, negative value means give up.
//after how much time (ms), client_socket_base will try to reconnect the server, negative value means give up.
virtual int prepare_reconnect(const asio::error_code& ec) {return ASCS_RECONNECT_INTERVAL;}
virtual void on_connect() {unified_out::info_out("connecting success.");}
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); force_shutdown();}
......@@ -107,7 +112,7 @@ protected:
{
this->show_info("client link:", "broken/been shut down", ec);
force_shutdown(this->is_shutting_down() ? need_reconnect : prepare_reconnect(ec) >= 0);
force_shutdown(need_reconnect);
this->status = super::link_status::BROKEN;
}
......@@ -115,16 +120,19 @@ protected:
virtual bool on_heartbeat_error()
{
this->show_info("client link:", "broke unexpectedly.");
force_shutdown(this->is_shutting_down() ? need_reconnect : prepare_reconnect(asio::error_code(asio::error::network_down)) >= 0);
force_shutdown(need_reconnect);
return false;
}
//reconnect at here rather than in on_recv_error to make sure that there's no any async invocations performed on this socket before reconnectiong
//reconnect at here rather than in on_recv_error to make sure no async invocations performed on this socket before reconnecting.
//if you don't want to reconnect the server after link broken, rewrite this virtual function and do nothing in it or call close_reconnt().
//if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function.
virtual void after_close() {if (need_reconnect) this->start();}
bool prepare_next_reconnect(const asio::error_code& ec)
{
if (this->started() && (asio::error::operation_aborted != ec || need_reconnect) && !this->stopped())
if (this->started() && !this->stopped())
{
#ifdef _WIN32
if (asio::error::connection_refused != ec && asio::error::network_unreachable != ec && asio::error::timed_out != ec)
......
......@@ -50,6 +50,9 @@ public:
void stop_listen() {asio::error_code ec; acceptor.cancel(ec); acceptor.close(ec);}
bool is_listening() const {return acceptor.is_open();}
asio::ip::tcp::acceptor& next_layer() {return acceptor;}
const asio::ip::tcp::acceptor& next_layer() const {return acceptor;}
//implement i_server's pure virtual functions
virtual service_pump& get_service_pump() {return Pool::get_service_pump();}
virtual const service_pump& get_service_pump() const {return Pool::get_service_pump();}
......@@ -63,7 +66,7 @@ public:
return this->del_object(raw_socket_ptr);
}
//restore the invalid socket whose id is equal to id, if successful, socket_ptr's take_over function will be invoked,
//you can restore the invalid socket to socket_ptr, everything is restorable except socket::next_layer_ (on the other
//you can restore the invalid socket to socket_ptr, everything can be restored except socket::next_layer_ (on the other
//hand, restore socket::next_layer_ doesn't make any sense).
virtual bool restore_socket(const std::shared_ptr<tracked_executor>& socket_ptr, uint_fast64_t id)
{
......@@ -116,7 +119,7 @@ protected:
acceptor.set_option(asio::ip::tcp::acceptor::reuse_address(true), ec); assert(!ec);
#endif
acceptor.bind(server_addr, ec); assert(!ec);
if (ec) {get_service_pump().stop(); unified_out::error_out("bind failed."); return false;}
if (ec) {unified_out::error_out("bind failed."); return false;}
auto num = async_accept_num();
assert(num > 0);
......@@ -143,7 +146,7 @@ protected:
#else
acceptor.listen(asio::ip::tcp::acceptor::max_connections, ec); assert(!ec);
#endif
if (ec) {get_service_pump().stop(); unified_out::error_out("listen failed."); return false;}
if (ec) {unified_out::error_out("listen failed."); return false;}
ascs::do_something_to_all(sockets, [this](typename Pool::object_ctype& item) {this->do_async_accept(item);});
this->start();
......
......@@ -179,6 +179,12 @@ protected:
return super::do_start();
}
//generally, you don't have to rewrite this to maintain the status of connections
//msg_can contains messages that were failed to send and tcp::socket_base will not hold them any more, if you want to re-send them in the future,
// you must take over them and re-send (at any time) them via direct_send_msg.
//DO NOT hold msg_can for future using, just swap its content with your own container in this virtual function.
virtual void on_send_error(const asio::error_code& ec, list<typename super::in_msg>& msg_can) {unified_out::error_out("send msg error (%d %s)", ec.value(), ec.message().data());}
#ifdef ASCS_SYNC_SEND
virtual void on_close() {ascs::do_something_to_all(last_send_msg,
[](typename super::in_msg& msg) {if (msg.p) msg.p->set_value(sync_call_result::NOT_APPLICABLE);}); super::on_close();}
......@@ -325,7 +331,7 @@ private:
}
else
{
this->on_send_error(ec);
on_send_error(ec, last_send_msg);
last_send_msg.clear(); //clear sending messages after on_send_error, then user can decide how to deal with them in on_send_error
sending = false;
......
......@@ -122,6 +122,11 @@ public:
///////////////////////////////////////////////////
protected:
//msg was failed to send and udp::socket_base will not hold it any more, if you want to re-send it in the future,
// you must take over it and re-send (at any time) it via direct_send_msg.
//DO NOT hold msg for future using, just swap its content with your own message in this virtual function.
virtual void on_send_error(const asio::error_code& ec, typename super::in_msg& msg) {unified_out::error_out("send msg error (%d %s)", ec.value(), ec.message().data());}
virtual void on_recv_error(const asio::error_code& ec)
{
if (asio::error::operation_aborted != ec)
......@@ -250,7 +255,7 @@ private:
#endif
}
else
this->on_send_error(ec);
on_send_error(ec, last_send_msg);
last_send_msg.clear(); //clear sending message after on_send_error, then user can decide how to deal with it in on_send_error
if (ec && (asio::error::not_socket == ec || asio::error::bad_descriptor == ec))
......@@ -258,7 +263,7 @@ private:
//send msg in sequence
//on windows, sending a msg to addr_any may cause errors, please note
//for UDP, sending error will not stop subsequent sendings.
//for UDP, sending error will not stop subsequent sending.
if (!do_send_msg(true) && !send_msg_buffer.empty())
do_send_msg(true); //just make sure no pending msgs
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册