提交 f86de996 编写于 作者: B Bart Wyatt

interim checkin to switch gears

上级 7e171f3d
...@@ -23,8 +23,8 @@ void apply_context::exec_one() ...@@ -23,8 +23,8 @@ void apply_context::exec_one()
} }
// create a receipt for this // create a receipt for this
results.applied_actions.emplace_back(action_result {receiver, act, move(_pending_console_output)}); results.applied_actions.emplace_back(action_result {receiver, act, move(_pending_console_output.str())});
_pending_console_output.clear(); _pending_console_output = std::ostringstream();
} }
void apply_context::exec() void apply_context::exec()
...@@ -146,9 +146,4 @@ void apply_context::deferred_transaction_send( uint32_t id ) { ...@@ -146,9 +146,4 @@ void apply_context::deferred_transaction_send( uint32_t id ) {
// _pending_deferred_transactions.erase(itr); // _pending_deferred_transactions.erase(itr);
} }
void apply_context::append_console(const string&& output) {
_pending_console_output.emplace_back(move(output));
}
} } /// eosio::chain } } /// eosio::chain
...@@ -261,22 +261,33 @@ transaction_result chain_controller::_push_transaction(const signed_transaction& ...@@ -261,22 +261,33 @@ transaction_result chain_controller::_push_transaction(const signed_transaction&
auto temp_session = _db.start_undo_session(true); auto temp_session = _db.start_undo_session(true);
auto tid = trx.id(); // for now apply the transaction serially but schedule it according to those invariants
validate_referenced_accounts(trx); validate_referenced_accounts(trx);
check_transaction_authorization(trx); check_transaction_authorization(trx);
auto result = _apply_transaction(trx);
auto tid = trx.id();
auto shardnum = _pending_cycle.schedule( trx ); auto shardnum = _pending_cycle.schedule( trx );
if( shardnum == -1 ) { /// schedule conflict start new cycle if( shardnum == -1 ) { /// schedule conflict start new cycle
_finalize_cycle();
_start_cycle(); _start_cycle();
shardnum = _pending_cycle.schedule( trx );
} }
auto& bcycle = _pending_block->cycles_summary.back(); auto& bcycle = _pending_block->cycles_summary.back();
if( shardnum >= bcycle.size() ) { if( shardnum >= bcycle.size() ) {
bcycle.resize( bcycle.size()+1 ); bcycle.resize( bcycle.size()+1 );
bcycle.back().emplace_back( tid );
} }
auto result = _apply_transaction(trx); bcycle.at(shardnum).emplace_back( tid );
if (shardnum >= _pending_shard_results.size()) {
_pending_shard_results.resize(_pending_shard_results.size() + 1);
}
_pending_shard_results.at(shardnum).append(result);
/** for now we will just shove everything into the first shard */ /** for now we will just shove everything into the first shard */
_pending_block->input_transactions.push_back(trx); _pending_block->input_transactions.push_back(trx);
...@@ -297,10 +308,49 @@ transaction_result chain_controller::_push_transaction(const signed_transaction& ...@@ -297,10 +308,49 @@ transaction_result chain_controller::_push_transaction(const signed_transaction&
void chain_controller::_start_cycle() { void chain_controller::_start_cycle() {
_pending_block->cycles_summary.resize( _pending_block->cycles_summary.size() + 1 ); _pending_block->cycles_summary.resize( _pending_block->cycles_summary.size() + 1 );
_pending_cycle = pending_cycle_state(); _pending_cycle = pending_cycle_state();
_pending_shard_results.clear();
/// TODO: check for deferred transactions and schedule them /// TODO: check for deferred transactions and schedule them
} // _start_cycle } // _start_cycle
void chain_controller::_finalize_cycle()
{
for (const auto& res: _pending_shard_results) {
_apply_shard_results(res);
}
}
void chain_controller::_apply_shard_results( const shard_result& res )
{
for (const auto& tr: res.transaction_results) {
for (const auto &dt: tr.deferred_transactions) {
_db.create<generated_transaction_object>([&](auto &obj) {
obj.trx_id = dt.id();
obj.sender = dt.sender;
obj.sender_id = dt.sender_id;
obj.expiration = dt.expiration;
obj.delay_until = dt.execute_after;
obj.packed_trx.resize(fc::raw::pack_size(dt));
fc::datastream<char *> ds(obj.packed_trx.data(), obj.packed_trx.size());
fc::raw::pack(ds, dt);
});
}
///TODO: hook this up as a signal handler in a de-coupled "logger" that may just silently drop them
for (const auto &ar : tr.action_results) {
auto prefix = fc::format_string(
"[(${s},${a})->${r}]",
fc::mutable_variant_object()
("s", ar.act.scope)
("a", ar.act.name)
("r", ar.receiver));
std::cerr << prefix << ": CONSOLE OUTPUT BEGIN =====================" << std::endl;
std::cerr << ar.console
std::cerr << prefix << ": CONSOLE OUTPUT END =====================" << std::endl;
}
}
}
signed_block chain_controller::generate_block( signed_block chain_controller::generate_block(
block_timestamp_type when, block_timestamp_type when,
account_name producer, account_name producer,
...@@ -438,33 +488,17 @@ void chain_controller::__apply_block(const signed_block& next_block) ...@@ -438,33 +488,17 @@ void chain_controller::__apply_block(const signed_block& next_block)
} }
for (const auto& cycle : next_block.cycles_summary) { for (const auto& cycle : next_block.cycles_summary) {
vector<shard_result> results;
results.reserve(cycle.size());
for (const auto& shard: cycle) { for (const auto& shard: cycle) {
results.emplace_back();
auto& result = results.back();
for (const auto& receipt : shard) { for (const auto& receipt : shard) {
if( receipt.status == transaction_receipt::executed ) { if( receipt.status == transaction_receipt::executed ) {
auto itr = trx_index.find(receipt.id); auto itr = trx_index.find(receipt.id);
if( itr != trx_index.end() ) { if( itr != trx_index.end() ) {
auto result = _apply_transaction( *itr->second ); result.append(move(_apply_transaction( *itr->second )));
///TODO: make more parallel accumulating shard output and reducing at cycle level
for (const auto& dt: result.deferred_transactions) {
_db.create<generated_transaction_object>([&](auto &obj) {
obj.trx_id = dt.id();
obj.sender = dt.sender;
obj.sender_id = dt.sender_id;
obj.expiration = dt.expiration;
obj.delay_until = dt.execute_after;
obj.packed_trx.resize(fc::raw::pack_size(dt));
fc::datastream<char *> ds(obj.packed_trx.data(), obj.packed_trx.size());
fc::raw::pack(ds, dt);
});
}
///TODO: hook this up as a signal handler in a de-coupled "logger" that may just silently drop them
for (const auto& ar : result.action_results) {
auto prefix = fc::format_string("[(${s},${a})->${r}](console): ", fc::mutable_variant_object()("s",ar.act.scope)("a", ar.act.name)("r", ar.receiver));
for (const auto& msg: ar.console) {
std::cerr << prefix << msg << std::endl;
}
}
} }
else else
{ {
...@@ -477,6 +511,10 @@ void chain_controller::__apply_block(const signed_block& next_block) ...@@ -477,6 +511,10 @@ void chain_controller::__apply_block(const signed_block& next_block)
// check_transaction_authorization(trx, true); // check_transaction_authorization(trx, true);
} /// for each transaction id } /// for each transaction id
} /// for each shard } /// for each shard
for (const auto res: results) {
_apply_shard_results(res);
}
} /// for each cycle } /// for each cycle
_finalize_block( next_block ); _finalize_block( next_block );
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <eosio/chain/transaction.hpp> #include <eosio/chain/transaction.hpp>
#include <eosio/chain/record_functions.hpp> #include <eosio/chain/record_functions.hpp>
#include <fc/utility.hpp> #include <fc/utility.hpp>
#include <sstream>
namespace chainbase { class database; } namespace chainbase { class database; }
...@@ -102,12 +103,6 @@ class apply_context { ...@@ -102,12 +103,6 @@ class apply_context {
*/ */
bool has_recipient(account_name account)const; bool has_recipient(account_name account)const;
/**
* Append a statement printed from the contract to the output log
*/
void append_console(const string&& output);
bool all_authorizations_used()const; bool all_authorizations_used()const;
vector<permission_level> unused_authorizations()const; vector<permission_level> unused_authorizations()const;
...@@ -189,7 +184,7 @@ class apply_context { ...@@ -189,7 +184,7 @@ class apply_context {
vector<account_name> _notified; ///< keeps track of new accounts to be notifed of current message vector<account_name> _notified; ///< keeps track of new accounts to be notifed of current message
vector<action> _inline_actions; ///< queued inline messages vector<action> _inline_actions; ///< queued inline messages
map<uint32_t,deferred_transaction> _pending_deferred_transactions; ///< deferred txs /// TODO specify when map<uint32_t,deferred_transaction> _pending_deferred_transactions; ///< deferred txs /// TODO specify when
vector<string> _pending_console_output; std::ostringstream _pending_console_output;
}; };
using apply_handler = std::function<void(apply_context&)>; using apply_handler = std::function<void(apply_context&)>;
......
...@@ -286,7 +286,19 @@ namespace eosio { namespace chain { ...@@ -286,7 +286,19 @@ namespace eosio { namespace chain {
flat_set<account_name> provided_accounts = flat_set<account_name>() flat_set<account_name> provided_accounts = flat_set<account_name>()
)const; )const;
private: private:
struct shard_result {
vector<transaction_result> transaction_results;
void append(transaction_result &&res) {
transaction_results.emplace_back(move(res));
}
void append(const transaction_result &res) {
transaction_results.emplace_back(res);
}
};
const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const; const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const;
...@@ -308,6 +320,7 @@ namespace eosio { namespace chain { ...@@ -308,6 +320,7 @@ namespace eosio { namespace chain {
transaction_result _apply_transaction( const transaction& trx ); transaction_result _apply_transaction( const transaction& trx );
void _finalize_block( const signed_block& b ); void _finalize_block( const signed_block& b );
void _apply_shard_results( const shard_result& res );
/// Reset the object graph in-memory /// Reset the object graph in-memory
void _initialize_indexes(); void _initialize_indexes();
...@@ -395,6 +408,8 @@ namespace eosio { namespace chain { ...@@ -395,6 +408,8 @@ namespace eosio { namespace chain {
void _spinup_db(); void _spinup_db();
void _spinup_fork_db(); void _spinup_fork_db();
void _start_cycle(); void _start_cycle();
void _append_shard_results(int shard_num, const transaction_result& result);
void _finalize_cycle();
// producer_schedule_type calculate_next_round( const signed_block& next_block ); // producer_schedule_type calculate_next_round( const signed_block& next_block );
...@@ -406,6 +421,7 @@ namespace eosio { namespace chain { ...@@ -406,6 +421,7 @@ namespace eosio { namespace chain {
optional<signed_block> _pending_block; optional<signed_block> _pending_block;
uint32_t _pending_transaction_count = 0; uint32_t _pending_transaction_count = 0;
pending_cycle_state _pending_cycle; pending_cycle_state _pending_cycle;
vector<shard_result> _pending_shard_results;
bool _currently_applying_block = false; bool _currently_applying_block = false;
bool _currently_replaying_blocks = false; bool _currently_replaying_blocks = false;
......
...@@ -155,7 +155,7 @@ namespace eosio { namespace chain { ...@@ -155,7 +155,7 @@ namespace eosio { namespace chain {
struct action_result { struct action_result {
account_name receiver; account_name receiver;
action act; action act;
vector<string> console; string console;
}; };
struct transaction_result : transaction_receipt { struct transaction_result : transaction_receipt {
......
...@@ -663,32 +663,32 @@ class console_api : public context_aware_api { ...@@ -663,32 +663,32 @@ class console_api : public context_aware_api {
using context_aware_api::context_aware_api; using context_aware_api::context_aware_api;
void prints(const char *str) { void prints(const char *str) {
context.append_console(string(str)); context._pending_console_output << string(str) << std::endl;
} }
void prints_l(array_ptr<const char> str, size_t str_len ) { void prints_l(array_ptr<const char> str, size_t str_len ) {
context.append_console(string(str, str_len)); context._pending_console_output << string(str, str_len) << std::endl;
} }
void printi(uint64_t val) { void printi(uint64_t val) {
context.append_console(to_string(val)); context._pending_console_output << to_string(val) << std::endl;
} }
void printi128(const unsigned __int128& val) { void printi128(const unsigned __int128& val) {
fc::uint128_t v(val>>64, uint64_t(val) ); fc::uint128_t v(val>>64, uint64_t(val) );
context.append_console(string(v)); context._pending_console_output << string(v) << std::endl;
} }
void printd( wasm_double val ) { void printd( wasm_double val ) {
context.append_console(string(val.str())); context._pending_console_output << string(val.str()) << std::endl;
} }
void printn(const name& value) { void printn(const name& value) {
context.append_console(value.to_string()); context._pending_console_output << value.to_string() << std::endl;
} }
void printhex(array_ptr<const char> data, size_t data_len ) { void printhex(array_ptr<const char> data, size_t data_len ) {
context.append_console(fc::to_hex(data, data_len)); context._pending_console_output << fc::to_hex(data, data_len) << std::endl;
} }
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册