From 7d0730d9aa77a91e31e55e3a1193b33c87cea2da Mon Sep 17 00:00:00 2001 From: youngwolf Date: Tue, 2 Aug 2016 16:11:54 +0800 Subject: [PATCH] Make memory_pool thread safe. --- asio_server/asio_server.cpp | 28 +++++++--- include/ext/st_asio_wrapper_ext.h | 14 ++++- include/ext/st_asio_wrapper_packer.h | 72 ++++++++++++++++++++++++-- include/ext/st_asio_wrapper_unpacker.h | 72 ++++++++++++++++++++++---- pingpong_client/client.cpp | 30 +++++++++-- pingpong_server/server.cpp | 29 +++++++++-- test_client/test_client.cpp | 52 ++++++++++++++----- 7 files changed, 258 insertions(+), 39 deletions(-) diff --git a/asio_server/asio_server.cpp b/asio_server/asio_server.cpp index 4a1de69..fdbb016 100644 --- a/asio_server/asio_server.cpp +++ b/asio_server/asio_server.cpp @@ -10,10 +10,11 @@ //#define ST_ASIO_FULL_STATISTIC //full statistic will slightly impact efficiency. //use the following macro to control the type of packer and unpacker -#define PACKER_UNPACKER_TYPE 1 +#define PACKER_UNPACKER_TYPE 4 //1-default packer and unpacker, head(length) + body //2-fixed length unpacker //3-prefix and suffix packer and unpacker +//4-pooled packer and unpacker, head(length) + body #if 1 == PACKER_UNPACKER_TYPE //#define ST_ASIO_DEFAULT_PACKER replaceable_packer @@ -23,6 +24,9 @@ #elif 3 == PACKER_UNPACKER_TYPE #define ST_ASIO_DEFAULT_PACKER prefix_suffix_packer #define ST_ASIO_DEFAULT_UNPACKER prefix_suffix_unpacker +#elif 4 == PACKER_UNPACKER_TYPE +#define ST_ASIO_DEFAULT_PACKER pooled_packer +#define ST_ASIO_DEFAULT_UNPACKER pooled_unpacker #endif //configuration @@ -41,10 +45,9 @@ using namespace st_asio_wrapper::ext; //under the default behavior, each st_tcp_socket has their own packer, and cause memory waste //at here, we make each echo_socket use the same global packer for memory saving //notice: do not do this for unpacker, because unpacker has member variables and can't share each other -#if 1 == PACKER_UNPACKER_TYPE || 2 == PACKER_UNPACKER_TYPE auto global_packer(boost::make_shared()); -#elif 3 == PACKER_UNPACKER_TYPE -auto global_packer(boost::make_shared()); +#if 4 == PACKER_UNPACKER_TYPE +memory_pool pool; #endif //demonstrate how to control the type of st_server_socket_base::server from template parameter @@ -62,9 +65,11 @@ public: inner_packer(global_packer); #if 2 == PACKER_UNPACKER_TYPE - dynamic_cast(&*inner_unpacker())->fixed_length(1024); + dynamic_cast(&*inner_unpacker())->fixed_length(1024); #elif 3 == PACKER_UNPACKER_TYPE - dynamic_cast(&*inner_unpacker())->prefix_suffix("begin", "end"); + dynamic_cast(&*inner_unpacker())->prefix_suffix("begin", "end"); +#elif 4 == PACKER_UNPACKER_TYPE + dynamic_cast(&*inner_unpacker())->mem_pool(pool); #endif } @@ -161,6 +166,13 @@ int main(int argc, const char* argv[]) auto thread_num = 1; if (argc > 1) thread_num = std::min(16, std::max(thread_num, atoi(argv[1]))); + +#if 3 == PACKER_UNPACKER_TYPE + global_packer->prefix_suffix("begin", "end"); +#elif 4 == PACKER_UNPACKER_TYPE + global_packer->mem_pool(pool); +#endif + service_pump.start_service(thread_num); while(service_pump.is_running()) { @@ -177,6 +189,10 @@ int main(int argc, const char* argv[]) { printf("normal server, link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", server_.size(), server_.invalid_object_size()); printf("echo server, link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", echo_server_.size(), echo_server_.invalid_object_size()); +#if 4 == PACKER_UNPACKER_TYPE + printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size()); +#endif + puts(""); puts(echo_server_.get_statistic().to_string().data()); } //the following two commands demonstrate how to suspend msg dispatching, no matter recv buffer been used or not diff --git a/include/ext/st_asio_wrapper_ext.h b/include/ext/st_asio_wrapper_ext.h index 4a615ac..bb40d93 100644 --- a/include/ext/st_asio_wrapper_ext.h +++ b/include/ext/st_asio_wrapper_ext.h @@ -65,14 +65,23 @@ protected: class memory_pool { public: + typedef most_primitive_buffer raw_object_type; + typedef const raw_object_type raw_object_ctype; typedef boost::shared_ptr object_type; + typedef const object_type object_ctype; memory_pool() {} - memory_pool(size_t block_count, size_t block_size) {for (size_t i = 0; i < block_count; ++i) pool.push_back(boost::make_shared(block_size));} + memory_pool(size_t block_count, size_t block_size) {init_pool(block_count, block_size);} + //not thread safe + void init_pool(size_t block_count, size_t block_size) {for (size_t i = 0; i < block_count; ++i) pool.push_back(boost::make_shared(block_size));} + + size_t size() {boost::shared_lock lock(mutex); return pool.size();} //memory block amount + uint_fast64_t buffer_size() {uint_fast64_t size = 0; do_something_to_all(pool, mutex, [&](object_ctype& item) {size += item->buffer_size();}); return size;} - size_t size() const {return pool.size();} object_type ask_memory(size_t block_size, bool best_fit = false) { + boost::unique_lock lock(mutex); + object_type re; if (best_fit) { @@ -109,6 +118,7 @@ public: protected: boost::container::list pool; + boost::shared_mutex mutex; }; }} //namespace diff --git a/include/ext/st_asio_wrapper_packer.h b/include/ext/st_asio_wrapper_packer.h index 4384519..52effd1 100644 --- a/include/ext/st_asio_wrapper_packer.h +++ b/include/ext/st_asio_wrapper_packer.h @@ -53,6 +53,7 @@ public: } }; +//protocol: length + body class packer : public i_packer { public: @@ -97,6 +98,64 @@ public: virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;} }; +//protocol: length + body +//use memory pool +class pooled_packer : public i_packer> +{ +public: + static size_t get_max_msg_size() {return ST_ASIO_MSG_BUFFER_SIZE - ST_ASIO_HEAD_LEN;} + + pooled_packer() : pool(nullptr) {} + void mem_pool(memory_pool& _pool) {pool = &_pool;} + memory_pool& mem_pool() {return *pool;} + + using i_packer::pack_msg; + virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false) + { + msg_type msg; + auto pre_len = native ? 0 : ST_ASIO_HEAD_LEN; + auto total_len = packer_helper::msg_size_check(pre_len, pstr, len, num); + if ((size_t) -1 == total_len) + return msg; + else if (total_len > pre_len) + { + if (!native) + { + auto head_len = (ST_ASIO_HEAD_TYPE) total_len; + if (total_len != head_len) + { + unified_out::error_out("pack msg error: length exceeded the header's range!"); + return msg; + } + + msg.raw_buffer(pool->ask_memory(total_len)); + head_len = ST_ASIO_HEAD_H2N(head_len); + memcpy(msg.raw_buffer()->data(), (const char*) &head_len, ST_ASIO_HEAD_LEN); + } + else + msg.raw_buffer(pool->ask_memory(total_len)); + + total_len = pre_len; + for (size_t i = 0; i < num; ++i) + if (nullptr != pstr[i]) + { + memcpy(std::next(msg.raw_buffer()->data(), total_len), pstr[i], len[i]); + total_len += len[i]; + } + } //if (total_len > pre_len) + + return msg; + } + + virtual char* raw_data(msg_type& msg) const {return std::next(msg.raw_buffer()->data(), ST_ASIO_HEAD_LEN);} + virtual const char* raw_data(msg_ctype& msg) const {return std::next(msg.data(), ST_ASIO_HEAD_LEN);} + virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;} + +protected: + memory_pool* pool; +}; + +//protocol: length + body class replaceable_packer : public i_packer { public: @@ -116,6 +175,7 @@ public: virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;} }; +//protocol: [prefix] + body + suffix class prefix_suffix_packer : public i_packer { public: @@ -155,9 +215,15 @@ private: std::string _prefix, _suffix; }; -class pooled_stream_packer : public i_packer> +//protocol: stream (non-protocol) +//use memory pool +class pooled_stream_packer : public i_packer> { public: + pooled_stream_packer() : pool(nullptr) {} + void mem_pool(memory_pool& _pool) {pool = &_pool;} + memory_pool& mem_pool() {return *pool;} + using i_packer::pack_msg; virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false) //native will not take effect { @@ -167,7 +233,7 @@ public: return msg; else if (total_len > 0) { - msg.raw_buffer(msg_pool.ask_memory(total_len)); + msg.raw_buffer(pool->ask_memory(total_len)); total_len = 0; for (size_t i = 0; i < num; ++i) @@ -186,7 +252,7 @@ public: virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size();} protected: - memory_pool msg_pool; + memory_pool* pool; }; }} //namespace diff --git a/include/ext/st_asio_wrapper_unpacker.h b/include/ext/st_asio_wrapper_unpacker.h index c1e12c5..a0a937c 100644 --- a/include/ext/st_asio_wrapper_unpacker.h +++ b/include/ext/st_asio_wrapper_unpacker.h @@ -26,6 +26,7 @@ namespace st_asio_wrapper { namespace ext { +//protocol: length + body class unpacker : public i_unpacker { public: @@ -127,6 +128,48 @@ protected: size_t remain_len; //half-baked msg }; +//protocol: length + body +//use memory pool +class pooled_unpacker : public i_unpacker>, public unpacker +{ +public: + using i_unpacker>::msg_type; + using i_unpacker>::msg_ctype; + using i_unpacker>::container_type; + + pooled_unpacker() : pool(nullptr) {} + void mem_pool(memory_pool& _pool) {pool = &_pool;} + memory_pool& mem_pool() {return *pool;} + + virtual void reset_state() {unpacker::reset_state();} + virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) + { + boost::container::list> msg_pos_can; + auto unpack_ok = unpacker::parse_msg(bytes_transferred, msg_pos_can); + do_something_to_all(msg_pos_can, [this, &msg_can](decltype(*std::begin(msg_pos_can))& item) { + auto buff = pool->ask_memory(item.second); + memcpy(buff->data(), item.first, item.second); + msg_can.push_back(shared_buffer(buff)); + }); + + if (unpack_ok && remain_len > 0) + { + auto pnext = std::next(msg_pos_can.back().first, msg_pos_can.back().second); + memcpy(std::begin(raw_buff), pnext, remain_len); //left behind unparsed data + } + + //if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note. + return unpack_ok; + } + + virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return unpacker::completion_condition(ec, bytes_transferred);} + virtual boost::asio::mutable_buffers_1 prepare_next_recv() {return unpacker::prepare_next_recv();} + +protected: + memory_pool* pool; +}; + +//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it. class udp_unpacker : public i_udp_unpacker { public: @@ -137,6 +180,7 @@ protected: boost::array raw_buff; }; +//protocol: length + body class replaceable_unpacker : public i_unpacker { public: @@ -146,10 +190,10 @@ public: unpacker::container_type tmp_can; auto unpack_ok = unpacker_.parse_msg(bytes_transferred, tmp_can); do_something_to_all(tmp_can, [&msg_can](decltype(*std::begin(tmp_can))& item) { - auto com = boost::make_shared(); - com->swap(item); + auto raw_buffer = boost::make_shared(); + raw_buffer->swap(item); msg_can.resize(msg_can.size() + 1); - msg_can.back().raw_buffer(com); + msg_can.back().raw_buffer(raw_buffer); }); //if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note. @@ -163,6 +207,7 @@ protected: unpacker unpacker_; }; +//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it. class replaceable_udp_unpacker : public i_udp_unpacker { public: @@ -179,6 +224,7 @@ protected: boost::array raw_buff; }; +//protocol: length + body //this unpacker demonstrate how to forbid memory copying while parsing msgs (let asio write msg directly). class buffer_free_unpacker : public i_unpacker { @@ -260,6 +306,7 @@ private: int step; //-1-error format, 0-want the head, 1-want the body }; +//protocol: fixed lenght class fixed_length_unpacker : public i_unpacker { public: @@ -321,6 +368,7 @@ private: size_t remain_len; //half-baked msg }; +//protocol: [prefix] + body + suffix class prefix_suffix_unpacker : public i_unpacker { public: @@ -435,6 +483,7 @@ private: size_t remain_len; //half-baked msg }; +//protocol: stream (non-protocol) class stream_unpacker : public i_unpacker { public: @@ -458,9 +507,15 @@ protected: boost::array raw_buff; }; +//protocol: stream (non-protocol) +//use memory pool class pooled_stream_unpacker : public i_unpacker> { public: + pooled_stream_unpacker() : pool(nullptr) {} + void mem_pool(memory_pool& _pool) {pool = &_pool;} + memory_pool& mem_pool() {return *pool;} + virtual void reset_state() {} virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) { @@ -469,22 +524,21 @@ public: assert(bytes_transferred <= ST_ASIO_MSG_BUFFER_SIZE); - msg_can.resize(msg_can.size() + 1); buff->size(bytes_transferred); - msg_can.back().raw_buffer(buff); + msg_can.push_back(shared_buffer(buff)); return true; } virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return ec || bytes_transferred > 0 ? 0 : boost::asio::detail::default_max_transfer_size;} virtual boost::asio::mutable_buffers_1 prepare_next_recv() { - buff = msg_pool.ask_memory(ST_ASIO_MSG_BUFFER_SIZE); - return boost::asio::buffer(buff->data(), buff->size()); + buff = pool->ask_memory(ST_ASIO_MSG_BUFFER_SIZE); + return boost::asio::buffer(buff->data(), buff->buffer_size()); } protected: - msg_type::buffer_type buff; - memory_pool msg_pool; + msg_type::buffer_type buff; //equal to memory_pool::object_type + memory_pool* pool; }; }} //namespace diff --git a/pingpong_client/client.cpp b/pingpong_client/client.cpp index 019caac..55cafcd 100644 --- a/pingpong_client/client.cpp +++ b/pingpong_client/client.cpp @@ -7,11 +7,20 @@ #define ST_ASIO_SERVER_PORT 9527 #define ST_ASIO_REUSE_OBJECT //use objects pool //#define ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER -//#define ST_ASIO_WANT_MSG_SEND_NOTIFY -//#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker +#define ST_ASIO_WANT_MSG_SEND_NOTIFY +#define ST_ASIO_MSG_BUFFER_SIZE 65536 + +//use the following macro to control the type of packer and unpacker +#define PACKER_UNPACKER_TYPE 2 +//1-stream unpacker (non-protocol) +//2-pooled_stream_packer and pooled_stream_unpacker (non-protocol) + +#if 1 == PACKER_UNPACKER_TYPE +#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker +#elif 2 == PACKER_UNPACKER_TYPE #define ST_ASIO_DEFAULT_PACKER pooled_stream_packer #define ST_ASIO_DEFAULT_UNPACKER pooled_stream_unpacker -#define ST_ASIO_MSG_BUFFER_SIZE 65536 +#endif //configuration #include "../include/ext/st_asio_wrapper_net.h" @@ -31,11 +40,20 @@ boost::atomic_ushort completed_session_num; #else st_atomic completed_session_num; #endif +#if 2 == PACKER_UNPACKER_TYPE +memory_pool pool; +#endif class echo_socket : public st_connector { public: - echo_socket(boost::asio::io_service& io_service_) : st_connector(io_service_) {} + echo_socket(boost::asio::io_service& io_service_) : st_connector(io_service_) + { +#if 2 == PACKER_UNPACKER_TYPE + dynamic_cast(&*inner_packer())->mem_pool(pool); + dynamic_cast(&*inner_unpacker())->mem_pool(pool); +#endif + } void begin(size_t msg_num, const char* msg, size_t msg_len) { @@ -150,6 +168,10 @@ int main(int argc, const char* argv[]) else if (LIST_STATUS == str) { printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client.size(), client.valid_size(), client.invalid_object_size()); +#if 2 == PACKER_UNPACKER_TYPE + printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size()); +#endif + puts(""); puts(client.get_statistic().to_string().data()); } else if (!str.empty()) diff --git a/pingpong_server/server.cpp b/pingpong_server/server.cpp index aaf9aa7..c5d7893 100644 --- a/pingpong_server/server.cpp +++ b/pingpong_server/server.cpp @@ -5,10 +5,19 @@ #define ST_ASIO_SERVER_PORT 9527 #define ST_ASIO_REUSE_OBJECT //use objects pool //#define ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER -//#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker +#define ST_ASIO_MSG_BUFFER_SIZE 65536 + +//use the following macro to control the type of packer and unpacker +#define PACKER_UNPACKER_TYPE 2 +//1-stream unpacker (non-protocol) +//2-pooled_stream_packer and pooled_stream_unpacker (non-protocol) + +#if 1 == PACKER_UNPACKER_TYPE +#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker +#elif 2 == PACKER_UNPACKER_TYPE #define ST_ASIO_DEFAULT_PACKER pooled_stream_packer #define ST_ASIO_DEFAULT_UNPACKER pooled_stream_unpacker -#define ST_ASIO_MSG_BUFFER_SIZE 65536 +#endif //configuration #include "../include/ext/st_asio_wrapper_net.h" @@ -22,10 +31,20 @@ using namespace st_asio_wrapper::ext; #define QUIT_COMMAND "quit" #define LIST_STATUS "status" +#if 2 == PACKER_UNPACKER_TYPE +memory_pool pool; +#endif + class echo_socket : public st_server_socket { public: - echo_socket(i_server& server_) : st_server_socket(server_) {} + echo_socket(i_server& server_) : st_server_socket(server_) + { +#if 2 == PACKER_UNPACKER_TYPE + dynamic_cast(&*inner_packer())->mem_pool(pool); + dynamic_cast(&*inner_unpacker())->mem_pool(pool); +#endif + } protected: #ifndef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER @@ -78,6 +97,10 @@ int main(int argc, const char* argv[]) else if (LIST_STATUS == str) { printf("link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", echo_server_.size(), echo_server_.invalid_object_size()); +#if 2 == PACKER_UNPACKER_TYPE + printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size()); +#endif + puts(""); puts(echo_server_.get_statistic().to_string().data()); } } diff --git a/test_client/test_client.cpp b/test_client/test_client.cpp index e870ccf..f9dcad1 100644 --- a/test_client/test_client.cpp +++ b/test_client/test_client.cpp @@ -13,10 +13,11 @@ //configuration //use the following macro to control the type of packer and unpacker -#define PACKER_UNPACKER_TYPE 1 +#define PACKER_UNPACKER_TYPE 4 //1-default packer and unpacker, head(length) + body //2-fixed length unpacker //3-prefix and suffix packer and unpacker +//4-pooled packer and unpacker, head(length) + body #if 1 == PACKER_UNPACKER_TYPE //#define ST_ASIO_DEFAULT_PACKER replaceable_packer @@ -26,6 +27,9 @@ #elif 3 == PACKER_UNPACKER_TYPE #define ST_ASIO_DEFAULT_PACKER prefix_suffix_packer #define ST_ASIO_DEFAULT_UNPACKER prefix_suffix_unpacker +#elif 4 == PACKER_UNPACKER_TYPE +#define ST_ASIO_DEFAULT_PACKER pooled_packer +#define ST_ASIO_DEFAULT_UNPACKER pooled_unpacker #endif #include "../include/ext/st_asio_wrapper_net.h" @@ -44,7 +48,9 @@ using namespace st_asio_wrapper::ext; #define RESUME_COMMAND "resume" static bool check_msg; - +#if 4 == PACKER_UNPACKER_TYPE +memory_pool pool; +#endif /////////////////////////////////////////////////// //msg sending interface #define TCP_RANDOM_SEND_MSG(FUNNAME, SEND_FUNNAME) \ @@ -63,10 +69,13 @@ public: test_socket(boost::asio::io_service& io_service_) : st_connector(io_service_), recv_bytes(0), recv_index(0) { #if 2 == PACKER_UNPACKER_TYPE - dynamic_cast(&*inner_unpacker())->fixed_length(1024); + dynamic_cast(&*inner_unpacker())->fixed_length(1024); #elif 3 == PACKER_UNPACKER_TYPE - dynamic_cast(&*inner_packer())->prefix_suffix("begin", "end"); - dynamic_cast(&*inner_unpacker())->prefix_suffix("begin", "end"); + dynamic_cast(&*inner_packer())->prefix_suffix("begin", "end"); + dynamic_cast(&*inner_unpacker())->prefix_suffix("begin", "end"); +#elif 4 == PACKER_UNPACKER_TYPE + dynamic_cast(&*inner_packer())->mem_pool(pool); + dynamic_cast(&*inner_unpacker())->mem_pool(pool); #endif } @@ -83,11 +92,13 @@ public: memset(buff, msg_fill, msg_len); memcpy(buff, &recv_index, sizeof(size_t)); //seq -#if 2 == PACKER_UNPACKER_TYPE //there's no fixed_length_packer +#if 2 == PACKER_UNPACKER_TYPE + //we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner. send_native_msg(buff, msg_len); #else send_msg(buff, msg_len); #endif + delete[] buff; } @@ -113,14 +124,19 @@ protected: #else auto pstr = inner_packer()->raw_data(msg); auto msg_len = inner_packer()->raw_data_len(msg); +#if 2 == PACKER_UNPACKER_TYPE + //we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner. + std::advance(pstr, -ST_ASIO_HEAD_LEN); + msg_len += ST_ASIO_HEAD_LEN; #endif size_t send_index; memcpy(&send_index, pstr, sizeof(size_t)); ++send_index; - memcpy(pstr, &send_index, sizeof(size_t)); + memcpy(pstr, &send_index, sizeof(size_t)); //seq -#if 2 == PACKER_UNPACKER_TYPE //there's no fixed_length_packer +#if 2 == PACKER_UNPACKER_TYPE + //we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner. send_native_msg(pstr, msg_len); #else send_msg(pstr, msg_len); @@ -267,6 +283,10 @@ int main(int argc, const char* argv[]) else if (LIST_STATUS == str) { printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client.size(), client.valid_size(), client.invalid_object_size()); +#if 4 == PACKER_UNPACKER_TYPE + printf("pool block amount: " ST_ASIO_SF ", pool total size: %llu\n", pool.size(), pool.buffer_size()); +#endif + puts(""); puts(client.get_statistic().to_string().data()); } //the following two commands demonstrate how to suspend msg dispatching, no matter recv buffer been used or not @@ -312,14 +332,12 @@ int main(int argc, const char* argv[]) auto iter = std::begin(tok); if (iter != std::end(tok)) msg_num = std::max((size_t) atoll(iter++->data()), (size_t) 1); - auto native = false; #if 1 == PACKER_UNPACKER_TYPE if (iter != std::end(tok)) msg_len = std::min(packer::get_max_msg_size(), std::max((size_t) atoi(iter++->data()), sizeof(size_t))); //include seq #elif 2 == PACKER_UNPACKER_TYPE if (iter != std::end(tok)) ++iter; msg_len = 1024; //we hard code this because we fixedly initialized the length of fixed_length_unpacker to 1024 - native = true; //we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner. #elif 3 == PACKER_UNPACKER_TYPE if (iter != std::end(tok)) msg_len = std::min((size_t) ST_ASIO_MSG_BUFFER_SIZE, std::max((size_t) atoi(iter++->data()), sizeof(size_t))); @@ -387,11 +405,21 @@ int main(int argc, const char* argv[]) switch (model) { case 0: - native ? client.safe_broadcast_native_msg(buff, msg_len) : client.safe_broadcast_msg(buff, msg_len); +#if 2 == PACKER_UNPACKER_TYPE + //we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner. + client.safe_broadcast_native_msg(buff, msg_len); +#else + client.safe_broadcast_msg(buff, msg_len); +#endif send_bytes += link_num * msg_len; break; case 1: - native ? client.safe_random_send_native_msg(buff, msg_len) : client.safe_random_send_msg(buff, msg_len); +#if 2 == PACKER_UNPACKER_TYPE + //we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner. + client.safe_random_send_native_msg(buff, msg_len); +#else + client.safe_random_send_msg(buff, msg_len); +#endif send_bytes += msg_len; break; default: -- GitLab