提交 3e7f5ce8 编写于 作者: P Phil Mesnier

fix for multi-node syncing #344

上级 b1804199
......@@ -84,8 +84,9 @@ namespace eos {
uint32_t end_block = 0;
uint32_t last = 0; ///< last sent or received
time_point start_time; ///< time request made or received
vector<signed_block> block_cache;
};
#if 0
struct by_start_block;
typedef multi_index_container<
sync_state,
......@@ -93,7 +94,7 @@ namespace eos {
ordered_unique< tag<by_start_block>, member<sync_state, uint32_t, &sync_state::start_block > >
>
> sync_request_index;
#endif
struct handshake_initializer {
static void populate (handshake_message &hello);
......@@ -116,9 +117,7 @@ namespace eos {
peer_addr (endpoint)
{
wlog( "created connection to ${n}", ("n", endpoint) );
pending_message_buffer.resize( 1024*1024*4 );
auto *rnd = remote_node_id.data();
rnd[0] = 0;
initialize();
}
connection( socket_ptr s )
......@@ -136,9 +135,7 @@ namespace eos {
peer_addr ()
{
wlog( "created connection from client" );
pending_message_buffer.resize( 1024*1024*4 );
auto *rnd = remote_node_id.data();
rnd[0] = 0;
initialize ();
}
~connection() {
......@@ -148,11 +145,18 @@ namespace eos {
wlog( "released connection to server at ${addr}", ("addr", peer_addr) );
}
void initialize () {
boost::asio::ip::tcp::no_delay option(true);
// socket->set_option(option);
pending_message_buffer.resize( 1024*1024*4 );
auto *rnd = remote_node_id.data();
rnd[0] = 0;
}
block_state_index block_state;
transaction_state_index trx_state;
sync_request_index in_sync_state; // we are requesting info from this peer
sync_request_index out_sync_state; // this peer is requesting info from us
vector<sync_state> in_sync_state; // we are requesting info from this peer
vector<sync_state> out_sync_state; // this peer is requesting info from us
socket_ptr socket;
uint32_t pending_message_size;
......@@ -173,6 +177,7 @@ namespace eos {
}
void close () {
connecting = false;
out_queue.clear();
if (socket) {
socket->close();
......@@ -226,7 +231,7 @@ namespace eos {
void write_block_backlog ( ) {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
auto ss = out_sync_state.begin();
uint32_t num = ++ss.get_node()->value().last;
uint32_t num = ++ss->last; //get_node()->value().last;
ilog ("num = ${num} end = ${end}",("num",num)("end",ss->end_block));
if (num >= ss->end_block) {
out_sync_state.erase(ss);
......@@ -235,15 +240,11 @@ namespace eos {
try {
fc::optional<signed_block> sb = cc.fetch_block_by_number(num);
if (sb) {
// dlog("write backlog, block #${num}",("num",num));
send( *sb );
}
} catch ( ... ) {
wlog( "write loop exception" );
}
if (out_sync_state.size() == 0) {
send_handshake ( );
}
}
......@@ -309,11 +310,14 @@ namespace eos {
std::set< connection_ptr > connections;
bool done = false;
uint32_t sync_head;
uint32_t sync_req_head;
uint32_t sync_req_span;
unique_ptr<boost::asio::steady_timer> connector_check;
unique_ptr<boost::asio::steady_timer> transaction_check;
boost::asio::steady_timer::duration connector_period;
boost::asio::steady_timer::duration txn_exp_period;
unique_ptr<boost::asio::steady_timer> connector_check;
unique_ptr<boost::asio::steady_timer> transaction_check;
boost::asio::steady_timer::duration connector_period;
boost::asio::steady_timer::duration txn_exp_period;
int16_t network_version;
chain_id_type chain_id;
......@@ -331,7 +335,6 @@ namespace eos {
void connect( connection_ptr c ) {
c->connecting = true;
auto host = c->peer_addr.substr( 0, c->peer_addr.find(':') );
auto port = c->peer_addr.substr( host.size()+1, host.size() );
idump((host)(port));
......@@ -351,17 +354,22 @@ namespace eos {
void connect( connection_ptr c, tcp::resolver::iterator endpoint_itr ) {
auto current_endpoint = *endpoint_itr;
++endpoint_itr;
c->connecting = true;
c->socket->async_connect( current_endpoint,
[c, endpoint_itr, this]
( const boost::system::error_code& err ) {
if( !err ) {
dlog("new connnection to ${peer}",("peer",c->peer_addr));
start_session( c );
} else {
if( endpoint_itr != tcp::resolver::iterator() ) {
c->close();
connect( c, endpoint_itr );
}
else {
elog ("connection failed to ${peer}: ${error}", ("peer", c->peer_addr)("error",err.message()));
c->connecting = false;
c->close();
}
}
} );
......@@ -374,7 +382,7 @@ namespace eos {
just_send_it_max = mtu;
}
start_read_message( con );
dlog ("calling send_handshake");
con->send_handshake( );
// for now, we can just use the application main loop.
......@@ -443,26 +451,72 @@ namespace eos {
}
}
void shared_fetch (uint32_t low, uint32_t high) {
bool get_sync_req (connection_ptr c) {
uint32_t delta = high - low;
uint32_t count = connections.size();
FC_ASSERT (count > 0);
uint32_t span = delta / count;
uint32_t lastSpan = delta - (span * (count-1));
for (auto &cx: connections) {
if (--count == 0) {
span = lastSpan;
dlog ("requesting a new sync requ= message, sync head = ${sh}, sync_req_head = ${srh}",
("sh",sync_head)("srh",sync_req_head));
if (sync_req_head == sync_head) {
return true;
}
uint32_t low = sync_req_head + 1;
uint32_t high = low + sync_req_span < sync_head ? low + sync_req_span : sync_head;
sync_state req = {low, high, sync_req_head, time_point::now(), vector<signed_block>() };
c->in_sync_state.push_back (req);
sync_request_message srm = {req.start_block, req.end_block };
c->send (srm);
sync_req_head = high;
return (sync_req_head == sync_head);
}
void set_sync_head (uint32_t target) {
uint32_t cchead = chain_plug->chain().head_block_num();
dlog ("target = ${t}, cc head = ${cc}",("t", target)("cc",cchead));
if (sync_head == sync_req_head) {
sync_req_head = chain_plug->chain().head_block_num();
}
sync_head = target;
for (auto &c : connections) {
if (c->out_sync_state.empty() && c->socket->is_open() && ! c->connecting) {
if (get_sync_req (c)) {
break;
}
}
}
}
void apply_cached_blocks () {
uint32_t start = 1 + chain_plug->chain().head_block_num();
bool keep_going = true;
while (keep_going) {
keep_going = false;
for (auto &c : connections) {
try {
auto ss = c->in_sync_state.begin();
if (start == ss->start_block) {
for (auto & blk : ss->block_cache) {
chain_plug->accept_block (blk,true);
ss->start_block++;
}
ss->block_cache.clear();
if (ss->start_block == ss->end_block) {
c->in_sync_state.erase(ss);
keep_going = true;
}
}
} catch (...) {
// not a problem. We found the list but no blocks were cached
return;
}
}
sync_state req = {low+1, low+span, low, time_point::now() };
cx->in_sync_state.insert (req);
sync_request_message srm = {req.start_block, req.end_block };
cx->send (srm);
low += span;
}
}
void handle_message (connection_ptr c, const handshake_message &msg) {
dlog ("got a handshake from ${p}",("p",c->peer_addr));
if (msg.node_id == node_id) {
elog ("Self connection detected. Closing connection");
close(c);
......@@ -501,7 +555,7 @@ namespace eos {
uint32_t head = cc.head_block_num ();
if ( msg.head_num > head) {
shared_fetch (head, msg.head_num);
set_sync_head(msg.head_num);
}
if ( c->remote_node_id != msg.node_id) {
......@@ -602,7 +656,7 @@ namespace eos {
void handle_message (connection_ptr c, const sync_request_message &msg) {
sync_state req = {msg.start_block,msg.end_block,msg.start_block-1,time_point::now()};
c->out_sync_state.insert (req);
c->out_sync_state.push_back (req);
c->write_block_backlog ();
}
......@@ -664,21 +718,47 @@ namespace eos {
if (cc.is_known_block(msg.id())) {
return;
}
uint32_t num = 0;
bool syncing = c->in_sync_state.size() > 0;
uint32_t num = msg.block_num();
bool syncing = sync_head > cc.head_block_num();
if (syncing) {
for( auto ss = c->in_sync_state.begin(); ss != c->in_sync_state.end(); ss++ ) {
if (msg.block_num() == ss->last + 1 && msg.block_num() <= ss->end_block) {
num = msg.block_num();
ss.get_node()->value().last = num;
for( auto &ss : c->in_sync_state) {
if (msg.block_num() <= ss.end_block) {
ss.last = num;
if (num == cc.head_block_num()+1) {
try {
chain_plug->accept_block(msg, true);
auto s0 = c->in_sync_state.begin();
s0->start_block++;
if (s0->start_block == s0->end_block) {
c->in_sync_state.erase(s0);
apply_cached_blocks();
}
} catch (const unlinkable_block_exception &ex) {
elog ("unable to accept block #${n} syncing",("n",num));
//close (c);
} catch (const assert_exception &ex) {
elog ("unable to accept block on assert exception #${n}",("n",num));
//close (c);
}
} else {
ss.block_cache.push_back(msg);
}
if (num == ss.end_block) {
get_sync_req (c);
}
break;
}
}
if (num == 0) {
elog ("syncing, got out-of-order block ${n}",("n",msg.block_num()));
//close (c);
return;
if ( chain_plug->chain().head_block_num() == sync_head) {
dlog ("calling send_handshake");
c->send_handshake ( );
}
return;
}
else {
send_all (msg, [c](connection_ptr conn) -> bool {
......@@ -710,7 +790,6 @@ namespace eos {
}
};
void start_reading_pending_buffer( connection_ptr c ) {
boost::asio::async_read( *c->socket,
boost::asio::buffer(c->pending_message_buffer.data(), c->pending_message_size ),
......@@ -784,6 +863,7 @@ namespace eos {
start_conn_timer();
vector <connection_ptr> discards;
for (auto &c : connections ) {
dlog ("socket open = ${so} connecting - ${c}",("so", c->socket->is_open())("c",c->connecting));
if (!c->socket->is_open() && !c->connecting) {
if (c->peer_addr.length() > 0) {
connect (c);
......@@ -899,7 +979,6 @@ namespace eos {
}
}
}; // class net_plugin_impl
void
......@@ -993,7 +1072,9 @@ namespace eos {
}
my->send_whole_blocks = true;
my->sync_req_span = 10;
my->sync_head = 0;
my->sync_req_head = 0;
if( options.count( "remote-endpoint" ) ) {
my->supplied_peers = options.at( "remote-endpoint" ).as< vector<string> >();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册