未验证 提交 f0f4fb2d 编写于 作者: D Daniel Larimer 提交者: GitHub

Merge pull request #1959 from brianjohnson5972/398-delayed-signed-txn-p2p

Fix Delayed Transactions for p2p
......@@ -33,8 +33,6 @@
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/algorithm/sort.hpp>
#include <boost/range/algorithm/find.hpp>
#include <boost/range/algorithm/remove_if.hpp>
#include <boost/range/algorithm/equal.hpp>
#include <fstream>
......@@ -276,46 +274,50 @@ transaction_trace chain_controller::_push_transaction(const packed_transaction&
if (!delay.sec_since_epoch()) {
result = _push_transaction(std::move(mtrx));
// notify anyone listening to pending transactions
on_pending_transaction(_pending_transaction_metas.back(), packed_trx);
_pending_block->input_transactions.emplace_back(packed_trx);
} else {
result.status = transaction_trace::delayed;
const auto trx = mtrx.trx();
FC_ASSERT( !trx.actions.empty(), "transaction must have at least one action");
FC_ASSERT( trx.expiration > (head_block_time() + fc::milliseconds(2*config::block_interval_ms)),
"transaction is expired when created" );
// add in the system account authorization
action for_deferred = trx.actions[0];
bool found = false;
for (const auto& auth : for_deferred.authorization) {
if (auth.actor == config::system_account_name &&
auth.permission == config::active_name) {
found = true;
break;
auto delayed_transaction_processing = [&](transaction_metadata& meta) {
result.status = transaction_trace::delayed;
const auto trx = mtrx.trx();
FC_ASSERT( !trx.actions.empty(), "transaction must have at least one action");
FC_ASSERT( trx.expiration > (head_block_time() + fc::milliseconds(2*config::block_interval_ms)),
"transaction is expired when created" );
// add in the system account authorization
action for_deferred = trx.actions[0];
bool found = false;
for (const auto& auth : for_deferred.authorization) {
if (auth.actor == config::system_account_name &&
auth.permission == config::active_name) {
found = true;
break;
}
}
}
if (!found)
for_deferred.authorization.push_back(permission_level{config::system_account_name, config::active_name});
if (!found)
for_deferred.authorization.push_back(permission_level{config::system_account_name, config::active_name});
apply_context context(*this, _db, for_deferred, mtrx);
apply_context context(*this, _db, for_deferred, mtrx);
time_point_sec execute_after = head_block_time();
execute_after += time_point_sec(delay);
deferred_transaction dtrx(context.get_next_sender_id(), config::system_account_name, execute_after, trx);
FC_ASSERT( dtrx.execute_after < dtrx.expiration, "transaction expires before it can execute" );
time_point_sec execute_after = head_block_time();
execute_after += time_point_sec(delay);
deferred_transaction dtrx(context.get_next_sender_id(), config::system_account_name, execute_after, trx);
FC_ASSERT( dtrx.execute_after < dtrx.expiration, "transaction expires before it can execute" );
result.deferred_transaction_requests.push_back(std::move(dtrx));
result.deferred_transaction_requests.push_back(std::move(dtrx));
// notify anyone listening to pending transactions
on_pending_transaction(std::move(mtrx), packed_trx);
store_deferred_transaction(result.deferred_transaction_requests[0].get<deferred_transaction>());
store_deferred_transaction(result.deferred_transaction_requests[0].get<deferred_transaction>());
return result;
};
result = wrap_transaction_processing(std::forward<transaction_metadata>(mtrx), delayed_transaction_processing);
}
// notify anyone listening to pending transactions
on_pending_transaction(_pending_transaction_metas.back(), packed_trx);
_pending_block->input_transactions.emplace_back(packed_trx);
return result;
} FC_CAPTURE_AND_RETHROW() }
......@@ -334,55 +336,19 @@ static void record_locks_for_data_access(const vector<action_trace>& action_trac
transaction_trace chain_controller::_push_transaction( transaction_metadata&& data )
{ try {
FC_ASSERT( _pending_block, " block not started" );
if (_limits.max_push_transaction_us.count() > 0) {
auto newval = fc::time_point::now() + _limits.max_push_transaction_us;
if ( !data.processing_deadline || newval < *data.processing_deadline ) {
data.processing_deadline = newval;
}
}
const transaction& trx = data.trx();
auto temp_session = _db.start_undo_session(true);
// for now apply the transaction serially but schedule it according to those invariants
validate_referenced_accounts(trx);
auto cyclenum = _pending_block->regions.back().cycles_summary.size() - 1;
/// TODO: move _pending_cycle into db so that it can be undone if transation fails, for now we will apply
/// the transaction first so that there is nothing to undo... this only works because things are currently
/// single threaded
// set cycle, shard, region etc
data.region_id = 0;
data.cycle_index = cyclenum;
data.shard_index = 0;
auto result = _apply_transaction( data );
auto& bcycle = _pending_block->regions.back().cycles_summary.back();
auto& bshard = bcycle.front();
record_locks_for_data_access(result.action_traces, bshard.read_locks, bshard.write_locks);
fc::deduplicate(bshard.read_locks);
fc::deduplicate(bshard.write_locks);
auto newend = boost::remove_if( bshard.read_locks,
[&]( const auto& l ){
return boost::find( bshard.write_locks, l ) != bshard.write_locks.end();
});
bshard.read_locks.erase( newend, bshard.read_locks.end() );
bshard.transactions.emplace_back( result );
_pending_cycle_trace->shard_traces.at(0).append(result);
// The transaction applied successfully. Merge its changes into the pending block session.
temp_session.squash();
_pending_transaction_metas.emplace_back(std::forward<transaction_metadata>(data));
return result;
auto process_apply_transaction = [&](transaction_metadata& meta) {
auto cyclenum = _pending_block->regions.back().cycles_summary.size() - 1;
/// TODO: move _pending_cycle into db so that it can be undone if transation fails, for now we will apply
/// the transaction first so that there is nothing to undo... this only works because things are currently
/// single threaded
// set cycle, shard, region etc
data.region_id = 0;
data.cycle_index = cyclenum;
data.shard_index = 0;
return _apply_transaction( data );
};
return wrap_transaction_processing( std::forward<transaction_metadata>(data), process_apply_transaction );
} FC_CAPTURE_AND_RETHROW() }
......
......@@ -12,6 +12,8 @@
#include <chainbase/chainbase.hpp>
#include <fc/scoped_exit.hpp>
#include <boost/range/algorithm/find.hpp>
#include <boost/range/algorithm/remove_if.hpp>
#include <boost/signals2/signal.hpp>
#include <eosio/chain/protocol.hpp>
......@@ -317,6 +319,52 @@ namespace eosio { namespace chain {
transaction_trace _apply_error( transaction_metadata& data );
vector<transaction_trace> _push_deferred_transactions( bool flush = false );
template<typename TransactionProcessing>
transaction_trace wrap_transaction_processing( transaction_metadata&& data, TransactionProcessing trx_processing )
{ try {
FC_ASSERT( _pending_block, " block not started" );
if (_limits.max_push_transaction_us.count() > 0) {
auto newval = fc::time_point::now() + _limits.max_push_transaction_us;
if ( !data.processing_deadline || newval < *data.processing_deadline ) {
data.processing_deadline = newval;
}
}
const transaction& trx = data.trx();
auto temp_session = _db.start_undo_session(true);
// for now apply the transaction serially but schedule it according to those invariants
validate_referenced_accounts(trx);
auto result = trx_processing(data);
auto& bcycle = _pending_block->regions.back().cycles_summary.back();
auto& bshard = bcycle.front();
record_locks_for_data_access(result.action_traces, bshard.read_locks, bshard.write_locks);
fc::deduplicate(bshard.read_locks);
fc::deduplicate(bshard.write_locks);
auto newend = boost::remove_if( bshard.read_locks,
[&]( const auto& l ){
return boost::find( bshard.write_locks, l ) != bshard.write_locks.end();
});
bshard.read_locks.erase( newend, bshard.read_locks.end() );
bshard.transactions.emplace_back( result );
_pending_cycle_trace->shard_traces.at(0).append(result);
// The transaction applied successfully. Merge its changes into the pending block session.
temp_session.squash();
_pending_transaction_metas.emplace_back(std::forward<transaction_metadata>(data));
return result;
} FC_CAPTURE_AND_RETHROW() }
/// Reset the object graph in-memory
void _initialize_indexes();
void _initialize_chain(contracts::chain_initializer& starter);
......
......@@ -2233,6 +2233,7 @@ namespace eosio {
for (const auto &recpt : shard.transactions) {
auto ltx = local_txns.get<by_id>().find(recpt.id);
switch (recpt.status) {
case transaction_receipt::delayed:
case transaction_receipt::executed: {
if( ltx != local_txns.end()) {
sb.input_transactions.push_back(ltx->packed_txn);
......@@ -2252,9 +2253,6 @@ namespace eosio {
}
break;
}
case transaction_receipt::delayed:
#warning TODO: Not sure what should happen here
break;
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册