提交 3f435246 编写于 作者: K Kevin Heifner 提交者: Matt Witherspoon

Merge pull request #923 from EOSIO/p2p-stat-274-gh-916

P2p stat 274 gh 916

(PARTIALLY reapplied on noon -- did not take the genesis file)
上级 4ed15e7b
......@@ -10,7 +10,6 @@
#include <array>
namespace eosio {
template <uint32_t buffer_len>
class mb_datastream;
......@@ -33,7 +32,7 @@ namespace eosio {
*/
typedef std::pair<uint32_t, uint32_t> index_t;
message_buffer() : buffers{pool().malloc()}, read_ind{0,0}, write_ind{0,0} { }
message_buffer() : buffers{pool().malloc()}, read_ind{0,0}, write_ind{0,0}, sanity_check (1) { }
~message_buffer() {
while (buffers.size() > 0) {
......@@ -75,6 +74,7 @@ namespace eosio {
* Does not affect the read or write pointer.
*/
void add_buffer_to_chain() {
sanity_check++;
buffers.push_back(pool().malloc());
}
......@@ -86,6 +86,7 @@ namespace eosio {
void add_space(uint32_t bytes) {
int buffers_to_add = bytes / buffer_len + 1;
for (int i = 0; i < buffers_to_add; i++) {
sanity_check++;
buffers.push_back(pool().malloc());
}
}
......@@ -95,12 +96,23 @@ namespace eosio {
* discarded.
*/
void reset() {
read_ind = { 0, 0 };
write_ind = { 0, 0 };
// some condition exists that can send *both* buffers.size() and sanity_check to well over 10^6.
// this seems to be related to some sort of memory overrun possibly. By forcing an exit here, an
// external watchdog can be used to restart the process and avoid hanging.
if( buffers.size() != sanity_check || buffers.size() > 1000000) {
elog ("read_ind = ${r1}, ${r2} write_ind = ${w1}, ${w2}, buff.size = ${bs}, sanity = ${s}",
("r1",read_ind.first)("r2",read_ind.second)("w1",write_ind.first)("w2",write_ind.second)("bs",buffers.size())("s",sanity_check));
elog("Buffer manager overwrite detected. Terminating to allow external restart");
exit(1);
}
while (buffers.size() > 1) {
sanity_check--;
pool().destroy(buffers.back());
buffers.pop_back();
}
read_ind = { 0, 0 };
write_ind = { 0, 0 };
}
/*
......@@ -144,6 +156,7 @@ namespace eosio {
while (read_ind.first > 0) {
pool().destroy(buffers.front());
buffers.pop_front();
sanity_check--;
read_ind.first--;
write_ind.first--;
}
......@@ -157,6 +170,7 @@ namespace eosio {
void advance_write_ptr(uint32_t bytes) {
advance_index(write_ind, bytes);
while (write_ind.first >= buffers.size()) {
sanity_check++;
buffers.push_back(pool().malloc());
}
}
......@@ -249,9 +263,9 @@ namespace eosio {
std::deque<std::array<char, buffer_len>* > buffers;
index_t read_ind;
index_t write_ind;
};
size_t sanity_check;
};
/*
* @brief datastream adapter that adapts message_buffer for use with fc unpack
......@@ -286,4 +300,3 @@ namespace eosio {
}
} // namespace eosio
......@@ -41,7 +41,10 @@ namespace eosio {
wrong_version, ///< the peer's network version doesn't match
forked, ///< the peer's irreversible blocks are different
unlinkable, ///< the peer sent a block we couldn't use
bad_transaction ///< the peer sent a transaction that failed verification
bad_transaction, ///< the peer sent a transaction that failed verification
validation, ///< the peer sent a block that failed validation
benign_other, ///< reasons such as a timeout. not fatal but warrant resetting
fatal_other ///< a catch-all for errors we don't have discriminated
};
constexpr auto reason_str( go_away_reason rsn ) {
......@@ -54,6 +57,9 @@ namespace eosio {
case forked : return "chain is forked";
case unlinkable : return "unlinkable block received";
case bad_transaction : return "bad transaction";
case validation : return "invalid block";
case fatal_other : return "some other failure";
case benign_other : return "some other non-fatal condition";
default : return "some crazy reason";
}
}
......
......@@ -376,7 +376,7 @@ namespace eosio {
sync_state_ptr sync_requested; // this peer is requesting info from us
socket_ptr socket;
message_buffer<4096> pending_message_buffer;
message_buffer<1024*1024> pending_message_buffer;
vector<char> send_buffer;
vector<char> blk_buffer;
......@@ -395,6 +395,7 @@ namespace eosio {
deque<net_message> out_queue;
bool connecting;
bool syncing;
int write_depth;
string peer_addr;
unique_ptr<boost::asio::steady_timer> response_expected;
optional<request_message> pending_fetch;
......@@ -469,8 +470,9 @@ namespace eosio {
void stop_send();
void enqueue( const net_message &msg );
void cancel_sync(const string &reason);
void cancel_sync(go_away_reason);
void cancel_fetch();
void flush_queues();
bool enqueue_sync_block();
void send_next_message();
void send_next_txn();
......@@ -525,6 +527,7 @@ namespace eosio {
public:
sync_manager(uint32_t span);
void reset_lib_num();
bool syncing();
void request_next_chunk(connection_ptr conn);
void take_chunk(connection_ptr c);
......@@ -532,7 +535,7 @@ namespace eosio {
void set_blocks_to_fetch(vector<block_id_type>);
void assign_fectch(connection_ptr c);
void reassign_fetch();
void reassign_fetch(connection_ptr c, go_away_reason reason);
static const fc::string logger_name;
static fc::logger logger;
......@@ -557,6 +560,7 @@ namespace eosio {
out_queue(),
connecting(false),
syncing(false),
write_depth(0),
peer_addr(endpoint),
response_expected(),
pending_fetch(),
......@@ -580,6 +584,7 @@ namespace eosio {
out_queue(),
connecting(false),
syncing(false),
write_depth(0),
peer_addr(),
response_expected(),
pending_fetch(),
......@@ -623,16 +628,42 @@ namespace eosio {
trx_state.clear();
}
void connection::flush_queues() {
if (write_depth > 1) {
while (out_queue.size() > 1) {
out_queue.pop_back();
}
} else {
out_queue.clear();
}
if (write_depth > 0) {
while (write_queue.size() > 1) {
write_queue.pop_back();
}
} else {
write_queue.clear();
}
}
void connection::close() {
if(socket) {
socket->close();
}
else {
wlog("no socket to close!");
}
flush_queues();
connecting = false;
syncing = false;
out_queue.clear();
sync_receiving.reset();
reset();
sent_handshake_count = 0;
last_handshake = handshake_message();
my_impl->sync_master->reset_lib_num();
if(response_expected) {
response_expected->cancel();
}
ilog("calling reset");
pending_message_buffer.reset();
}
......@@ -812,35 +843,55 @@ namespace eosio {
}
void connection::do_queue_write() {
if(write_queue.empty())
return;
if(write_queue.empty())
return;
write_depth++;
connection_wptr c(shared_from_this());
boost::asio::async_write(*socket, boost::asio::buffer(*write_queue.front().buff), [c](boost::system::error_code ec, std::size_t w) {
try {
auto conn = c.lock();
if(!conn)
return;
connection_wptr c(shared_from_this());
boost::asio::async_write(*socket, boost::asio::buffer(*write_queue.front().buff), [c](boost::system::error_code ec, std::size_t w) {
try {
auto conn = c.lock();
if(!conn)
return;
if(ec) {
elog("Error sending to connection: ${i}", ("i", ec.message()));
my_impl->close(conn);
conn->write_queue.clear();
return;
}
if (conn->write_queue.size() ) {
conn->write_queue.front().cb(ec, w);
conn->write_queue.pop_front();
conn->do_queue_write();
}
catch(...) {
elog("Exception in do_queue_write");
}
}
conn->write_depth--;
if(ec) {
elog("Error sending to connection: ${i}", ("i", ec.message()));
my_impl->close(conn);
return;
}
conn->write_queue.pop_front();
conn->do_queue_write();
}
catch(const std::exception &ex) {
elog("Exception in do_queue_write ${s}", ("s",ex.what()));
}
catch(const fc::exception &ex) {
elog("Exception in do_queue_write ${s}", ("s",ex.to_string()));
}
catch(...) {
elog("Exception in do_queue_write");
}
});
}
void connection::cancel_sync(const string &reason) {
elog("cancel sync reason = ${m} ", ("m",reason));
enqueue( ( sync_request_message ) {0,0} );
my_impl->sync_master->reassign_fetch();
void connection::cancel_sync(go_away_reason reason) {
elog("cancel sync reason = ${m}, out queue size ${o} ", ("m",reason_str(reason)) ("o", out_queue.size()));
sync_receiving.reset();
flush_queues();
switch (reason) {
case validation :
case fatal_other : {
no_retry = reason;
enqueue( go_away_message( reason ));
break;
}
default:
enqueue( ( sync_request_message ) {0,0} );
}
}
void connection::cancel_fetch() {
......@@ -892,17 +943,20 @@ namespace eosio {
ds.write( header, header_size );
fc::raw::pack( ds, m );
write_depth++;
queue_write(std::make_shared<vector<char>>(send_buffer.begin(), send_buffer.begin()+buffer_size),
[this](boost::system::error_code ec, std::size_t ) {
if(out_queue.size()) {
if(out_queue.front().contains<go_away_message>()) {
my_impl->close(shared_from_this());
return;
}
out_queue.pop_front();
}
send_next_message();
});
write_depth--;
if(out_queue.size()) {
if(out_queue.front().contains<go_away_message>()) {
elog ("sent a go away message, closing connection to ${p}",("p",peer_name()));
my_impl->close(shared_from_this());
return;
}
out_queue.pop_front();
}
send_next_message();
});
}
void connection::send_next_txn() {
......@@ -970,8 +1024,7 @@ namespace eosio {
void connection::sync_timeout( boost::system::error_code ec ) {
if( !ec ) {
if( sync_receiving && sync_receiving->last < sync_receiving->end_block) {
cancel_sync("sync timeout");
// my_impl->sync_master->take_chunk(shared_from_this());
my_impl->sync_master->reassign_fetch (shared_from_this(),benign_other);
}
}
else if( ec == boost::asio::error::operation_aborted) {
......@@ -985,12 +1038,12 @@ namespace eosio {
}
const string connection::peer_name() {
if( !peer_addr.empty() ) {
return peer_addr;
}
if( !last_handshake.p2p_address.empty() ) {
return last_handshake.p2p_address;
}
if( !peer_addr.empty() ) {
return peer_addr;
}
return "connecting client";
}
......@@ -1054,6 +1107,19 @@ namespace eosio {
chain_plug = app( ).find_plugin<chain_plugin>( );
}
void sync_manager::reset_lib_num() {
sync_known_lib_num = chain_plug->chain().last_irreversible_block_num();
sync_last_requested_num = chain_plug->chain().head_block_num();
for (auto& c : my_impl->connections) {
if( c->last_handshake.last_irreversible_block_num > sync_known_lib_num) {
sync_known_lib_num =c->last_handshake.last_irreversible_block_num;
}
if( c->sync_receiving && c->sync_receiving->end_block > sync_last_requested_num) {
sync_last_requested_num = c->sync_receiving->end_block;
}
}
}
bool sync_manager::syncing( ) {
fc_dlog(logger, "ours = ${ours} known = ${known} head = ${head}",("ours",sync_last_requested_num)("known",sync_known_lib_num)("head",chain_plug->chain( ).head_block_num( )));
return( sync_last_requested_num != sync_known_lib_num ||
......@@ -1062,6 +1128,12 @@ namespace eosio {
void sync_manager::request_next_chunk( connection_ptr conn = connection_ptr() ) {
uint32_t head_block = chain_plug->chain().head_block_num();
if (head_block < sync_last_requested_num) {
fc_dlog (logger, "ignoring request, head is ${h} last req = ${r}",("h",head_block)("r",sync_last_requested_num));
return;
}
if (conn) {
source = conn;
}
......@@ -1126,7 +1198,7 @@ namespace eosio {
}
if(source->sync_receiving && source->sync_receiving->start_block == head_block + 1) {
source->enqueue( (sync_request_message){source->sync_receiving->start_block,
source->enqueue( (sync_request_message){source->sync_receiving->start_block,
source->sync_receiving->end_block});
sync_last_requested_num = source->sync_receiving->end_block;
}
......@@ -1171,17 +1243,20 @@ namespace eosio {
}
}
}
ilog( "Catching up with chain, our last req is ${cc}, theirs is ${t}", ( "cc",sync_last_requested_num)("t",target));
ilog( "Catching up with chain, our last req is ${cc}, theirs is ${t} peer ${p}", ( "cc",sync_last_requested_num)("t",target)("p",c->peer_name()));
if( target > sync_known_lib_num) {
sync_known_lib_num = target;
}
if( c->sync_receiving && c->sync_receiving->end_block > 0) {
ilog("connection already has end block ${eb}",("eb",c->sync_receiving->end_block));
return;
}
request_next_chunk(c);
}
void sync_manager::reassign_fetch() {
void sync_manager::reassign_fetch(connection_ptr c, go_away_reason reason) {
sync_last_requested_num = chain_plug->chain().head_block_num();
c->cancel_sync (reason);
request_next_chunk();
}
......@@ -1275,15 +1350,22 @@ namespace eosio {
void net_plugin_impl::start_read_message( connection_ptr conn ) {
try {
if(!conn->socket) {
return;
}
conn->socket->async_read_some(
conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(),
[this,conn]( boost::system::error_code ec, std::size_t bytes_transferred ) {
try {
if( !ec ) {
if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
elog("async_read_some callback: bytes_transfered = ${bt}, buffer.bytes_to_write = ${btw}",("bt",bytes_transferred)("btw",conn->pending_message_buffer.bytes_to_write()));
}
FC_ASSERT(bytes_transferred <= conn->pending_message_buffer.bytes_to_write());
conn->pending_message_buffer.advance_write_ptr(bytes_transferred);
while (conn->pending_message_buffer.bytes_to_read() > 0) {
uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();
if (bytes_in_buffer < message_header_size) {
break;
} else {
......@@ -1311,7 +1393,16 @@ namespace eosio {
elog( "Error reading message from connection: ${m}",( "m", ec.message() ) );
close( conn );
}
} catch (...) {
}
catch(const std::exception &ex) {
elog("Exception in handling read data ${s}", ("s",ex.what()));
close( conn );
}
catch(const fc::exception &ex) {
elog("Exception in handling read data ${s}", ("s",ex.to_string()));
close( conn );
}
catch (...) {
elog( "Undefined exception hanlding the read data from connection: ${m}",( "m", ec.message() ) );
close( conn );
}
......@@ -1344,7 +1435,7 @@ namespace eosio {
}
void net_plugin_impl::handle_message( connection_ptr c, const handshake_message &msg) {
fc_dlog(logger, "got a handshake_message from ${p}", ("p",c->peer_name()));
ilog("got a handshake_message from ${p} ${h}", ("p",c->peer_addr)("h",msg.p2p_address));
if( c->connecting ) {
c->connecting = false;
......@@ -1424,7 +1515,8 @@ namespace eosio {
}
}
c->last_handshake = msg;
sync_master->reset_lib_num();
c->syncing = false;
uint32_t head = cc.head_block_num( );
......@@ -1454,7 +1546,6 @@ namespace eosio {
c->syncing = true;
}
}
c->last_handshake = msg;
}
void net_plugin_impl::handle_message( connection_ptr c, const go_away_message &msg ) {
......@@ -1463,6 +1554,8 @@ namespace eosio {
if(msg.reason == go_away_reason::duplicate ) {
c->node_id = msg.node_id;
}
c->flush_queues();
close (c);
}
void net_plugin_impl::handle_message(connection_ptr c, const time_message &msg) {
......@@ -1560,7 +1653,9 @@ namespace eosio {
break;
}
case id_list_modes::last_irr_catch_up : {
sync_master->start_sync(c, msg.known_blocks.pending);
if (!c->sync_receiving) {
sync_master->start_sync(c, msg.known_blocks.pending);
}
break;
}
case id_list_modes::catch_up : {
......@@ -1673,9 +1768,9 @@ namespace eosio {
}
void net_plugin_impl::handle_message( connection_ptr c, const sync_request_message &msg) {
fc_dlog(logger, "got a sync_request_message { ${s} - ${e} } from ${p}",("s",msg.start_block)("e",msg.end_block) ("p",c->peer_name()));
if( msg.end_block == 0) {
c->sync_requested.reset();
c->flush_queues();
} else {
c->sync_requested.reset(new sync_state( msg.start_block,msg.end_block,msg.start_block-1));
c->enqueue_sync_block();
......@@ -1883,22 +1978,24 @@ namespace eosio {
else {
if( c->sync_receiving->last + 1 != num) {
wlog( "expected block ${next} but got ${num} from ${p}",("next",c->sync_receiving->last+1)("num",num)("p",c->peer_name()));
c->cancel_sync("blk out of order");
sync_master->reassign_fetch();
sync_master->reassign_fetch(c,benign_other);
return;
}
c->sync_receiving->last = num;
}
}
bool accepted = false;
go_away_reason reason = fatal_other;
fc_dlog(logger, "last irreversible block = ${lib}", ("lib", cc.last_irreversible_block_num()));
if( !syncing || num == cc.head_block_num()+1 ) {
try {
chain_plug->accept_block(msg, syncing);
accepted = true;
reason = no_reason;
} catch( const unlinkable_block_exception &ex) {
elog( "handle signed block: unlinkable_block_exception accept block #${n} syncing from ${p}",("n",num)("p",c->peer_name()));
c->enqueue( go_away_message( go_away_reason::unlinkable ));
elog( "unlinkable_block_exception accept block #${n} syncing from ${p}",("n",num)("p",c->peer_name()));
reason = unlinkable;
} catch( const block_validate_exception &ex) {
elog( "block_validate_exception accept block #${n} syncing from ${p}",("n",num)("p",c->peer_name()));
reason = validation;
} catch( const assert_exception &ex) {
elog( "unable to accept block on assert exception ${n} from ${p}",("n",ex.to_string())("p",c->peer_name()));
} catch( const fc::exception &ex) {
......@@ -1909,10 +2006,9 @@ namespace eosio {
}
if( has_chunk) {
if( !accepted) {
wlog("block ${num}, ${bid} not accepted from ${p}",("num",num)("p",c->peer_name()));
c->cancel_sync("blk not acceptd");
sync_master->reassign_fetch();
if( reason != no_reason ) {
wlog("block ${num}, ${bid} not accepted from ${p}",("num",num)("p",c->peer_name())("bid",blk_id));
sync_master->reassign_fetch(c,reason);
}
if( num == c->sync_receiving->end_block) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册