未验证 提交 1590b618 编写于 作者: D Daniel Larimer 提交者: GitHub

Merge pull request #1549 from EOSIO/dawn-654-syncstall

#1542 ensure sync keeps going the best it can.
......@@ -286,7 +286,7 @@ namespace eosio {
constexpr auto def_txn_expire_wait = std::chrono::seconds(3);
constexpr auto def_resp_expected_wait = std::chrono::seconds(1);
constexpr auto def_sync_fetch_span = 100;
constexpr auto def_max_just_send = 1500 * 8; // "mtu" * 8
constexpr auto def_max_just_send = 1500 * 3; // "mtu" * 3
constexpr auto def_send_whole_blocks = true;
constexpr auto message_header_size = 4;
......@@ -371,7 +371,7 @@ namespace eosio {
struct sync_state {
sync_state(uint32_t start = 0, uint32_t end = 0, uint32_t last_acted = 0)
:start_block( start ), end_block( end ), last( last_acted ),
start_time(time_point::now())//, block_cache()
start_time(time_point::now())
{}
uint32_t start_block;
uint32_t end_block;
......@@ -379,9 +379,7 @@ namespace eosio {
time_point start_time; ///< time request made or received
};
using sync_state_ptr = shared_ptr< sync_state >;
struct handshake_initializer {
struct handshake_initializer {
static void populate(handshake_message &hello);
};
......@@ -395,7 +393,7 @@ namespace eosio {
block_state_index blk_state;
transaction_state_index trx_state;
sync_state_ptr sync_requested; // this peer is requesting info from us
optional<sync_state> peer_requested; // this peer is requesting info from us
socket_ptr socket;
message_buffer<1024*1024> pending_message_buffer;
......@@ -494,6 +492,7 @@ namespace eosio {
void cancel_fetch();
void flush_queues();
bool enqueue_sync_block();
void request_sync_blocks (uint32_t start, uint32_t end);
void cancel_wait();
void sync_wait();
......@@ -552,15 +551,16 @@ namespace eosio {
public:
sync_manager(uint32_t span);
void set_state(stages s);
bool is_active(connection_ptr conn);
void reset_lib_num();
void reset_lib_num(connection_ptr conn);
bool sync_required();
void request_next_chunk(connection_ptr conn = connection_ptr() );
void start_sync(connection_ptr c, uint32_t target);
void send_handshakes();
void reassign_fetch(connection_ptr c, go_away_reason reason);
void verify_catchup(connection_ptr c, uint32_t num, block_id_type id);
void recv_block(connection_ptr c, const signed_block_summary &blk, bool accepted);
void recv_block(connection_ptr c, const block_id_type &blk_id, uint32_t blk_num, bool accepted);
void recv_handshake(connection_ptr c, const handshake_message& msg);
void recv_notice(connection_ptr c, const notice_message& msg);
......@@ -602,7 +602,7 @@ namespace eosio {
connection::connection( string endpoint )
: blk_state(),
trx_state(),
sync_requested(),
peer_requested(),
socket( std::make_shared<tcp::socket>( std::ref( app().get_io_service() ))),
node_id(),
last_handshake_recv(),
......@@ -625,7 +625,7 @@ namespace eosio {
connection::connection( socket_ptr s )
: blk_state(),
trx_state(),
sync_requested(),
peer_requested(),
socket( s ),
node_id(),
last_handshake_recv(),
......@@ -667,7 +667,7 @@ namespace eosio {
}
void connection::reset() {
sync_requested.reset();
peer_requested.reset();
blk_state.clear();
trx_state.clear();
}
......@@ -696,7 +696,7 @@ namespace eosio {
sent_handshake_count = 0;
last_handshake_recv = handshake_message();
last_handshake_sent = handshake_message();
my_impl->sync_master->reset_lib_num();
my_impl->sync_master->reset_lib_num(shared_from_this());
cancel_wait();
pending_message_buffer.reset();
}
......@@ -945,12 +945,12 @@ namespace eosio {
bool connection::enqueue_sync_block() {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
if (!sync_requested)
if (!peer_requested)
return false;
uint32_t num = ++sync_requested->last;
bool trigger_send = num == sync_requested->start_block;
if(num == sync_requested->end_block) {
sync_requested.reset();
uint32_t num = ++peer_requested->last;
bool trigger_send = num == peer_requested->start_block;
if(num == peer_requested->end_block) {
peer_requested.reset();
}
try {
fc::optional<signed_block> sb = cc.fetch_block_by_number(num);
......@@ -1069,9 +1069,13 @@ namespace eosio {
}
}
void connection::request_sync_blocks (uint32_t start, uint32_t end) {
sync_request_message srm = {start,end};
enqueue( net_message(srm));
}
bool connection::process_next_message(net_plugin_impl& impl, uint32_t message_length) {
try {
cancel_wait();
// If it is a signed_block, then save the raw message for the cache
// This must be done before we unpack the message.
// This code is copied from fc::io::unpack(..., unsigned_int)
......@@ -1115,6 +1119,13 @@ namespace eosio {
chain_plug = app( ).find_plugin<chain_plugin>( );
}
void sync_manager::set_state(stages newstate) {
string os = state == in_sync ? "in sync" : state == lib_catchup ? "lib catchup" : "head catchup";
state = newstate;
string ns = state == in_sync ? "in sync" : state == lib_catchup ? "lib catchup" : "head catchup";
fc_dlog(logger, "old state ${os} becoming ${ns}",("os",os)("ns",ns));
}
bool sync_manager::is_active(connection_ptr c) {
if (state == head_catchup && c) {
bool fhset = c->fork_head != block_id_type();
......@@ -1125,13 +1136,14 @@ namespace eosio {
return state != in_sync;
}
void sync_manager::reset_lib_num() {
sync_known_lib_num = chain_plug->chain().last_irreversible_block_num();
sync_last_requested_num = chain_plug->chain().head_block_num();
for (auto& c : my_impl->connections) {
void sync_manager::reset_lib_num(connection_ptr c) {
if( c->current() ) {
if( c->last_handshake_recv.last_irreversible_block_num > sync_known_lib_num) {
sync_known_lib_num =c->last_handshake_recv.last_irreversible_block_num;
}
} else if( c == source ) {
sync_last_requested_num = chain_plug->chain().head_block_num();
request_next_chunk();
}
}
......@@ -1146,10 +1158,18 @@ namespace eosio {
uint32_t head_block = chain_plug->chain().head_block_num();
if (head_block < sync_last_requested_num) {
ilog ("ignoring request, head is ${h} last req = ${r}",("h",head_block)("r",sync_last_requested_num));
fc_ilog (logger, "ignoring request, head is ${h} last req = ${r}",
("h",head_block)("r",sync_last_requested_num));
return;
}
/* ----------
* next chunk provider selection criteria
* 1. a provider is supplied, use it.
* 2. we only have 1 peer so use that.
* 3. we have multiple peers, select the next available from the list
*/
if (conn) {
source = conn;
}
......@@ -1189,10 +1209,13 @@ namespace eosio {
}
}
}
if (!source) {
elog("Unable to continue syncing at this time");
sync_last_requested_num = chain_plug->chain().head_block_num();
sync_known_lib_num = chain_plug->chain().last_irreversible_block_num();
fc_ilog(logger, "resetting request, our last req is ${cc}, peer ${p}",
( "cc",sync_last_requested_num)("p",source->peer_name()));
return;
}
......@@ -1207,8 +1230,7 @@ namespace eosio {
if( end > 0 && end >= start ) {
fc_dlog(logger, "conn ${n} requesting range ${s} to ${e}",
("n",source->peer_name())("s",start)("e",end));
sync_request_message srm = {start,end};
source->enqueue( net_message(srm));
source->request_sync_blocks(start, end);
sync_last_requested_num = end;
}
}
......@@ -1236,7 +1258,7 @@ namespace eosio {
return;
}
state = lib_catchup;
set_state(lib_catchup);
fc_ilog(logger, "Catching up with chain, our last req is ${cc}, theirs is ${t} peer ${p}",
( "cc",sync_last_requested_num)("t",target)("p",c->peer_name()));
......@@ -1246,6 +1268,10 @@ namespace eosio {
void sync_manager::reassign_fetch(connection_ptr c, go_away_reason reason) {
sync_last_requested_num = chain_plug->chain().head_block_num();
fc_ilog(logger, "reassign_fetch, our last req is ${cc}, peer ${p}",
( "cc",sync_last_requested_num)("p",c->peer_name()));
c->cancel_sync (reason);
request_next_chunk();
}
......@@ -1254,9 +1280,10 @@ namespace eosio {
chain_controller& cc = chain_plug->chain();
uint32_t lib_num = cc.last_irreversible_block_num( );
uint32_t peer_lib = msg.last_irreversible_block_num;
reset_lib_num();
reset_lib_num(c);
c->syncing = false;
state = in_sync;
set_state (in_sync);
//--------------------------------
// sync need checkz; (lib == last irreversible block)
......@@ -1337,7 +1364,7 @@ namespace eosio {
if( req.req_blocks.mode == catch_up ) {
c->fork_head = id;
c->fork_head_num = num;
state = head_catchup;
set_state(head_catchup);
}
else {
c->fork_head = block_id_type();
......@@ -1359,12 +1386,12 @@ namespace eosio {
}
else {
c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending;
reset_lib_num ();
reset_lib_num (c);
start_sync(c, msg.known_blocks.pending);
}
}
void sync_manager::recv_block (connection_ptr c, const signed_block_summary &blk, bool accepted) {
void sync_manager::recv_block (connection_ptr c, const block_id_type &blk_id, uint32_t blk_num, bool accepted) {
if (!accepted) {
uint32_t head_num = chain_plug->chain().head_block_num();
if (head_num != last_repeated) {
......@@ -1382,29 +1409,27 @@ namespace eosio {
}
last_repeated = 0;
uint32_t blk_num = blk.block_num();
if (state == head_catchup) {
fc_dlog (logger, "sync_manager in head_catchup state");
state = in_sync;
set_state(in_sync);
block_id_type null_id;
for (auto cp : my_impl->connections) {
if (cp->fork_head == null_id) {
continue;
}
if (cp->fork_head == blk.id() || cp->fork_head_num < blk_num) {
if (cp->fork_head == blk_id || cp->fork_head_num < blk_num) {
c->fork_head = null_id;
c->fork_head_num = 0;
}
else {
state = head_catchup;
set_state(head_catchup);
}
}
}
else if (state == lib_catchup) {
if( blk_num == sync_known_lib_num ) {
fc_dlog( logger, "All caught up with last known last irreversible block resending handshake");
c->cancel_wait();
state = in_sync;
set_state(in_sync);
send_handshakes();
}
else if (blk_num == sync_last_requested_num) {
......@@ -1610,6 +1635,7 @@ namespace eosio {
void big_msg_manager::recv_transaction (connection_ptr c) {
pending_txn_source = c;
c->cancel_wait();
}
void big_msg_manager::recv_notice (connection_ptr c, const notice_message& msg) {
......@@ -2134,10 +2160,10 @@ namespace eosio {
void net_plugin_impl::handle_message( connection_ptr c, const sync_request_message &msg) {
if( msg.end_block == 0) {
c->sync_requested.reset();
c->peer_requested.reset();
c->flush_queues();
} else {
c->sync_requested.reset(new sync_state( msg.start_block,msg.end_block,msg.start_block-1));
c->peer_requested = sync_state( msg.start_block,msg.end_block,msg.start_block-1);
c->enqueue_sync_block();
}
}
......@@ -2149,6 +2175,7 @@ namespace eosio {
return;
}
c->cancel_wait();
big_msg_master->recv_transaction(c);
try {
......@@ -2167,15 +2194,18 @@ namespace eosio {
}
void net_plugin_impl::handle_message( connection_ptr c, const signed_transaction &msg) {
c->cancel_wait();
}
void net_plugin_impl::handle_message( connection_ptr c, const signed_block_summary &msg) {
chain_controller &cc = chain_plug->chain();
block_id_type blk_id = msg.id();
uint32_t blk_num = msg.block_num();
c->cancel_wait();
try {
if( cc.is_known_block(blk_id)) {
sync_master->recv_block(c, blk_id, blk_num, true);
return;
}
} catch( ...) {
......@@ -2240,8 +2270,8 @@ namespace eosio {
} catch( ...) {
elog( "handle sync block caught something else from ${p}",("num",blk_num)("p",c->peer_name()));
}
big_msg_master->recv_block (c, msg);
sync_master->recv_block (c, msg, reason == no_reason);
big_msg_master->recv_block(c, msg);
sync_master->recv_block(c, blk_id, blk_num, reason == no_reason);
}
void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg) {
......@@ -2249,9 +2279,11 @@ namespace eosio {
chain_controller &cc = chain_plug->chain();
block_id_type blk_id = msg.id();
uint32_t blk_num = msg.block_num();
c->cancel_wait();
try {
if( cc.is_known_block(blk_id)) {
sync_master->recv_block(c, blk_id, blk_num, true);
return;
}
} catch( ...) {
......@@ -2300,9 +2332,8 @@ namespace eosio {
}
}
}
big_msg_master->recv_block (c, msg);
}
sync_master->recv_block (c, msg, reason == no_reason);
sync_master->recv_block(c, blk_id, blk_num, reason == no_reason);
}
void net_plugin_impl::start_conn_timer( ) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册