提交 4ff259a3 编写于 作者: Y youngwolf

Add flexible_unpacker--an unpacker which support huge messages but with a...

Add flexible_unpacker--an unpacker which support huge messages but with a limited pre-allocated buffer.
Make class basic_buffer to be more alike to std::string.
Make function void i_unpacker::stripped(bool) to be virtual.
unpacker2 support basic_buffer too.
Optimize unpackers.
上级 7af05b62
Subproject commit be7badc31abcc395cf868de6a1e240c2350bdbf2
Subproject commit 77ed33821d6473d6904ef3171e318181c015003a
......@@ -25,9 +25,17 @@
//2-fixed length packer and unpacker
//3-prefix and/or suffix packer and unpacker
#if 1 == PACKER_UNPACKER_TYPE
#if 0 == PACKER_UNPACKER_TYPE
#define ASCS_HUGE_MSG
#define ASCS_MSG_BUFFER_SIZE 1000000
#define ASCS_MAX_SEND_BUF (10 * ASCS_MSG_BUFFER_SIZE)
#define ASCS_MAX_RECV_BUF (10 * ASCS_MSG_BUFFER_SIZE)
#define ASCS_DEFAULT_UNPACKER flexible_unpacker<>
//this unpacker only pre-allocated a buffer of 4000 bytes, but it can parse messages up to ST_ASIO_MSG_BUFFER_SIZE (here is 1000000) bytes,
//it works as the default unpacker for messages <= 4000, otherwise, it works as non_copy_unpacker
#elif 1 == PACKER_UNPACKER_TYPE
#define ASCS_DEFAULT_PACKER packer2<unique_buffer<std::string>, std::string>
#define ASCS_DEFAULT_UNPACKER unpacker2<unique_buffer<std::string>>
#define ASCS_DEFAULT_UNPACKER unpacker2<unique_buffer, std::string, flexible_unpacker<>>
#elif 2 == PACKER_UNPACKER_TYPE
#undef ASCS_HEARTBEAT_INTERVAL
#define ASCS_HEARTBEAT_INTERVAL 0 //not support heartbeat
......@@ -357,7 +365,7 @@ void send_msg_concurrently(echo_client& client, size_t send_thread_num, size_t m
fflush(stdout);
}
} while (percent < 100);
do_something_to_all(threads, [](std::thread& t) {t.join();});
do_something_to_all(threads, [](std::thread& t) {if (t.joinable()) t.join();});
begin_time.stop();
printf(" finished in %f seconds, TPS: %f(*2), speed: %f(*2) MBps.\n",
......
......@@ -25,9 +25,17 @@
//2-fixed length packer and unpacker
//3-prefix and/or suffix packer and unpacker
#if 1 == PACKER_UNPACKER_TYPE
#if 0 == PACKER_UNPACKER_TYPE
#define ASCS_HUGE_MSG
#define ASCS_MSG_BUFFER_SIZE 1000000
#define ASCS_MAX_SEND_BUF (10 * ASCS_MSG_BUFFER_SIZE)
#define ASCS_MAX_RECV_BUF (10 * ASCS_MSG_BUFFER_SIZE)
#define ASCS_DEFAULT_UNPACKER flexible_unpacker<basic_buffer>
//this unpacker only pre-allocated a buffer of 4000 bytes, but it can parse messages up to ST_ASIO_MSG_BUFFER_SIZE (here is 1000000) bytes,
//it works as the default unpacker for messages <= 4000, otherwise, it works as non_copy_unpacker
#elif 1 == PACKER_UNPACKER_TYPE
#define ASCS_DEFAULT_PACKER packer2<unique_buffer<std::string>, std::string>
#define ASCS_DEFAULT_UNPACKER unpacker2<unique_buffer<std::string>>
#define ASCS_DEFAULT_UNPACKER unpacker2<unique_buffer, basic_buffer, flexible_unpacker<basic_buffer>>
#elif 2 == PACKER_UNPACKER_TYPE
#undef ASCS_HEARTBEAT_INTERVAL
#define ASCS_HEARTBEAT_INTERVAL 0 //not support heartbeat
......
......@@ -200,14 +200,14 @@ public:
typedef std::list<msg_type> container_type;
typedef ASCS_RECV_BUFFER_TYPE buffer_type;
bool stripped() const {return _stripped;}
void stripped(bool stripped_) {_stripped = stripped_;}
protected:
i_unpacker() : _stripped(true) {}
virtual ~i_unpacker() {}
public:
bool stripped() const {return _stripped;}
virtual void stripped(bool stripped_) {_stripped = stripped_;}
virtual void reset() = 0;
virtual void dump_left_data() const {}
//heartbeat must not be included in msg_can, otherwise you must handle heartbeat at where you handle normal messages.
......@@ -442,7 +442,7 @@ class auto_duration
{
public:
auto_duration(statistic::stat_duration& duration_) : started(true), begin_time(statistic::now()), duration(duration_) {}
~auto_duration() {end();}
virtual ~auto_duration() {end();}
void end() {if (started) duration += statistic::now() - begin_time; started = false;}
......
......@@ -813,13 +813,13 @@ static_assert(ASCS_SERVER_PORT > 0, "server port must be bigger than zero.");
//send buffer's maximum size (bytes), it will be expanded dynamically (not fixed) within this range.
#ifndef ASCS_MAX_SEND_BUF
#define ASCS_MAX_SEND_BUF 1048576 //1M
#define ASCS_MAX_SEND_BUF (1024 * 1024) //1M, 1048576
#endif
static_assert(ASCS_MAX_SEND_BUF > 0, "send buffer capacity must be bigger than zero.");
//recv buffer's maximum size (bytes), it will be expanded dynamically (not fixed) within this range.
#ifndef ASCS_MAX_RECV_BUF
#define ASCS_MAX_RECV_BUF 1048576 //1M
#define ASCS_MAX_RECV_BUF (1024 * 1024) //1M, 1048576
#endif
static_assert(ASCS_MAX_RECV_BUF > 0, "recv buffer capacity must be bigger than zero.");
......
......@@ -23,7 +23,6 @@
#ifndef ASCS_MSG_BUFFER_SIZE
#define ASCS_MSG_BUFFER_SIZE 4000
#endif
static_assert(ASCS_MSG_BUFFER_SIZE > 0, "message buffer size must be bigger than zero.");
//#define ASCS_SCATTERED_RECV_BUFFER
//define this macro will introduce scatter-gather buffers when doing async read, it's very useful under certain situations (for example, ring buffer).
......@@ -38,7 +37,9 @@ static_assert(ASCS_MSG_BUFFER_SIZE > 0, "message buffer size must be bigger than
#define ASCS_HEAD_H2N htons
#define ASCS_HEAD_N2H ntohs
#endif
#define ASCS_HEAD_LEN (sizeof(ASCS_HEAD_TYPE))
static_assert(100 * 1024 * 1024 >= ASCS_MSG_BUFFER_SIZE && ASCS_MSG_BUFFER_SIZE >= ASCS_HEAD_LEN, "invalid message buffer size.");
namespace ascs { namespace ext {
......@@ -51,6 +52,10 @@ public:
virtual const char* data() const {return std::string::data();}
};
//a substitute of std::string (just for unpacking scenario, many features are missing according to std::string), because std::string
// has a small defect which is terrible for unpacking scenario, it cannot change its size without fill its buffer.
//please note that basic_buffer won't append '\0' to the end of the string (std::string will do), you cannot treat it as a string and
// print it with "%s" format even all characters in it are printable (because no '\0' appended to them).
class basic_buffer
#if defined(_MSC_VER) && _MSC_VER <= 1800
: public asio::noncopyable
......@@ -59,35 +64,53 @@ class basic_buffer
public:
basic_buffer() {do_detach();}
basic_buffer(size_t len) {do_assign(len);}
basic_buffer(char* buff, size_t len) {do_attach(buff, len, len);}
basic_buffer(basic_buffer&& other) {do_attach(other.buff, other.len, other.buff_len); other.do_detach();}
~basic_buffer() {clear();}
basic_buffer(const char* _buff, size_t len) {do_assign(len); memcpy(buff, _buff, len);}
basic_buffer(basic_buffer&& other) {do_attach(other.buff, other.len, other.cap); other.do_detach();}
virtual ~basic_buffer() {clear();}
basic_buffer& operator=(basic_buffer&& other) {clear(); swap(other); return *this;}
void assign(size_t len) {clear(); do_assign(len);}
void attach(char* buff, size_t len) {clear(); do_attach(buff, len, len);}
void resize(size_t _len) //won't fill the extended buffer
{
if (_len <= cap)
len = _len;
else
{
auto old_buff = buff;
auto old_len = len;
do_assign(_len);
if (nullptr != old_buff)
{
memcpy(buff, old_buff, old_len);
delete[] old_buff;
}
}
}
void reserve(size_t len) {if (len > cap) resize(len);}
void assign(size_t len) {resize(len);}
size_t max_size() const {return (unsigned) -1;}
size_t capacity() const {return cap;}
//the following five functions are needed by ascs
bool empty() const {return 0 == len || nullptr == buff;}
size_t size() const {return nullptr == buff ? 0 : len;}
const char* data() const {return buff;}
void swap(basic_buffer& other) {std::swap(buff, other.buff); std::swap(len, other.len); std::swap(buff_len, other.buff_len);}
void swap(basic_buffer& other) {std::swap(buff, other.buff); std::swap(len, other.len); std::swap(cap, other.cap);}
void clear() {delete[] buff; do_detach();}
//functions needed by packer and unpacker
char* data() {return buff;}
bool shrink_size(size_t _len) {assert(_len <= buff_len); return (_len <= buff_len) ? (len = _len, true) : false;}
size_t buffer_size() const {return nullptr == buff ? 0 : buff_len;}
protected:
void do_assign(size_t len) {do_attach(new char[len], len, len);}
void do_attach(char* _buff, size_t _len, size_t _buff_len) {buff = _buff; len = _len; buff_len = _buff_len;}
void do_detach() {buff = nullptr; len = buff_len = 0;}
void do_attach(char* _buff, size_t _len, size_t capacity) {buff = _buff; len = (unsigned) _len; cap = (unsigned) capacity;}
void do_detach() {buff = nullptr; len = cap = 0;}
protected:
char* buff;
size_t len, buff_len;
unsigned len, cap;
};
class cpu_timer //a substitute of boost::timer::cpu_timer
......
......@@ -55,12 +55,8 @@ public:
unpacker() {reset();}
size_t current_msg_length() const {return cur_msg_len;} //current msg's total length, -1 means not available
bool parse_msg(size_t bytes_transferred, std::list<std::pair<const char*, size_t>>& msg_can)
bool parse_msg(std::list<std::pair<const char*, size_t>>& msg_can)
{
//length + msg
remain_len += bytes_transferred;
assert(remain_len <= ASCS_MSG_BUFFER_SIZE);
auto pnext = &*std::begin(raw_buff);
auto unpack_ok = true;
while (unpack_ok) //considering sticky package problem, we need a loop
......@@ -102,12 +98,16 @@ public:
virtual void dump_left_data() const {unpacker_helper::dump_left_data(raw_buff.data(), cur_msg_len, remain_len);}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
//length + msg
remain_len += bytes_transferred;
assert(remain_len <= ASCS_MSG_BUFFER_SIZE);
std::list<std::pair<const char*, size_t>> msg_pos_can;
auto unpack_ok = parse_msg(bytes_transferred, msg_pos_can);
auto unpack_ok = parse_msg(msg_pos_can);
do_something_to_all(msg_pos_can, [this, &msg_can](decltype(msg_pos_can.front()) item) {
if (item.second > ASCS_HEAD_LEN) //ignore heartbeat
{
if (this->stripped())
if (stripped())
msg_can.emplace_back(std::next(item.first, ASCS_HEAD_LEN), item.second - ASCS_HEAD_LEN);
else
msg_can.emplace_back(item.first, item.second);
......@@ -144,7 +144,7 @@ public:
return 0;
}
return data_len >= cur_msg_len ? 0 : asio::detail::default_max_transfer_size;
return data_len >= cur_msg_len ? 0 : ASCS_MSG_BUFFER_SIZE;
//read as many as possible except that we have already got an entire msg
}
......@@ -169,6 +169,198 @@ protected:
size_t remain_len; //half-baked msg
};
//protocol: length + body
//this unpacker has a fixed buffer (4000 bytes), if messages can be held in it, then this unpacker works just as the default unpacker,
// otherwise, a dynamic std::string will be created to hold big messages, then this unpacker works just as the non_copy_unpacker.
//T can be std::string or basic_buffer, the latter will not fill its buffer in resize invocation, so is more efficient.
template<typename T = std::string>
class flexible_unpacker : public i_unpacker<T>
{
private:
typedef i_unpacker<T> super;
public:
flexible_unpacker() {reset();}
size_t current_msg_length() const {return cur_msg_len;} //current msg's total length, -1 means not available
bool parse_msg(std::list<std::pair<const char*, size_t>>& msg_can)
{
auto pnext = &*std::begin(raw_buff);
auto unpack_ok = true;
while (unpack_ok) //considering sticky package problem, we need a loop
if ((size_t) -1 != cur_msg_len)
{
if (cur_msg_len > ASCS_MSG_BUFFER_SIZE || cur_msg_len < ASCS_HEAD_LEN)
unpack_ok = false;
else if (remain_len >= cur_msg_len) //one msg received
{
msg_can.emplace_back(pnext, cur_msg_len);
remain_len -= cur_msg_len;
std::advance(pnext, cur_msg_len);
cur_msg_len = -1;
}
else
break;
}
else if (remain_len >= ASCS_HEAD_LEN) //the msg's head been received, sticky package found
{
ASCS_HEAD_TYPE head;
memcpy(&head, pnext, ASCS_HEAD_LEN);
cur_msg_len = ASCS_HEAD_N2H(head);
#ifdef ASCS_HUGE_MSG
if ((size_t) -1 == cur_msg_len) //avoid dead loop on 32bit system with macro ASCS_HUGE_MSG
unpack_ok = false;
#endif
}
else
break;
if (pnext == &*std::begin(raw_buff)) //we should have at least got one msg.
unpack_ok = false;
return unpack_ok;
}
public:
virtual void reset() {big_msg.clear(); cur_msg_len = -1; remain_len = 0;}
virtual void dump_left_data() const {unpacker_helper::dump_left_data(big_msg.empty() ? raw_buff.data() : big_msg.data(), cur_msg_len, remain_len);}
virtual bool parse_msg(size_t bytes_transferred, typename super::container_type& msg_can)
{
//length + msg
remain_len += bytes_transferred;
assert(remain_len <= std::max(raw_buff.size(), big_msg.size()));
if (!big_msg.empty())
{
if (remain_len != big_msg.size())
return false;
msg_can.emplace_back(std::move(big_msg));
reset();
return true;
}
if ((size_t) -1 == cur_msg_len && remain_len >= ASCS_HEAD_LEN) //the msg's head been received
{
ASCS_HEAD_TYPE head;
memcpy(&head, &*std::begin(raw_buff), ASCS_HEAD_LEN);
cur_msg_len = ASCS_HEAD_N2H(head);
}
if (cur_msg_len <= ASCS_MSG_BUFFER_SIZE && cur_msg_len > raw_buff.size()) //big message
{
extern_buffer();
return true;
}
std::list<std::pair<const char*, size_t>> msg_pos_can;
auto unpack_ok = parse_msg(msg_pos_can);
do_something_to_all(msg_pos_can, [this, &msg_can](decltype(msg_pos_can.front()) item) {
if (item.second > ASCS_HEAD_LEN) //ignore heartbeat
{
if (this->stripped())
msg_can.emplace_back(std::next(item.first, ASCS_HEAD_LEN), item.second - ASCS_HEAD_LEN);
else
msg_can.emplace_back(item.first, item.second);
}
});
if (remain_len > 0 && !msg_pos_can.empty())
{
auto pnext = std::next(msg_pos_can.back().first, msg_pos_can.back().second);
memmove(&*std::begin(raw_buff), pnext, remain_len); //left behind unparsed data
}
if (unpack_ok && (size_t) -1 != cur_msg_len && cur_msg_len > raw_buff.size()) //big message
extern_buffer();
//if unpacking failed, successfully parsed msgs will still returned via msg_can(sticky package), please note.
return unpack_ok;
}
//a return value of 0 indicates that the read operation is complete. a non-zero value indicates the maximum number
//of bytes to be read on the next call to the stream's async_read_some function. ---asio::async_read
//read as many as possible to reduce asynchronous call-back, and don't forget to handle sticky package carefully in parse_msg function.
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred)
{
if (ec)
return 0;
auto data_len = remain_len + bytes_transferred;
assert(data_len <= cur_msg_len);
if ((size_t) -1 == cur_msg_len && data_len >= ASCS_HEAD_LEN) //the msg's head been received
{
ASCS_HEAD_TYPE head;
memcpy(&head, &*std::begin(raw_buff), ASCS_HEAD_LEN);
cur_msg_len = ASCS_HEAD_N2H(head);
if (cur_msg_len > ASCS_MSG_BUFFER_SIZE || cur_msg_len < ASCS_HEAD_LEN || //invalid msg, stop reading
cur_msg_len > raw_buff.size()) //big message
return 0;
}
return data_len >= cur_msg_len ? 0 : (big_msg.empty() ? raw_buff.size() : big_msg.size());
//read as many as possible except that we have already got an entire msg
}
#ifdef ASCS_SCATTERED_RECV_BUFFER
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
virtual typename super::buffer_type prepare_next_recv()
{
assert(remain_len < cur_msg_len);
if (big_msg.empty())
return typename super::buffer_type(1, asio::buffer(raw_buff) + remain_len);
return typename super::buffer_type(1, asio::buffer(const_cast<char*>(big_msg.data()), big_msg.size()) + remain_len);
}
#elif ASIO_VERSION < 101100
virtual typename super::buffer_type prepare_next_recv()
{
assert(remain_len < cur_msg_len);
if (big_msg.empty())
return asio::buffer(asio::buffer(raw_buff) + remain_len);
return asio::buffer(asio::buffer(const_cast<char*>(big_msg.data()), big_msg.size()) + remain_len);
}
#else
virtual typename super::buffer_type prepare_next_recv()
{
assert(remain_len < cur_msg_len);
if (big_msg.empty())
return asio::buffer(raw_buff) + remain_len;
return asio::buffer(const_cast<char*>(big_msg.data()), big_msg.size()) + remain_len;
}
#endif
//msg must has been unpacked by this unpacker
virtual char* raw_data(typename super::msg_type& msg) const {return const_cast<char*>(this->stripped() ? msg.data() : std::next(msg.data(), ASCS_HEAD_LEN));}
virtual const char* raw_data(typename super::msg_ctype& msg) const {return this->stripped() ? msg.data() : std::next(msg.data(), ASCS_HEAD_LEN);}
virtual size_t raw_data_len(typename super::msg_ctype& msg) const {return this->stripped() ? msg.size() : msg.size() - ASCS_HEAD_LEN;}
private:
void extern_buffer()
{
auto step = 0;
if (this->stripped())
{
cur_msg_len -= ASCS_HEAD_LEN;
remain_len -= ASCS_HEAD_LEN;
step = ASCS_HEAD_LEN;
}
big_msg.resize(cur_msg_len); //std::string will fill big_msg, which is totally not necessary for this scenario, but basic_buffer won't.
memcpy(const_cast<char*>(big_msg.data()), std::next(raw_buff.data(), step), remain_len);
}
protected:
std::array<char, 4000> raw_buff;
typename super::msg_type big_msg;
size_t cur_msg_len; //-1 means head not received, so msg length is not available.
size_t remain_len; //half-baked msg
};
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
//this unpacker doesn't support heartbeat, please note.
class udp_unpacker : public i_unpacker<std::string>
......@@ -191,22 +383,24 @@ protected:
};
//protocol: length + body
//T can be unique_buffer<std::string> or shared_buffer<std::string>, the latter makes output messages seemingly copyable.
template<typename T = shared_buffer<std::string>>
class unpacker2 : public ascs::i_unpacker<T>
//Buffer can be unique_buffer or shared_buffer, the latter makes output messages seemingly copyable.
//T can be std::string or basic_buffer, Unpacker can be the default unpacker or flexible_unpacker.
template<template<typename> class Buffer = shared_buffer, typename T = std::string, typename Unpacker = unpacker>
class unpacker2 : public i_unpacker<Buffer<T>>
{
private:
typedef ascs::i_unpacker<T> super;
typedef i_unpacker<Buffer<T>> super;
public:
virtual void stripped(bool stripped_) {super::stripped(stripped_); unpacker_.stripped(stripped_);}
virtual void reset() {unpacker_.reset();}
virtual void dump_left_data() const {unpacker_.dump_left_data();}
virtual bool parse_msg(size_t bytes_transferred, typename super::container_type& msg_can)
{
unpacker::container_type tmp_can;
unpacker_.stripped(this->stripped());
typename Unpacker::container_type tmp_can;
auto unpack_ok = unpacker_.parse_msg(bytes_transferred, tmp_can);
do_something_to_all(tmp_can, [&msg_can](unpacker::msg_type& item) {msg_can.emplace_back(new std::string(std::move(item)));});
do_something_to_all(tmp_can, [&msg_can](typename Unpacker::msg_type& item) {msg_can.emplace_back(new T(std::move(item)));});
//if unpacking failed, successfully parsed msgs will still returned via msg_can(sticky package), please note.
return unpack_ok;
......@@ -216,21 +410,21 @@ public:
virtual typename super::buffer_type prepare_next_recv() {return unpacker_.prepare_next_recv();}
//msg must has been unpacked by this unpacker
virtual char* raw_data(typename super::msg_type& msg) const {return const_cast<char*>(this->stripped() ? msg.data() : std::next(msg.data(), ASCS_HEAD_LEN));}
virtual const char* raw_data(typename super::msg_ctype& msg) const {return this->stripped() ? msg.data() : std::next(msg.data(), ASCS_HEAD_LEN);}
virtual size_t raw_data_len(typename super::msg_ctype& msg) const {return this->stripped() ? msg.size() : msg.size() - ASCS_HEAD_LEN;}
virtual char* raw_data(typename super::msg_type& msg) const {return unpacker_.raw_data(*msg.raw_buffer());}
virtual const char* raw_data(typename super::msg_ctype& msg) const {return unpacker_.raw_data(*msg.raw_buffer());}
virtual size_t raw_data_len(typename super::msg_ctype& msg) const {return unpacker_.raw_data_len(*msg.raw_buffer());}
protected:
unpacker unpacker_;
Unpacker unpacker_;
};
//protocol: UDP has message boundary, so we don't need a specific protocol to unpack it.
//T can be unique_buffer<std::string> or shared_buffer<std::string>, the latter makes output messages seemingly copyable.
template<typename T = shared_buffer<std::string>>
class udp_unpacker2 : public ascs::i_unpacker<T>
class udp_unpacker2 : public i_unpacker<T>
{
private:
typedef ascs::i_unpacker<T> super;
typedef i_unpacker<T> super;
public:
virtual void reset() {}
......@@ -258,14 +452,19 @@ protected:
//let asio write msg directly (no temporary memory needed), not support unstripped messages, please note (you can fix this defect if you like).
//actually, this unpacker has the worst performance, because it needs 2 read for one message, other unpackers are able to get many messages from just one read.
//so this unpacker just demonstrates a way to avoid memory replications and temporary memory utilization, it can provide better performance for huge messages.
//this unpacker only output stripped messages, please note.
class non_copy_unpacker : public i_unpacker<basic_buffer>
{
private:
typedef i_unpacker<basic_buffer> super;
public:
non_copy_unpacker() {reset();}
size_t current_msg_length() const {return raw_buff.size();} //current msg's total length(not include the head), 0 means not available
public:
virtual void stripped(bool stripped_)
{if (!stripped_) unified_out::error_out("non_copy_unpacker doesn't support unstripped messages"); else super::stripped(stripped_);}
virtual void reset() {raw_buff.clear(); step = 0;}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{
......@@ -316,12 +515,12 @@ public:
if (0 == step) //want the head
{
assert(raw_buff.empty());
return asio::detail::default_max_transfer_size;
return ASCS_HEAD_LEN;
}
else if (1 == step) //want the body
{
assert(!raw_buff.empty());
return asio::detail::default_max_transfer_size;
return raw_buff.size();
}
else
assert(false);
......@@ -377,8 +576,7 @@ public:
//a return value of 0 indicates that the read operation is complete. a non-zero value indicates the maximum number
//of bytes to be read on the next call to the stream's async_read_some function. ---asio::async_read
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred)
{return ec || bytes_transferred == raw_buff.size() ? 0 : asio::detail::default_max_transfer_size;}
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred) {return ec || bytes_transferred == raw_buff.size() ? 0 : _fixed_length;}
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
......@@ -433,7 +631,7 @@ public:
return 0; //invalid msg, stop reading
}
return asio::detail::default_max_transfer_size; //read as many as possible
return ASCS_MSG_BUFFER_SIZE; //read as many as possible
}
//like strstr, except support \0 in the middle of mem and sub_mem
......@@ -537,7 +735,7 @@ public:
return true;
}
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred) {return ec || bytes_transferred > 0 ? 0 : asio::detail::default_max_transfer_size;}
virtual size_t completion_condition(const asio::error_code& ec, size_t bytes_transferred) {return ec || bytes_transferred > 0 ? 0 : ASCS_MSG_BUFFER_SIZE;}
//this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
......
/*
* socks4.h
* socks.h
*
* Created on: 2020-8-25
* Author: youngwolf
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册