提交 7b42c4b5 编写于 作者: Y youngwolf

1.2.6 release.

Fix: Reconnectiong may happen in ascs::socket::reset, it's not a right behavior.
Do reconnecting in client_socket_base::after_close rather in client_socket_base::on_close.
上级 4b5ad024
......@@ -2,25 +2,29 @@ ascs (a successor of st_asio_wrapper)
===============
Overview
-
ascs is an asynchronous c/s framework based on Asio(non-Boost), besides all benefits brought by Asio, it also contains: </br>
ascs is an asynchronous c/s framework based on Asio(non-Boost edition, http://think-async.com/), besides all benefits brought by Asio, it also contains: </br>
1. Based on message just like UDP with several couple of build-in packer and unpacker;</br>
2. Support packer and unpacker customization, and replacing packer and unpacker at run-time;</br>
3. Automatically reconnect to the server after link broken;</br>
4. Widely support timers;</br>
5. Support object pool, object reusing;</br>
4. Support object pool, object reusing and restoration;</br>
5. Worker thread management; </br>
6. Support message buffer;</br>
7. Support ssl;</br>
8. Support TCP/UDP.</br>
7. Widely support timers;</br>
8. Support TCP/UDP;</br>
9. Support ssl;</br>
10. Can work alone or with Boost.</br>
Suggest using ascs if c++0x or higher is available (even you used Boost, but not Boost.Asio), otherwise st_asio_wrapper.
And Asio(non-Boost edition) needs macro ASIO_STANDALONE to be defined, please note.
Quick start:
-
### server:
Derive your own socket from `server_socket_base`, you must at least re-write the `on_msg` or `on_msg_handle` virtual function and handle messages in it;</br>
Create a `service_pump` object, create a `server_base` object, call `service_pump::start_service`;</br>
Create a `service_pump` object, create a `server_base<your_socket>` object, call `service_pump::start_service`;</br>
Call `server_socket_base::send_msg` when you have messages need to send.</br>
### client:
Derive your own socket from `client_socket_base`, you must at least re-write the `on_msg` or `on_msg_handle` virtual function and handle messages in it;</br>
Create a `service_pump` object, create a `tcp::client_base` object, set server address via `client_socket_base::set_server_addr`, call `service_pump::start_service`;</br>
Create a `service_pump` object, create a `multi_client_base<your_socket>` object, add some socket via `multi_client_base::add_socket`, call `service_pump::start_service`;</br>
Call `client_socket_base::send_msg` when you have messages need to send.</br>
Directory structure:
......
......@@ -351,7 +351,8 @@ void send_msg_concurrently(echo_client& client, size_t send_thread_num, size_t m
do_something_to_all(threads, [](std::thread& t) {t.join();});
begin_time.stop();
printf(" finished in %f seconds, speed: %f(*2) MBps.\n", begin_time.elapsed(), total_msg_bytes / begin_time.elapsed() / 1024 / 1024);
printf(" finished in %f seconds, TPS: %f(*2), speed: %f(*2) MBps.\n",
begin_time.elapsed(), link_num * msg_num / begin_time.elapsed(), total_msg_bytes / begin_time.elapsed() / 1024 / 1024);
}
static bool is_testing;
......
......@@ -110,7 +110,7 @@ protected:
{
//the type of tcp::server_socket_base::server now can be controlled by derived class(echo_socket),
//which is actually i_echo_server, so, we can invoke i_echo_server::test virtual function.
server.test();
get_server().test();
server_socket_base::on_recv_error(ec);
}
......@@ -171,14 +171,9 @@ public:
normal_socket(i_server& server_) : server_socket_base(server_) {}
protected:
virtual bool do_start()
{
//demo client needs heartbeat (macro ASCS_HEARTBEAT_INTERVAL been defined), pleae note that the interval (here is 5) must be equal to
//macro ASCS_HEARTBEAT_INTERVAL defined in demo client, and macro ASCS_HEARTBEAT_MAX_ABSENCE must has the same value as demo client's.
start_heartbeat(5);
return server_socket_base::do_start();
}
//demo client needs heartbeat (macro ASCS_HEARTBEAT_INTERVAL been defined), pleae note that the interval (here is 5) must be equal to
//macro ASCS_HEARTBEAT_INTERVAL defined in demo client, and macro ASCS_HEARTBEAT_MAX_ABSENCE must has the same value as demo client's.
virtual void on_connect() {start_heartbeat(5);}
};
#endif
......
......@@ -124,7 +124,7 @@ void file_socket::handle_msg(out_msg_ctype& msg)
{
uint_fast64_t id;
memcpy(&id, std::next(msg.data(), ORDER_LEN), sizeof(uint_fast64_t));
server.restore_socket(this->shared_from_this(), id);
get_server().restore_socket(this->shared_from_this(), id);
}
default:
break;
......
......@@ -198,7 +198,8 @@ int main(int argc, const char* argv[])
std::this_thread::sleep_for(std::chrono::milliseconds(50));
uint64_t total_msg_bytes = link_num; total_msg_bytes *= msg_len; total_msg_bytes *= msg_num;
printf("finished in %f seconds, speed: %f(*2) MBps.\n", begin_time.elapsed(), total_msg_bytes / begin_time.elapsed() / 1024 / 1024);
printf("finished in %f seconds, TPS: %f(*2), speed: %f(*2) MBps.\n",
begin_time.elapsed(), link_num * msg_num / begin_time.elapsed(), total_msg_bytes / begin_time.elapsed() / 1024 / 1024);
delete[] init_msg;
}
......
......@@ -17,13 +17,13 @@
#include <stdarg.h>
#include <list>
#include <mutex>
#include <vector>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
#include <atomic>
#include <mutex>
#include <sstream>
#include <iomanip>
......@@ -57,8 +57,8 @@ private:
std::atomic_flag& atomic;
};
class service_pump;
class object;
class service_pump;
namespace tcp
{
class i_server
......
......@@ -285,6 +285,26 @@
*
* REPLACEMENTS:
*
* ===============================================================
* 2018.4.10 version 1.2.6
*
* SPECIAL ATTENTION (incompatible with old editions):
* Do reconnecting in client_socket_base::after_close rather in client_socket_base::on_close.
*
* HIGHLIGHT:
*
* FIX:
* Reconnectiong may happen in ascs::socket::reset, it's not a right behavior.
*
* ENHANCEMENTS:
* Add ascs::socket::after_close virtual function, a good case for using it is to reconnect to the server in client_socket_base.
*
* DELETION:
*
* REFACTORING:
*
* REPLACEMENTS:
*
*/
#ifndef _ASCS_CONFIG_H_
......@@ -294,8 +314,8 @@
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ASCS_VER 10205 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.2.5"
#define ASCS_VER 10206 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ASCS_VERSION "1.2.6"
//asio and compiler check
#ifdef _MSC_VER
......
......@@ -54,8 +54,8 @@ public:
{
while (s < _Newsize)
{
++s;
impl.emplace_back();
++s;
}
if (s > _Newsize)
......@@ -86,7 +86,7 @@ public:
void push_back(const _Ty& _Val) {++s; impl.push_back(_Val);}
void push_back(_Ty&& _Val) {++s; impl.push_back(std::move(_Val));}
template<class... _Valty>
void emplace_back(_Valty&&... _Val) {++s; impl.emplace_back(std::forward<_Valty>(_Val)...);}
void emplace_back(_Valty&&... _Val) {impl.emplace_back(std::forward<_Valty>(_Val)...); ++s;}
void pop_back() {--s; impl.pop_back();}
reference back() {return impl.back();}
iterator end() {return impl.end();}
......@@ -219,8 +219,10 @@ public:
bool try_dequeue(T& item) {typename Lockable::lock_guard lock(*this); return try_dequeue_(item);}
//not thread safe
bool enqueue_(const T& item) {this->emplace_back(item); return true;}
bool enqueue_(T&& item) {this->emplace_back(std::move(item)); return true;}
bool enqueue_(const T& item)
{try {this->emplace_back(item);} catch (const std::exception& e) {unified_out::error_out("cannot hold more objects (%s)", e.what()); return false;} return true;}
bool enqueue_(T&& item)
{try {this->emplace_back(std::move(item));} catch (const std::exception& e) {unified_out::error_out("cannot hold more objects (%s)", e.what()); return false;} return true;}
void move_items_in_(std::list<T>& can) {this->splice(std::end(*this), can);}
bool try_dequeue_(T& item) {if (this->empty()) return false; item.swap(this->front()); this->pop_front(); return true;}
};
......
......@@ -55,6 +55,9 @@ protected:
{
assert(object_ptr && !object_ptr->is_equal_to(-1));
if (!object_ptr)
return false;
std::lock_guard<std::mutex> lock(object_can_mutex);
return object_can.size() < max_size_ ? object_can.emplace(object_ptr->id(), object_ptr).second : false;
}
......@@ -72,7 +75,7 @@ protected:
if (exist)
{
std::lock_guard<std::mutex> lock(invalid_object_can_mutex);
invalid_object_can.emplace_back(object_ptr);
try {invalid_object_can.emplace_back(object_ptr);} catch (const std::exception& e) {unified_out::error_out("cannot hold more objects (%s)", e.what());}
}
return exist;
......@@ -115,6 +118,20 @@ protected:
return old_object_ptr;
}
#define CREATE_OBJECT_1_ARG(first_way) \
auto object_ptr = first_way(); \
if (!object_ptr) \
try {object_ptr = std::make_shared<Object>(arg);} catch (const std::exception& e) {unified_out::error_out("cannot create object (%s)", e.what());} \
init_object(object_ptr); \
return object_ptr;
#define CREATE_OBJECT_2_ARG(first_way) \
auto object_ptr = first_way(); \
if (!object_ptr) \
try {object_ptr = std::make_shared<Object>(arg1, arg2);} catch (const std::exception& e) {unified_out::error_out("cannot create object (%s)", e.what());} \
init_object(object_ptr); \
return object_ptr;
#if defined(ASCS_REUSE_OBJECT) && !defined(ASCS_RESTORE_OBJECT)
object_type reuse_object()
{
......@@ -125,46 +142,14 @@ protected:
return object_ptr;
}
template<typename Arg>
object_type create_object(Arg& arg)
{
auto object_ptr = reuse_object();
if (!object_ptr)
object_ptr = std::make_shared<Object>(arg);
init_object(object_ptr);
return object_ptr;
}
template<typename Arg1, typename Arg2>
object_type create_object(Arg1& arg1, Arg2& arg2)
{
auto object_ptr = reuse_object();
if (!object_ptr)
object_ptr = std::make_shared<Object>(arg1, arg2);
init_object(object_ptr);
return object_ptr;
}
template<typename Arg> object_type create_object(Arg& arg) {CREATE_OBJECT_1_ARG(reuse_object);}
template<typename Arg1, typename Arg2> object_type create_object(Arg1& arg1, Arg2& arg2) {CREATE_OBJECT_2_ARG(reuse_object);}
#else
template<typename Arg>
object_type create_object(Arg& arg)
{
auto object_ptr = std::make_shared<Object>(arg);
init_object(object_ptr);
return object_ptr;
}
template<typename Arg1, typename Arg2>
object_type create_object(Arg1& arg1, Arg2& arg2)
{
auto object_ptr = std::make_shared<Object>(arg1, arg2);
init_object(object_ptr);
return object_ptr;
}
template<typename Arg> object_type create_object(Arg& arg) {CREATE_OBJECT_1_ARG(object_type);}
template<typename Arg1, typename Arg2> object_type create_object(Arg1& arg1, Arg2& arg2) {CREATE_OBJECT_2_ARG(object_type);}
#endif
object_type create_object() {return create_object(sp);}
object_type create_object() {return create_object(get_service_pump());}
public:
//to configure unordered_set(for example, set factor or reserved size), not thread safe, so must be called before service_pump startup.
......@@ -257,7 +242,7 @@ public:
for (auto iter = std::begin(object_can); iter != std::end(object_can);)
if (iter->second->obsoleted())
{
objects.emplace_back(std::move(iter->second));
try {objects.emplace_back(std::move(iter->second));} catch (const std::exception& e) {unified_out::error_out("cannot hold more objects (%s)", e.what());}
iter = object_can.erase(iter);
}
else
......
......@@ -62,11 +62,11 @@ public:
service_pump() : started(false)
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
, real_thread_num(0), del_thread_num(0), del_thread_req(false)
, real_thread_num(0), del_thread_num(0)
#endif
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
#if ASIO_VERSION >= 101100
, work(asio::make_work_guard(*this))
, work(get_executor())
#else
, work(std::make_shared<asio::io_service::work>(*this))
#endif
......@@ -170,7 +170,7 @@ public:
void add_service_thread(int thread_num) {for (auto i = 0; i < thread_num; ++i) service_threads.emplace_back([this]() {this->run();});}
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
void del_service_thread(int thread_num) {if (thread_num > 0) {del_thread_num += thread_num; del_thread_req = true;}}
void del_service_thread(int thread_num) {if (thread_num > 0) {del_thread_num += thread_num;}}
int service_thread_num() const {return real_thread_num;}
#endif
......@@ -222,7 +222,6 @@ protected:
size_t run()
{
size_t n = 0;
std::unique_lock<std::mutex> lock(del_thread_mutex, std::defer_lock);
std::stringstream os;
os << "service thread[" << std::this_thread::get_id() << "] begin.";
......@@ -230,21 +229,17 @@ protected:
++real_thread_num;
while (true)
{
if (del_thread_req)
if (del_thread_num > 0)
{
if (--del_thread_num >= 0)
{
lock.lock();
if (real_thread_num > 1)
if (--real_thread_num > 0) //forbid to stop all service thread
break;
else
lock.unlock();
++real_thread_num;
}
else
{
del_thread_req = false;
++del_thread_num;
}
}
//we cannot always decrease service thread timely (because run_one can block).
......@@ -257,9 +252,11 @@ protected:
if (this_n > 0)
n += this_n; //n can overflow, please note.
else
{
--real_thread_num;
break;
}
}
--real_thread_num;
os.str("");
os << "service thread[" << std::this_thread::get_id() << "] end.";
unified_out::info_out(os.str().data());
......@@ -293,8 +290,6 @@ protected:
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
std::atomic_int_fast32_t real_thread_num;
std::atomic_int_fast32_t del_thread_num;
std::mutex del_thread_mutex;
bool del_thread_req;
#endif
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
......
......@@ -62,13 +62,13 @@ protected:
set_async_calling(false);
}
clear_buffer();
stat.reset();
packer_->reset();
sending = false;
dispatching = false;
congestion_controlling = false;
stat.reset();
recv_idle_began = false;
congestion_controlling = false;
clear_buffer();
}
void clear_buffer()
......@@ -120,7 +120,7 @@ public:
}
//interval's unit is second
//if macro ST_ASIO_HEARTBEAT_INTERVAL is bigger than zero, subclass will call start_heartbeat automatically with interval equal to ST_ASIO_HEARTBEAT_INTERVAL,
//if macro ST_ASIO_HEARTBEAT_INTERVAL been defined and is bigger than zero, start_heartbeat will be called automatically with interval equal to ST_ASIO_HEARTBEAT_INTERVAL,
//and max_absence equal to ST_ASIO_HEARTBEAT_MAX_ABSENCE (so check_heartbeat will be called regularly). otherwise, you can call check_heartbeat with you own logic.
//return false for timeout (timeout check will only be performed on valid links), otherwise true (even the link has not established yet).
bool check_heartbeat(int interval, int max_absence = ASCS_HEARTBEAT_MAX_ABSENCE)
......@@ -202,7 +202,7 @@ protected:
virtual bool do_send_msg() = 0;
virtual bool do_send_msg(InMsgType&& msg) = 0;
virtual void do_recv_msg() = 0;
//socket will guarantee not call these 4 functions in more than one thread concurrently.
//socket will guarantee not call these 4 functions (include do_start()) in more than one thread concurrently.
//generally, you don't have to rewrite this to maintain the status of connections (TCP)
virtual void on_send_error(const asio::error_code& ec) {unified_out::error_out("send msg error (%d %s)", ec.value(), ec.message().data());}
......@@ -210,10 +210,11 @@ protected:
virtual bool on_heartbeat_error() = 0; //heartbeat timed out, return true to continue heartbeat function (useful for UDP)
//if ASCS_DELAY_CLOSE is equal to zero, in this callback, socket guarantee that there's no any other async call associated it,
// include user timers(created by set_timer()) and user async calls(started via post()), this means you can clean up any resource
// include user timers(created by set_timer()) and user async calls(started via post() or defer()), this means you can clean up any resource
// in this socket except this socket itself, because this socket maybe is being maintained by object_pool.
//otherwise (bigger than zero), socket simply call this callback ASCS_DELAY_CLOSE seconds later after link down, no any guarantees.
virtual void on_close() {unified_out::info_out("on_close()");}
virtual void after_close() {} //a good case for using this is to reconnect to the server, please refer to client_socket_base.
#ifndef ASCS_FORCE_TO_USE_MSG_RECV_BUFFER
//if you want to use your own receive buffer, you can move the msg to your own receive buffer, then handle them as your own strategy(may be you'll need a msg dispatch thread),
......@@ -267,7 +268,10 @@ protected:
}
if (stopped())
{
on_close();
after_close();
}
else
{
set_async_calling(true);
......@@ -363,7 +367,7 @@ protected:
{
scope_atomic_lock lock(send_atomic);
if (!sending && lock.locked())
return sending = true;
return (sending = true);
}
return false;
......@@ -375,7 +379,7 @@ protected:
{
scope_atomic_lock lock(dispatch_atomic);
if (!dispatching && lock.locked())
return dispatching = true;
return (dispatching = true);
}
return false;
......@@ -410,6 +414,7 @@ private:
}
change_timer_status(TIMER_DELAY_CLOSE, timer_info::TIMER_CANCELED);
on_close();
after_close();
set_async_calling(false);
break;
default:
......
......@@ -47,8 +47,13 @@ public:
typename Pool::object_type add_socket(unsigned short port, const std::string& ip = ASCS_SERVER_IP)
{
auto socket_ptr(this->create_object());
socket_ptr->set_server_addr(port, ip);
return this->add_socket(socket_ptr, false) ? socket_ptr : typename Pool::object_type();
if (!socket_ptr)
return typename Pool::object_type();
else
{
socket_ptr->set_server_addr(port, ip);
return this->add_socket(socket_ptr, false) ? socket_ptr : typename Pool::object_type();
}
}
///////////////////////////////////////////////////
......
......@@ -61,7 +61,7 @@ public:
void force_shutdown(bool reconnect = false)
{
if (super::link_status::FORCE_SHUTTING_DOWN != this->status)
show_info("client link:", "been shut down.");
this->show_info("client link:", "been shut down.");
need_reconnect = reconnect;
super::force_shutdown();
......@@ -76,28 +76,12 @@ public:
if (this->is_broken())
return force_shutdown(reconnect);
else if (!this->is_shutting_down())
show_info("client link:", "being shut down gracefully.");
this->show_info("client link:", "being shut down gracefully.");
need_reconnect = reconnect;
super::graceful_shutdown(sync);
}
void show_info(const char* head, const char* tail) const
{
asio::error_code ec;
auto ep = this->lowest_layer().local_endpoint(ec);
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().data(), ep.port(), tail);
}
void show_info(const char* head, const char* tail, const asio::error_code& ec) const
{
asio::error_code ec2;
auto ep = this->lowest_layer().local_endpoint(ec2);
if (!ec2)
unified_out::info_out("%s %s:%hu %s (%d %s)", head, ep.address().to_string().data(), ep.port(), tail, ec.value(), ec.message().data());
}
protected:
virtual bool do_start() //connect
{
......@@ -121,7 +105,7 @@ protected:
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); force_shutdown();}
virtual void on_recv_error(const asio::error_code& ec)
{
show_info("client link:", "broken/been shut down", ec);
this->show_info("client link:", "broken/been shut down", ec);
force_shutdown(this->is_shutting_down() ? need_reconnect : prepare_reconnect(ec) >= 0);
this->status = super::link_status::BROKEN;
......@@ -130,13 +114,13 @@ protected:
virtual void on_async_shutdown_error() {force_shutdown(need_reconnect);}
virtual bool on_heartbeat_error()
{
show_info("client link:", "broke unexpectedly.");
this->show_info("client link:", "broke unexpectedly.");
force_shutdown(this->is_shutting_down() ? need_reconnect : prepare_reconnect(asio::error_code(asio::error::network_down)) >= 0);
return false;
}
//reconnect at here rather than in on_recv_error to make sure that there's no any async invocations performed on this socket before reconnectiong
virtual void on_close() {if (need_reconnect) this->start(); else super::on_close();}
virtual void after_close() {if (need_reconnect) this->start();}
bool prepare_next_reconnect(const asio::error_code& ec)
{
......
......@@ -41,7 +41,7 @@ public:
void force_shutdown()
{
if (super::link_status::FORCE_SHUTTING_DOWN != this->status)
show_info("server link:", "been shut down.");
this->show_info("server link:", "been shut down.");
super::force_shutdown();
}
......@@ -55,33 +55,20 @@ public:
if (this->is_broken())
return force_shutdown();
else if (!this->is_shutting_down())
show_info("server link:", "being shut down gracefully.");
this->show_info("server link:", "being shut down gracefully.");
super::graceful_shutdown(sync);
}
void show_info(const char* head, const char* tail) const
{
asio::error_code ec;
auto ep = this->lowest_layer().remote_endpoint(ec);
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().data(), ep.port(), tail);
}
void show_info(const char* head, const char* tail, const asio::error_code& ec) const
{
asio::error_code ec2;
auto ep = this->lowest_layer().remote_endpoint(ec2);
if (!ec2)
unified_out::info_out("%s %s:%hu %s (%d %s)", head, ep.address().to_string().data(), ep.port(), tail, ec.value(), ec.message().data());
}
protected:
Server& get_server() {return server;}
const Server& get_server() const {return server;}
virtual void on_unpack_error() {unified_out::error_out("can not unpack msg."); force_shutdown();}
//do not forget to force_shutdown this socket(in del_socket(), there's a force_shutdown() invocation)
virtual void on_recv_error(const asio::error_code& ec)
{
show_info("server link:", "broken/been shut down", ec);
this->show_info("server link:", "broken/been shut down", ec);
#ifdef ASCS_CLEAR_OBJECT_INTERVAL
force_shutdown();
......@@ -92,9 +79,9 @@ protected:
}
virtual void on_async_shutdown_error() {force_shutdown();}
virtual bool on_heartbeat_error() {show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;}
virtual bool on_heartbeat_error() {this->show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;}
protected:
private:
Server& server;
};
......
......@@ -64,6 +64,34 @@ public:
bool is_connected() const {return link_status::CONNECTED == status;}
bool is_shutting_down() const {return link_status::FORCE_SHUTTING_DOWN == status || link_status::GRACEFUL_SHUTTING_DOWN == status;}
void show_info(const char* head, const char* tail) const
{
asio::error_code ec;
auto local_ep = this->lowest_layer().local_endpoint(ec);
if (!ec)
{
auto remote_ep = this->lowest_layer().remote_endpoint(ec);
if (!ec)
unified_out::info_out("%s (%s:%hu %s:%hu) %s", head,
local_ep.address().to_string().data(), local_ep.port(),
remote_ep.address().to_string().data(), remote_ep.port(), tail);
}
}
void show_info(const char* head, const char* tail, const asio::error_code& ec) const
{
asio::error_code ec2;
auto local_ep = this->lowest_layer().local_endpoint(ec2);
if (!ec2)
{
auto remote_ep = this->lowest_layer().remote_endpoint(ec2);
if (!ec2)
unified_out::info_out("%s (%s:%hu %s:%hu) %s (%d %s)", head,
local_ep.address().to_string().data(), local_ep.port(),
remote_ep.address().to_string().data(), remote_ep.port(), tail, ec.value(), ec.message().data());
}
}
//get or change the unpacker at runtime
//changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
......
......@@ -10,16 +10,16 @@
* make ascs support asio::ssl
*/
#ifndef _ASICS_SSL_H_
#define _ASICS_SSL_H_
#ifndef _ASCS_SSL_H_
#define _ASCS_SSL_H_
#include <asio/ssl.hpp>
#include "../../object_pool.h"
#include "../client_socket.h"
#include "../client.h"
#include "../server_socket.h"
#include "../server.h"
#include "../client_socket.h"
#include "../server_socket.h"
#include "../../object_pool.h"
namespace ascs { namespace ssl {
......@@ -37,9 +37,9 @@ public:
protected:
virtual void on_recv_error(const asio::error_code& ec)
{
#ifndef ASCS_REUSE_SSL_STREAM
if (this->is_ready())
{
#ifndef ASCS_REUSE_SSL_STREAM
this->status = Socket::link_status::GRACEFUL_SHUTTING_DOWN;
this->show_info("ssl link:", "been shut down.");
asio::error_code ec;
......@@ -47,8 +47,8 @@ protected:
if (ec && asio::error::eof != ec) //the endpoint who initiated a shutdown operation will get error eof.
unified_out::info_out("shutdown ssl link failed (maybe intentionally because of reusing)");
#endif
}
#endif
Socket::on_recv_error(ec);
}
......@@ -152,9 +152,8 @@ public:
object_pool(service_pump& service_pump_, const asio::ssl::context::method& m) : super(service_pump_), ctx(m) {}
asio::ssl::context& context() {return ctx;}
typename object_pool::object_type create_object() {return create_object(this->sp);}
template<typename Arg>
typename object_pool::object_type create_object(Arg& arg) {return super::create_object(arg, ctx);}
typename object_pool::object_type create_object() {return create_object(this->get_service_pump());}
template<typename Arg> typename object_pool::object_type create_object(Arg& arg) {return super::create_object(arg, ctx);}
protected:
asio::ssl::context ctx;
......@@ -194,7 +193,7 @@ private:
if (!ec)
super::do_start(); //return to tcp::server_socket_base::do_start
else
this->server.del_socket(this->shared_from_this());
this->get_server().del_socket(this->shared_from_this());
}
};
......@@ -204,4 +203,4 @@ template<typename Socket, typename Pool = object_pool<Socket>> using multi_clien
}} //namespace
#endif /* _ASICS_SSL_H_ */
#endif /* _ASCS_SSL_H_ */
......@@ -65,21 +65,24 @@ public:
timer(asio::io_context& io_context_) : object(io_context_), timer_can((tid) -1) {tid id = -1; do_something_to_all([&id](timer_info& item) {item.id = ++id;});}
void update_timer_info(tid id, size_t interval, std::function<bool(tid)>&& call_back, bool start = false)
bool update_timer_info(tid id, size_t interval, std::function<bool(tid)>&& call_back, bool start = false)
{
timer_info& ti = timer_can[id];
if (timer_info::TIMER_FAKE == ti.status)
ti.timer = std::make_shared<timer_type>(io_context_);
try {ti.timer = std::make_shared<timer_type>(io_context_);}
catch (const std::exception& e) {unified_out::error_out("cannot create timer %d (%s)", ti.id, e.what()); return false;}
ti.status = timer_info::TIMER_OK;
ti.interval_ms = interval;
ti.call_back.swap(call_back);
if (start)
start_timer(ti);
return true;
}
void update_timer_info(tid id, size_t interval, const std::function<bool(tid)>& call_back, bool start = false)
{update_timer_info(id, interval, std::function<bool(tid)>(call_back), start);}
bool update_timer_info(tid id, size_t interval, const std::function<bool(tid)>& call_back, bool start = false)
{return update_timer_info(id, interval, std::function<bool(tid)>(call_back), start);}
void change_timer_status(tid id, timer_info::timer_status status) {timer_can[id].status = status;}
void change_timer_interval(tid id, size_t interval) {timer_can[id].interval_ms = interval;}
......@@ -87,8 +90,8 @@ public:
void change_timer_call_back(tid id, std::function<bool(tid)>&& call_back) {timer_can[id].call_back.swap(call_back);}
void change_timer_call_back(tid id, const std::function<bool(tid)>& call_back) {change_timer_call_back(id, std::function<bool(tid)>(call_back));}
void set_timer(tid id, size_t interval, std::function<bool(tid)>&& call_back) {update_timer_info(id, interval, std::move(call_back), true);}
void set_timer(tid id, size_t interval, const std::function<bool(tid)>& call_back) {update_timer_info(id, interval, call_back, true);}
bool set_timer(tid id, size_t interval, std::function<bool(tid)>&& call_back) {return update_timer_info(id, interval, std::move(call_back), true);}
bool set_timer(tid id, size_t interval, const std::function<bool(tid)>& call_back) {return update_timer_info(id, interval, call_back, true);}
bool start_timer(tid id)
{
......@@ -140,7 +143,7 @@ protected:
{
if (timer_info::TIMER_OK == ti.status) //enable stopping timers that has been stopped
{
try {ti.timer->cancel();} catch (const asio::system_error& e) {}
try {ti.timer->cancel();} catch (const asio::system_error& e) {unified_out::error_out("cannot stop timer %d (%d %s)", ti.id, e.code().value(), e.what());}
ti.status = timer_info::TIMER_CANCELED;
}
}
......
......@@ -32,8 +32,13 @@ public:
typename Pool::object_type add_socket(unsigned short port, const std::string& ip = std::string())
{
auto socket_ptr(this->create_object());
socket_ptr->set_local_addr(port, ip);
return this->add_socket(socket_ptr) ? socket_ptr : typename Pool::object_type();
if (!socket_ptr)
return typename Pool::object_type();
else
{
socket_ptr->set_local_addr(port, ip);
return this->add_socket(socket_ptr) ? socket_ptr : typename Pool::object_type();
}
}
//functions with a socket_ptr parameter will remove the link from object pool first, then call corresponding function
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册