提交 3ce69fb3 编写于 作者: Y youngwolf 提交者: youngowlf

Memory pool optimization.

上级 7d0730d9
......@@ -5,7 +5,7 @@
#define ST_ASIO_SERVER_PORT 9527
#define ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define ST_ASIO_CUSTOM_LOG
#define ST_ASIO_DEFAULT_UNPACKER buffer_free_unpacker
#define ST_ASIO_DEFAULT_UNPACKER non_copy_unpacker
//#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
//the following three macros demonstrate how to support huge msg(exceed 65535 - 2).
......
......@@ -190,7 +190,7 @@ int main(int argc, const char* argv[])
printf("normal server, link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", server_.size(), server_.invalid_object_size());
printf("echo server, link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", echo_server_.size(), echo_server_.invalid_object_size());
#if 4 == PACKER_UNPACKER_TYPE
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size());
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.available_size(), pool.available_buffer_size());
#endif
puts("");
puts(echo_server_.get_statistic().to_string().data());
......@@ -228,6 +228,10 @@ int main(int argc, const char* argv[])
}
}
#if 4 == PACKER_UNPACKER_TYPE
pool.stop();
#endif
return 0;
}
......
......@@ -15,6 +15,7 @@
#include <string>
#include <boost/array.hpp>
#include <boost/container/set.hpp>
#include "../st_asio_wrapper_base.h"
......@@ -29,13 +30,13 @@ public:
virtual const char* data() const {return std::string::data();}
};
class most_primitive_buffer
class basic_buffer
{
public:
most_primitive_buffer() {do_detach();}
most_primitive_buffer(size_t len) {do_detach(); assign(len);}
most_primitive_buffer(most_primitive_buffer&& other) {do_attach(other.buff, other.len, other.buff_len); other.do_detach();}
~most_primitive_buffer() {free();}
basic_buffer() {do_detach();}
basic_buffer(size_t len) {do_detach(); assign(len);}
basic_buffer(basic_buffer&& other) {do_attach(other.buff, other.len, other.buff_len); other.do_detach();}
~basic_buffer() {free();}
void assign(size_t len) {free(); do_attach(new char[len], len, len);}
void free() {delete[] buff; do_detach();}
......@@ -44,7 +45,7 @@ public:
bool empty() const {return 0 == len || nullptr == buff;}
size_t size() const {return len;}
const char* data() const {return buff;}
void swap(most_primitive_buffer& other) {std::swap(buff, other.buff); std::swap(len, other.len);}
void swap(basic_buffer& other) {std::swap(buff, other.buff); std::swap(len, other.len);}
void clear() {free();}
//functions needed by packer and unpacker
......@@ -61,64 +62,85 @@ protected:
size_t len, buff_len;
};
//not thread safe, please pay attention
class memory_pool
class pooled_buffer : public basic_buffer
{
public:
typedef most_primitive_buffer raw_object_type;
typedef const raw_object_type raw_object_ctype;
typedef boost::shared_ptr<most_primitive_buffer> object_type;
typedef const object_type object_ctype;
class i_memory_pool
{
public:
typedef pooled_buffer raw_buffer_type;
typedef const raw_buffer_type raw_buffer_ctype;
typedef boost::shared_ptr<raw_buffer_type> buffer_type;
typedef const buffer_type buffer_ctype;
virtual buffer_type checkout(size_t block_size, bool best_fit = false) = 0;
virtual void checkin(raw_buffer_type& buff) = 0;
};
memory_pool() {}
memory_pool(size_t block_count, size_t block_size) {init_pool(block_count, block_size);}
//not thread safe
void init_pool(size_t block_count, size_t block_size) {for (size_t i = 0; i < block_count; ++i) pool.push_back(boost::make_shared<most_primitive_buffer>(block_size));}
public:
pooled_buffer(i_memory_pool& _pool) : pool(_pool) {}
pooled_buffer(i_memory_pool& _pool, size_t len) : basic_buffer(len), pool(_pool) {}
pooled_buffer(pooled_buffer&& other) : basic_buffer(std::move(other)), pool(other.pool) {}
~pooled_buffer() {pool.checkin(*this);}
size_t size() {boost::shared_lock<boost::shared_mutex> lock(mutex); return pool.size();} //memory block amount
uint_fast64_t buffer_size() {uint_fast64_t size = 0; do_something_to_all(pool, mutex, [&](object_ctype& item) {size += item->buffer_size();}); return size;}
protected:
i_memory_pool& pool;
};
object_type ask_memory(size_t block_size, bool best_fit = false)
class memory_pool : public pooled_buffer::i_memory_pool
{
public:
memory_pool() : stopped_(false) {}
memory_pool(size_t block_count, size_t block_size) : stopped_(false) {init_pool(block_count, block_size);}
void stop() {stopped_ = true;}
bool stopped() const {return stopped_;}
//not thread safe, and can call many times before using this memory pool
void init_pool(size_t block_count, size_t block_size) {for (size_t i = 0; i < block_count; ++i) pool.insert(boost::make_shared<raw_buffer_type>(*this, block_size));}
size_t available_size() {boost::shared_lock<boost::shared_mutex> lock(mutex); return pool.size();}
uint_fast64_t available_buffer_size() {uint_fast64_t size = 0; do_something_to_all(pool, mutex, [&](buffer_ctype& item) {size += item->buffer_size();}); return size;}
public:
virtual buffer_type checkout(size_t block_size, bool best_fit = false)
{
if (stopped())
return buffer_type();
auto find_buffer_predicate = [block_size](buffer_ctype& item)->bool {return item->buffer_size() >= block_size;};
boost::unique_lock<boost::shared_mutex> lock(mutex);
auto iter = best_fit ? std::prev(std::find_if(pool.rbegin(), pool.rend(), find_buffer_predicate).base()) :
std::find_if(std::begin(pool), std::end(pool), find_buffer_predicate);
object_type re;
if (best_fit)
{
object_type candidate;
size_t gap = -1;
for (auto iter = std::begin(pool); iter != std::end(pool); ++iter)
if (iter->unique() && (*iter)->buffer_size() >= block_size)
{
auto this_gap = (*iter)->buffer_size() - block_size;
if (this_gap < gap)
{
candidate = *iter;
if (0 == (gap = this_gap))
break;
}
}
re = candidate;
}
else
for (auto iter = std::begin(pool); !re && iter != std::end(pool); ++iter)
if (iter->unique() && (*iter)->buffer_size() >= block_size)
re = *iter;
if (re)
re->size(block_size);
else
if (iter != std::end(pool))
{
re = boost::make_shared<most_primitive_buffer>(block_size);
pool.push_back(re);
auto buff(std::move(*iter));
buff->size(block_size);
pool.erase(iter);
return buff;
}
lock.unlock();
return re;
return boost::make_shared<raw_buffer_type>(*this, block_size);
}
virtual void checkin(raw_buffer_type& buff)
{
if (stopped())
return;
boost::unique_lock<boost::shared_mutex> lock(mutex);
pool.insert(boost::make_shared<raw_buffer_type>(std::move(buff)));
}
protected:
boost::container::list<object_type> pool;
struct buffer_compare {bool operator()(buffer_ctype& left, buffer_ctype& right) const {return left->buffer_size() > right->buffer_size();}};
boost::container::multiset<buffer_type, buffer_compare> pool;
boost::shared_mutex mutex;
bool stopped_;
};
}} //namespace
......
......@@ -100,7 +100,7 @@ public:
//protocol: length + body
//use memory pool
class pooled_packer : public i_packer<shared_buffer<memory_pool::raw_object_type>>
class pooled_packer : public i_packer<shared_buffer<memory_pool::raw_buffer_type>>
{
public:
static size_t get_max_msg_size() {return ST_ASIO_MSG_BUFFER_SIZE - ST_ASIO_HEAD_LEN;}
......@@ -128,12 +128,12 @@ public:
return msg;
}
msg.raw_buffer(pool->ask_memory(total_len));
msg.raw_buffer(pool->checkout(total_len));
head_len = ST_ASIO_HEAD_H2N(head_len);
memcpy(msg.raw_buffer()->data(), (const char*) &head_len, ST_ASIO_HEAD_LEN);
}
else
msg.raw_buffer(pool->ask_memory(total_len));
msg.raw_buffer(pool->checkout(total_len));
total_len = pre_len;
for (size_t i = 0; i < num; ++i)
......@@ -217,7 +217,7 @@ private:
//protocol: stream (non-protocol)
//use memory pool
class pooled_stream_packer : public i_packer<shared_buffer<memory_pool::raw_object_type>>
class pooled_stream_packer : public i_packer<shared_buffer<memory_pool::raw_buffer_type>>
{
public:
pooled_stream_packer() : pool(nullptr) {}
......@@ -233,7 +233,7 @@ public:
return msg;
else if (total_len > 0)
{
msg.raw_buffer(pool->ask_memory(total_len));
msg.raw_buffer(pool->checkout(total_len));
total_len = 0;
for (size_t i = 0; i < num; ++i)
......
......@@ -130,12 +130,12 @@ protected:
//protocol: length + body
//use memory pool
class pooled_unpacker : public i_unpacker<shared_buffer<most_primitive_buffer>>, public unpacker
class pooled_unpacker : public i_unpacker<shared_buffer<memory_pool::raw_buffer_type>>, public unpacker
{
public:
using i_unpacker<shared_buffer<most_primitive_buffer>>::msg_type;
using i_unpacker<shared_buffer<most_primitive_buffer>>::msg_ctype;
using i_unpacker<shared_buffer<most_primitive_buffer>>::container_type;
using i_unpacker<shared_buffer<memory_pool::raw_buffer_type>>::msg_type;
using i_unpacker<shared_buffer<memory_pool::raw_buffer_type>>::msg_ctype;
using i_unpacker<shared_buffer<memory_pool::raw_buffer_type>>::container_type;
pooled_unpacker() : pool(nullptr) {}
void mem_pool(memory_pool& _pool) {pool = &_pool;}
......@@ -147,9 +147,9 @@ public:
boost::container::list<std::pair<const char*, size_t>> msg_pos_can;
auto unpack_ok = unpacker::parse_msg(bytes_transferred, msg_pos_can);
do_something_to_all(msg_pos_can, [this, &msg_can](decltype(*std::begin(msg_pos_can))& item) {
auto buff = pool->ask_memory(item.second);
auto buff = pool->checkout(item.second);
memcpy(buff->data(), item.first, item.second);
msg_can.push_back(shared_buffer<most_primitive_buffer>(buff));
msg_can.push_back(msg_type(buff));
});
if (unpack_ok && remain_len > 0)
......@@ -226,10 +226,10 @@ protected:
//protocol: length + body
//this unpacker demonstrate how to forbid memory copying while parsing msgs (let asio write msg directly).
class buffer_free_unpacker : public i_unpacker<most_primitive_buffer>
class non_copy_unpacker : public i_unpacker<basic_buffer>
{
public:
buffer_free_unpacker() {reset_state();}
non_copy_unpacker() {reset_state();}
size_t current_msg_length() const {return raw_buff.size();} //current msg's total length(not include the head), 0 means not available
public:
......@@ -509,7 +509,7 @@ protected:
//protocol: stream (non-protocol)
//use memory pool
class pooled_stream_unpacker : public i_unpacker<shared_buffer<most_primitive_buffer>>
class pooled_stream_unpacker : public i_unpacker<shared_buffer<memory_pool::raw_buffer_type>>
{
public:
pooled_stream_unpacker() : pool(nullptr) {}
......@@ -525,19 +525,19 @@ public:
assert(bytes_transferred <= ST_ASIO_MSG_BUFFER_SIZE);
buff->size(bytes_transferred);
msg_can.push_back(shared_buffer<most_primitive_buffer>(buff));
msg_can.push_back(msg_type(buff));
return true;
}
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return ec || bytes_transferred > 0 ? 0 : boost::asio::detail::default_max_transfer_size;}
virtual boost::asio::mutable_buffers_1 prepare_next_recv()
{
buff = pool->ask_memory(ST_ASIO_MSG_BUFFER_SIZE);
buff = pool->checkout(ST_ASIO_MSG_BUFFER_SIZE);
return boost::asio::buffer(buff->data(), buff->buffer_size());
}
protected:
msg_type::buffer_type buff; //equal to memory_pool::object_type
msg_type::buffer_type buff; //equal to memory_pool::buffer_type
memory_pool* pool;
};
......
......@@ -64,6 +64,7 @@ namespace st_asio_wrapper
virtual const char* data() const = 0;
};
//convert '->' operation to '.' operation
template<typename T>
class shared_buffer
{
......
......@@ -7,11 +7,11 @@
#define ST_ASIO_SERVER_PORT 9527
#define ST_ASIO_REUSE_OBJECT //use objects pool
//#define ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
#define ST_ASIO_WANT_MSG_SEND_NOTIFY
//#define ST_ASIO_WANT_MSG_SEND_NOTIFY
#define ST_ASIO_MSG_BUFFER_SIZE 65536
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 2
#define PACKER_UNPACKER_TYPE 1
//1-stream unpacker (non-protocol)
//2-pooled_stream_packer and pooled_stream_unpacker (non-protocol)
......@@ -169,7 +169,7 @@ int main(int argc, const char* argv[])
{
printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client.size(), client.valid_size(), client.invalid_object_size());
#if 2 == PACKER_UNPACKER_TYPE
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size());
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.available_size(), pool.available_buffer_size());
#endif
puts("");
puts(client.get_statistic().to_string().data());
......@@ -208,6 +208,10 @@ int main(int argc, const char* argv[])
}
}
#if 2 == PACKER_UNPACKER_TYPE
pool.stop();
#endif
return 0;
}
......
......@@ -8,7 +8,7 @@
#define ST_ASIO_MSG_BUFFER_SIZE 65536
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 2
#define PACKER_UNPACKER_TYPE 1
//1-stream unpacker (non-protocol)
//2-pooled_stream_packer and pooled_stream_unpacker (non-protocol)
......@@ -98,13 +98,17 @@ int main(int argc, const char* argv[])
{
printf("link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", echo_server_.size(), echo_server_.invalid_object_size());
#if 2 == PACKER_UNPACKER_TYPE
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size());
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.available_size(), pool.available_buffer_size());
#endif
puts("");
puts(echo_server_.get_statistic().to_string().data());
}
}
#if 2 == PACKER_UNPACKER_TYPE
pool.stop();
#endif
return 0;
}
......
......@@ -284,7 +284,7 @@ int main(int argc, const char* argv[])
{
printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client.size(), client.valid_size(), client.invalid_object_size());
#if 4 == PACKER_UNPACKER_TYPE
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size());
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.available_size(), pool.available_buffer_size());
#endif
puts("");
puts(client.get_statistic().to_string().data());
......@@ -447,6 +447,10 @@ int main(int argc, const char* argv[])
}
}
#if 4 == PACKER_UNPACKER_TYPE
pool.stop();
#endif
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册