diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index c17f4e19ef50ac618e8df3172c34763f5a8b64bc..0a3ecde0b6c7a28da5f5abd0ea8b6ad382d5dd63 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -1,4 +1,4 @@ - #include +#include #include #include @@ -84,7 +84,7 @@ namespace eos { uint32_t end_block = 0; uint32_t last = 0; ///< last sent or received time_point start_time; ///< time request made or received - vector block_cache; + vector< vector > block_cache; }; #if 0 struct by_start_block; @@ -105,8 +105,8 @@ namespace eos { connection( string endpoint ) : block_state(), trx_state(), - in_sync_state(), - out_sync_state(), + sync_received(), + sync_requested(), socket( std::make_shared( std::ref( app().get_io_service() ))), pending_message_size(), pending_message_buffer(), @@ -124,8 +124,8 @@ namespace eos { connection( socket_ptr s ) : block_state(), trx_state(), - in_sync_state(), - out_sync_state(), + sync_received(), + sync_requested(), socket( s ), pending_message_size(), pending_message_buffer(), @@ -157,12 +157,14 @@ namespace eos { block_state_index block_state; transaction_state_index trx_state; - vector in_sync_state; // we are requesting info from this peer - vector out_sync_state; // this peer is requesting info from us + vector sync_received; // we are requesting info from this peer + vector sync_requested; // this peer is requesting info from us socket_ptr socket; uint32_t pending_message_size; vector pending_message_buffer; + vector blk_buffer; + size_t message_size; fc::sha256 remote_node_id; handshake_message last_handshake; @@ -177,8 +179,9 @@ namespace eos { } void reset () { - in_sync_state.clear(); - out_sync_state.clear(); + dlog ("reset clearing sync_received"); + sync_received.clear(); + sync_requested.clear(); block_state.clear(); trx_state.clear(); } @@ -207,7 +210,7 @@ namespace eos { void send_next_message() { if( !out_queue.size() ) { - if (out_sync_state.size() > 0) { + if (sync_requested.size() > 0) { write_block_backlog(); } return; @@ -220,7 +223,6 @@ namespace eos { fc::datastream ds( buffer.data(), buffer.size() ); ds.write( (char*)&size, sizeof(size) ); fc::raw::pack( ds, m ); - boost::asio::buffer abuff( buffer.data(), buffer.size()); boost::asio::async_write( *socket, boost::asio::buffer( buffer.data(), buffer.size() ), [this,buf=std::move(buffer)]( boost::system::error_code ec, std::size_t bytes_transferred ) { if( ec ) { @@ -238,12 +240,12 @@ namespace eos { void write_block_backlog ( ) { chain_controller& cc = app().find_plugin()->chain(); - auto ss = out_sync_state.begin(); + auto ss = sync_requested.begin(); uint32_t num = ++ss->last; //get_node()->value().last; ilog ("num = ${num} end = ${end}",("num",num)("end",ss->end_block)); if (num >= ss->end_block) { - out_sync_state.erase(ss); - ilog ("out sync size = ${s}",("s",out_sync_state.size())); + sync_requested.erase(ss); + ilog ("out sync size = ${s}",("s",sync_requested.size())); } try { fc::optional sb = cc.fetch_block_by_number(num); @@ -462,48 +464,71 @@ namespace eos { } uint32_t low = sync_req_head + 1; uint32_t high = low + sync_req_span < sync_head ? low + sync_req_span : sync_head; - sync_state req = {low, high, sync_req_head, time_point::now(), vector() }; - c->in_sync_state.push_back (req); + sync_state req = {low, high, sync_req_head, time_point::now(), vector< vector >() }; + c->sync_received.push_back (req); + dlog ("pushing new cache from ${l} to ${h} on ${c} size = ${s}", + ("l",low)("h",high)("c",c->peer_addr)("s",c->sync_received.size())); + sync_request_message srm = {req.start_block, req.end_block }; c->send (srm); sync_req_head = high; return (sync_req_head == sync_head); } - void set_sync_head (uint32_t target) { - if (sync_head == sync_req_head) { + void set_sync_head (connection_ptr c, uint32_t target) { + bool stable = (sync_head == sync_req_head); + if (stable) { sync_req_head = chain_plug->chain().head_block_num(); } - - sync_head = target; - for (auto &c : connections) { - if (c->ready()) { - if (get_sync_req (c)) { - break; + ilog ("Catching up with chain, our head is ${cc}, theirs is ${t}", + ("cc",sync_req_head)("t",target)); + if (target > sync_head) { + sync_head = target; + if (stable) { + for (auto &ci : connections) { + if (ci->ready()) { + if (get_sync_req (ci)) { + break; + } + } } + } else { + get_sync_req (c); } } } void apply_cached_blocks (connection_ptr conn) { - uint32_t start = 1 + chain_plug->chain().head_block_num(); bool keep_going = true; while (keep_going) { keep_going = false; for (auto &c : connections) { - if (c == conn || !c->ready()) { + if (c == conn || !c->ready()) { // || c->sync_received.size() == 0) { continue; } try { - auto ss = c->in_sync_state.begin(); + auto ss = c->sync_received.begin(); + if (ss == c->sync_received.end()) { + dlog ("conn ${c} has empty sync_received"); + continue; + } + uint32_t start = 1 + chain_plug->chain().head_block_num(); if (start == ss->start_block) { + if (ss->last < start) { + dlog ("found the set but the cache is empty"); + return; + } for (auto & blk : ss->block_cache) { - chain_plug->accept_block (blk,true); + auto block = fc::raw::unpack( blk ); + chain_plug->accept_block (block,true); + ++start; ss->start_block++; } ss->block_cache.clear(); - if (ss->start_block == ss->end_block) { - c->in_sync_state.erase(ss); + dlog ("cleared a cache, start = ${s} and end = ${e} head = ${h}", + ("s",start)("e",ss->end_block)("h",chain_plug->chain().head_block_num())); + if (start >= ss->end_block) { + c->sync_received.erase(ss); keep_going = true; } } @@ -555,13 +580,13 @@ namespace eos { } if ( c->remote_node_id != msg.node_id) { - c->reset(); + // c->reset(); c->remote_node_id = msg.node_id; } uint32_t head = cc.head_block_num (); if ( msg.head_num > head) { - set_sync_head(msg.head_num); + set_sync_head(c, msg.head_num); } else { c->syncing = head != msg.head_num; @@ -646,7 +671,7 @@ namespace eos { void handle_message (connection_ptr c, const sync_request_message &msg) { sync_state req = {msg.start_block,msg.end_block,msg.start_block-1,time_point::now()}; - c->out_sync_state.push_back (req); + c->sync_requested.push_back (req); c->write_block_backlog (); } @@ -706,46 +731,56 @@ namespace eos { chain_controller &cc = chain_plug->chain(); try { if (cc.is_known_block(msg.id())) { + dlog ("skipping known block"); return; } } catch (...) { } uint32_t num = msg.block_num(); + dlog ("handling block #${n} conn = ${c}",("n", num)("c",c->peer_addr)); bool syncing = sync_head > cc.head_block_num(); if (syncing) { - for( auto &ss : c->in_sync_state) { - if (num <= ss.end_block) { - ss.last = num; - if (num == ss.end_block) { - get_sync_req (c); - } + dlog ("sync_rec size ${s}",("s", c->sync_received.size())); + for( auto &ss : c->sync_received) { + if (num > ss.end_block) { - if (num == cc.head_block_num()+1) { - try { - chain_plug->accept_block(msg, true); - auto s0 = c->in_sync_state.begin(); - if (s0->start_block == s0->end_block) { - c->in_sync_state.erase(s0); - apply_cached_blocks(c); - } - else { - s0->start_block++; - } - } catch (const unlinkable_block_exception &ex) { - elog ("unable to accept block #${n} syncing",("n",num)); - //close (c); - } catch (const assert_exception &ex) { - elog ("unable to accept block on assert exception #${n}",("n",num)); - //close (c); - } + continue; + } + ss.last = num; + if (num == ss.end_block) { + get_sync_req (c); + } - } else { - ss.block_cache.push_back(msg); + if (num == cc.head_block_num()+1) { + try { + dlog ("sync accepting block #${n}",("n", num)); + chain_plug->accept_block(msg, true); + auto s0 = c->sync_received.begin(); + if (s0->start_block == s0->end_block) { + c->sync_received.erase(s0); + apply_cached_blocks(c); + } + else { + s0->start_block++; + } + } catch (const unlinkable_block_exception &ex) { + elog ("unable to accept block #${n} syncing",("n",num)); + //close (c); + } catch (const assert_exception &ex) { + elog ("unable to accept block on assert exception #${n}",("n",num)); + //close (c); } - break; + + } else { + dlog ("caching block #${n} size = ${s}", ("n", num)("s", c->blk_buffer.size())); + // ss.block_cache.emplace_back(::move(c->blk_buffer)); + + ss.block_cache.push_back(c->blk_buffer); } + break; } if ( chain_plug->chain().head_block_num() == sync_head) { + dlog ("caught up with previous target ${t}",("t",sync_head)); handshake_message hello; handshake_initializer::populate(hello); send_all (hello, [c](connection_ptr conn) -> bool { @@ -772,6 +807,25 @@ namespace eos { } } + struct precache : public fc::visitor { + connection_ptr c; + precache (connection_ptr conn) : c(conn) {} + + void operator()(const signed_block &msg) const + { + c->blk_buffer.resize(c->message_size); + memcpy(c->blk_buffer.data(),c->pending_message_buffer.data(), c->message_size); + } + + template + void operator()(const T &msg) const + { + //no-op + } + + }; + + struct msgHandler : public fc::visitor { net_plugin_impl &impl; @@ -791,7 +845,10 @@ namespace eos { [this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) { if( !ec ) { try { + c->message_size = bytes_transferred; auto msg = fc::raw::unpack( c->pending_message_buffer ); + precache pc( c ); + msg.visit (pc); start_read_message( c ); msgHandler m(*this, c);