提交 05dc1ef0 编写于 作者: P Phil Mesnier

#2211, introduce block number as a criteria for purging old peer transaction and block state data.

上级 5b12cab8
......@@ -121,7 +121,6 @@ namespace eosio {
fc::time_point_sec,
&node_transaction_state::expires >
>,
ordered_non_unique<
tag<by_block_num>,
member< node_transaction_state,
......@@ -285,7 +284,7 @@ namespace eosio {
constexpr auto def_max_clients = 25; // 0 for unlimited clients
constexpr auto def_conn_retry_wait = 30;
constexpr auto def_txn_expire_wait = std::chrono::seconds(3);
constexpr auto def_resp_expected_wait = std::chrono::seconds(1);
constexpr auto def_resp_expected_wait = std::chrono::seconds(5);
constexpr auto def_sync_fetch_span = 100;
constexpr auto def_max_just_send = 1500; // "mtu" * 1
constexpr auto def_send_whole_blocks = true;
......@@ -326,8 +325,14 @@ namespace eosio {
typedef multi_index_container<
transaction_state,
indexed_by<
ordered_unique< tag<by_id>, member<transaction_state, transaction_id_type, &transaction_state::id > >
ordered_unique< tag<by_id>, member<transaction_state, transaction_id_type, &transaction_state::id > >,
ordered_non_unique<
tag<by_block_num>,
member< transaction_state,
uint32_t,
&transaction_state::block_num > >
>
> transaction_state_index;
/**
......@@ -335,6 +340,7 @@ namespace eosio {
*/
struct block_state {
block_id_type id;
uint32_t block_num;
bool is_known;
bool is_noticed;
time_point requested_time;
......@@ -352,7 +358,8 @@ namespace eosio {
typedef multi_index_container<
block_state,
indexed_by<
ordered_unique< tag<by_id>, member<block_state, block_id_type, &block_state::id > >
ordered_unique< tag<by_id>, member<block_state, block_id_type, &block_state::id > >,
ordered_unique< tag<by_block_num>, member<block_state, uint32_t, &block_state::block_num > >
>
> block_state_index;
......@@ -576,7 +583,7 @@ namespace eosio {
void bcast_transaction (const transaction_id_type& id,
time_point_sec expiration,
const packed_transaction& msg);
void bcast_rejected_transaction (const packed_transaction& msg, uint64_t code);
void rejected_transaction (const packed_transaction& msg);
void recv_block (connection_ptr conn, const signed_block_summary& msg);
void recv_transaction(connection_ptr c);
void recv_notice (connection_ptr conn, const notice_message& msg);
......@@ -1397,6 +1404,7 @@ namespace eosio {
if (blk_num != sync_next_expected_num) {
fc_ilog (logger, "expected block ${ne} but got ${bn}",("ne",sync_next_expected_num)("bn",blk_num));
c->close();
return;
}
sync_next_expected_num = blk_num + 1;
}
......@@ -1443,18 +1451,19 @@ namespace eosio {
uint32_t msgsiz = packsiz + sizeof(packsiz);
notice_message pending_notify;
block_id_type bid = bsum.id();
uint32_t bnum = bsum.block_num();
pending_notify.known_blocks.mode = normal;
pending_notify.known_blocks.ids.push_back( bid );
pending_notify.known_trx.mode = none;
if (msgsiz > just_send_it_max) {
fc_ilog(logger, "block size is ${ms}, sending notify",("ms", msgsiz));
my_impl->send_all(pending_notify, [skip, bid](connection_ptr c) -> bool {
my_impl->send_all(pending_notify, [skip, bid, bnum](connection_ptr c) -> bool {
if (c == skip || !c->current())
return false;
const auto& bs = c->blk_state.find(bid);
bool unknown = bs == c->blk_state.end();
if (unknown) {
c->blk_state.insert((block_state){bid,false,true,time_point()});
c->blk_state.insert((block_state){bid,bnum,false,true,time_point()});
}
else {
elog("${p} already has knowledge of block ${b}", ("p",c->peer_name())("b",bid));
......@@ -1469,25 +1478,22 @@ namespace eosio {
}
const auto& prev = cp->blk_state.find (bsum.previous);
if (prev != cp->blk_state.end() && !prev->is_known) {
cp->blk_state.insert((block_state){bid,false,true,time_point()});
cp->blk_state.insert((block_state){bid,bnum,false,true,time_point()});
cp->enqueue( pending_notify );
}
else {
cp->blk_state.insert((block_state){bid,true,true,time_point()});
cp->blk_state.insert((block_state){bid,bnum,true,true,time_point()});
cp->enqueue( bsum );
}
}
}
}
void big_msg_manager::bcast_rejected_transaction (const packed_transaction& txn, uint64_t code) {
if (code == 0 || code == eosio::chain::tx_duplicate::code_value) {
//do not forward duplicates or those with unknown exception types
return;
}
void big_msg_manager::rejected_transaction (const packed_transaction& txn) {
transaction_id_type tid = txn.get_transaction().id();
fc_wlog(logger,"sending rejected transaction ${tid}",("tid",tid));
bcast_transaction (tid, time_point_sec(), txn);
fc_dlog(logger,"not sending rejected transaction ${tid}",("tid",tid));
pending_txn_source.reset();
}
void big_msg_manager::bcast_transaction (const transaction_id_type & txnid,
......@@ -1507,36 +1513,32 @@ namespace eosio {
fc_dlog(logger, "found txnid in local_txns" );
return;
}
bool remember = expiration != time_point_sec();
uint32_t packsiz = 0;
uint32_t bufsiz = 0;
if (remember) {
net_message msg(txn);
packsiz = fc::raw::pack_size(msg);
bufsiz = packsiz + sizeof(packsiz);
vector<char> buff(bufsiz);
fc::datastream<char*> ds( buff.data(), bufsiz);
ds.write( reinterpret_cast<char*>(&packsiz), sizeof(packsiz) );
fc::raw::pack( ds, msg );
node_transaction_state nts = {txnid,
expiration,
txn,
std::move(buff),
0, 0, 0};
my_impl->local_txns.insert(std::move(nts));
}
net_message msg(txn);
packsiz = fc::raw::pack_size(msg);
bufsiz = packsiz + sizeof(packsiz);
vector<char> buff(bufsiz);
fc::datastream<char*> ds( buff.data(), bufsiz);
ds.write( reinterpret_cast<char*>(&packsiz), sizeof(packsiz) );
fc::raw::pack( ds, msg );
node_transaction_state nts = {txnid,
expiration,
txn,
std::move(buff),
0, 0, 0};
my_impl->local_txns.insert(std::move(nts));
if(bufsiz <= just_send_it_max) {
my_impl->send_all( txn, [skip, remember,txnid](connection_ptr c) -> bool {
my_impl->send_all( txn, [skip, txnid](connection_ptr c) -> bool {
if(c == skip || c->syncing ) {
return false;
}
const auto& bs = c->trx_state.find(txnid);
bool unknown = bs == c->trx_state.end();
if( unknown) {
if (remember) {
c->trx_state.insert(transaction_state({txnid,true,true,0,time_point() }));
}
c->trx_state.insert(transaction_state({txnid,true,true,0,time_point() }));
fc_dlog(logger, "sending whole txn to ${n}", ("n",c->peer_name() ) );
}
return unknown;
......@@ -1547,7 +1549,7 @@ namespace eosio {
pending_notify.known_trx.mode = normal;
pending_notify.known_trx.ids.push_back( txnid );
pending_notify.known_blocks.mode = none;
my_impl->send_all(pending_notify, [skip, remember, txnid](connection_ptr c) -> bool {
my_impl->send_all(pending_notify, [skip, txnid](connection_ptr c) -> bool {
if (c == skip || c->syncing) {
return false;
}
......@@ -1555,9 +1557,7 @@ namespace eosio {
bool unknown = bs == c->trx_state.end();
if( unknown) {
fc_dlog(logger, "sending notice to ${n}", ("n",c->peer_name() ) );
if (remember) {
c->trx_state.insert(transaction_state({txnid,false,true,0, time_point() }));
}
c->trx_state.insert(transaction_state({txnid,false,true,0, time_point() }));
}
return unknown;
});
......@@ -1576,11 +1576,11 @@ namespace eosio {
note.known_blocks.mode = normal;
note.known_blocks.ids.push_back( blk_id );
note.known_trx.mode = none;
my_impl->send_all(note, [blk_id](connection_ptr conn) -> bool {
my_impl->send_all(note, [blk_id, num](connection_ptr conn) -> bool {
const auto& bs = conn->blk_state.find(blk_id);
bool unknown = bs == conn->blk_state.end();
if (unknown) {
conn->blk_state.insert(block_state({blk_id,false,true,fc::time_point() }));
conn->blk_state.insert(block_state({blk_id,num,false,true,fc::time_point() }));
}
return unknown;
});
......@@ -1598,7 +1598,7 @@ namespace eosio {
if( c != conn && !conn->syncing ) {
auto b = conn->blk_state.get<by_id>().find(blk_id);
if(b == conn->blk_state.end()) {
conn->blk_state.insert( (block_state){blk_id,true,true,fc::time_point()});
conn->blk_state.insert( (block_state){blk_id,num,true,true,fc::time_point()});
sendit = true;
} else if (!b->is_known) {
conn->blk_state.modify(b,set_is_known);
......@@ -1617,11 +1617,11 @@ namespace eosio {
note.known_blocks.mode = normal;
note.known_blocks.ids.push_back( blk_id );
note.known_trx.mode = none;
my_impl->send_all(note, [blk_id](connection_ptr conn) -> bool {
my_impl->send_all(note, [blk_id, num](connection_ptr conn) -> bool {
const auto& bs = conn->blk_state.find(blk_id);
bool unknown = bs == conn->blk_state.end();
if (unknown) {
conn->blk_state.insert(block_state({blk_id,false,true,fc::time_point() }));
conn->blk_state.insert(block_state({blk_id,num,false,true,fc::time_point() }));
}
return unknown;
});
......@@ -1681,7 +1681,6 @@ namespace eosio {
elog( "failed to retrieve block for id");
}
if (!b) {
c->blk_state.insert((block_state){blkid,true,true,time_point::now()});
send_req = true;
req.req_blocks.ids.push_back( blkid );
req_blks.push_back( blkid );
......@@ -2200,7 +2199,7 @@ namespace eosio {
elog( " caught something attempting to accept transaction");
}
big_msg_master->bcast_rejected_transaction(msg, code);
big_msg_master->rejected_transaction(msg);
}
void net_plugin_impl::handle_message( connection_ptr c, const signed_transaction &msg) {
......@@ -2406,9 +2405,13 @@ namespace eosio {
auto &stale = local_txns.get<by_block_num>();
chain_controller &cc = chain_plug->chain();
uint32_t bn = cc.last_irreversible_block_num();
auto bn_up = stale.upper_bound(bn);
auto bn_lo = stale.lower_bound(1);
stale.erase( bn_lo, bn_up);
stale.erase( stale.lower_bound(1), stale.upper_bound(bn) );
for ( auto &c : connections ) {
auto &stale_txn = c->trx_state.get<by_block_num>();
stale_txn.erase( stale_txn.lower_bound(1), stale_txn.upper_bound(bn) );
auto &stale_blk = c->blk_state.get<by_block_num>();
stale_blk.erase( stale_blk.lower_bound(1), stale_blk.upper_bound(bn) );
}
}
void net_plugin_impl::connection_monitor( ) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册