提交 2c80d30f 编写于 作者: Y youngwolf

Add append concept to basic_buffer (the same to std::string).

Amend basic_buffer's reserve concept (to be the same as std::string).
Enhance the default packer and unpacker to support basic_buffer as their output message type.
Enhance packer2 and unpacker2 to support unique_buffer<basic_buffer> and shared_buffer<basic_buffer> as their output message type.
To use the original default packer and unpacker, now you must write them as packer<> and unpacker<>.
上级 3bb656b8
......@@ -14,7 +14,7 @@
//#define ASCS_DEFAULT_UNPACKER stream_unpacker
//the following two macros demonstrate how to support huge msg(exceed 65535 - 2).
//#define ASCS_HUGE_MSG
#define ASCS_HUGE_MSG
//#define ASCS_MSG_BUFFER_SIZE (1024 * 1024) //should not bigger than ASCS_MAX_SEND_BUF and ASCS_MAX_RECV_BUF, please note
#define ASCS_HEARTBEAT_INTERVAL 5 //if use stream_unpacker, heartbeat messages will be treated as normal messages,
//because stream_unpacker doesn't support heartbeat
......
......@@ -524,7 +524,7 @@ int main(int argc, const char* argv[])
if (iter != std::end(parameters)) msg_num = std::max((size_t) atoll(iter++->data()), (size_t) 1);
#if 0 == PACKER_UNPACKER_TYPE || 1 == PACKER_UNPACKER_TYPE
if (iter != std::end(parameters)) msg_len = std::min(packer::get_max_msg_size(),
if (iter != std::end(parameters)) msg_len = std::min(packer<>::get_max_msg_size(),
std::max((size_t) atoi(iter++->data()), sizeof(size_t))); //include seq
#elif 2 == PACKER_UNPACKER_TYPE
if (iter != std::end(parameters)) ++iter;
......
......@@ -34,7 +34,7 @@
//this unpacker only pre-allocated a buffer of 4000 bytes, but it can parse messages up to ST_ASIO_MSG_BUFFER_SIZE (here is 1000000) bytes,
//it works as the default unpacker for messages <= 4000, otherwise, it works as non_copy_unpacker
#elif 1 == PACKER_UNPACKER_TYPE
#define ASCS_DEFAULT_PACKER packer2<unique_buffer<std::string>, std::string>
#define ASCS_DEFAULT_PACKER packer2<unique_buffer<basic_buffer>, basic_buffer, packer<basic_buffer>>
#define ASCS_DEFAULT_UNPACKER unpacker2<unique_buffer, basic_buffer, flexible_unpacker<basic_buffer>>
#elif 2 == PACKER_UNPACKER_TYPE
#undef ASCS_HEARTBEAT_INTERVAL
......@@ -167,10 +167,10 @@ protected:
typedef server_socket_base<ext::packer, ext::unpacker> normal_socket;
#else
//demonstrate how to open heartbeat function without defining macro ASCS_HEARTBEAT_INTERVAL
class normal_socket : public server_socket_base<ext::packer, ext::unpacker>
class normal_socket : public server_socket_base<ext::packer<>, ext::unpacker<>>
{
public:
normal_socket(i_server& server_) : server_socket_base<ext::packer, ext::unpacker>(server_) {}
normal_socket(i_server& server_) : server_socket_base<ext::packer<>, ext::unpacker<>>(server_) {}
protected:
//demo client needs heartbeat (macro ASCS_HEARTBEAT_INTERVAL been defined), please note that the interval (here is 5) must be equal to
......@@ -190,10 +190,10 @@ protected:
virtual bool on_accept(object_ctype& socket_ptr) {stop_listen(); return true;}
};
class short_connection : public server_socket_base<ext::packer, ext::unpacker>
class short_connection : public server_socket_base<ext::packer<>, ext::unpacker<>>
{
private:
typedef server_socket_base<ext::packer, ext::unpacker> super;
typedef server_socket_base<ext::packer<>, ext::unpacker<>> super;
public:
short_connection(i_server& server_) : super(server_) {}
......
......@@ -64,16 +64,29 @@ class basic_buffer
public:
basic_buffer() {do_detach();}
basic_buffer(size_t len) {do_assign(len);}
basic_buffer(const char* _buff, size_t len) {do_assign(len); memcpy(buff, _buff, len);}
basic_buffer(const char* buff, size_t len) {do_detach(); append(buff, len);}
basic_buffer(basic_buffer&& other) {do_attach(other.buff, other.len, other.cap); other.do_detach();}
virtual ~basic_buffer() {clear();}
basic_buffer& operator=(basic_buffer&& other) {clear(); swap(other); return *this;}
basic_buffer& append(const char* _buff, size_t _len)
{
if (nullptr != _buff && _len > 0)
{
if (len + _len > cap) //no optimization for memory re-allocation, please reserve enough memory before appending data
reserve(len + _len);
memcpy(std::next(buff, len), _buff, _len);
len += (unsigned) _len;
}
return *this;
}
void resize(size_t _len) //won't fill the extended buffer
{
if (_len <= cap)
len = _len;
len = (unsigned) _len;
else
{
auto old_buff = buff;
......@@ -87,8 +100,19 @@ public:
}
}
}
void reserve(size_t len) {if (len > cap) resize(len);}
void assign(size_t len) {resize(len);}
void assign(const char* buff, size_t len) {resize(0); append(buff, len);}
void reserve(size_t _len)
{
if (_len > cap)
{
auto old_len = len;
resize(_len);
len = old_len;
}
}
size_t max_size() const {return (unsigned) -1;}
size_t capacity() const {return cap;}
......
......@@ -56,15 +56,20 @@ public:
};
//protocol: length + body
class packer : public i_packer<std::string>
//T can be std::string or basic_buffer
template<typename T = std::string>
class packer : public i_packer<T>
{
private:
typedef i_packer<T> super;
public:
static size_t get_max_msg_size() {return ASCS_MSG_BUFFER_SIZE - ASCS_HEAD_LEN;}
using i_packer<msg_type>::pack_msg;
virtual msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false)
using i_packer<typename super::msg_type>::pack_msg;
virtual typename super::msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false)
{
msg_type msg;
typename super::msg_type msg;
auto pre_len = native ? 0 : ASCS_HEAD_LEN;
auto total_len = packer_helper::msg_size_check(pre_len, pstr, len, num);
if ((size_t) -1 != total_len && total_len > pre_len)
......@@ -92,7 +97,7 @@ public:
return msg;
}
virtual bool pack_msg(msg_type&& msg, container_type& msg_can)
virtual bool pack_msg(typename super::msg_type&& msg, typename super::container_type& msg_can)
{
auto len = msg.size();
if (len > get_max_msg_size())
......@@ -104,7 +109,7 @@ public:
return true;
}
virtual bool pack_msg(msg_type&& msg1, msg_type&& msg2, container_type& msg_can)
virtual bool pack_msg(typename super::msg_type&& msg1, typename super::msg_type&& msg2, typename super::container_type& msg_can)
{
auto len = msg1.size() + msg2.size();
if (len > get_max_msg_size()) //not considered overflow
......@@ -117,7 +122,7 @@ public:
return true;
}
virtual bool pack_msg(container_type& in, container_type& out)
virtual bool pack_msg(typename super::container_type& in, typename super::container_type& out)
{
auto len = ascs::get_size_in_byte(in);
if (len > get_max_msg_size()) //not considered overflow
......@@ -129,30 +134,32 @@ public:
return true;
}
virtual msg_type pack_heartbeat() {auto head_len = packer_helper::pack_header(0); return msg_type((const char*) &head_len, ASCS_HEAD_LEN);}
virtual typename super::msg_type pack_heartbeat() {auto head_len = packer_helper::pack_header(0); return typename super::msg_type((const char*) &head_len, ASCS_HEAD_LEN);}
//msg must has been packed by this packer with native == false
virtual char* raw_data(msg_type& msg) const {return const_cast<char*>(std::next(msg.data(), ASCS_HEAD_LEN));}
virtual const char* raw_data(msg_ctype& msg) const {return std::next(msg.data(), ASCS_HEAD_LEN);}
virtual size_t raw_data_len(msg_ctype& msg) const {return msg.size() - ASCS_HEAD_LEN;}
virtual char* raw_data(typename super::msg_type& msg) const {return const_cast<char*>(std::next(msg.data(), ASCS_HEAD_LEN));}
virtual const char* raw_data(typename super::msg_ctype& msg) const {return std::next(msg.data(), ASCS_HEAD_LEN);}
virtual size_t raw_data_len(typename super::msg_ctype& msg) const {return msg.size() - ASCS_HEAD_LEN;}
};
//protocol: length + body
//T can be unique_buffer or shared_buffer, the latter makes output messages seemingly copyable.
template<typename T = unique_buffer<i_buffer>, typename C = string_buffer>
//T can be unique_buffer<XXXX> or shared_buffer<XXXX>, the latter makes output messages seemingly copyable.
//C is XXXX or a class that inherit from XXXX (because XXXX can be a virtual interface).
//Packer is the real packer who packs messages, which means packer2 is just a wrapper.
template<typename T = unique_buffer<i_buffer>, typename C = string_buffer, typename Packer = packer<>>
class packer2 : public i_packer<T>
{
private:
typedef i_packer<T> super;
public:
static size_t get_max_msg_size() {return packer::get_max_msg_size();}
static size_t get_max_msg_size() {return Packer::get_max_msg_size();}
using super::pack_msg;
virtual typename super::msg_type pack_msg(const char* const pstr[], const size_t len[], size_t num, bool native = false)
{
auto raw_msg = new C();
auto str = packer().pack_msg(pstr, len, num, native);
auto str = Packer().pack_msg(pstr, len, num, native);
raw_msg->swap(str);
return typename super::msg_type(raw_msg);
}
......@@ -202,7 +209,7 @@ public:
virtual typename super::msg_type pack_heartbeat()
{
auto raw_msg = new C();
auto str = packer().pack_heartbeat();
auto str = Packer().pack_heartbeat();
raw_msg->swap(str);
return typename super::msg_type(raw_msg);
}
......@@ -214,7 +221,7 @@ public:
};
//protocol: fixed length
class fixed_length_packer : public packer
class fixed_length_packer : public packer<>
{
public:
using packer::pack_msg;
......
......@@ -19,11 +19,11 @@
#include "../single_service_pump.h"
#ifndef ASCS_DEFAULT_PACKER
#define ASCS_DEFAULT_PACKER ascs::ext::packer
#define ASCS_DEFAULT_PACKER ascs::ext::packer<>
#endif
#ifndef ASCS_DEFAULT_UNPACKER
#define ASCS_DEFAULT_UNPACKER ascs::ext::unpacker
#define ASCS_DEFAULT_UNPACKER ascs::ext::unpacker<>
#endif
namespace ascs { namespace ext { namespace ssl {
......
......@@ -23,11 +23,11 @@
#include "../single_service_pump.h"
#ifndef ASCS_DEFAULT_PACKER
#define ASCS_DEFAULT_PACKER ascs::ext::packer
#define ASCS_DEFAULT_PACKER ascs::ext::packer<>
#endif
#ifndef ASCS_DEFAULT_UNPACKER
#define ASCS_DEFAULT_UNPACKER ascs::ext::unpacker
#define ASCS_DEFAULT_UNPACKER ascs::ext::unpacker<>
#endif
namespace ascs { namespace ext { namespace tcp {
......
......@@ -20,7 +20,7 @@
#include "../single_service_pump.h"
#ifndef ASCS_DEFAULT_PACKER
#define ASCS_DEFAULT_PACKER ascs::ext::packer
#define ASCS_DEFAULT_PACKER ascs::ext::packer<>
#endif
#ifndef ASCS_DEFAULT_UDP_UNPACKER
......
......@@ -49,8 +49,13 @@ public:
};
//protocol: length + body
class unpacker : public i_unpacker<std::string>
//T can be std::string or basic_buffer
template<typename T = std::string>
class unpacker : public i_unpacker<T>
{
private:
typedef i_unpacker<T> super;
public:
unpacker() {reset();}
size_t current_msg_length() const {return cur_msg_len;} //current msg's total length, -1 means not available
......@@ -96,7 +101,7 @@ public:
public:
virtual void reset() {cur_msg_len = -1; remain_len = 0;}
virtual void dump_left_data() const {unpacker_helper::dump_left_data(raw_buff.data(), cur_msg_len, remain_len);}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
virtual bool parse_msg(size_t bytes_transferred, typename super::container_type& msg_can)
{
//length + msg
remain_len += bytes_transferred;
......@@ -107,7 +112,7 @@ public:
do_something_to_all(msg_pos_can, [this, &msg_can](decltype(msg_pos_can.front()) item) {
if (item.second > ASCS_HEAD_LEN) //ignore heartbeat
{
if (stripped())
if (this->stripped())
msg_can.emplace_back(std::next(item.first, ASCS_HEAD_LEN), item.second - ASCS_HEAD_LEN);
else
msg_can.emplace_back(item.first, item.second);
......@@ -151,17 +156,17 @@ public:
#ifdef ASCS_SCATTERED_RECV_BUFFER
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return buffer_type(1, asio::buffer(raw_buff) + remain_len);}
virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return typename super::buffer_type(1, asio::buffer(raw_buff) + remain_len);}
#elif ASIO_VERSION < 101100
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(asio::buffer(raw_buff) + remain_len);}
virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(asio::buffer(raw_buff) + remain_len);}
#else
virtual buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(raw_buff) + remain_len;}
virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ASCS_MSG_BUFFER_SIZE); return asio::buffer(raw_buff) + remain_len;}
#endif
//msg must has been unpacked by this unpacker
virtual char* raw_data(msg_type& msg) const {return const_cast<char*>(stripped() ? msg.data() : std::next(msg.data(), ASCS_HEAD_LEN));}
virtual const char* raw_data(msg_ctype& msg) const {return stripped() ? msg.data() : std::next(msg.data(), ASCS_HEAD_LEN);}
virtual size_t raw_data_len(msg_ctype& msg) const {return stripped() ? msg.size() : msg.size() - ASCS_HEAD_LEN;}
virtual char* raw_data(typename super::msg_type& msg) const {return const_cast<char*>(this->stripped() ? msg.data() : std::next(msg.data(), ASCS_HEAD_LEN));}
virtual const char* raw_data(typename super::msg_ctype& msg) const {return this->stripped() ? msg.data() : std::next(msg.data(), ASCS_HEAD_LEN);}
virtual size_t raw_data_len(typename super::msg_ctype& msg) const {return this->stripped() ? msg.size() : msg.size() - ASCS_HEAD_LEN;}
protected:
std::array<char, ASCS_MSG_BUFFER_SIZE> raw_buff;
......@@ -384,8 +389,9 @@ protected:
//protocol: length + body
//Buffer can be unique_buffer or shared_buffer, the latter makes output messages seemingly copyable.
//T can be std::string or basic_buffer, Unpacker can be the default unpacker or flexible_unpacker.
template<template<typename> class Buffer = shared_buffer, typename T = std::string, typename Unpacker = unpacker>
//T can be std::string or basic_buffer, and the output message type will be Buffer<T>.
//Unpacker can be the default unpacker or flexible_unpacker, which means unpacker2 is just a wrapper.
template<template<typename> class Buffer = shared_buffer, typename T = std::string, typename Unpacker = unpacker<>>
class unpacker2 : public i_unpacker<Buffer<T>>
{
private:
......@@ -548,9 +554,9 @@ private:
};
//protocol: fixed length
//non-copy, let asio write msg directly (no temporary memory needed), actually, this unpacker has poor performance, because it needs one read for one message, other unpackers
//are able to get many messages from just one read, so this unpacker just demonstrates a way to avoid memory replications and temporary memory utilization, it can provide better
// performance for huge messages.
//non-copy, let asio write msg directly (no temporary memory needed), actually, this unpacker has poor performance, because it needs one read for one message,
// other unpackers are able to get many messages from just one read, so this unpacker just demonstrates a way to avoid memory replications and temporary memory
// utilization, it can provide better performance for huge messages.
//this unpacker doesn't support heartbeat, please note.
class fixed_length_unpacker : public i_unpacker<basic_buffer>
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册