提交 90862d5d 编写于 作者: D Daniel Larimer 提交者: GitHub

Merge pull request #377 from EOSIO/p2p-sync-improvment

P2p sync improvement
......@@ -80,21 +80,17 @@ namespace eos {
* Index by start_block
*/
struct sync_state {
uint32_t start_block = 0;
uint32_t end_block = 0;
uint32_t last = 0; ///< last sent or received
sync_state(uint32_t start = 0, uint32_t end = 0, uint32_t last_acted = 0)
:start_block ( start ), end_block( end ), last( last_acted ),
start_time (time_point::now()), block_cache()
{}
uint32_t start_block;
uint32_t end_block;
uint32_t last; ///< last sent or received
time_point start_time; ///< time request made or received
vector< vector<char> > block_cache;
};
struct by_start_block;
typedef multi_index_container<
sync_state,
indexed_by<
ordered_unique< tag<by_start_block>, member<sync_state, uint32_t, &sync_state::start_block > >
>
> sync_request_index;
struct handshake_initializer {
static void populate (handshake_message &hello);
};
......@@ -104,8 +100,8 @@ namespace eos {
connection( string endpoint )
: block_state(),
trx_state(),
in_sync_state(),
out_sync_state(),
sync_received(),
sync_requested(),
socket( std::make_shared<tcp::socket>( std::ref( app().get_io_service() ))),
pending_message_size(),
pending_message_buffer(),
......@@ -113,19 +109,18 @@ namespace eos {
last_handshake(),
out_queue(),
connecting (false),
syncing (false),
peer_addr (endpoint)
{
wlog( "created connection to ${n}", ("n", endpoint) );
pending_message_buffer.resize( 1024*1024*4 );
auto *rnd = remote_node_id.data();
rnd[0] = 0;
initialize();
}
connection( socket_ptr s )
: block_state(),
trx_state(),
in_sync_state(),
out_sync_state(),
sync_received(),
sync_requested(),
socket( s ),
pending_message_size(),
pending_message_buffer(),
......@@ -133,12 +128,13 @@ namespace eos {
last_handshake(),
out_queue(),
connecting (false),
syncing (false),
peer_addr ()
{
wlog( "created connection from client" );
pending_message_buffer.resize( 1024*1024*4 );
auto *rnd = remote_node_id.data();
rnd[0] = 0;
boost::asio::ip::tcp::no_delay option(true);
socket->set_option(option);
initialize ();
}
~connection() {
......@@ -148,31 +144,49 @@ namespace eos {
wlog( "released connection to server at ${addr}", ("addr", peer_addr) );
}
void initialize () {
pending_message_buffer.resize( 1024*1024*4 );
auto *rnd = remote_node_id.data();
rnd[0] = 0;
}
block_state_index block_state;
transaction_state_index trx_state;
sync_request_index in_sync_state; // we are requesting info from this peer
sync_request_index out_sync_state; // this peer is requesting info from us
vector<sync_state> sync_received; // we are requesting info from this peer
vector<sync_state> sync_requested; // this peer is requesting info from us
socket_ptr socket;
uint32_t pending_message_size;
vector<char> pending_message_buffer;
vector<char> blk_buffer;
size_t message_size;
fc::sha256 remote_node_id;
handshake_message last_handshake;
std::deque<net_message> out_queue;
uint32_t mtu;
bool connecting;
bool syncing;
string peer_addr;
bool ready () {
return (socket->is_open() && !connecting);
}
bool ready_and_willing () {
return (ready() && !syncing);
}
void reset () {
in_sync_state.clear();
out_sync_state.clear();
sync_received.clear();
sync_requested.clear();
block_state.clear();
trx_state.clear();
}
void close () {
connecting = false;
syncing = false;
out_queue.clear();
if (socket) {
socket->close();
......@@ -194,7 +208,7 @@ namespace eos {
void send_next_message() {
if( !out_queue.size() ) {
if (out_sync_state.size() > 0) {
if (sync_requested.size() > 0) {
write_block_backlog();
}
return;
......@@ -207,7 +221,6 @@ namespace eos {
fc::datastream<char*> ds( buffer.data(), buffer.size() );
ds.write( (char*)&size, sizeof(size) );
fc::raw::pack( ds, m );
boost::asio::async_write( *socket, boost::asio::buffer( buffer.data(), buffer.size() ),
[this,buf=std::move(buffer)]( boost::system::error_code ec, std::size_t bytes_transferred ) {
if( ec ) {
......@@ -225,25 +238,21 @@ namespace eos {
void write_block_backlog ( ) {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
auto ss = out_sync_state.begin();
uint32_t num = ++ss.get_node()->value().last;
auto ss = sync_requested.begin();
uint32_t num = ++ss->last; //get_node()->value().last;
ilog ("num = ${num} end = ${end}",("num",num)("end",ss->end_block));
if (num >= ss->end_block) {
out_sync_state.erase(ss);
ilog ("out sync size = ${s}",("s",out_sync_state.size()));
sync_requested.erase(ss);
ilog ("out sync size = ${s}",("s",sync_requested.size()));
}
try {
fc::optional<signed_block> sb = cc.fetch_block_by_number(num);
if (sb) {
// dlog("write backlog, block #${num}",("num",num));
send( *sb );
}
} catch ( ... ) {
wlog( "write loop exception" );
}
if (out_sync_state.size() == 0) {
send_handshake ( );
}
}
......@@ -304,16 +313,17 @@ namespace eos {
string p2p_address;
vector<string> supplied_peers;
std::set<fc::sha256> resolved_nodes;
std::set<fc::sha256> learned_nodes;
std::set< connection_ptr > connections;
bool done = false;
uint32_t sync_head;
uint32_t sync_req_head;
uint32_t sync_req_span;
unique_ptr<boost::asio::steady_timer> connector_check;
unique_ptr<boost::asio::steady_timer> transaction_check;
boost::asio::steady_timer::duration connector_period;
boost::asio::steady_timer::duration txn_exp_period;
unique_ptr<boost::asio::steady_timer> connector_check;
unique_ptr<boost::asio::steady_timer> transaction_check;
boost::asio::steady_timer::duration connector_period;
boost::asio::steady_timer::duration txn_exp_period;
int16_t network_version;
chain_id_type chain_id;
......@@ -331,7 +341,6 @@ namespace eos {
void connect( connection_ptr c ) {
c->connecting = true;
auto host = c->peer_addr.substr( 0, c->peer_addr.find(':') );
auto port = c->peer_addr.substr( host.size()+1, host.size() );
idump((host)(port));
......@@ -351,6 +360,7 @@ namespace eos {
void connect( connection_ptr c, tcp::resolver::iterator endpoint_itr ) {
auto current_endpoint = *endpoint_itr;
++endpoint_itr;
c->connecting = true;
c->socket->async_connect( current_endpoint,
[c, endpoint_itr, this]
( const boost::system::error_code& err ) {
......@@ -358,10 +368,13 @@ namespace eos {
start_session( c );
} else {
if( endpoint_itr != tcp::resolver::iterator() ) {
c->close();
connect( c, endpoint_itr );
}
else {
elog ("connection failed to ${peer}: ${error}", ("peer", c->peer_addr)("error",err.message()));
c->connecting = false;
c->close();
}
}
} );
......@@ -374,7 +387,6 @@ namespace eos {
just_send_it_max = mtu;
}
start_read_message( con );
con->send_handshake( );
// for now, we can just use the application main loop.
......@@ -436,29 +448,108 @@ namespace eos {
template<typename VerifierFunc>
void send_all (const net_message &msg, VerifierFunc verify) {
for (auto &c : connections) {
if (c->out_sync_state.size() == 0 &&
verify (c)) {
if (c->ready_and_willing() && verify (c)) {
c->send(msg);
}
}
}
void shared_fetch (uint32_t low, uint32_t high) {
bool get_sync_req (connection_ptr c) {
if (sync_req_head == sync_head) {
return true;
}
uint32_t first = sync_req_head + 1;
uint32_t last = (first + sync_req_span -1);
if (last < sync_head)
last = sync_head;
c->sync_received.emplace_back (first, last, sync_req_head);
sync_request_message srm = {first, last };
c->send (srm);
sync_req_head = last;
return (sync_req_head == sync_head);
}
void set_sync_head (connection_ptr c, uint32_t target) {
bool stable = (sync_head == sync_req_head);
if (stable) {
sync_req_head = chain_plug->chain().head_block_num();
}
ilog ("Catching up with chain, our head is ${cc}, theirs is ${t}",
("cc",sync_req_head)("t",target));
if (target > sync_head) {
sync_head = target;
if (stable) {
for (auto &ci : connections) {
if (ci->ready()) {
if (get_sync_req (ci)) {
break;
}
}
}
} else {
get_sync_req (c);
}
}
}
struct postcache : public fc::visitor<void> {
connection_ptr c;
chain_plugin * chain_plug;
postcache (connection_ptr conn, chain_plugin *cp) : c(conn), chain_plug (cp) {}
void operator()(const signed_block &block) const
{
chain_plug->accept_block (block,true);
}
template <typename T>
void operator()(const T &msg) const
{
//no-op
}
};
void apply_cached_blocks (connection_ptr conn) {
bool keep_going = true;
while (keep_going) {
keep_going = false;
for (auto &c : connections) {
try {
auto ss = c->sync_received.begin();
if (ss == c->sync_received.end()) {
get_sync_req(c);
continue;
}
uint32_t start = 1 + chain_plug->chain().head_block_num();
if (start == ss->start_block) {
if (ss->last < start || ss->block_cache.empty()) {
return;
}
uint32_t delta = high - low;
uint32_t count = connections.size();
FC_ASSERT (count > 0);
uint32_t span = delta / count;
uint32_t lastSpan = delta - (span * (count-1));
for (auto &cx: connections) {
if (--count == 0) {
span = lastSpan;
for (auto & blk : ss->block_cache) {
auto block = fc::raw::unpack<net_message>( blk );
postcache pc(c,chain_plug);
block.visit (pc);
++start;
ss->start_block++;
}
ss->block_cache.clear();
if (start > ss->end_block) {
c->sync_received.erase(ss);
if (c->sync_received.empty()) {
get_sync_req(c);
}
keep_going = true;
}
}
} catch (...) {
elog ("caught something trying to accept blocks");
// not a problem. We found the list but no blocks were cached
return;
}
}
sync_state req = {low+1, low+span, low, time_point::now() };
cx->in_sync_state.insert (req);
sync_request_message srm = {req.start_block, req.end_block };
cx->send (srm);
low += span;
}
}
......@@ -474,7 +565,8 @@ namespace eos {
return;
}
if (msg.network_version != network_version) {
elog ("Peer network version does not match ");
elog ("Peer network version does not match expected ${nv} but got ${mnv}",
("nv", network_version)("mnv",msg.network_version));
close (c);
return;
}
......@@ -499,29 +591,17 @@ namespace eos {
}
}
if ( c->remote_node_id != msg.node_id) {
// c->reset();
c->remote_node_id = msg.node_id;
}
uint32_t head = cc.head_block_num ();
if ( msg.head_num > head) {
shared_fetch (head, msg.head_num);
set_sync_head(c, msg.head_num);
}
if ( c->remote_node_id != msg.node_id) {
c->reset();
if (c->peer_addr.length() > 0) {
auto old_id = resolved_nodes.find (c->remote_node_id);
if (old_id != resolved_nodes.end()) {
resolved_nodes.erase(old_id);
}
resolved_nodes.insert (msg.node_id);
}
else {
auto old_id = learned_nodes.find (c->remote_node_id);
if (old_id != learned_nodes.end()) {
learned_nodes.erase(old_id);
}
learned_nodes.insert (msg.node_id);
}
c->remote_node_id = msg.node_id;
else {
c->syncing = head != msg.head_num;
}
c->last_handshake = msg;
}
......@@ -553,6 +633,7 @@ namespace eos {
// collect a list of transactions that were found.
// collect a second list of transaction ids that were not found but are otherwise known by some peers
// finally, what remains are future(?) transactions
vector< SignedTransaction > send_now;
map <connection_ptr, vector < transaction_id_type > > forward_to;
auto conn_ndx = connections.begin();
......@@ -601,8 +682,7 @@ namespace eos {
}
void handle_message (connection_ptr c, const sync_request_message &msg) {
sync_state req = {msg.start_block,msg.end_block,msg.start_block-1,time_point::now()};
c->out_sync_state.insert (req);
c->sync_requested.emplace_back (msg.start_block,msg.end_block,msg.start_block-1);
c->write_block_backlog ();
}
......@@ -660,28 +740,59 @@ namespace eos {
void handle_message (connection_ptr c, const signed_block &msg) {
chain_controller &cc = chain_plug->chain();
if (cc.is_known_block(msg.id())) {
return;
try {
if (cc.is_known_block(msg.id())) {
return;
}
} catch (...) {
}
uint32_t num = 0;
bool syncing = c->in_sync_state.size() > 0;
uint32_t num = msg.block_num();
bool syncing = sync_head > cc.head_block_num();
bool get_more = false;
if (syncing) {
for( auto ss = c->in_sync_state.begin(); ss != c->in_sync_state.end(); ss++ ) {
if (msg.block_num() == ss->last + 1 && msg.block_num() <= ss->end_block) {
num = msg.block_num();
ss.get_node()->value().last = num;
break;
for( auto &ss : c->sync_received) {
if (num > ss.end_block) {
continue;
}
ss.last = num;
get_more = num == ss.end_block;
if (num == cc.head_block_num()+1) {
try {
chain_plug->accept_block(msg, true);
auto s0 = c->sync_received.begin();
if (s0->start_block == s0->end_block) {
c->sync_received.erase(s0);
apply_cached_blocks(c);
}
else {
s0->start_block++;
}
} catch (const unlinkable_block_exception &ex) {
elog ("unable to accept block #${n} syncing",("n",num));
//close (c);
} catch (const assert_exception &ex) {
elog ("unable to accept block on assert exception #${n}",("n",num));
//close (c);
}
} else {
ss.block_cache.emplace_back(std::move(c->blk_buffer));
}
break;
}
if (num == 0) {
elog ("syncing, got out-of-order block ${n}",("n",msg.block_num()));
//close (c);
return;
if ( chain_plug->chain().head_block_num() == sync_head) {
handshake_message hello;
handshake_initializer::populate(hello);
send_all (hello, [c](connection_ptr conn) -> bool {
return true;
});
} else if (get_more) {
get_sync_req( c );
}
return;
}
else {
send_all (msg, [c](connection_ptr conn) -> bool {
send_all (msg, [c](connection_ptr conn) -> bool {
return (c != conn);
});
......@@ -697,6 +808,25 @@ namespace eos {
}
}
struct precache : public fc::visitor<void> {
connection_ptr c;
precache (connection_ptr conn) : c(conn) {}
void operator()(const signed_block &msg) const
{
c->blk_buffer.resize(c->message_size);
memcpy(c->blk_buffer.data(),c->pending_message_buffer.data(), c->message_size);
}
template <typename T>
void operator()(const T &msg) const
{
//no-op
}
};
struct msgHandler : public fc::visitor<void> {
net_plugin_impl &impl;
......@@ -710,14 +840,16 @@ namespace eos {
}
};
void start_reading_pending_buffer( connection_ptr c ) {
boost::asio::async_read( *c->socket,
boost::asio::buffer(c->pending_message_buffer.data(), c->pending_message_size ),
[this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) {
if( !ec ) {
try {
c->message_size = bytes_transferred;
auto msg = fc::raw::unpack<net_message>( c->pending_message_buffer );
precache pc( c );
msg.visit (pc);
start_read_message( c );
msgHandler m(*this, c);
......@@ -899,7 +1031,6 @@ namespace eos {
}
}
}; // class net_plugin_impl
void
......@@ -993,7 +1124,9 @@ namespace eos {
}
my->send_whole_blocks = true;
my->sync_req_span = 10;
my->sync_head = 0;
my->sync_req_head = 0;
if( options.count( "remote-endpoint" ) ) {
my->supplied_peers = options.at( "remote-endpoint" ).as< vector<string> >();
}
......@@ -1027,10 +1160,6 @@ namespace eos {
my->connections.insert (c);
my->connect( c );
}
boost::asio::signal_set signals (app().get_io_service(), SIGINT, SIGTERM);
signals.async_wait ([this](const boost::system::error_code &ec, int signum) {
dlog ("caught signal ${sn}", ("sn", signum) ) ;
});
}
void net_plugin::plugin_shutdown() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册