提交 c0f8fca3 编写于 作者: Y youngwolf 提交者: youngowlf

Continue last committing.

上级 4bdb6f22
......@@ -35,12 +35,6 @@
using namespace st_asio_wrapper;
using namespace st_asio_wrapper::ext;
#if defined(_MSC_VER) || defined(__i386__)
#define uint64_format "%llu"
#else // defined(__GNUC__) && defined(__x86_64__)
#define uint64_format "%lu"
#endif
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
#define LIST_ALL_CLIENT "list_all_client"
......
......@@ -5,7 +5,7 @@
#define ST_ASIO_SERVER_PORT 9528
#define ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER //force to use the msg recv buffer
#define ST_ASIO_CUSTOM_LOG
#define ST_ASIO_DEFAULT_UNPACKER buffer_free_unpacker
#define ST_ASIO_DEFAULT_UNPACKER non_copy_unpacker
//#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
//the following three macros demonstrate how to support huge msg(exceed 65535 - 2).
......
......@@ -10,14 +10,15 @@
#define ST_ASIO_FULL_STATISTIC //full statistic will slightly impact efficiency.
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 1
//1-default packer and unpacker, head(length) + body
#define PACKER_UNPACKER_TYPE 0
//0-default packer and unpacker, head(length) + body
//1-default replaceable_packer and replaceable_unpacker, head(length) + body
//2-fixed length unpacker
//3-prefix and suffix packer and unpacker
#if 1 == PACKER_UNPACKER_TYPE
//#define ST_ASIO_DEFAULT_PACKER replaceable_packer
//#define ST_ASIO_DEFAULT_UNPACKER replaceable_unpacker
#define ST_ASIO_DEFAULT_PACKER replaceable_packer
#define ST_ASIO_DEFAULT_UNPACKER replaceable_unpacker
#elif 2 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_UNPACKER fixed_length_unpacker
#elif 3 == PACKER_UNPACKER_TYPE
......@@ -41,11 +42,7 @@ using namespace st_asio_wrapper::ext;
//under the default behavior, each st_tcp_socket has their own packer, and cause memory waste
//at here, we make each echo_socket use the same global packer for memory saving
//notice: do not do this for unpacker, because unpacker has member variables and can't share each other
#if 1 == PACKER_UNPACKER_TYPE || 2 == PACKER_UNPACKER_TYPE
BOOST_AUTO(global_packer, boost::make_shared<ST_ASIO_DEFAULT_PACKER>());
#elif 3 == PACKER_UNPACKER_TYPE
BOOST_AUTO(global_packer, boost::make_shared<prefix_suffix_packer>());
#endif
//demonstrate how to control the type of st_server_socket_base::server from template parameter
class i_echo_server : public i_server
......@@ -63,9 +60,9 @@ public:
inner_packer(global_packer);
#if 2 == PACKER_UNPACKER_TYPE
dynamic_cast<fixed_length_unpacker*>(&*inner_unpacker())->fixed_length(1024);
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->fixed_length(1024);
#elif 3 == PACKER_UNPACKER_TYPE
dynamic_cast<prefix_suffix_unpacker*>(&*inner_unpacker())->prefix_suffix("begin", "end");
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->prefix_suffix("begin", "end");
#endif
}
......@@ -165,6 +162,11 @@ int main(int argc, const char* argv[])
int thread_num = 1;
if (argc > 1)
thread_num = std::min(16, std::max(thread_num, atoi(argv[1])));
#if 3 == PACKER_UNPACKER_TYPE
global_packer->prefix_suffix("begin", "end");
#endif
service_pump.start_service(thread_num);
while(service_pump.is_running())
{
......@@ -181,6 +183,7 @@ int main(int argc, const char* argv[])
{
printf("normal server, link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", server_.size(), server_.invalid_object_size());
printf("echo server, link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", echo_server_.size(), echo_server_.invalid_object_size());
puts("");
puts(echo_server_.get_statistic().to_string().data());
}
//the following two commands demonstrate how to suspend msg dispatching, no matter recv buffer been used or not
......
......@@ -25,7 +25,7 @@ bool file_socket::on_msg_handle(out_msg_type& msg, bool link_down) {handle_msg(m
#ifdef ST_ASIO_WANT_MSG_SEND_NOTIFY
void file_socket::on_msg_send(in_msg_type& msg)
{
BOOST_AUTO(buffer, boost::dynamic_pointer_cast<file_buffer>(msg.raw_buffer()));
BOOST_AUTO(buffer, dynamic_cast<file_buffer*>(msg.raw_buffer()));
if (NULL != buffer)
{
buffer->read();
......@@ -93,7 +93,7 @@ void file_socket::handle_msg(out_msg_ctype& msg)
{
state = TRANS_BUSY;
fseeko(file, offset, SEEK_SET);
direct_send_msg(replaceable_buffer(boost::make_shared<file_buffer>(file, length)), true);
direct_send_msg(replaceable_buffer(new file_buffer(file, length)), true);
}
}
break;
......
/*
* st_asio_wrapper_ext.h
*
* Created on: 2016-7-30
* Author: youngwolf
* email: mail2tao@163.com
* QQ: 676218192
* Community on QQ: 198941541
*
* extensional, replaceable and indispensable components.
*/
#ifndef ST_ASIO_WRAPPER_EXT_H_
#define ST_ASIO_WRAPPER_EXT_H_
#include <string>
#include <sstream>
#include <boost/array.hpp>
#include <boost/container/set.hpp>
#include "../st_asio_wrapper_base.h"
namespace st_asio_wrapper { namespace ext {
//buffers who implemented i_buffer interface can be wrapped by replaceable_buffer
class string_buffer : public std::string, public i_buffer
{
public:
virtual bool empty() const {return std::string::empty();}
virtual size_t size() const {return std::string::size();}
virtual const char* data() const {return std::string::data();}
};
class basic_buffer/* : public boost::noncopyable*/ //TBD
{
public:
basic_buffer() {do_detach();}
basic_buffer(size_t len) {do_detach(); assign(len);}
~basic_buffer() {free();}
void assign(size_t len) {free(); do_attach(new char[len], len, len);}
//the following five functions are needed by st_asio_wrapper
bool empty() const {return 0 == len || NULL == buff;}
size_t size() const {return NULL == buff ? 0 : len;}
const char* data() const {return buff;}
void swap(basic_buffer& other) {std::swap(buff, other.buff); std::swap(len, other.len); std::swap(buff_len, other.buff_len);}
void clear() {free();}
//functions needed by packer and unpacker
char* data() {return buff;}
bool size(size_t _len) {assert(_len <= buff_len); return (_len <= buff_len) ? (len = _len, true) : false;}
size_t buffer_size() const {return NULL == buff ? 0 : buff_len;}
protected:
void do_attach(char* _buff, size_t _len, size_t _buff_len) {buff = _buff; len = _len; buff_len = _buff_len;}
void do_detach() {buff = NULL; len = buff_len = 0;}
void free() {delete[] buff; do_detach();}
protected:
char* buff;
size_t len, buff_len;
};
}} //namespace
#endif /* ST_ASIO_WRAPPER_EXT_H_ */
......@@ -13,7 +13,7 @@
#ifndef ST_ASIO_WRAPPER_PACKER_H_
#define ST_ASIO_WRAPPER_PACKER_H_
#include "../st_asio_wrapper_base.h"
#include "st_asio_wrapper_ext.h"
#ifdef ST_ASIO_HUGE_MSG
#define ST_ASIO_HEAD_TYPE boost::uint32_t
......@@ -53,6 +53,7 @@ public:
}
};
//protocol: length + body
class packer : public i_packer<std::string>
{
public:
......@@ -97,17 +98,9 @@ public:
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;}
};
//protocol: length + body
class replaceable_packer : public i_packer<replaceable_buffer>
{
public:
class buffer : public std::string, public i_buffer
{
public:
virtual bool empty() const {return std::string::empty();}
virtual size_t size() const {return std::string::size();}
virtual const char* data() const {return std::string::data();}
};
public:
using i_packer<msg_type>::pack_msg;
virtual bool pack_msg(msg_type& msg, const char* const pstr[], const size_t len[], size_t num, bool native = false)
......@@ -117,9 +110,9 @@ public:
packer::msg_type str;
if (p.pack_msg(str, pstr, len, num, native))
{
BOOST_AUTO(com, boost::make_shared<buffer>());
com->swap(str);
msg.raw_buffer(com);
BOOST_AUTO(raw_msg, new string_buffer());
raw_msg->swap(str);
msg.raw_buffer(raw_msg);
return true;
}
......@@ -132,6 +125,7 @@ public:
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;}
};
//protocol: [prefix] + body + suffix
class prefix_suffix_packer : public i_packer<std::string>
{
public:
......
......@@ -13,9 +13,7 @@
#ifndef ST_ASIO_WRAPPER_UNPACKER_H_
#define ST_ASIO_WRAPPER_UNPACKER_H_
#include <boost/array.hpp>
#include "../st_asio_wrapper_base.h"
#include "st_asio_wrapper_ext.h"
#ifdef ST_ASIO_HUGE_MSG
#define ST_ASIO_HEAD_TYPE boost::uint32_t
......@@ -28,11 +26,12 @@
namespace st_asio_wrapper { namespace ext {
//protocol: length + body
class unpacker : public i_unpacker<std::string>
{
public:
unpacker() {reset_state();}
size_t current_msg_length() const {return cur_msg_len;} //current msg's total length, -1 means don't know
size_t current_msg_length() const {return cur_msg_len;} //current msg's total length, -1 means not available
bool parse_msg(size_t bytes_transferred, boost::container::list<std::pair<const char*, size_t> >& msg_can)
{
......@@ -126,10 +125,11 @@ public:
protected:
boost::array<char, ST_ASIO_MSG_BUFFER_SIZE> raw_buff;
size_t cur_msg_len; //-1 means head has not received, so doesn't know the whole msg length.
size_t cur_msg_len; //-1 means head not received, so msg length is not available.
size_t remain_len; //half-baked msg
};
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
class udp_unpacker : public i_udp_unpacker<std::string>
{
public:
......@@ -140,17 +140,9 @@ protected:
boost::array<char, ST_ASIO_MSG_BUFFER_SIZE> raw_buff;
};
//protocol: length + body
class replaceable_unpacker : public i_unpacker<replaceable_buffer>
{
public:
class buffer : public std::string, public i_buffer
{
public:
virtual bool empty() const {return std::string::empty();}
virtual size_t size() const {return std::string::size();}
virtual const char* data() const {return std::string::data();}
};
public:
virtual void reset_state() {unpacker_.reset_state();}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
......@@ -159,10 +151,10 @@ public:
bool unpack_ok = unpacker_.parse_msg(bytes_transferred, tmp_can);
for (BOOST_AUTO(iter, tmp_can.begin()); iter != tmp_can.end(); ++iter)
{
BOOST_AUTO(com, boost::make_shared<buffer>());
com->swap(*iter);
BOOST_AUTO(raw_buffer, new string_buffer());
raw_buffer->swap(*iter);
msg_can.resize(msg_can.size() + 1);
msg_can.back().raw_buffer(com);
msg_can.back().raw_buffer(raw_buffer);
}
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
......@@ -176,24 +168,16 @@ protected:
unpacker unpacker_;
};
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
class replaceable_udp_unpacker : public i_udp_unpacker<replaceable_buffer>
{
public:
class buffer : public std::string, public i_buffer
{
public:
virtual bool empty() const {return std::string::empty();}
virtual size_t size() const {return std::string::size();}
virtual const char* data() const {return std::string::data();}
};
public:
virtual void parse_msg(msg_type& msg, size_t bytes_transferred)
{
assert(bytes_transferred <= ST_ASIO_MSG_BUFFER_SIZE);
BOOST_AUTO(com, boost::make_shared<buffer>());
com->assign(raw_buff.data(), bytes_transferred);
msg.raw_buffer(com);
BOOST_AUTO(raw_msg, new string_buffer());
raw_msg->assign(raw_buff.data(), bytes_transferred);
msg.raw_buffer(raw_msg);
}
virtual boost::asio::mutable_buffers_1 prepare_next_recv() {return boost::asio::buffer(raw_buff);}
......@@ -201,39 +185,13 @@ protected:
boost::array<char, ST_ASIO_MSG_BUFFER_SIZE> raw_buff;
};
class most_primitive_buffer
{
public:
most_primitive_buffer() {do_detach();}
~most_primitive_buffer() {free();}
void assign(size_t _len) {free(); do_attach(new char[_len], _len);}
void free() {delete buff; do_detach();}
//the following five functions (char* data() is used by unpackers, not counted in) are needed by st_asio_wrapper,
//for other functions, depends on the implementation of your packer and unpacker.
bool empty() const {return 0 == len || NULL == buff;}
size_t size() const {return len;}
const char* data() const {return buff;}
char* data() {return buff;}
void swap(most_primitive_buffer& other) {std::swap(buff, other.buff); std::swap(len, other.len);}
void clear() {free();}
protected:
void do_attach(char* _buff, size_t _len) {buff = _buff; len = _len;}
void do_detach() {buff = NULL; len = 0;}
protected:
char* buff;
size_t len;
};
//protocol: length + body
//this unpacker demonstrate how to forbid memory copying while parsing msgs (let asio write msg directly).
class buffer_free_unpacker : public i_unpacker<most_primitive_buffer>
class non_copy_unpacker : public i_unpacker<basic_buffer>
{
public:
buffer_free_unpacker() {reset_state();}
size_t current_msg_length() const {return raw_buff.size();} //current msg's total length(not include the head), 0 means don't know
non_copy_unpacker() {reset_state();}
size_t current_msg_length() const {return raw_buff.size();} //current msg's total length(not include the head), 0 means not available
public:
virtual void reset_state() {raw_buff.clear(); step = 0;}
......@@ -309,6 +267,7 @@ private:
int step; //-1-error format, 0-want the head, 1-want the body
};
//protocol: fixed lenght
class fixed_length_unpacker : public i_unpacker<std::string>
{
public:
......@@ -370,6 +329,7 @@ private:
size_t remain_len; //half-baked msg
};
//protocol: [prefix] + body + suffix
class prefix_suffix_unpacker : public i_unpacker<std::string>
{
public:
......@@ -484,6 +444,7 @@ private:
size_t remain_len; //half-baked msg
};
//protocol: stream (non-protocol)
class stream_unpacker : public i_unpacker<std::string>
{
public:
......@@ -500,7 +461,7 @@ public:
return true;
}
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return ec || bytes_transferred > 0 ? 0 : ST_ASIO_MSG_BUFFER_SIZE;}
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return ec || bytes_transferred > 0 ? 0 : boost::asio::detail::default_max_transfer_size;}
virtual boost::asio::mutable_buffers_1 prepare_next_recv() {return boost::asio::buffer(raw_buff);}
protected:
......
......@@ -14,11 +14,11 @@
#define ST_ASIO_WRAPPER_BASE_H_
#include <time.h>
#include <string>
#include <stdio.h>
#include <string.h>
#include <stdarg.h>
#include <assert.h>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
......@@ -58,37 +58,41 @@ namespace st_asio_wrapper
{
class i_buffer
{
protected:
public:
virtual ~i_buffer() {}
public:
virtual bool empty() const = 0;
virtual size_t size() const = 0;
virtual const char* data() const = 0;
};
//if you want to replace packer or unpacker at runtime, please use this as the msg type
class replaceable_buffer
//convert '->' operation to '.' operation
//user need to allocate object for this class, and this class will free it
template<typename T>
class proxy_buffer/* : public boost::noncopyable*/ //TBD
{
public:
replaceable_buffer() {}
replaceable_buffer(const boost::shared_ptr<i_buffer>& _buffer) : buffer(_buffer) {}
replaceable_buffer(const replaceable_buffer& other) : buffer(other.buffer) {}
typedef T* buffer_type;
typedef const T* buffer_ctype;
proxy_buffer() : buffer(NULL) {}
proxy_buffer(buffer_type _buffer) : buffer(_buffer) {}
~proxy_buffer() {clear();}
boost::shared_ptr<i_buffer> raw_buffer() {return buffer;}
boost::shared_ptr<const i_buffer> raw_buffer() const {return buffer;}
void raw_buffer(const boost::shared_ptr<i_buffer>& _buffer) {buffer = _buffer;}
buffer_type raw_buffer() {return buffer;}
void raw_buffer(buffer_type _buffer) {buffer = _buffer;}
//the following five functions are needed by st_asio_wrapper, for other functions, depends on the implementation of your packer and unpacker
bool empty() const {return !buffer || buffer->empty();}
size_t size() const {return buffer ? buffer->size() : 0;}
const char* data() const {return buffer ? buffer->data() : NULL;}
void swap(replaceable_buffer& other) {buffer.swap(other.buffer);}
void clear() {buffer.reset();}
//the following five functions are needed by st_asio_wrapper
bool empty() const {return NULL == buffer || buffer->empty();}
size_t size() const {return NULL == buffer ? 0 : buffer->size();}
const char* data() const {return NULL == buffer ? NULL : buffer->data();}
void swap(proxy_buffer& other) {std::swap(buffer, other.buffer);}
void clear() {delete buffer; buffer = NULL;}
protected:
boost::shared_ptr<i_buffer> buffer;
buffer_type buffer;
};
typedef proxy_buffer<i_buffer> replaceable_buffer; //if you want to replace packer or unpacker at runtime, please use this as the msg type
//packer concept
template<typename MsgType>
......
......@@ -8,8 +8,10 @@
#define ST_ASIO_REUSE_OBJECT //use objects pool
//#define ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
//#define ST_ASIO_WANT_MSG_SEND_NOTIFY
#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
#define ST_ASIO_MSG_BUFFER_SIZE 65536
//stream unpacker (non-protocol)
#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
//configuration
#include "../include/ext/st_asio_wrapper_net.h"
......@@ -61,13 +63,16 @@ protected:
#endif
private:
#ifdef ST_ASIO_WANT_MSG_SEND_NOTIFY
void handle_msg(out_msg_ctype& msg)
{
#ifdef ST_ASIO_WANT_MSG_SEND_NOTIFY
recv_bytes += msg.size();
if (recv_bytes >= total_bytes && 0 == --completed_session_num)
begin_time.stop();
}
#else
void handle_msg(out_msg_type& msg)
{
if (0 == total_bytes)
return;
......@@ -80,8 +85,8 @@ private:
}
else
direct_send_msg(msg);
#endif
}
#endif
private:
boost::uint64_t total_bytes, send_bytes, recv_bytes;
......@@ -150,6 +155,7 @@ int main(int argc, const char* argv[])
else if (LIST_STATUS == str)
{
printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client.size(), client.valid_size(), client.invalid_object_size());
puts("");
puts(client.get_statistic().to_string().data());
}
else if (!str.empty())
......@@ -194,6 +200,7 @@ int main(int argc, const char* argv[])
#undef ST_ASIO_REUSE_OBJECT
#undef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
#undef ST_ASIO_WANT_MSG_SEND_NOTIFY
#undef ST_ASIO_DEFAULT_PACKER
#undef ST_ASIO_DEFAULT_UNPACKER
#undef ST_ASIO_MSG_BUFFER_SIZE
//restore configuration
......@@ -5,8 +5,10 @@
#define ST_ASIO_SERVER_PORT 9527
#define ST_ASIO_REUSE_OBJECT //use objects pool
//#define ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
#define ST_ASIO_MSG_BUFFER_SIZE 65536
//stream unpacker (non-protocol)
#define ST_ASIO_DEFAULT_UNPACKER stream_unpacker
//configuration
#include "../include/ext/st_asio_wrapper_net.h"
......@@ -78,6 +80,7 @@ int main(int argc, const char* argv[])
else if (LIST_STATUS == str)
{
printf("link #: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", echo_server_.size(), echo_server_.invalid_object_size());
puts("");
puts(echo_server_.get_statistic().to_string().data());
}
}
......@@ -89,6 +92,7 @@ int main(int argc, const char* argv[])
#undef ST_ASIO_SERVER_PORT
#undef ST_ASIO_REUSE_OBJECT
#undef ST_ASIO_FREE_OBJECT_INTERVAL
#undef ST_ASIO_DEFAULT_PACKER
#undef ST_ASIO_DEFAULT_UNPACKER
#undef ST_ASIO_MSG_BUFFER_SIZE
//restore configuration
......@@ -15,14 +15,15 @@
//configuration
//use the following macro to control the type of packer and unpacker
#define PACKER_UNPACKER_TYPE 1
//1-default packer and unpacker, head(length) + body
#define PACKER_UNPACKER_TYPE 0
//0-default packer and unpacker, head(length) + body
//1-default replaceable_packer and replaceable_unpacker, head(length) + body
//2-fixed length unpacker
//3-prefix and suffix packer and unpacker
#if 1 == PACKER_UNPACKER_TYPE
//#define ST_ASIO_DEFAULT_PACKER replaceable_packer
//#define ST_ASIO_DEFAULT_UNPACKER replaceable_unpacker
#define ST_ASIO_DEFAULT_PACKER replaceable_packer
#define ST_ASIO_DEFAULT_UNPACKER replaceable_unpacker
#elif 2 == PACKER_UNPACKER_TYPE
#define ST_ASIO_DEFAULT_UNPACKER fixed_length_unpacker
#elif 3 == PACKER_UNPACKER_TYPE
......@@ -61,10 +62,10 @@ public:
test_socket(boost::asio::io_service& io_service_) : st_connector(io_service_), recv_bytes(0), recv_index(0)
{
#if 2 == PACKER_UNPACKER_TYPE
dynamic_cast<fixed_length_unpacker*>(&*inner_unpacker())->fixed_length(1024);
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->fixed_length(1024);
#elif 3 == PACKER_UNPACKER_TYPE
dynamic_cast<prefix_suffix_packer*>(&*inner_packer())->prefix_suffix("begin", "end");
dynamic_cast<prefix_suffix_unpacker*>(&*inner_unpacker())->prefix_suffix("begin", "end");
dynamic_cast<ST_ASIO_DEFAULT_PACKER*>(&*inner_packer())->prefix_suffix("begin", "end");
dynamic_cast<ST_ASIO_DEFAULT_UNPACKER*>(&*inner_unpacker())->prefix_suffix("begin", "end");
#endif
}
......@@ -81,11 +82,13 @@ public:
memset(buff, msg_fill, msg_len);
memcpy(buff, &recv_index, sizeof(size_t)); //seq
#if 2 == PACKER_UNPACKER_TYPE //there's no fixed_length_packer
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
send_native_msg(buff, msg_len);
#else
send_msg(buff, msg_len);
#endif
delete[] buff;
}
......@@ -111,14 +114,19 @@ protected:
#else
char* pstr = inner_packer()->raw_data(msg);
size_t msg_len = inner_packer()->raw_data_len(msg);
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
std::advance(pstr, -ST_ASIO_HEAD_LEN);
msg_len += ST_ASIO_HEAD_LEN;
#endif
size_t send_index;
memcpy(&send_index, pstr, sizeof(size_t));
++send_index;
memcpy(pstr, &send_index, sizeof(size_t));
memcpy(pstr, &send_index, sizeof(size_t)); //seq
#if 2 == PACKER_UNPACKER_TYPE //there's no fixed_length_packer
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
send_native_msg(pstr, msg_len);
#else
send_msg(pstr, msg_len);
......@@ -275,6 +283,7 @@ int main(int argc, const char* argv[])
else if (LIST_STATUS == str)
{
printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client.size(), client.valid_size(), client.invalid_object_size());
puts("");
puts(client.get_statistic().to_string().data());
}
//the following two commands demonstrate how to suspend msg dispatching, no matter recv buffer been used or not
......@@ -320,14 +329,12 @@ int main(int argc, const char* argv[])
BOOST_AUTO(iter, tok.begin());
if (iter != tok.end()) msg_num = std::max((size_t) atoi(iter++->data()), (size_t) 1);
bool native = false;
#if 1 == PACKER_UNPACKER_TYPE
if (iter != tok.end()) msg_len = std::min(packer::get_max_msg_size(),
std::max((size_t) atoi(iter++->data()), sizeof(size_t))); //include seq
#elif 2 == PACKER_UNPACKER_TYPE
if (iter != tok.end()) ++iter;
msg_len = 1024; //we hard code this because we fixedly initialized the length of fixed_length_unpacker to 1024
native = true; //we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
#elif 3 == PACKER_UNPACKER_TYPE
if (iter != tok.end()) msg_len = std::min((size_t) ST_ASIO_MSG_BUFFER_SIZE,
std::max((size_t) atoi(iter++->data()), sizeof(size_t)));
......@@ -395,11 +402,21 @@ int main(int argc, const char* argv[])
switch (model)
{
case 0:
native ? client.safe_broadcast_native_msg(buff, msg_len) : client.safe_broadcast_msg(buff, msg_len);
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
client.safe_broadcast_native_msg(buff, msg_len);
#else
client.safe_broadcast_msg(buff, msg_len);
#endif
send_bytes += link_num * msg_len;
break;
case 1:
native ? client.safe_random_send_native_msg(buff, msg_len) : client.safe_random_send_msg(buff, msg_len);
#if 2 == PACKER_UNPACKER_TYPE
//we don't have fixed_length_packer, so use packer instead, but need to pack msgs with native manner.
client.safe_random_send_native_msg(buff, msg_len);
#else
client.safe_random_send_msg(buff, msg_len);
#endif
send_bytes += msg_len;
break;
default:
......
......@@ -37,7 +37,7 @@ public:
basic_buffer() {do_detach();}
basic_buffer(size_t len) {do_detach(); assign(len);}
basic_buffer(basic_buffer&& other) {do_attach(other.buff, other.len, other.buff_len); other.do_detach();}
virtual ~basic_buffer() {free();}
~basic_buffer() {free();}
void assign(size_t len) {free(); do_attach(new char[len], len, len);}
......
......@@ -98,63 +98,6 @@ public:
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;}
};
//protocol: length + body
//use memory pool
class pooled_packer : public i_packer<memory_pool::buffer_type>
{
public:
static size_t get_max_msg_size() {return ST_ASIO_MSG_BUFFER_SIZE - ST_ASIO_HEAD_LEN;}
pooled_packer() : pool(nullptr) {}
void mem_pool(memory_pool& _pool) {pool = &_pool;}
memory_pool* mem_pool() const {return pool;}
using i_packer<msg_type>::pack_msg;
virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false)
{
msg_type msg;
auto pre_len = native ? 0 : ST_ASIO_HEAD_LEN;
auto total_len = packer_helper::msg_size_check(pre_len, pstr, len, num);
if ((size_t) -1 == total_len)
return msg;
else if (total_len > pre_len)
{
if (!native)
{
auto head_len = (ST_ASIO_HEAD_TYPE) total_len;
if (total_len != head_len)
{
unified_out::error_out("pack msg error: length exceeded the header's range!");
return msg;
}
msg.swap(pool->checkout(total_len));
head_len = ST_ASIO_HEAD_H2N(head_len);
memcpy(msg.data(), (const char*) &head_len, ST_ASIO_HEAD_LEN);
}
else
msg.swap(pool->checkout(total_len));
total_len = pre_len;
for (size_t i = 0; i < num; ++i)
if (nullptr != pstr[i])
{
memcpy(std::next(msg.data(), total_len), pstr[i], len[i]);
total_len += len[i];
}
} //if (total_len > pre_len)
return msg;
}
virtual char* raw_data(msg_type& msg) const {return std::next(msg.data(), ST_ASIO_HEAD_LEN);}
virtual const char* raw_data(msg_ctype& msg) const {return std::next(msg.data(), ST_ASIO_HEAD_LEN);}
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;}
protected:
memory_pool* pool;
};
//protocol: length + body
class replaceable_packer : public i_packer<replaceable_buffer>
{
......@@ -213,6 +156,63 @@ private:
std::string _prefix, _suffix;
};
//protocol: length + body
//use memory pool
class pooled_packer : public i_packer<memory_pool::buffer_type>
{
public:
static size_t get_max_msg_size() {return ST_ASIO_MSG_BUFFER_SIZE - ST_ASIO_HEAD_LEN;}
pooled_packer() : pool(nullptr) {}
void mem_pool(memory_pool& _pool) {pool = &_pool;}
memory_pool* mem_pool() const {return pool;}
using i_packer<msg_type>::pack_msg;
virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false)
{
msg_type msg;
auto pre_len = native ? 0 : ST_ASIO_HEAD_LEN;
auto total_len = packer_helper::msg_size_check(pre_len, pstr, len, num);
if ((size_t) -1 == total_len)
return msg;
else if (total_len > pre_len)
{
if (!native)
{
auto head_len = (ST_ASIO_HEAD_TYPE) total_len;
if (total_len != head_len)
{
unified_out::error_out("pack msg error: length exceeded the header's range!");
return msg;
}
msg.swap(pool->checkout(total_len));
head_len = ST_ASIO_HEAD_H2N(head_len);
memcpy(msg.data(), (const char*) &head_len, ST_ASIO_HEAD_LEN);
}
else
msg.swap(pool->checkout(total_len));
total_len = pre_len;
for (size_t i = 0; i < num; ++i)
if (nullptr != pstr[i])
{
memcpy(std::next(msg.data(), total_len), pstr[i], len[i]);
total_len += len[i];
}
} //if (total_len > pre_len)
return msg;
}
virtual char* raw_data(msg_type& msg) const {return std::next(msg.data(), ST_ASIO_HEAD_LEN);}
virtual const char* raw_data(msg_ctype& msg) const {return std::next(msg.data(), ST_ASIO_HEAD_LEN);}
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ST_ASIO_HEAD_LEN;}
protected:
memory_pool* pool;
};
//protocol: stream (non-protocol)
//use memory pool
class pooled_stream_packer : public i_packer<memory_pool::buffer_type>
......
......@@ -128,47 +128,6 @@ protected:
size_t remain_len; //half-baked msg
};
//protocol: length + body
//use memory pool
class pooled_unpacker : public i_unpacker<memory_pool::buffer_type>, public unpacker
{
public:
using i_unpacker<memory_pool::buffer_type>::msg_type;
using i_unpacker<memory_pool::buffer_type>::msg_ctype;
using i_unpacker<memory_pool::buffer_type>::container_type;
pooled_unpacker() : pool(nullptr) {}
void mem_pool(memory_pool& _pool) {pool = &_pool;}
memory_pool* mem_pool() const {return pool;}
virtual void reset_state() {unpacker::reset_state();}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
boost::container::list<std::pair<const char*, size_t>> msg_pos_can;
auto unpack_ok = unpacker::parse_msg(bytes_transferred, msg_pos_can);
do_something_to_all(msg_pos_can, [this, &msg_can](decltype(*std::begin(msg_pos_can))& item) {
auto buff = pool->checkout(item.second);
memcpy(buff.data(), item.first, item.second);
msg_can.push_back(std::move(buff));
});
if (unpack_ok && remain_len > 0)
{
auto pnext = std::next(msg_pos_can.back().first, msg_pos_can.back().second);
memcpy(std::begin(raw_buff), pnext, remain_len); //left behind unparsed data
}
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
return unpack_ok;
}
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return unpacker::completion_condition(ec, bytes_transferred);}
virtual boost::asio::mutable_buffers_1 prepare_next_recv() {return unpacker::prepare_next_recv();}
protected:
memory_pool* pool;
};
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
class udp_unpacker : public i_udp_unpacker<std::string>
{
......@@ -508,6 +467,47 @@ protected:
boost::array<char, ST_ASIO_MSG_BUFFER_SIZE> raw_buff;
};
//protocol: length + body
//use memory pool
class pooled_unpacker : public i_unpacker<memory_pool::buffer_type>, public unpacker
{
public:
using i_unpacker<memory_pool::buffer_type>::msg_type;
using i_unpacker<memory_pool::buffer_type>::msg_ctype;
using i_unpacker<memory_pool::buffer_type>::container_type;
pooled_unpacker() : pool(nullptr) {}
void mem_pool(memory_pool& _pool) {pool = &_pool;}
memory_pool* mem_pool() const {return pool;}
virtual void reset_state() {unpacker::reset_state();}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
boost::container::list<std::pair<const char*, size_t>> msg_pos_can;
auto unpack_ok = unpacker::parse_msg(bytes_transferred, msg_pos_can);
do_something_to_all(msg_pos_can, [this, &msg_can](decltype(*std::begin(msg_pos_can))& item) {
auto buff = pool->checkout(item.second);
memcpy(buff.data(), item.first, item.second);
msg_can.push_back(std::move(buff));
});
if (unpack_ok && remain_len > 0)
{
auto pnext = std::next(msg_pos_can.back().first, msg_pos_can.back().second);
memcpy(std::begin(raw_buff), pnext, remain_len); //left behind unparsed data
}
//if unpacking failed, successfully parsed msgs will still returned via msg_can(stick package), please note.
return unpack_ok;
}
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return unpacker::completion_condition(ec, bytes_transferred);}
virtual boost::asio::mutable_buffers_1 prepare_next_recv() {return unpacker::prepare_next_recv();}
protected:
memory_pool* pool;
};
//protocol: stream (non-protocol)
//use memory pool
class pooled_stream_unpacker : public i_unpacker<memory_pool::buffer_type>
......
......@@ -31,12 +31,6 @@ using namespace st_asio_wrapper::ext;
#define atoll _atoi64
#endif
#if defined(_MSC_VER) || defined(__i386__)
#define uint64_format "%llu"
#else // defined(__GNUC__) && defined(__x86_64__)
#define uint64_format "%lu"
#endif
#define QUIT_COMMAND "quit"
#define LIST_STATUS "status"
......@@ -46,6 +40,7 @@ boost::atomic_ushort completed_session_num;
#else
st_atomic<unsigned short> completed_session_num;
#endif
#if 2 == PACKER_UNPACKER_TYPE
memory_pool pool;
#endif
......
......@@ -28,12 +28,6 @@ using namespace st_asio_wrapper::ext;
#define atoll _atoi64
#endif
#if defined(_MSC_VER) || defined(__i386__)
#define uint64_format "%llu"
#else // defined(__GNUC__) && defined(__x86_64__)
#define uint64_format "%lu"
#endif
#define QUIT_COMMAND "quit"
#define LIST_STATUS "status"
......
......@@ -41,12 +41,6 @@ using namespace st_asio_wrapper::ext;
#define atoll _atoi64
#endif
#if defined(_MSC_VER) || defined(__i386__)
#define uint64_format "%llu"
#else // defined(__GNUC__) && defined(__x86_64__)
#define uint64_format "%lu"
#endif
#define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart"
#define LIST_ALL_CLIENT "list_all_client"
......@@ -58,6 +52,7 @@ static bool check_msg;
#if 4 == PACKER_UNPACKER_TYPE
memory_pool pool;
#endif
///////////////////////////////////////////////////
//msg sending interface
#define TCP_RANDOM_SEND_MSG(FUNNAME, SEND_FUNNAME) \
......@@ -231,11 +226,6 @@ public:
int main(int argc, const char* argv[])
{
printf("usage: test_client [<service thread number=1> [<port=%d> [<ip=%s> [link num=16]]]]\n", ST_ASIO_SERVER_PORT, ST_ASIO_SERVER_IP);
/* memory_pool pool;
pooled_buffer a(&pool, 100);
pool.checkin(a);
if (a.empty()) puts("a is empty");
printf("pool block amount: " ST_ASIO_SF ", pool total size: " uint64_format "\n", pool.available_size(), pool.available_buffer_size());*/
if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h")))
return 0;
else
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册