提交 9548350c 编写于 作者: P Phil Mesnier

more revisions to clean up syncing, lots of debugging #344

上级 18331b2a
#include <eos/chain/types.hpp> #include <eos/chain/types.hpp>
#include <eos/net_plugin/net_plugin.hpp> #include <eos/net_plugin/net_plugin.hpp>
#include <eos/net_plugin/protocol.hpp> #include <eos/net_plugin/protocol.hpp>
...@@ -84,7 +84,7 @@ namespace eos { ...@@ -84,7 +84,7 @@ namespace eos {
uint32_t end_block = 0; uint32_t end_block = 0;
uint32_t last = 0; ///< last sent or received uint32_t last = 0; ///< last sent or received
time_point start_time; ///< time request made or received time_point start_time; ///< time request made or received
vector<signed_block> block_cache; vector< vector<char> > block_cache;
}; };
#if 0 #if 0
struct by_start_block; struct by_start_block;
...@@ -105,8 +105,8 @@ namespace eos { ...@@ -105,8 +105,8 @@ namespace eos {
connection( string endpoint ) connection( string endpoint )
: block_state(), : block_state(),
trx_state(), trx_state(),
in_sync_state(), sync_received(),
out_sync_state(), sync_requested(),
socket( std::make_shared<tcp::socket>( std::ref( app().get_io_service() ))), socket( std::make_shared<tcp::socket>( std::ref( app().get_io_service() ))),
pending_message_size(), pending_message_size(),
pending_message_buffer(), pending_message_buffer(),
...@@ -124,8 +124,8 @@ namespace eos { ...@@ -124,8 +124,8 @@ namespace eos {
connection( socket_ptr s ) connection( socket_ptr s )
: block_state(), : block_state(),
trx_state(), trx_state(),
in_sync_state(), sync_received(),
out_sync_state(), sync_requested(),
socket( s ), socket( s ),
pending_message_size(), pending_message_size(),
pending_message_buffer(), pending_message_buffer(),
...@@ -157,12 +157,14 @@ namespace eos { ...@@ -157,12 +157,14 @@ namespace eos {
block_state_index block_state; block_state_index block_state;
transaction_state_index trx_state; transaction_state_index trx_state;
vector<sync_state> in_sync_state; // we are requesting info from this peer vector<sync_state> sync_received; // we are requesting info from this peer
vector<sync_state> out_sync_state; // this peer is requesting info from us vector<sync_state> sync_requested; // this peer is requesting info from us
socket_ptr socket; socket_ptr socket;
uint32_t pending_message_size; uint32_t pending_message_size;
vector<char> pending_message_buffer; vector<char> pending_message_buffer;
vector<char> blk_buffer;
size_t message_size;
fc::sha256 remote_node_id; fc::sha256 remote_node_id;
handshake_message last_handshake; handshake_message last_handshake;
...@@ -177,8 +179,9 @@ namespace eos { ...@@ -177,8 +179,9 @@ namespace eos {
} }
void reset () { void reset () {
in_sync_state.clear(); dlog ("reset clearing sync_received");
out_sync_state.clear(); sync_received.clear();
sync_requested.clear();
block_state.clear(); block_state.clear();
trx_state.clear(); trx_state.clear();
} }
...@@ -207,7 +210,7 @@ namespace eos { ...@@ -207,7 +210,7 @@ namespace eos {
void send_next_message() { void send_next_message() {
if( !out_queue.size() ) { if( !out_queue.size() ) {
if (out_sync_state.size() > 0) { if (sync_requested.size() > 0) {
write_block_backlog(); write_block_backlog();
} }
return; return;
...@@ -220,7 +223,6 @@ namespace eos { ...@@ -220,7 +223,6 @@ namespace eos {
fc::datastream<char*> ds( buffer.data(), buffer.size() ); fc::datastream<char*> ds( buffer.data(), buffer.size() );
ds.write( (char*)&size, sizeof(size) ); ds.write( (char*)&size, sizeof(size) );
fc::raw::pack( ds, m ); fc::raw::pack( ds, m );
boost::asio::buffer abuff( buffer.data(), buffer.size());
boost::asio::async_write( *socket, boost::asio::buffer( buffer.data(), buffer.size() ), boost::asio::async_write( *socket, boost::asio::buffer( buffer.data(), buffer.size() ),
[this,buf=std::move(buffer)]( boost::system::error_code ec, std::size_t bytes_transferred ) { [this,buf=std::move(buffer)]( boost::system::error_code ec, std::size_t bytes_transferred ) {
if( ec ) { if( ec ) {
...@@ -238,12 +240,12 @@ namespace eos { ...@@ -238,12 +240,12 @@ namespace eos {
void write_block_backlog ( ) { void write_block_backlog ( ) {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain(); chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
auto ss = out_sync_state.begin(); auto ss = sync_requested.begin();
uint32_t num = ++ss->last; //get_node()->value().last; uint32_t num = ++ss->last; //get_node()->value().last;
ilog ("num = ${num} end = ${end}",("num",num)("end",ss->end_block)); ilog ("num = ${num} end = ${end}",("num",num)("end",ss->end_block));
if (num >= ss->end_block) { if (num >= ss->end_block) {
out_sync_state.erase(ss); sync_requested.erase(ss);
ilog ("out sync size = ${s}",("s",out_sync_state.size())); ilog ("out sync size = ${s}",("s",sync_requested.size()));
} }
try { try {
fc::optional<signed_block> sb = cc.fetch_block_by_number(num); fc::optional<signed_block> sb = cc.fetch_block_by_number(num);
...@@ -462,48 +464,71 @@ namespace eos { ...@@ -462,48 +464,71 @@ namespace eos {
} }
uint32_t low = sync_req_head + 1; uint32_t low = sync_req_head + 1;
uint32_t high = low + sync_req_span < sync_head ? low + sync_req_span : sync_head; uint32_t high = low + sync_req_span < sync_head ? low + sync_req_span : sync_head;
sync_state req = {low, high, sync_req_head, time_point::now(), vector<signed_block>() }; sync_state req = {low, high, sync_req_head, time_point::now(), vector< vector<char> >() };
c->in_sync_state.push_back (req); c->sync_received.push_back (req);
dlog ("pushing new cache from ${l} to ${h} on ${c} size = ${s}",
("l",low)("h",high)("c",c->peer_addr)("s",c->sync_received.size()));
sync_request_message srm = {req.start_block, req.end_block }; sync_request_message srm = {req.start_block, req.end_block };
c->send (srm); c->send (srm);
sync_req_head = high; sync_req_head = high;
return (sync_req_head == sync_head); return (sync_req_head == sync_head);
} }
void set_sync_head (uint32_t target) { void set_sync_head (connection_ptr c, uint32_t target) {
if (sync_head == sync_req_head) { bool stable = (sync_head == sync_req_head);
if (stable) {
sync_req_head = chain_plug->chain().head_block_num(); sync_req_head = chain_plug->chain().head_block_num();
} }
ilog ("Catching up with chain, our head is ${cc}, theirs is ${t}",
sync_head = target; ("cc",sync_req_head)("t",target));
for (auto &c : connections) { if (target > sync_head) {
if (c->ready()) { sync_head = target;
if (get_sync_req (c)) { if (stable) {
break; for (auto &ci : connections) {
if (ci->ready()) {
if (get_sync_req (ci)) {
break;
}
}
} }
} else {
get_sync_req (c);
} }
} }
} }
void apply_cached_blocks (connection_ptr conn) { void apply_cached_blocks (connection_ptr conn) {
uint32_t start = 1 + chain_plug->chain().head_block_num();
bool keep_going = true; bool keep_going = true;
while (keep_going) { while (keep_going) {
keep_going = false; keep_going = false;
for (auto &c : connections) { for (auto &c : connections) {
if (c == conn || !c->ready()) { if (c == conn || !c->ready()) { // || c->sync_received.size() == 0) {
continue; continue;
} }
try { try {
auto ss = c->in_sync_state.begin(); auto ss = c->sync_received.begin();
if (ss == c->sync_received.end()) {
dlog ("conn ${c} has empty sync_received");
continue;
}
uint32_t start = 1 + chain_plug->chain().head_block_num();
if (start == ss->start_block) { if (start == ss->start_block) {
if (ss->last < start) {
dlog ("found the set but the cache is empty");
return;
}
for (auto & blk : ss->block_cache) { for (auto & blk : ss->block_cache) {
chain_plug->accept_block (blk,true); auto block = fc::raw::unpack<signed_block>( blk );
chain_plug->accept_block (block,true);
++start;
ss->start_block++; ss->start_block++;
} }
ss->block_cache.clear(); ss->block_cache.clear();
if (ss->start_block == ss->end_block) { dlog ("cleared a cache, start = ${s} and end = ${e} head = ${h}",
c->in_sync_state.erase(ss); ("s",start)("e",ss->end_block)("h",chain_plug->chain().head_block_num()));
if (start >= ss->end_block) {
c->sync_received.erase(ss);
keep_going = true; keep_going = true;
} }
} }
...@@ -555,13 +580,13 @@ namespace eos { ...@@ -555,13 +580,13 @@ namespace eos {
} }
if ( c->remote_node_id != msg.node_id) { if ( c->remote_node_id != msg.node_id) {
c->reset(); // c->reset();
c->remote_node_id = msg.node_id; c->remote_node_id = msg.node_id;
} }
uint32_t head = cc.head_block_num (); uint32_t head = cc.head_block_num ();
if ( msg.head_num > head) { if ( msg.head_num > head) {
set_sync_head(msg.head_num); set_sync_head(c, msg.head_num);
} }
else { else {
c->syncing = head != msg.head_num; c->syncing = head != msg.head_num;
...@@ -646,7 +671,7 @@ namespace eos { ...@@ -646,7 +671,7 @@ namespace eos {
void handle_message (connection_ptr c, const sync_request_message &msg) { void handle_message (connection_ptr c, const sync_request_message &msg) {
sync_state req = {msg.start_block,msg.end_block,msg.start_block-1,time_point::now()}; sync_state req = {msg.start_block,msg.end_block,msg.start_block-1,time_point::now()};
c->out_sync_state.push_back (req); c->sync_requested.push_back (req);
c->write_block_backlog (); c->write_block_backlog ();
} }
...@@ -706,46 +731,56 @@ namespace eos { ...@@ -706,46 +731,56 @@ namespace eos {
chain_controller &cc = chain_plug->chain(); chain_controller &cc = chain_plug->chain();
try { try {
if (cc.is_known_block(msg.id())) { if (cc.is_known_block(msg.id())) {
dlog ("skipping known block");
return; return;
} }
} catch (...) { } catch (...) {
} }
uint32_t num = msg.block_num(); uint32_t num = msg.block_num();
dlog ("handling block #${n} conn = ${c}",("n", num)("c",c->peer_addr));
bool syncing = sync_head > cc.head_block_num(); bool syncing = sync_head > cc.head_block_num();
if (syncing) { if (syncing) {
for( auto &ss : c->in_sync_state) { dlog ("sync_rec size ${s}",("s", c->sync_received.size()));
if (num <= ss.end_block) { for( auto &ss : c->sync_received) {
ss.last = num; if (num > ss.end_block) {
if (num == ss.end_block) {
get_sync_req (c);
}
if (num == cc.head_block_num()+1) { continue;
try { }
chain_plug->accept_block(msg, true); ss.last = num;
auto s0 = c->in_sync_state.begin(); if (num == ss.end_block) {
if (s0->start_block == s0->end_block) { get_sync_req (c);
c->in_sync_state.erase(s0); }
apply_cached_blocks(c);
}
else {
s0->start_block++;
}
} catch (const unlinkable_block_exception &ex) {
elog ("unable to accept block #${n} syncing",("n",num));
//close (c);
} catch (const assert_exception &ex) {
elog ("unable to accept block on assert exception #${n}",("n",num));
//close (c);
}
} else { if (num == cc.head_block_num()+1) {
ss.block_cache.push_back(msg); try {
dlog ("sync accepting block #${n}",("n", num));
chain_plug->accept_block(msg, true);
auto s0 = c->sync_received.begin();
if (s0->start_block == s0->end_block) {
c->sync_received.erase(s0);
apply_cached_blocks(c);
}
else {
s0->start_block++;
}
} catch (const unlinkable_block_exception &ex) {
elog ("unable to accept block #${n} syncing",("n",num));
//close (c);
} catch (const assert_exception &ex) {
elog ("unable to accept block on assert exception #${n}",("n",num));
//close (c);
} }
break;
} else {
dlog ("caching block #${n} size = ${s}", ("n", num)("s", c->blk_buffer.size()));
// ss.block_cache.emplace_back(::move(c->blk_buffer));
ss.block_cache.push_back(c->blk_buffer);
} }
break;
} }
if ( chain_plug->chain().head_block_num() == sync_head) { if ( chain_plug->chain().head_block_num() == sync_head) {
dlog ("caught up with previous target ${t}",("t",sync_head));
handshake_message hello; handshake_message hello;
handshake_initializer::populate(hello); handshake_initializer::populate(hello);
send_all (hello, [c](connection_ptr conn) -> bool { send_all (hello, [c](connection_ptr conn) -> bool {
...@@ -772,6 +807,25 @@ namespace eos { ...@@ -772,6 +807,25 @@ namespace eos {
} }
} }
struct precache : public fc::visitor<void> {
connection_ptr c;
precache (connection_ptr conn) : c(conn) {}
void operator()(const signed_block &msg) const
{
c->blk_buffer.resize(c->message_size);
memcpy(c->blk_buffer.data(),c->pending_message_buffer.data(), c->message_size);
}
template <typename T>
void operator()(const T &msg) const
{
//no-op
}
};
struct msgHandler : public fc::visitor<void> { struct msgHandler : public fc::visitor<void> {
net_plugin_impl &impl; net_plugin_impl &impl;
...@@ -791,7 +845,10 @@ namespace eos { ...@@ -791,7 +845,10 @@ namespace eos {
[this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) { [this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) {
if( !ec ) { if( !ec ) {
try { try {
c->message_size = bytes_transferred;
auto msg = fc::raw::unpack<net_message>( c->pending_message_buffer ); auto msg = fc::raw::unpack<net_message>( c->pending_message_buffer );
precache pc( c );
msg.visit (pc);
start_read_message( c ); start_read_message( c );
msgHandler m(*this, c); msgHandler m(*this, c);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册