提交 4642f312 编写于 作者: Y yang li

Rollback ascs::list.

Optimize class queue.
上级 36a81b3e
......@@ -161,7 +161,118 @@ protected:
//not like auto_buffer, shared_buffer is copyable, but auto_buffer is a bit more efficient.
//packer or/and unpacker who used auto_buffer or shared_buffer as its msg type will be replaceable.
//ascs requires that container must take one and only one template argument.
#if defined(_MSC_VER) || defined(__clang__) || _GLIBCXX_USE_CXX11_ABI
template<typename T> using list = std::list<T>;
//for list::size() and empty(), ascs::queue needs them to be thread safe no matter itself is lockable or dummy lockable (see ascs::queue for more details).
#else
//a substitute of std::list, it's size() function has O(1) complexity and is thread safe (but doesn't have to be consistent)
//BTW, the naming rule is not mine, I copied them from std::list in Visual C++ 14.0
template<typename _Ty>
class list
{
public:
typedef list<_Ty> _Myt;
typedef std::list<_Ty> _Mybase;
typedef typename _Mybase::value_type value_type;
typedef typename _Mybase::size_type size_type;
typedef typename _Mybase::reference reference;
typedef typename _Mybase::const_reference const_reference;
typedef typename _Mybase::iterator iterator;
typedef typename _Mybase::const_iterator const_iterator;
typedef typename _Mybase::reverse_iterator reverse_iterator;
typedef typename _Mybase::const_reverse_iterator const_reverse_iterator;
#if __GNUC__ > 4 || __GNUC_MINOR__ > 8
typedef const_iterator Iter;
#else
typedef iterator Iter; //just satisfy old gcc compilers (before gcc 4.9)
#endif
list() : s(0) {}
list(size_type n) : s(n), impl(n) {}
list(list&& other) : s(0) {swap(other);}
list& operator=(list&& other) {clear(); swap(other); return *this;}
void swap(list& other) {impl.swap(other.impl); std::swap(s, other.s);}
bool empty() const {return 0 == s;}
size_type size() const {return s;}
void resize(size_type _Newsize)
{
while (s < _Newsize)
{
impl.emplace_back();
++s;
}
if (s > _Newsize)
{
auto end_iter = std::end(impl);
auto begin_iter = _Newsize <= s / 2 ? std::next(std::begin(impl), _Newsize) : std::prev(end_iter, s - _Newsize); //minimize iterator movement
s = _Newsize;
impl.erase(begin_iter, end_iter);
}
}
void clear() {s = 0; impl.clear();}
iterator erase(Iter _Where) {--s; return impl.erase(_Where);}
void push_front(const _Ty& _Val) {++s; impl.push_front(_Val);}
void push_front(_Ty&& _Val) {++s; impl.push_front(std::move(_Val));}
template<class... _Valty>
void emplace_front(_Valty&&... _Val) {++s; impl.emplace_front(std::forward<_Valty>(_Val)...);}
void pop_front() {--s; impl.pop_front();}
reference front() {return impl.front();}
iterator begin() {return impl.begin();}
reverse_iterator rbegin() {return impl.rbegin();}
const_reference front() const {return impl.front();}
const_iterator begin() const {return impl.begin();}
const_reverse_iterator rbegin() const {return impl.rbegin();}
void push_back(const _Ty& _Val) {++s; impl.push_back(_Val);}
void push_back(_Ty&& _Val) {++s; impl.push_back(std::move(_Val));}
template<class... _Valty>
void emplace_back(_Valty&&... _Val) {impl.emplace_back(std::forward<_Valty>(_Val)...); ++s;}
void pop_back() {--s; impl.pop_back();}
reference back() {return impl.back();}
iterator end() {return impl.end();}
reverse_iterator rend() {return impl.rend();}
const_reference back() const {return impl.back();}
const_iterator end() const {return impl.end();}
const_reverse_iterator rend() const {return impl.rend();}
void splice(Iter _Where, _Mybase& _Right) {s += _Right.size(); impl.splice(_Where, _Right);}
void splice(Iter _Where, _Mybase& _Right, Iter _First) {++s; impl.splice(_Where, _Right, _First);}
void splice(Iter _Where, _Mybase& _Right, Iter _First, Iter _Last)
{
auto size = std::distance(_First, _Last);
//this std::distance invocation is the penalty for making complexity of size() constant.
s += size;
impl.splice(_Where, _Right, _First, _Last);
}
void splice(Iter _Where, _Myt& _Right) {s += _Right.size(); _Right.s = 0; impl.splice(_Where, _Right.impl);}
void splice(Iter _Where, _Myt& _Right, Iter _First) {++s; --_Right.s; impl.splice(_Where, _Right.impl, _First);}
void splice(Iter _Where, _Myt& _Right, Iter _First, Iter _Last)
{
auto size = std::distance(_First, _Last);
//this std::distance invocation is the penalty for making complexity of size() constant.
s += size;
_Right.s -= size;
impl.splice(_Where, _Right.impl, _First, _Last);
}
private:
volatile size_type s;
_Mybase impl;
};
#endif
//packer concept
template<typename MsgType>
......
......@@ -837,7 +837,7 @@ static_assert(ASIO_HAS_STD_FUTURE == 1, "sync message sending needs std::future.
//If both sync message receiving and async message receiving exist, sync receiving has the priority no matter it was initiated before async receiving or not.
//#define ASCS_SYNC_DISPATCH
//with this macro, virtual size_t on_msg(std::list<OutMsgType>& msg_can) will be provided, you can rewrite it and handle all or a part of the
//with this macro, virtual size_t on_msg(list<OutMsgType>& msg_can) will be provided, you can rewrite it and handle all or a part of the
// messages like virtual function on_msg_handle (with macro ASCS_DISPATCH_BATCH_MSG), if your logic is simple enough (like echo or pingpong test),
// this feature is recommended because it can slightly improve efficiency.
//now we have three ways to handle messages (sync_recv_msg, on_msg and on_msg_handle), the invocation order is the same as listed, if messages been successfully
......
......@@ -45,12 +45,14 @@ private:
//Container must at least has the following functions (like std::list):
// Container() and Container(size_t) constructor
// size
// empty
// clear
// swap
// emplace_back(const T& item)
// emplace_back(T&& item)
// splice(iter, list<T>&), after this, list<T> must be empty
// splice(iter, list<T>&, iter, iter), if macro ASCS_DISPATCH_BATCH_MSG been defined
// splice(iter, Container&), after this, the source container must be empty
// splice(iter, Container&, iter, iter)
// front
// pop_front
// back
......@@ -82,7 +84,7 @@ public:
//thread safe
bool enqueue(const T& item) {typename Lockable::lock_guard lock(*this); return enqueue_(item);}
bool enqueue(T&& item) {typename Lockable::lock_guard lock(*this); return enqueue_(std::move(item));}
void move_items_in(list<T>& src, size_t size_in_byte = 0) {typename Lockable::lock_guard lock(*this); move_items_in_(src, size_in_byte);}
void move_items_in(Container& src, size_t size_in_byte = 0) {typename Lockable::lock_guard lock(*this); move_items_in_(src, size_in_byte);}
bool try_dequeue(T& item) {typename Lockable::lock_guard lock(*this); return try_dequeue_(item);}
//thread safe
......@@ -120,7 +122,7 @@ public:
return true;
}
void move_items_in_(list<T>& src, size_t size_in_byte = 0)
void move_items_in_(Container& src, size_t size_in_byte = 0)
{
if (0 == size_in_byte)
do_something_to_all(src, [&size_in_byte](const T& item) {size_in_byte += item.size();});
......@@ -132,8 +134,9 @@ public:
bool try_dequeue_(T& item) {if (this->empty()) return false; item.swap(this->front()); this->pop_front(); buff_size -= item.size(); return true;}
//not thread safe
#ifdef ASCS_DISPATCH_BATCH_MSG
void move_items_out(Container& dest, size_t max_item_num = -1) {typename Lockable::lock_guard lock(*this); move_items_out_(dest, max_item_num);} //thread safe
void move_items_out(size_t max_size_in_byte, Container& dest) {typename Lockable::lock_guard lock(*this); move_items_out_(max_size_in_byte, dest);} //thread safe
void move_items_out_(Container& dest, size_t max_item_num = -1) //not thread safe
{
if ((size_t) -1 == max_item_num)
......@@ -155,9 +158,29 @@ public:
}
}
void move_items_out_(size_t max_size_in_byte, Container& dest) //not thread safe
{
if ((size_t) -1 == max_size_in_byte)
{
dest.splice(std::end(dest), *this);
buff_size = 0;
}
else
{
size_t s = 0;
auto end_iter = this->begin();
do_something_to_one(*this, [&](const T& item) {s += item.size(); ++end_iter; if (s >= max_size_in_byte) return true; return false;});
if (end_iter == this->end())
dest.splice(std::end(dest), *this);
else
dest.splice(std::end(dest), *this, this->begin(), end_iter);
buff_size -= s;
}
}
using Container::begin;
using Container::end;
#endif
private:
size_t buff_size; //in use
......
......@@ -321,7 +321,7 @@ private:
//we must guarantee these objects not be freed from the heap or reused, so we move these objects from object_can to invalid_object_can, and free them
//from the heap or reuse them in the near future. if ASCS_CLEAR_OBJECT_INTERVAL been defined, clear_obsoleted_object() will be invoked automatically and
//periodically to move all invalid objects into invalid_object_can.
list<object_type> invalid_object_can;
std::list<object_type> invalid_object_can;
std::mutex invalid_object_can_mutex;
};
......
......@@ -58,7 +58,7 @@ public:
public:
typedef i_service* object_type;
typedef const object_type object_ctype;
typedef list<object_type> container_type;
typedef std::list<object_type> container_type;
service_pump() : started(false)
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
......@@ -289,7 +289,7 @@ private:
bool started;
container_type service_can;
std::mutex service_can_mutex;
list<std::thread> service_threads;
std::list<std::thread> service_threads;
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
std::atomic_int_fast32_t real_thread_num;
......
......@@ -405,7 +405,7 @@ protected:
#endif
if (msg_num > 0)
{
list<out_msg> temp_buffer(msg_num);
out_container_type temp_buffer(msg_num);
auto op_iter = temp_buffer.begin();
for (auto iter = temp_msg_can.begin(); iter != temp_msg_can.end(); ++op_iter, ++iter)
op_iter->swap(*iter);
......@@ -438,7 +438,7 @@ protected:
bool do_direct_send_msg(list<InMsgType>& msg_can)
{
size_t size_in_byte = 0;
list<in_msg> temp_buffer;
in_container_type temp_buffer;
ascs::do_something_to_all(msg_can, [&size_in_byte, &temp_buffer](InMsgType& msg) {size_in_byte += msg.size(); temp_buffer.emplace_back(std::move(msg));});
send_msg_buffer.move_items_in(temp_buffer, size_in_byte);
if (!sending && is_ready())
......@@ -475,7 +475,7 @@ protected:
return sync_call_result::SUCCESS;
size_t size_in_byte = 0;
list<in_msg> temp_buffer;
in_container_type temp_buffer;
ascs::do_something_to_all(msg_can, [&size_in_byte, &temp_buffer](InMsgType& msg) {size_in_byte += msg.size(); temp_buffer.emplace_back(std::move(msg));});
temp_buffer.back().check_and_create_promise(true);
......
......@@ -128,7 +128,7 @@ protected:
if (num <= 0)
num = 16;
list<typename Pool::object_type> sockets;
std::list<typename Pool::object_type> sockets;
unified_out::info_out("begin to pre-create %d server socket...", num);
while (--num >= 0)
{
......
......@@ -184,7 +184,8 @@ protected:
//msg_can contains messages that were failed to send and tcp::socket_base will not hold them any more, if you want to re-send them in the future,
// you must take over them and re-send (at any time) them via direct_send_msg.
//DO NOT hold msg_can for future using, just swap its content with your own container in this virtual function.
virtual void on_send_error(const asio::error_code& ec, list<typename super::in_msg>& msg_can) {unified_out::error_out("send msg error (%d %s)", ec.value(), ec.message().data());}
virtual void on_send_error(const asio::error_code& ec, typename super::in_container_type& msg_can)
{unified_out::error_out("send msg error (%d %s)", ec.value(), ec.message().data());}
virtual void on_close()
{
......@@ -284,27 +285,18 @@ private:
if (!in_strand && sending)
return true;
list<asio::const_buffer> bufs;
{
auto end_time = statistic::now();
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
const size_t max_send_size = 1;
send_msg_buffer.move_items_out(0, last_send_msg);
#else
const size_t max_send_size = asio::detail::default_max_transfer_size;
send_msg_buffer.move_items_out(asio::detail::default_max_transfer_size, last_send_msg);
#endif
size_t size = 0;
typename super::in_msg msg;
auto end_time = statistic::now();
typename super::in_queue_type::lock_guard lock(send_msg_buffer);
while (send_msg_buffer.try_dequeue_(msg))
{
stat.send_delay_sum += end_time - msg.begin_time;
size += msg.size();
last_send_msg.emplace_back(std::move(msg));
bufs.emplace_back(last_send_msg.back().data(), last_send_msg.back().size());
if (size >= max_send_size)
break;
}
std::vector<asio::const_buffer> bufs;
bufs.reserve(last_send_msg.size());
for (auto iter = std::begin(last_send_msg); iter != std::end(last_send_msg); ++iter)
{
stat.send_delay_sum += end_time - iter->begin_time;
bufs.emplace_back(iter->data(), iter->size());
}
if ((sending = !bufs.empty()))
......@@ -386,7 +378,7 @@ private:
#endif
std::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
list<typename super::in_msg> last_send_msg;
typename super::in_container_type last_send_msg;
asio::io_context::strand strand;
};
......
......@@ -174,7 +174,7 @@ protected:
}
private:
typedef list<timer_info> container_type;
typedef std::list<timer_info> container_type;
container_type timer_can;
std::mutex timer_can_mutex;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册