未验证 提交 9a35620f 编写于 作者: D Daniel Larimer 提交者: GitHub

Merge pull request #613 from PaulCalabrese/STAT-90-Use-"Greedy-read"-for-reading-from-connections

STAT-90 Switch to using greedy reads
......@@ -145,7 +145,6 @@ namespace eos {
void start_session( connection_ptr c );
void start_listen_loop( );
void start_read_message( connection_ptr c);
void start_reading_pending_buffer( connection_ptr c);
void close( connection_ptr c );
......@@ -215,6 +214,8 @@ namespace eos {
constexpr auto def_max_just_send = 1300 * 3; // "mtu" * 3
constexpr auto def_send_whole_blocks = true;
constexpr auto message_header_size = 4;
/**
* Index by id
* Index by is_known, block_num, validated_time, this is the order we will broadcast
......@@ -311,11 +312,11 @@ namespace eos {
// still would be to have a mempool class that has a series of fixed size blocks that can
// be used scatter-gather style.
uint32_t expected_size;
vector<char> pending_message_buffer;
uint32_t pending_message_write_index;
uint32_t pending_message_read_index;
vector<char> send_buffer;
vector<char> blk_buffer;
size_t actual_size;
deque< vector<char> > txn_queue;
size_t txn_in_flight;
......@@ -400,8 +401,60 @@ namespace eos {
void fetch_wait ();
void sync_timeout (boost::system::error_code ec);
void fetch_timeout (boost::system::error_code ec);
/** \brief Adjust the pending message buffer
*
* This method is called to adjust the size of the
* pending_message_buffer when there is a partial message
* in the buffer of message_length. There may be
* additional messages earlier in the buffer that
* can be removed
*/
void adjust_buffer_size(uint32_t message_length);
/** \brief Process the next message from the pending message buffer
*
* Process the next message from the pending_message_buffer.
* message_length is the already determined length of the data
* part of the message and impl in the net plugin implementation
* that will handle the message.
* Returns true is successful. Returns false if an error was
* encountered unpacking or processing the message.
*/
bool process_next_message(net_plugin_impl& impl, uint32_t message_length);
};
struct precache : public fc::visitor<void> {
connection_ptr c;
size_t message_size;
precache( connection_ptr conn, size_t msg_size) : c(conn), message_size(msg_size) {}
void operator()(const signed_block &msg) const
{
c->blk_buffer.resize(message_size);
memcpy(c->blk_buffer.data(),
&c->pending_message_buffer[c->pending_message_read_index+message_header_size],
message_size);
}
template <typename T>
void operator()(const T &msg) const
{
//no-op
}
};
struct msgHandler : public fc::visitor<void> {
net_plugin_impl &impl;
connection_ptr c;
msgHandler( net_plugin_impl &imp, connection_ptr conn) : impl(imp), c(conn) {}
template <typename T>
void operator()(const T &msg) const
{
impl.handle_message( c, msg);
}
};
class sync_manager {
uint32_t sync_known_lib_num;
......@@ -436,8 +489,9 @@ namespace eos {
sync_receiving(),
sync_requested(),
socket( std::make_shared<tcp::socket>( std::ref( app().get_io_service() ))),
expected_size(0),
pending_message_buffer(recv_buf_size),
pending_message_write_index(0),
pending_message_read_index(0),
send_buffer(send_buf_size),
node_id(),
last_handshake(),
......@@ -462,8 +516,9 @@ namespace eos {
sync_receiving(),
sync_requested(),
socket( s ),
expected_size(0),
pending_message_buffer(recv_buf_size),
pending_message_write_index(0),
pending_message_read_index(0),
send_buffer(send_buf_size),
node_id(),
last_handshake(),
......@@ -860,6 +915,54 @@ namespace eos {
elog( "setting timer for fetch request got error ${ec}", ("ec", ec.message( ) ) );
}
}
void connection::adjust_buffer_size(uint32_t message_length) {
uint32_t current_buffer_size = pending_message_buffer.size();
if (current_buffer_size - pending_message_read_index + 1 < message_length)
// Not enough room in the buffer, grow the buffer, first move remaining
// unprocessed data to the beginning of the buffer
if (pending_message_read_index != 0) {
memmove(&pending_message_buffer[0],
&pending_message_buffer[pending_message_read_index],
pending_message_write_index - pending_message_read_index);
pending_message_write_index -= pending_message_read_index;
pending_message_read_index = 0;
}
// See if we need to grow the buffer or if there is enough space now.
uint32_t bytes_needed = message_length - current_buffer_size - pending_message_read_index + 1;
if (bytes_needed > 0) {
// Grow the buffer by some multiplier of the current size.
// Note that the buffer size will never shrink in the current implementation.
// The eventual solution will be to use a chain of buffers using scatter/gather
uint32_t multiplier = (bytes_needed / current_buffer_size) + 2;
uint32_t new_size = current_buffer_size * multiplier;
pending_message_buffer.resize(new_size);
}
}
bool connection::process_next_message(net_plugin_impl& impl, uint32_t message_length) {
try {
fc::datastream<const char*> ds(&pending_message_buffer[pending_message_read_index + message_header_size],
message_length);
net_message msg;
fc::raw::unpack(ds, msg);
precache pc( shared_from_this(), message_length );
msg.visit( pc);
msgHandler m(impl, shared_from_this() );
msg.visit(m);
} catch( const fc::exception& e ) {
edump((e.to_detail_string() ));
impl.close( shared_from_this() );
return false;
}
pending_message_read_index += message_header_size + message_length;
return true;
}
//-----------------------------------------------------------
sync_manager::sync_manager( uint32_t span )
......@@ -1106,29 +1209,47 @@ namespace eos {
}
void net_plugin_impl::start_read_message( connection_ptr conn ) {
conn->expected_size = 0;
connection_wptr c( conn);
uint32_t *buff = &(conn.get()->expected_size);
boost::asio::async_read( *conn->socket,
boost::asio::buffer((char *)buff, sizeof(conn->expected_size)),
[this,c]( boost::system::error_code ec, std::size_t read_size ) {
//ilog( "read size handler..." );
if( !ec ) {
connection_ptr conn = c.lock();
if( conn->expected_size <= conn->pending_message_buffer.size() ) {
start_reading_pending_buffer( conn );
return;
} else {
elog( "expected_size = ${es}, pmb_size = ${pmb}, readsize = ${rs}",
("es",conn->expected_size)("pmb",conn->pending_message_buffer.size())("rs", read_size));
}
} else {
elog( "Error reading message from connection: ${m}",( "m", ec.message() ) );
}
close( c.lock() );
}
);
conn->socket->async_read_some(
boost::asio::buffer(&conn->pending_message_buffer[conn->pending_message_write_index],
conn->pending_message_buffer.size() - conn->pending_message_write_index),
[this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) {
if( !ec ) {
connection_ptr conn = c.lock();
if (!conn) {
return;
}
conn->pending_message_write_index += bytes_transferred;
while (conn->pending_message_read_index < conn->pending_message_write_index) {
uint32_t bytes_in_buffer = conn->pending_message_write_index - conn->pending_message_read_index;
if (bytes_in_buffer < message_header_size) {
break;
} else {
// Ignore byte-ordering concerns
uint32_t message_length;
memcpy(&message_length, &conn->pending_message_buffer[conn->pending_message_read_index], sizeof(message_length));
if (bytes_in_buffer >= message_header_size + message_length) {
if (!conn->process_next_message(*this, message_length)) {
return;
}
} else {
conn->adjust_buffer_size(message_length);
break;
}
}
}
if (conn->pending_message_read_index == conn->pending_message_write_index) {
// Buffer is empty, reset indices
conn->pending_message_read_index = 0;
conn->pending_message_write_index = 0;
}
start_read_message(conn);
} else {
elog( "Error reading message from connection: ${m}",( "m", ec.message() ) );
close( c.lock() );
}
}
);
}
template<typename VerifierFunc>
......@@ -1644,61 +1765,6 @@ namespace eos {
}
}
struct precache : public fc::visitor<void> {
connection_ptr c;
precache( connection_ptr conn) : c(conn) {}
void operator()(const signed_block &msg) const
{
c->blk_buffer.resize(c->actual_size);
memcpy(c->blk_buffer.data(),c->pending_message_buffer.data(), c->actual_size);
}
template <typename T>
void operator()(const T &msg) const
{
//no-op
}
};
struct msgHandler : public fc::visitor<void> {
net_plugin_impl &impl;
connection_ptr c;
msgHandler( net_plugin_impl &imp, connection_ptr conn) : impl(imp), c(conn) {}
template <typename T>
void operator()(const T &msg) const
{
impl.handle_message( c, msg);
}
};
void net_plugin_impl::start_reading_pending_buffer( connection_ptr c ) {
boost::asio::async_read( *c->socket,
boost::asio::buffer(c->pending_message_buffer.data(), c->expected_size ),
[this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) {
if( !ec ) {
try {
c->actual_size = bytes_transferred;
auto msg = fc::raw::unpack<net_message>( c->pending_message_buffer );
precache pc( c );
msg.visit( pc );
start_read_message( c );
msgHandler m( *this, c );
msg.visit( m );
return;
} catch( const fc::exception& e ) {
edump((e.to_detail_string() ));
}
} else {
elog( "Error reading message from connection: ${m}",( "m", ec.message() ) ); }
close( c );
});
}
void net_plugin_impl::start_conn_timer( ) {
connector_check->expires_from_now( connector_period);
connector_check->async_wait( [&](boost::system::error_code ec) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册