提交 23748ae0 编写于 作者: P Phil Mesnier

Implement the purger of entries in the local transaction cache. Ref #291 and #318

上级 a198cf3f
......@@ -255,24 +255,41 @@ namespace eos {
fc::time_point received;
fc::time_point_sec expires;
// vector<char> packed_transaction; //just for the moment
SignedTransaction transaction;
uint32_t block_num = -1; /// block transaction was included in
bool validated = false; /// whether or not our node has validated it
};
struct update_block_num {
UInt16 new_bnum;
update_block_num (UInt16 bnum) : new_bnum(bnum) {}
void operator() (node_transaction_state& nts) {
nts.block_num = static_cast<uint32_t>(new_bnum);
}
};
struct by_expiry;
struct by_block_num;
typedef multi_index_container<
node_transaction_state,
indexed_by<
ordered_unique<
tag<by_id>, member < node_transaction_state,
transaction_id_type,
&node_transaction_state::id > >,
tag< by_id >,
member < node_transaction_state,
transaction_id_type,
&node_transaction_state::id > >,
ordered_non_unique<
tag< by_expiry >,
member< node_transaction_state,
fc::time_point_sec,
&node_transaction_state::expires >
>,
ordered_non_unique<
tag<by_expiry>, member< node_transaction_state,
fc::time_point_sec,
&node_transaction_state::expires > >
tag<by_block_num>,
member< node_transaction_state,
uint32_t,
&node_transaction_state::block_num > >
>
>
node_transaction_index;
......@@ -335,7 +352,7 @@ namespace eos {
auto current_endpoint = *endpoint_itr;
++endpoint_itr;
c->socket->async_connect( current_endpoint,
[c,endpoint_itr, this]
[c, endpoint_itr, this]
( const boost::system::error_code& err ) {
if( !err ) {
start_session( c );
......@@ -542,7 +559,12 @@ namespace eos {
for (auto t: msg.req_trx) {
auto txn = local_txns.get<by_id>().find(t);
if (txn != local_txns.end()) {
send_now.push_back(txn->transaction);
chain_controller &cc = chain_plug->chain();
try {
send_now.push_back(cc.get_recent_transaction(t));
} catch (...) {
elog( "failed to retieve transaction");
}
}
else {
int cycle_count = 2;
......@@ -550,7 +572,7 @@ namespace eos {
while (conn_ndx != loop_start) {
if (conn_ndx == connections.end()) {
if (--cycle_count == 0) {
elog("loop cycled twice, something is wrong");
elog("loop cycled twice, something is wrong");
break;
}
conn_ndx = connections.begin();
......@@ -563,7 +585,7 @@ namespace eos {
auto txn = conn_ndx->get()->trx_state.get<by_id>().find(t);
if (txn != conn_ndx->get()->trx_state.end()) {
//forward_to[conn_ndx]->push_back(t);
//forward_to[conn_ndx]->push_back(t);
break;
}
++conn_ndx;
......@@ -620,10 +642,16 @@ namespace eos {
}
void handle_message (connection_ptr c, const SignedTransaction &msg) {
auto txn = local_txns.get<by_id>().find(msg.id());
if (txn != local_txns.end()) {
if (handle_transaction (msg, true)) {
return;
}
if (sizeof(msg) <= just_send_it_max) {
send_all (msg, [c](connection_ptr conn) -> bool {
return (c != conn);
});
}
chain_controller &cc = chain_plug->chain();
if (!cc.is_known_transaction(msg.id())) {
chain_plug->accept_transaction (msg);
......@@ -647,19 +675,25 @@ namespace eos {
}
}
if (num == 0) {
elog ("Got out-of-order block ${n}",("n",msg.block_num()));
close (c);
elog ("syncing, got out-of-order block ${n}",("n",msg.block_num()));
//close (c);
return;
}
}
else {
send_all (msg, [c](connection_ptr conn) -> bool {
return (c != conn);
});
}
try {
chain_plug->accept_block(msg, syncing);
} catch (const unlinkable_block_exception &ex) {
elog ("unable to accpt block #${n}",("n",num));
close (c);
elog ("unable to accept block #${n}",("n",num));
//close (c);
} catch (const assert_exception &ex) {
elog ("unable to accept block cuz my asserts! #${n}",("n",num));
close (c);
elog ("unable to accept block on assert exception #${n}",("n",num));
//close (c);
}
}
......@@ -734,7 +768,16 @@ namespace eos {
void expire_txns () {
start_txn_timer ();
#warning ("TODO: Add by-expiry purging code");
auto &old = local_txns.get<by_expiry>();
auto ex_up = old.upper_bound (time_point::now());
auto ex_lo = old.lower_bound (fc::time_point_sec (0));
old.erase (ex_lo, ex_up);
auto &stale = local_txns.get<by_block_num>();
chain_controller &cc = chain_plug->chain();
uint32_t bn = cc.last_irreversible_block_num();
auto bn_up = stale.upper_bound(bn);
auto bn_lo = stale.lower_bound(0);
stale.erase (bn_lo, bn_up);
}
void connection_monitor () {
......@@ -762,12 +805,29 @@ namespace eos {
c->close();
}
void send_all_txn (const SignedTransaction& txn) {
uint16_t bn = static_cast<uint16_t>(txn.refBlockNum);
node_transaction_state nts = {txn.id(),time_point::now(),txn.expiration,
txn,bn, true};
local_txns.insert(nts);
bool handle_transaction (const SignedTransaction& txn, bool check_expire) {
if (check_expire) {
//expire_txns ();
}
auto t = local_txns.get<by_id>().find(txn.id());
bool isKnown = t != local_txns.end();
if (!isKnown) {
uint16_t bn = static_cast<uint16_t>(txn.refBlockNum);
node_transaction_state nts = {txn.id(),time_point::now(),txn.expiration,
bn, true};
local_txns.insert(nts);
}
return isKnown;
}
void send_all_txn (const SignedTransaction& txn) {
if (handle_transaction (txn, true)) {
return;
}
if (sizeof(txn) <= just_send_it_max) {
send_all (txn, [txn](connection_ptr c) -> bool {
const auto& bs = c->trx_state.find(txn.id());
......@@ -800,6 +860,45 @@ namespace eos {
my_impl->send_all_txn (txn);
}
void broadcast_block_impl (const chain::signed_block &sb) {
if (send_whole_blocks) {
send_all (sb,[](connection_ptr c) -> bool { return true; });
}
chain_controller &cc = chain_plug->chain();
vector<transaction_id_type> trxs;
if (!sb.cycles.empty()) {
for (const auto& cyc : sb.cycles) {
for (const auto& thr : cyc) {
for (auto ui : thr.user_input) {
auto &txn = cc.get_recent_transaction (ui.id());
auto &id_iter = local_txns.get<by_id>();
auto lt = id_iter.find(ui.id());
if (lt != local_txns.end()) {
id_iter.modify (lt, update_block_num(txn.refBlockNum));
} else {
handle_transaction (txn, false);
}
trxs.push_back (ui.id());
}
}
}
}
if (!send_whole_blocks) {
block_summary_message bsm = {sb.id(), trxs};
send_all (bsm,[sb](connection_ptr c) -> bool {
return true;
const auto& bs = c->block_state.find(sb.id());
if (bs == c->block_state.end()) {
c->block_state.insert ((block_state){sb.id(),true,true,fc::time_point()});
return true;
}
return false;
});
}
}
}; // class net_plugin_impl
......@@ -955,31 +1054,6 @@ namespace eos {
} FC_CAPTURE_AND_RETHROW() }
void net_plugin::broadcast_block (const chain::signed_block &sb) {
if (my->send_whole_blocks) {
my->send_all (sb,[](connection_ptr c) -> bool { return true; });
return;
}
vector<transaction_id_type> trxs;
if (!sb.cycles.empty()) {
for (const auto& cyc : sb.cycles) {
for (const auto& thr : cyc) {
for (auto ui : thr.user_input) {
trxs.push_back (ui.id());
}
}
}
}
block_summary_message bsm = {sb.id(), trxs};
my->send_all (bsm,[sb](connection_ptr c) -> bool {
return true;
const auto& bs = c->block_state.find(sb.id());
if (bs == c->block_state.end()) {
c->block_state.insert ((block_state){sb.id(),true,true,fc::time_point()});
return true;
}
return false;
});
my->broadcast_block_impl (sb);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册