diff --git a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp index 44587aff1e68120fe18875d8fb479d8fad6aa3d2..c414bc8d3a7cc7527319471fc643c3125813a6d3 100644 --- a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp @@ -21,17 +21,15 @@ namespace eos { struct notice_message { vector known_trx; - vector known_blocks; }; struct request_message { vector req_trx; - vector req_blocks; }; struct block_summary_message { - signed_block block; + block_id_type block; vector trx_ids; }; @@ -59,8 +57,8 @@ FC_REFLECT( eos::handshake_message, (os)(agent) ) FC_REFLECT( eos::block_summary_message, (block)(trx_ids) ) -FC_REFLECT( eos::notice_message, (known_trx)(known_blocks) ) -FC_REFLECT( eos::request_message, (req_trx)(req_blocks) ) +FC_REFLECT( eos::notice_message, (known_trx) ) +FC_REFLECT( eos::request_message, (req_trx) ) FC_REFLECT( eos::sync_request_message, (start_block)(end_block) ) /** diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 371300f9c0c8bb66366230f74c5dbd96706a82c9..e1180b541c5c8f854ede0e96e5972efee6d29df9 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -101,26 +101,44 @@ namespace eos { class connection : public std::enable_shared_from_this { public: - connection( socket_ptr s, bool try_recon ) + connection( string endpoint ) : block_state(), trx_state(), in_sync_state(), out_sync_state(), - socket(s), + socket( std::make_shared( std::ref( app().get_io_service() ))), pending_message_size(), pending_message_buffer(), remote_node_id(), last_handshake(), out_queue(), - try_reconnect (try_recon) + connecting (false), + peer_addr (endpoint) { - wlog( "created connection" ); + wlog( "created connection to ${n}", ("n", endpoint) ); pending_message_buffer.resize( 1024*1024*4 ); auto *rnd = remote_node_id.data(); rnd[0] = 0; + } - - + connection( socket_ptr s ) + : block_state(), + trx_state(), + in_sync_state(), + out_sync_state(), + socket( s ), + pending_message_size(), + pending_message_buffer(), + remote_node_id(), + last_handshake(), + out_queue(), + connecting (false), + peer_addr () + { + wlog( "created connection from client" ); + pending_message_buffer.resize( 1024*1024*4 ); + auto *rnd = remote_node_id.data(); + rnd[0] = 0; } ~connection() { @@ -135,14 +153,27 @@ namespace eos { uint32_t pending_message_size; vector pending_message_buffer; - vector raw_recv; - vector raw_send; fc::sha256 remote_node_id; handshake_message last_handshake; std::deque out_queue; uint32_t mtu; - bool try_reconnect; + bool connecting; + string peer_addr; + + void reset () { + in_sync_state.clear(); + out_sync_state.clear(); + block_state.clear(); + trx_state.clear(); + } + + void close () { + out_queue.clear(); + if (socket) { + socket->close(); + } + } void send_handshake ( ) { handshake_message hello; @@ -255,12 +286,13 @@ namespace eos { std::set resolved_nodes; std::set learned_nodes; - std::set pending_sockets; std::set< connection_ptr > connections; bool done = false; - unique_ptr connector_check; - unique_ptr transaction_check; + unique_ptr connector_check; + unique_ptr transaction_check; + boost::asio::steady_timer::duration connector_period; + boost::asio::steady_timer::duration txn_exp_period; int16_t network_version; chain_id_type chain_id; @@ -269,74 +301,53 @@ namespace eos { string user_agent_name; chain_plugin* chain_plug; int32_t just_send_it_max; + bool send_whole_blocks; node_transaction_index local_txns; vector pending_notify; - void connect( const string& peer_addr ) { - auto host = peer_addr.substr( 0, peer_addr.find(':') ); - auto port = peer_addr.substr( host.size()+1, host.size() ); + shared_ptr resolver; + + + void connect( connection_ptr c ) { + c->connecting = true; + auto host = c->peer_addr.substr( 0, c->peer_addr.find(':') ); + auto port = c->peer_addr.substr( host.size()+1, host.size() ); idump((host)(port)); - auto resolver = std::make_shared( std::ref( app().get_io_service() ) ); tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() ); // Note: need to add support for IPv6 too resolver->async_resolve( query, - [resolver,peer_addr,this]( const boost::system::error_code& err, tcp::resolver::iterator endpoint_itr ){ + [c, this]( const boost::system::error_code& err, tcp::resolver::iterator endpoint_itr ){ if( !err ) { - connect( resolver, endpoint_itr ); + connect( c, endpoint_itr ); } else { - elog( "Unable to resolve ${peer_addr}: ${error}", ( "peer_addr", peer_addr )("error", err.message() ) ); + elog( "Unable to resolve ${peer_addr}: ${error}", ( "peer_addr", c->peer_addr )("error", err.message() ) ); } }); } - void connect( std::shared_ptr resolver, tcp::resolver::iterator endpoint_itr ) { - auto sock = std::make_shared( std::ref( app().get_io_service() ) ); - pending_sockets.insert( sock ); - + void connect( connection_ptr c, tcp::resolver::iterator endpoint_itr ) { auto current_endpoint = *endpoint_itr; ++endpoint_itr; - sock->async_connect( current_endpoint, - [sock,resolver,endpoint_itr, this] + c->socket->async_connect( current_endpoint, + [c,endpoint_itr, this] ( const boost::system::error_code& err ) { - pending_sockets.erase( sock ); if( !err ) { - start_session( std::make_shared(sock, true)); + start_session( c ); } else { if( endpoint_itr != tcp::resolver::iterator() ) { - connect( resolver, endpoint_itr ); + connect( c, endpoint_itr ); + } + else { + c->connecting = false; } } } ); } -#if 0 - /** - * This thread performs high level coordination among multiple connections and - * ensures connections are cleaned up, reconnected, etc. - */ - void network_loop() { - try { - ilog( "starting network loop" ); - while( !done ) { - for( auto itr = connections.begin(); itr != connections.end(); ) { - auto con = *itr; - if( !con->socket->is_open() ) { - close(con); - itr = connections.begin(); - continue; - } - ++itr; - } - } - ilog("network loop done"); - } FC_CAPTURE_AND_RETHROW() } -#endif - - void start_session(connection_ptr con ) { - connections.insert (con); + con->connecting = false; uint32_t mtu = 1300; // need a way to query this if (mtu < just_send_it_max) { just_send_it_max = mtu; @@ -355,7 +366,7 @@ namespace eos { auto socket = std::make_shared( std::ref( app().get_io_service() ) ); acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) { if( !ec ) { - start_session( std::make_shared( socket, false ) ); + start_session( std::make_shared( socket ) ); start_listen_loop(); } else { elog( "Error accepting connection: ${m}", ("m", ec.message() ) ); @@ -409,18 +420,7 @@ namespace eos { } } - void send_all (const block_summary_message &msg) { - for (auto &c : connections) { - const auto& bs = c->block_state.find(msg.block.id()); - if (bs == c->block_state.end()) { - c->block_state.insert ((block_state){msg.block.id(),true,true,fc::time_point()}); - if (c->out_sync_state.size() == 0) - c->send(msg); - } - } - } - - void shared_fetch (uint32_t low, uint32_t high) { + void shared_fetch (uint32_t low, uint32_t high) { uint32_t delta = high - low; uint32_t count = connections.size(); @@ -451,7 +451,7 @@ namespace eos { return; } if (msg.network_version != network_version) { - elog ("Peer network virsion does not match "); + elog ("Peer network version does not match "); close (c); return; } @@ -460,12 +460,13 @@ namespace eos { uint32_t lib_num = cc.last_irreversible_block_num (); uint32_t peer_lib = msg.last_irreversible_block_num; bool on_fork = false; - if (peer_lib <= lib_num) { + if (peer_lib <= lib_num && peer_lib > 0) { try { block_id_type peer_lib_id = cc.get_block_id_for_num (peer_lib); on_fork = (msg.last_irreversible_block_id != peer_lib_id); } catch (...) { + wlog ("caught an exception getting block id for ${pl}",("pl",peer_lib)); on_fork = true; } if (on_fork) { @@ -481,7 +482,8 @@ namespace eos { } if ( c->remote_node_id != msg.node_id) { - if (c->try_reconnect) { + c->reset(); + if (c->peer_addr.length() > 0) { auto old_id = resolved_nodes.find (c->remote_node_id); if (old_id != resolved_nodes.end()) { resolved_nodes.erase(old_id); @@ -502,16 +504,10 @@ namespace eos { } void handle_message (connection_ptr c, const notice_message &msg) { + //peer tells us about one or more blocks or txns. We need to forward only those + //we don't already know about. and for each peer note that it knows notice_message fwd; request_message req; - for (const auto& b : msg.known_blocks) { - const auto &bs = c->block_state.find(b); - if (bs == c->block_state.end()) { - c->block_state.insert((block_state){b,true,true,fc::time_point()}); - fwd.known_blocks.push_back(b); - req.req_blocks.push_back(b); - } - } for (const auto& t : msg.known_trx) { const auto &tx = c->trx_state.find(t); @@ -522,14 +518,13 @@ namespace eos { req.req_trx.push_back(t); } } - if (fwd.known_blocks.size() > 0 || fwd.known_trx.size() > 0) { + if (fwd.known_trx.size() > 0) { send_all (fwd, [c,fwd](connection_ptr cptr) -> bool { -#warning ("TODO: add more forwarding logic here"); return cptr != c; }); c->send(req); } - } + } void handle_message (connection_ptr c, const request_message &msg) { // collect a list of transactions that were found. @@ -544,11 +539,12 @@ namespace eos { send_now.push_back(txn->transaction); } else { - int cycle_count = 4; + int cycle_count = 2; auto loop_start = conn_ndx++; while (conn_ndx != loop_start) { if (conn_ndx == connections.end()) { if (--cycle_count == 0) { + elog("loop cycled twice, something is wrong"); break; } conn_ndx = connections.begin(); @@ -560,7 +556,8 @@ namespace eos { } auto txn = conn_ndx->get()->trx_state.get().find(t); if (txn != conn_ndx->get()->trx_state.end()) { - // add to forward_to list + + //forward_to[conn_ndx]->push_back(t); break; } ++conn_ndx; @@ -569,6 +566,9 @@ namespace eos { } if (!send_now.empty()) { + for (auto &t : send_now) { + c->send (t); + } } } @@ -579,13 +579,11 @@ namespace eos { } void handle_message (connection_ptr c, const block_summary_message &msg) { -#warning ("TODO: reconstruct actual block from cached transactions") const auto& itr = c->block_state.get(); - auto bs = itr.find(msg.block.id()); + auto bs = itr.find(msg.block); if (bs == c->block_state.end()) { - c->block_state.insert (block_state({msg.block.id(),true,true,fc::time_point()})); + c->block_state.insert (block_state({msg.block,true,true,fc::time_point()})); send_all (msg, [c](connection_ptr cptr) -> bool { -#warning ("TODO: add more forwarding logic here"); return cptr != c; }); } else { @@ -594,21 +592,22 @@ namespace eos { value.is_known= true; c->block_state.insert (std::move(value)); send_all (msg, [c](connection_ptr cptr) -> bool { - #warning ("TODO: add more forwarding logic here"); return cptr != c; }); } } +#warning ("TODO: reconstruct actual block from cached transactions") + signed_block sb; chain_controller &cc = chain_plug->chain(); - if (!cc.is_known_block(msg.block.id()) ) { + if (!cc.is_known_block(msg.block) ) { try { - chain_plug->accept_block(msg.block, false); + chain_plug->accept_block(sb, false); } catch (const unlinkable_block_exception &ex) { - elog (" caught unlinkable block exception #${n}",("n",msg.block.block_num())); + elog (" caught unlinkable block exception #${n}",("n",sb.block_num())); close (c); } catch (const assert_exception &ex) { // received a block due to out of sequence - elog (" caught assertion #${n}",("n",msg.block.block_num())); + elog (" caught assertion #${n}",("n",sb.block_num())); close (c); } } @@ -626,28 +625,29 @@ namespace eos { } void handle_message (connection_ptr c, const signed_block &msg) { - // uint32_t bn = msg.block_num(); chain_controller &cc = chain_plug->chain(); if (cc.is_known_block(msg.id())) { return; } uint32_t num = 0; - - for( auto ss = c->in_sync_state.begin(); ss != c->in_sync_state.end(); ss++ ) { - if (msg.block_num() == ss->last + 1 && msg.block_num() <= ss->end_block) { - num = msg.block_num(); - ss.get_node()->value().last = num; - break; + bool syncing = c->in_sync_state.size() > 0; + if (syncing) { + for( auto ss = c->in_sync_state.begin(); ss != c->in_sync_state.end(); ss++ ) { + if (msg.block_num() == ss->last + 1 && msg.block_num() <= ss->end_block) { + num = msg.block_num(); + ss.get_node()->value().last = num; + break; + } + } + if (num == 0) { + elog ("Got out-of-order block ${n}",("n",msg.block_num())); + close (c); + return; } - } - if (num == 0) { - elog ("Got out-of-order block ${n}",("n",msg.block_num())); - close (c); - return; } try { - chain_plug->accept_block(msg, true); + chain_plug->accept_block(msg, syncing); } catch (const unlinkable_block_exception &ex) { elog ("unable to accpt block #${n}",("n",num)); close (c); @@ -694,6 +694,7 @@ namespace eos { } void start_conn_timer () { + connector_check->expires_from_now (connector_period); connector_check->async_wait ([&](boost::system::error_code ec) { if (!ec) { connection_monitor (); @@ -706,6 +707,7 @@ namespace eos { } void start_txn_timer () { + transaction_check->expires_from_now (txn_exp_period); transaction_check->async_wait ([&](boost::system::error_code ec) { if (!ec) { expire_txns (); @@ -718,20 +720,24 @@ namespace eos { } void start_monitors () { + connector_check.reset(new boost::asio::steady_timer (app().get_io_service())); + transaction_check.reset(new boost::asio::steady_timer (app().get_io_service())); start_conn_timer(); start_txn_timer(); } void expire_txns () { + start_txn_timer (); #warning ("TODO: Add by-expiry purging code"); } void connection_monitor () { + start_conn_timer(); vector discards; for (auto &c : connections ) { - if (!c->socket->is_open()) { - if (c->try_reconnect) { -#warning ("TODO: Add reconnect code"); + if (!c->socket->is_open() && !c->connecting) { + if (c->peer_addr.length() > 0) { + connect (c); } else { discards.push_back (c); @@ -747,8 +753,7 @@ namespace eos { } void close( connection_ptr c ) { - if( c->socket ) - c->socket->close(); + c->close(); } void send_all_txn (const SignedTransaction& txn) { @@ -769,7 +774,7 @@ namespace eos { } else { pending_notify.push_back (txn.id()); - notice_message nm = {vector(), pending_notify}; + notice_message nm = {pending_notify}; send_all (nm, [txn](connection_ptr c) -> bool { const auto& bs = c->trx_state.find(txn.id()); bool unknown = bs == c->trx_state.end(); @@ -778,6 +783,7 @@ namespace eos { fc::time_point(),fc::time_point() })); return unknown; }); + pending_notify.clear(); } } @@ -850,7 +856,7 @@ namespace eos { void net_plugin::plugin_initialize( const variables_map& options ) { ilog("Initialize net plugin"); - auto resolver = std::make_shared( std::ref( app().get_io_service() ) ); + my->resolver = std::make_shared( std::ref( app().get_io_service() ) ); if( options.count( "listen-endpoint" ) ) { my->p2p_address = options.at("listen-endpoint").as< string >(); auto host = my->p2p_address.substr( 0, my->p2p_address.find(':') ); @@ -859,7 +865,7 @@ namespace eos { tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() ); // Note: need to add support for IPv6 too? - my->listen_endpoint = *resolver->resolve( query); + my->listen_endpoint = *my->resolver->resolve( query); my->acceptor.reset( new tcp::acceptor( app().get_io_service() ) ); } @@ -881,6 +887,8 @@ namespace eos { } } + my->send_whole_blocks = true; + if( options.count( "remote-endpoint" ) ) { my->supplied_peers = options.at( "remote-endpoint" ).as< vector >(); } @@ -891,12 +899,9 @@ namespace eos { my->chain_plug->get_chain_id(my->chain_id); fc::rand_pseudo_bytes(my->node_id.data(), my->node_id.data_size()); - boost::asio::steady_timer::duration period(30); //seconds from now - my->connector_check.reset(new boost::asio::steady_timer (app().get_io_service(), - period)); - period = boost::asio::steady_timer::duration (3); - my->transaction_check.reset(new boost::asio::steady_timer (app().get_io_service(), - period)); + my->connector_period = std::chrono::seconds (30); + my->txn_exp_period = std::chrono::seconds (3); + my->just_send_it_max = 1300; } @@ -913,7 +918,9 @@ namespace eos { my->start_monitors(); for( auto seed_node : my->supplied_peers ) { - my->connect( seed_node ); + connection_ptr c = std::make_shared(seed_node); + my->connections.insert (c); + my->connect( c ); } boost::asio::signal_set signals (app().get_io_service(), SIGINT, SIGTERM); signals.async_wait ([this](const boost::system::error_code &ec, int signum) { @@ -942,6 +949,10 @@ namespace eos { } FC_CAPTURE_AND_RETHROW() } void net_plugin::broadcast_block (const chain::signed_block &sb) { + if (my->send_whole_blocks) { + my->send_all (sb,[](connection_ptr c) -> bool { return true; }); + return; + } vector trxs; if (!sb.cycles.empty()) { for (const auto& cyc : sb.cycles) { @@ -953,18 +964,16 @@ namespace eos { } } - vector blks; - blks.push_back (sb.id()); - notice_message nm = {blks, my->pending_notify}; - my->send_all (nm,[](connection_ptr c) -> bool { - return false; - }); - - block_summary_message bsm = {sb, trxs}; - my->send_all (bsm,[](connection_ptr c) -> bool { + block_summary_message bsm = {sb.id(), trxs}; + my->send_all (bsm,[sb](connection_ptr c) -> bool { return true; + const auto& bs = c->block_state.find(sb.id()); + if (bs == c->block_state.end()) { + c->block_state.insert ((block_state){sb.id(),true,true,fc::time_point()}); + return true; + } + return false; }); - my->pending_notify.clear(); } }