提交 7d0730d9 编写于 作者: Y youngwolf 提交者: youngowlf

Make memory_pool thread safe.

上级 d500ded7
......@@ -10,10 +10,11 @@
//#define ST_ASIO_FULL_STATISTIC //full statistic will slightly impact efficiency.
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 1
#define PACKER_UNPACKER_TYPE 4
//1-default packer and unpacker, head(length) + body
//2-fixed length unpacker
//3-prefix and suffix packer and unpacker
//4-pooled packer and unpacker, head(length) + body
#if 1 == PACKER_UNPACKER_TYPE
//#define ST_ASIO_DEFAULT_PACKER replaceable_packer
......@@ -23,6 +24,9 @@
#elif 3 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_PACKER prefix_suffix_packer
#define ST_ASIO_DEFAULT_UNPACKER prefix_suffix_unpacker
#elif 4 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_PACKER pooled_packer
#define ST_ASIO_DEFAULT_UNPACKER pooled_unpacker
#endif
//configuration
......@@ -41,10 +45,9 @@ using namespace st_asio_wrapper::ext;
//under the default behavior, each st_tcp_socket has their own packer, and cause memory waste
//at here, we make each echo_socket use the same global packer for memory saving
//notice: do not do this for unpacker, because unpacker has member variables and can't share each other
#if 1 == PACKER_UNPACKER_TYPE || 2 == PACKER_UNPACKER_TYPE
auto global_packer(boost::make_shared<ST_ASIO_DEFAULT_PACKER>());
#elif 3 == PACKER_UNPACKER_TYPE
auto global_packer(boost::make_shared<prefix_suffix_packer>());
#if 4 == PACKER_UNPACKER_TYPE
memory_pool pool;
#endif
//demonstrate how to control the type of st_server_socket_base::server from template parameter
......@@ -62,9 +65,11 @@ public:
inner_packer(global_packer);
#if 2 == PACKER_UNPACKER_TYPE
dynamic_cast<fixed_length_unpacker*>(&*inner_unpacker())->fixed_length(1024);
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->fixed_length(1024);
#elif 3 == PACKER_UNPACKER_TYPE
dynamic_cast<prefix_suffix_unpacker*>(&*inner_unpacker())->prefix_suffix("begin", "end");
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->prefix_suffix("begin", "end");
#elif 4 == PACKER_UNPACKER_TYPE
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->mem_pool(pool);
#endif
}
......@@ -161,6 +166,13 @@ int main(int argc, const char* argv[])
auto thread_num = 1;
if (argc > 1)
thread_num = std::min(16, std::max(thread_num, atoi(argv[1])));
#if 3 == PACKER_UNPACKER_TYPE
global_packer->prefix_suffix("begin", "end");
#elif 4 == PACKER_UNPACKER_TYPE
global_packer->mem_pool(pool);
#endif
service_pump.start_service(thread_num);
while(service_pump.is_running())
{
......@@ -177,6 +189,10 @@ int main(int argc, const char* argv[])
{
printf("normal server, link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", server_.size(), server_.invalid_object_size());
printf("echo server, link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", echo_server_.size(), echo_server_.invalid_object_size());
#if 4 == PACKER_UNPACKER_TYPE
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size());
#endif
puts("");
puts(echo_server_.get_statistic().to_string().data());
}
//the following two commands demonstrate how to suspend msg dispatching, no matter recv buffer been used or not
......
......@@ -65,14 +65,23 @@ protected:
class memory_pool
{
public:
typedef most_primitive_buffer raw_object_type;
typedef const raw_object_type raw_object_ctype;
typedef boost::shared_ptr<most_primitive_buffer> object_type;
typedef const object_type object_ctype;
memory_pool() {}
memory_pool(size_t block_count, size_t block_size) {for (size_t i = 0; i < block_count; ++i) pool.push_back(boost::make_shared<most_primitive_buffer>(block_size));}
memory_pool(size_t block_count, size_t block_size) {init_pool(block_count, block_size);}
//not thread safe
void init_pool(size_t block_count, size_t block_size) {for (size_t i = 0; i < block_count; ++i) pool.push_back(boost::make_shared<most_primitive_buffer>(block_size));}
size_t size() {boost::shared_lock<boost::shared_mutex> lock(mutex); return pool.size();} //memory block amount
uint_fast64_t buffer_size() {uint_fast64_t size = 0; do_something_to_all(pool, mutex, [&](object_ctype& item) {size += item->buffer_size();}); return size;}
size_t size() const {return pool.size();}
object_type ask_memory(size_t block_size, bool best_fit = false)
{
boost::unique_lock<boost::shared_mutex> lock(mutex);
object_type re;
if (best_fit)
{
......@@ -109,6 +118,7 @@ public:
protected:
boost::container::list<object_type> pool;
boost::shared_mutex mutex;
};
}} //namespace
......
......@@ -53,6 +53,7 @@ public:
}
};
//protocol: length + body
class packer : public i_packer<std::string>
{
public:
......@@ -97,6 +98,64 @@ public:
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;}
};
//protocol: length + body
//use memory pool
class pooled_packer : public i_packer<shared_buffer<memory_pool::raw_object_type>>
{
public:
static size_t get_max_msg_size() {return ST_ASIO_MSG_BUFFER_SIZE - ST_ASIO_HEAD_LEN;}
pooled_packer() : pool(nullptr) {}
void mem_pool(memory_pool& _pool) {pool = &_pool;}
memory_pool& mem_pool() {return *pool;}
using i_packer<msg_type>::pack_msg;
virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false)
{
msg_type msg;
auto pre_len = native ? 0 : ST_ASIO_HEAD_LEN;
auto total_len = packer_helper::msg_size_check(pre_len, pstr, len, num);
if ((size_t) -1 == total_len)
return msg;
else if (total_len > pre_len)
{
if (!native)
{
auto head_len = (ST_ASIO_HEAD_TYPE) total_len;
if (total_len != head_len)
{
unified_out::error_out("pack msg error: length exceeded the header's range!");
return msg;
}
msg.raw_buffer(pool->ask_memory(total_len));
head_len = ST_ASIO_HEAD_H2N(head_len);
memcpy(msg.raw_buffer()->data(), (const char*) &head_len, ST_ASIO_HEAD_LEN);
}
else
msg.raw_buffer(pool->ask_memory(total_len));
total_len = pre_len;
for (size_t i = 0; i < num; ++i)
if (nullptr != pstr[i])
{
memcpy(std::next(msg.raw_buffer()->data(), total_len), pstr[i], len[i]);
total_len += len[i];
}
} //if (total_len > pre_len)
return msg;
}
virtual char* raw_data(msg_type& msg) const {return std::next(msg.raw_buffer()->data(), ST_ASIO_HEAD_LEN);}
virtual const char* raw_data(msg_ctype& msg) const {return std::next(msg.data(), ST_ASIO_HEAD_LEN);}
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;}
protected:
memory_pool* pool;
};
//protocol: length + body
class replaceable_packer : public i_packer<replaceable_buffer>
{
public:
......@@ -116,6 +175,7 @@ public:
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;}
};
//protocol: [prefix] + body + suffix
class prefix_suffix_packer : public i_packer<std::string>
{
public:
......@@ -155,9 +215,15 @@ private:
std::string _prefix, _suffix;
};
class pooled_stream_packer : public i_packer<shared_buffer<most_primitive_buffer>>
//protocol: stream (non-protocol)
//use memory pool
class pooled_stream_packer : public i_packer<shared_buffer<memory_pool::raw_object_type>>
{
public:
pooled_stream_packer() : pool(nullptr) {}
void mem_pool(memory_pool& _pool) {pool = &_pool;}
memory_pool& mem_pool() {return *pool;}
using i_packer<msg_type>::pack_msg;
virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false) //native will not take effect
{
......@@ -167,7 +233,7 @@ public:
return msg;
else if (total_len > 0)
{
msg.raw_buffer(msg_pool.ask_memory(total_len));
msg.raw_buffer(pool->ask_memory(total_len));
total_len = 0;
for (size_t i = 0; i < num; ++i)
......@@ -186,7 +252,7 @@ public:
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size();}
protected:
memory_pool msg_pool;
memory_pool* pool;
};
}} //namespace
......
......@@ -26,6 +26,7 @@
namespace st_asio_wrapper { namespace ext {
//protocol: length + body
class unpacker : public i_unpacker<std::string>
{
public:
......@@ -127,6 +128,48 @@ protected:
size_t remain_len; //half-baked msg
};
//protocol: length + body
//use memory pool
class pooled_unpacker : public i_unpacker<shared_buffer<most_primitive_buffer>>, public unpacker
{
public:
using i_unpacker<shared_buffer<most_primitive_buffer>>::msg_type;
using i_unpacker<shared_buffer<most_primitive_buffer>>::msg_ctype;
using i_unpacker<shared_buffer<most_primitive_buffer>>::container_type;
pooled_unpacker() : pool(nullptr) {}
void mem_pool(memory_pool& _pool) {pool = &_pool;}
memory_pool& mem_pool() {return *pool;}
virtual void reset_state() {unpacker::reset_state();}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
boost::container::list<std::pair<const char*, size_t>> msg_pos_can;
auto unpack_ok = unpacker::parse_msg(bytes_transferred, msg_pos_can);
do_something_to_all(msg_pos_can, [this, &msg_can](decltype(*std::begin(msg_pos_can))& item) {
auto buff = pool->ask_memory(item.second);
memcpy(buff->data(), item.first, item.second);
msg_can.push_back(shared_buffer<most_primitive_buffer>(buff));
});
if (unpack_ok && remain_len > 0)
{
auto pnext = std::next(msg_pos_can.back().first, msg_pos_can.back().second);
memcpy(std::begin(raw_buff), pnext, remain_len); //left behind unparsed data
}
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
return unpack_ok;
}
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return unpacker::completion_condition(ec, bytes_transferred);}
virtual boost::asio::mutable_buffers_1 prepare_next_recv() {return unpacker::prepare_next_recv();}
protected:
memory_pool* pool;
};
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
class udp_unpacker : public i_udp_unpacker<std::string>
{
public:
......@@ -137,6 +180,7 @@ protected:
boost::array<char, ST_ASIO_MSG_BUFFER_SIZE> raw_buff;
};
//protocol: length + body
class replaceable_unpacker : public i_unpacker<replaceable_buffer>
{
public:
......@@ -146,10 +190,10 @@ public:
unpacker::container_type tmp_can;
auto unpack_ok = unpacker_.parse_msg(bytes_transferred, tmp_can);
do_something_to_all(tmp_can, [&msg_can](decltype(*std::begin(tmp_can))& item) {
auto com = boost::make_shared<string_buffer>();
com->swap(item);
auto raw_buffer = boost::make_shared<string_buffer>();
raw_buffer->swap(item);
msg_can.resize(msg_can.size() + 1);
msg_can.back().raw_buffer(com);
msg_can.back().raw_buffer(raw_buffer);
});
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
......@@ -163,6 +207,7 @@ protected:
unpacker unpacker_;
};
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
class replaceable_udp_unpacker : public i_udp_unpacker<replaceable_buffer>
{
public:
......@@ -179,6 +224,7 @@ protected:
boost::array<char, ST_ASIO_MSG_BUFFER_SIZE> raw_buff;
};
//protocol: length + body
//this unpacker demonstrate how to forbid memory copying while parsing msgs (let asio write msg directly).
class buffer_free_unpacker : public i_unpacker<most_primitive_buffer>
{
......@@ -260,6 +306,7 @@ private:
int step; //-1-error format, 0-want the head, 1-want the body
};
//protocol: fixed lenght
class fixed_length_unpacker : public i_unpacker<std::string>
{
public:
......@@ -321,6 +368,7 @@ private:
size_t remain_len; //half-baked msg
};
//protocol: [prefix] + body + suffix
class prefix_suffix_unpacker : public i_unpacker<std::string>
{
public:
......@@ -435,6 +483,7 @@ private:
size_t remain_len; //half-baked msg
};
//protocol: stream (non-protocol)
class stream_unpacker : public i_unpacker<std::string>
{
public:
......@@ -458,9 +507,15 @@ protected:
boost::array<char, ST_ASIO_MSG_BUFFER_SIZE> raw_buff;
};
//protocol: stream (non-protocol)
//use memory pool
class pooled_stream_unpacker : public i_unpacker<shared_buffer<most_primitive_buffer>>
{
public:
pooled_stream_unpacker() : pool(nullptr) {}
void mem_pool(memory_pool& _pool) {pool = &_pool;}
memory_pool& mem_pool() {return *pool;}
virtual void reset_state() {}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
......@@ -469,22 +524,21 @@ public:
assert(bytes_transferred <= ST_ASIO_MSG_BUFFER_SIZE);
msg_can.resize(msg_can.size() + 1);
buff->size(bytes_transferred);
msg_can.back().raw_buffer(buff);
msg_can.push_back(shared_buffer<most_primitive_buffer>(buff));
return true;
}
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return ec || bytes_transferred > 0 ? 0 : boost::asio::detail::default_max_transfer_size;}
virtual boost::asio::mutable_buffers_1 prepare_next_recv()
{
buff = msg_pool.ask_memory(ST_ASIO_MSG_BUFFER_SIZE);
return boost::asio::buffer(buff->data(), buff->size());
buff = pool->ask_memory(ST_ASIO_MSG_BUFFER_SIZE);
return boost::asio::buffer(buff->data(), buff->buffer_size());
}
protected:
msg_type::buffer_type buff;
memory_pool msg_pool;
msg_type::buffer_type buff; //equal to memory_pool::object_type
memory_pool* pool;
};
}} //namespace
......
......@@ -7,11 +7,20 @@
#define ST_ASIO_SERVER_PORT 9527
#define ST_ASIO_REUSE_OBJECT //use objects pool
//#define ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
//#define ST_ASIO_WANT_MSG_SEND_NOTIFY
//#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
#define ST_ASIO_WANT_MSG_SEND_NOTIFY
#define ST_ASIO_MSG_BUFFER_SIZE 65536
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 2
//1-stream unpacker (non-protocol)
//2-pooled_stream_packer and pooled_stream_unpacker (non-protocol)
#if 1 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
#elif 2 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_PACKER pooled_stream_packer
#define ST_ASIO_DEFAULT_UNPACKER pooled_stream_unpacker
#define ST_ASIO_MSG_BUFFER_SIZE 65536
#endif
//configuration
#include "../include/ext/st_asio_wrapper_net.h"
......@@ -31,11 +40,20 @@ boost::atomic_ushort completed_session_num;
#else
st_atomic<unsigned short> completed_session_num;
#endif
#if 2 == PACKER_UNPACKER_TYPE
memory_pool pool;
#endif
class echo_socket : public st_connector
{
public:
echo_socket(boost::asio::io_service& io_service_) : st_connector(io_service_) {}
echo_socket(boost::asio::io_service& io_service_) : st_connector(io_service_)
{
#if 2 == PACKER_UNPACKER_TYPE
dynamic_cast<ST_ASIO_DEFAULT_PACKER*>(&*inner_packer())->mem_pool(pool);
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->mem_pool(pool);
#endif
}
void begin(size_t msg_num, const char* msg, size_t msg_len)
{
......@@ -150,6 +168,10 @@ int main(int argc, const char* argv[])
else if (LIST_STATUS == str)
{
printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client.size(), client.valid_size(), client.invalid_object_size());
#if 2 == PACKER_UNPACKER_TYPE
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size());
#endif
puts("");
puts(client.get_statistic().to_string().data());
}
else if (!str.empty())
......
......@@ -5,10 +5,19 @@
#define ST_ASIO_SERVER_PORT 9527
#define ST_ASIO_REUSE_OBJECT //use objects pool
//#define ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
//#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
#define ST_ASIO_MSG_BUFFER_SIZE 65536
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 2
//1-stream unpacker (non-protocol)
//2-pooled_stream_packer and pooled_stream_unpacker (non-protocol)
#if 1 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
#elif 2 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_PACKER pooled_stream_packer
#define ST_ASIO_DEFAULT_UNPACKER pooled_stream_unpacker
#define ST_ASIO_MSG_BUFFER_SIZE 65536
#endif
//configuration
#include "../include/ext/st_asio_wrapper_net.h"
......@@ -22,10 +31,20 @@ using namespace st_asio_wrapper::ext;
#define QUIT_COMMAND "quit"
#define LIST_STATUS "status"
#if 2 == PACKER_UNPACKER_TYPE
memory_pool pool;
#endif
class echo_socket : public st_server_socket
{
public:
echo_socket(i_server& server_) : st_server_socket(server_) {}
echo_socket(i_server& server_) : st_server_socket(server_)
{
#if 2 == PACKER_UNPACKER_TYPE
dynamic_cast<ST_ASIO_DEFAULT_PACKER*>(&*inner_packer())->mem_pool(pool);
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->mem_pool(pool);
#endif
}
protected:
#ifndef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
......@@ -78,6 +97,10 @@ int main(int argc, const char* argv[])
else if (LIST_STATUS == str)
{
printf("link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", echo_server_.size(), echo_server_.invalid_object_size());
#if 2 == PACKER_UNPACKER_TYPE
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size());
#endif
puts("");
puts(echo_server_.get_statistic().to_string().data());
}
}
......
......@@ -13,10 +13,11 @@
//configuration
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 1
#define PACKER_UNPACKER_TYPE 4
//1-default packer and unpacker, head(length) + body
//2-fixed length unpacker
//3-prefix and suffix packer and unpacker
//4-pooled packer and unpacker, head(length) + body
#if 1 == PACKER_UNPACKER_TYPE
//#define ST_ASIO_DEFAULT_PACKER replaceable_packer
......@@ -26,6 +27,9 @@
#elif 3 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_PACKER prefix_suffix_packer
#define ST_ASIO_DEFAULT_UNPACKER prefix_suffix_unpacker
#elif 4 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_PACKER pooled_packer
#define ST_ASIO_DEFAULT_UNPACKER pooled_unpacker
#endif
#include "../include/ext/st_asio_wrapper_net.h"
......@@ -44,7 +48,9 @@ using namespace st_asio_wrapper::ext;
#define RESUME_COMMAND "resume"
static bool check_msg;
#if 4 == PACKER_UNPACKER_TYPE
memory_pool pool;
#endif
///////////////////////////////////////////////////
//msg sending interface
#define TCP_RANDOM_SEND_MSG(FUNNAME, SEND_FUNNAME) \
......@@ -63,10 +69,13 @@ public:
test_socket(boost::asio::io_service& io_service_) : st_connector(io_service_), recv_bytes(0), recv_index(0)
{
#if 2 == PACKER_UNPACKER_TYPE
dynamic_cast<fixed_length_unpacker*>(&*inner_unpacker())->fixed_length(1024);
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->fixed_length(1024);
#elif 3 == PACKER_UNPACKER_TYPE
dynamic_cast<prefix_suffix_packer*>(&*inner_packer())->prefix_suffix("begin", "end");
dynamic_cast<prefix_suffix_unpacker*>(&*inner_unpacker())->prefix_suffix("begin", "end");
dynamic_cast<ST_ASIO_DEFAULT_PACKER*>(&*inner_packer())->prefix_suffix("begin", "end");
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->prefix_suffix("begin", "end");
#elif 4 == PACKER_UNPACKER_TYPE
dynamic_cast<ST_ASIO_DEFAULT_PACKER*>(&*inner_packer())->mem_pool(pool);
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->mem_pool(pool);
#endif
}
......@@ -83,11 +92,13 @@ public:
memset(buff, msg_fill, msg_len);
memcpy(buff, &recv_index, sizeof(size_t)); //seq
#if 2 == PACKER_UNPACKER_TYPE //there's no fixed_length_packer
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
send_native_msg(buff, msg_len);
#else
send_msg(buff, msg_len);
#endif
delete[] buff;
}
......@@ -113,14 +124,19 @@ protected:
#else
auto pstr = inner_packer()->raw_data(msg);
auto msg_len = inner_packer()->raw_data_len(msg);
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
std::advance(pstr, -ST_ASIO_HEAD_LEN);
msg_len += ST_ASIO_HEAD_LEN;
#endif
size_t send_index;
memcpy(&send_index, pstr, sizeof(size_t));
++send_index;
memcpy(pstr, &send_index, sizeof(size_t));
memcpy(pstr, &send_index, sizeof(size_t)); //seq
#if 2 == PACKER_UNPACKER_TYPE //there's no fixed_length_packer
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
send_native_msg(pstr, msg_len);
#else
send_msg(pstr, msg_len);
......@@ -267,6 +283,10 @@ int main(int argc, const char* argv[])
else if (LIST_STATUS == str)
{
printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client.size(), client.valid_size(), client.invalid_object_size());
#if 4 == PACKER_UNPACKER_TYPE
printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size());
#endif
puts("");
puts(client.get_statistic().to_string().data());
}
//the following two commands demonstrate how to suspend msg dispatching, no matter recv buffer been used or not
......@@ -312,14 +332,12 @@ int main(int argc, const char* argv[])
auto iter = std::begin(tok);
if (iter != std::end(tok)) msg_num = std::max((size_t) atoll(iter++->data()), (size_t) 1);
auto native = false;
#if 1 == PACKER_UNPACKER_TYPE
if (iter != std::end(tok)) msg_len = std::min(packer::get_max_msg_size(),
std::max((size_t) atoi(iter++->data()), sizeof(size_t))); //include seq
#elif 2 == PACKER_UNPACKER_TYPE
if (iter != std::end(tok)) ++iter;
msg_len = 1024; //we hard code this because we fixedly initialized the length of fixed_length_unpacker to 1024
native = true; //we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
#elif 3 == PACKER_UNPACKER_TYPE
if (iter != std::end(tok)) msg_len = std::min((size_t) ST_ASIO_MSG_BUFFER_SIZE,
std::max((size_t) atoi(iter++->data()), sizeof(size_t)));
......@@ -387,11 +405,21 @@ int main(int argc, const char* argv[])
switch (model)
{
case 0:
native ? client.safe_broadcast_native_msg(buff, msg_len) : client.safe_broadcast_msg(buff, msg_len);
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
client.safe_broadcast_native_msg(buff, msg_len);
#else
client.safe_broadcast_msg(buff, msg_len);
#endif
send_bytes += link_num * msg_len;
break;
case 1:
native ? client.safe_random_send_native_msg(buff, msg_len) : client.safe_random_send_msg(buff, msg_len);
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
client.safe_random_send_native_msg(buff, msg_len);
#else
client.safe_random_send_msg(buff, msg_len);
#endif
send_bytes += msg_len;
break;
default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册