提交 f15a7441 编写于 作者: B Bart Wyatt

Fix for unchecked load in received_transactions/blocks

The two vectors (received_transactions and received_blocks) in the net_plugin were being improperly pruned when multiple connections managed to create entries leading to unchecked memory and processing growth.  Additionally, lookups were always done by ID so, refactoring to a multimap instead of a vector of named "pair" types reduces the load of scanning for those transactions/blocks
上级 e1d07401
...@@ -676,18 +676,8 @@ namespace eosio { ...@@ -676,18 +676,8 @@ namespace eosio {
vector<transaction_id_type> req_trx; vector<transaction_id_type> req_trx;
struct block_origin { std::multimap<block_id_type, connection_ptr> received_blocks;
block_id_type id; std::multimap<transaction_id_type, connection_ptr> received_transactions;
connection_ptr origin;
};
struct transaction_origin {
transaction_id_type id;
connection_ptr origin;
};
vector<block_origin> received_blocks;
vector<transaction_origin> received_transactions;
void bcast_transaction (const packed_transaction& msg); void bcast_transaction (const packed_transaction& msg);
void rejected_transaction (const transaction_id_type& msg); void rejected_transaction (const transaction_id_type& msg);
...@@ -1610,14 +1600,13 @@ namespace eosio { ...@@ -1610,14 +1600,13 @@ namespace eosio {
//------------------------------------------------------------------------ //------------------------------------------------------------------------
void dispatch_manager::bcast_block (const signed_block &bsum) { void dispatch_manager::bcast_block (const signed_block &bsum) {
connection_ptr skip; std::set<connection_ptr> skips;
for (auto org = received_blocks.begin(); org != received_blocks.end(); org++) { auto range = received_blocks.equal_range(bsum.id());
if (org->id == bsum.id()) { for (auto org = range.first; org != range.second; ++org) {
skip = org->origin; skips.insert(org->second);
received_blocks.erase(org);
break;
}
} }
received_blocks.erase(range.first, range.second);
net_message msg(bsum); net_message msg(bsum);
uint32_t packsiz = fc::raw::pack_size(msg); uint32_t packsiz = fc::raw::pack_size(msg);
uint32_t msgsiz = packsiz + sizeof(packsiz); uint32_t msgsiz = packsiz + sizeof(packsiz);
...@@ -1630,12 +1619,12 @@ namespace eosio { ...@@ -1630,12 +1619,12 @@ namespace eosio {
peer_block_state pbstate = {bid, bnum, false,true,time_point()}; peer_block_state pbstate = {bid, bnum, false,true,time_point()};
// skip will be empty if our producer emitted this block so just send it // skip will be empty if our producer emitted this block so just send it
if (( large_msg_notify && msgsiz > just_send_it_max) && skip) { if (( large_msg_notify && msgsiz > just_send_it_max) && !skips.empty()) {
fc_ilog(logger, "block size is ${ms}, sending notify",("ms", msgsiz)); fc_ilog(logger, "block size is ${ms}, sending notify",("ms", msgsiz));
connection_wptr weak_skip = skip; my_impl->send_all(pending_notify, [&skips, pbstate](connection_ptr c) -> bool {
my_impl->send_all(pending_notify, [weak_skip, pbstate](connection_ptr c) -> bool { if (skips.find(c) != skips.end() || !c->current())
if (c == weak_skip.lock() || !c->current())
return false; return false;
bool unknown = c->add_peer_block(pbstate); bool unknown = c->add_peer_block(pbstate);
if (!unknown) { if (!unknown) {
elog("${p} already has knowledge of block ${b}", ("p",c->peer_name())("b",pbstate.block_num)); elog("${p} already has knowledge of block ${b}", ("p",c->peer_name())("b",pbstate.block_num));
...@@ -1646,7 +1635,7 @@ namespace eosio { ...@@ -1646,7 +1635,7 @@ namespace eosio {
else { else {
pbstate.is_known = true; pbstate.is_known = true;
for (auto cp : my_impl->connections) { for (auto cp : my_impl->connections) {
if (cp == skip || !cp->current()) { if (skips.find(cp) != skips.end() || !cp->current()) {
continue; continue;
} }
cp->add_peer_block(pbstate); cp->add_peer_block(pbstate);
...@@ -1656,7 +1645,7 @@ namespace eosio { ...@@ -1656,7 +1645,7 @@ namespace eosio {
} }
void dispatch_manager::recv_block (connection_ptr c, const block_id_type& id, uint32_t bnum) { void dispatch_manager::recv_block (connection_ptr c, const block_id_type& id, uint32_t bnum) {
received_blocks.emplace_back((block_origin){id, c}); received_blocks.insert(std::make_pair(id, c));
if (c && if (c &&
c->last_req && c->last_req &&
c->last_req->req_blocks.mode != none && c->last_req->req_blocks.mode != none &&
...@@ -1672,25 +1661,19 @@ namespace eosio { ...@@ -1672,25 +1661,19 @@ namespace eosio {
void dispatch_manager::rejected_block (const block_id_type& id) { void dispatch_manager::rejected_block (const block_id_type& id) {
fc_dlog(logger,"not sending rejected transaction ${tid}",("tid",id)); fc_dlog(logger,"not sending rejected transaction ${tid}",("tid",id));
for (auto org = received_blocks.begin(); org != received_blocks.end(); org++) { auto range = received_blocks.equal_range(id);
if (org->id == id) { received_blocks.erase(range.first, range.second);
received_blocks.erase(org);
break;
}
}
} }
void dispatch_manager::bcast_transaction (const packed_transaction& trx) { void dispatch_manager::bcast_transaction (const packed_transaction& trx) {
connection_ptr skip; std::set<connection_ptr> skips;
transaction_id_type id = trx.id(); transaction_id_type id = trx.id();
for (auto org = received_transactions.begin(); org != received_transactions.end(); org++) { auto range = received_transactions.equal_range(id);
if (org->id == id) { for (auto org = range.first; org != range.second; ++org) {
skip = org->origin; skips.insert(org->second);
received_transactions.erase(org);
break;
}
} }
received_transactions.erase(range.first, range.second);
for (auto ref = req_trx.begin(); ref != req_trx.end(); ++ref) { for (auto ref = req_trx.begin(); ref != req_trx.end(); ++ref) {
if (*ref == id) { if (*ref == id) {
...@@ -1723,9 +1706,8 @@ namespace eosio { ...@@ -1723,9 +1706,8 @@ namespace eosio {
my_impl->local_txns.insert(std::move(nts)); my_impl->local_txns.insert(std::move(nts));
if( !large_msg_notify || bufsiz <= just_send_it_max) { if( !large_msg_notify || bufsiz <= just_send_it_max) {
connection_wptr weak_skip = skip; my_impl->send_all( trx, [id, &skips, trx_expiration](connection_ptr c) -> bool {
my_impl->send_all( trx, [weak_skip, id, trx_expiration](connection_ptr c) -> bool { if( skips.find(c) != skips.end() || c->syncing ) {
if(c == weak_skip.lock() || c->syncing ) {
return false; return false;
} }
const auto& bs = c->trx_state.find(id); const auto& bs = c->trx_state.find(id);
...@@ -1745,9 +1727,8 @@ namespace eosio { ...@@ -1745,9 +1727,8 @@ namespace eosio {
pending_notify.known_trx.mode = normal; pending_notify.known_trx.mode = normal;
pending_notify.known_trx.ids.push_back( id ); pending_notify.known_trx.ids.push_back( id );
pending_notify.known_blocks.mode = none; pending_notify.known_blocks.mode = none;
connection_wptr weak_skip = skip; my_impl->send_all(pending_notify, [id, &skips, trx_expiration](connection_ptr c) -> bool {
my_impl->send_all(pending_notify, [weak_skip, id, trx_expiration](connection_ptr c) -> bool { if (skips.find(c) != skips.end() || c->syncing) {
if (c == weak_skip.lock() || c->syncing) {
return false; return false;
} }
const auto& bs = c->trx_state.find(id); const auto& bs = c->trx_state.find(id);
...@@ -1766,7 +1747,7 @@ namespace eosio { ...@@ -1766,7 +1747,7 @@ namespace eosio {
} }
void dispatch_manager::recv_transaction (connection_ptr c, const transaction_id_type& id) { void dispatch_manager::recv_transaction (connection_ptr c, const transaction_id_type& id) {
received_transactions.emplace_back((transaction_origin){id, c}); received_transactions.insert(std::make_pair(id, c));
if (c && if (c &&
c->last_req && c->last_req &&
c->last_req->req_trx.mode != none && c->last_req->req_trx.mode != none &&
...@@ -1781,12 +1762,8 @@ namespace eosio { ...@@ -1781,12 +1762,8 @@ namespace eosio {
void dispatch_manager::rejected_transaction (const transaction_id_type& id) { void dispatch_manager::rejected_transaction (const transaction_id_type& id) {
fc_dlog(logger,"not sending rejected transaction ${tid}",("tid",id)); fc_dlog(logger,"not sending rejected transaction ${tid}",("tid",id));
for (auto org = received_transactions.begin(); org != received_transactions.end(); org++) { auto range = received_transactions.equal_range(id);
if (org->id == id) { received_transactions.erase(range.first, range.second);
received_transactions.erase(org);
break;
}
}
} }
void dispatch_manager::recv_notice (connection_ptr c, const notice_message& msg, bool generated) { void dispatch_manager::recv_notice (connection_ptr c, const notice_message& msg, bool generated) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册