From 4ee3b98384a4e2cc4a898ec91594db64c39225d8 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Wed, 22 Nov 2017 17:36:40 -0500 Subject: [PATCH] start implementing some scheduling of cycles --- libraries/chain/apply_context.cpp | 15 ++- libraries/chain/chain_controller.cpp | 119 ++++-------------- .../include/eosio/chain/chain_controller.hpp | 3 + .../chain/generated_transaction_object.hpp | 6 +- .../eosio/chain/pending_cycle_state.hpp | 69 ++++++++++ .../chain/include/eosio/chain/transaction.hpp | 4 +- 6 files changed, 117 insertions(+), 99 deletions(-) create mode 100644 libraries/chain/include/eosio/chain/pending_cycle_state.hpp diff --git a/libraries/chain/apply_context.cpp b/libraries/chain/apply_context.cpp index 09d344a92..200458085 100644 --- a/libraries/chain/apply_context.cpp +++ b/libraries/chain/apply_context.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace eosio { namespace chain { void apply_context::exec() @@ -109,7 +110,7 @@ void apply_context::deferred_transaction_start( uint32_t id, trx.expiration = execute_before; trx.execute_after = execute_after; trx.sender = receiver; ///< sender is the receiver of the current action - trx.id = id; + trx.sender_id = id; controller.validate_scope( trx ); @@ -134,6 +135,18 @@ void apply_context::deferred_transaction_send( uint32_t id ) { FC_ASSERT( dt.actions.size(), "transaction must contain at least one action" ); controller.check_authorization( dt, flat_set(), false, {receiver} ); auto itr = _pending_deferred_transactions.find( id ); + + mutable_db.create( [&]( auto& obj ) { + obj.trx_id = dt.id(); + obj.sender = receiver; + obj.sender_id = dt.sender_id; + obj.expiration = dt.expiration; + obj.delay_until = dt.execute_after; + obj.packed_trx.resize( fc::raw::pack_size( dt ) ); + fc::datastream ds( obj.packed_trx.data(), obj.packed_trx.size() ); + fc::raw::pack( ds, dt ); + }); + _pending_deferred_transactions.erase(itr); } diff --git a/libraries/chain/chain_controller.cpp b/libraries/chain/chain_controller.cpp index 7d941895d..02ccb8de2 100644 --- a/libraries/chain/chain_controller.cpp +++ b/libraries/chain/chain_controller.cpp @@ -265,9 +265,19 @@ void chain_controller::_push_transaction(const signed_transaction& trx) { validate_referenced_accounts(trx); check_transaction_authorization(trx); + auto shardnum = _pending_cycle.schedule( trx ); + if( shardnum == -1 ) { /// schedule conflict start new cycle + _start_cycle(); + } + + auto& bcycle = _pending_block->cycles_summary.back(); + if( shardnum >= bcycle.size() ) { + bcycle.resize( bcycle.size()+1 ); + bcycle.back().emplace_back( tid ); + } + _apply_transaction(trx); /** for now we will just shove everything into the first shard */ - _pending_block->cycles_summary[0][0].emplace_back( tid ); _pending_block->input_transactions.push_back(trx); // The transaction applied successfully. Merge its changes into the pending block session. @@ -277,6 +287,18 @@ void chain_controller::_push_transaction(const signed_transaction& trx) { on_pending_transaction(trx); } + +/** + * Wraps up all work for current shards, starts a new cycle, and + * executes any pending transactions + */ +void chain_controller::_start_cycle() { + _pending_block->cycles_summary.resize( _pending_block->cycles_summary.size() + 1 ); + _pending_cycle = pending_cycle_state(); + + /// TODO: check for deferred transactions and schedule them +} // _start_cycle + signed_block chain_controller::generate_block( block_timestamp_type when, account_name producer, @@ -614,86 +636,6 @@ void chain_controller::validate_expiration( const transaction& trx ) const } FC_CAPTURE_AND_RETHROW((trx)) } -/* -void chain_controller::process_message( const transaction& trx, account_name code, - const action& message, actionOutput& output, apply_context* parent_context) { - apply_context apply_ctx(*this, _db, trx, message, code); - apply_message(apply_ctx); - - output.notify.reserve( apply_ctx.notified.size() ); - - for( uint32_t i = 0; i < apply_ctx.notified.size(); ++i ) { - try { - auto notify_code = apply_ctx.notified[i]; - output.notify.push_back( {notify_code} ); - process_message(trx, notify_code, message, output.notify.back().output, &apply_ctx); - } FC_CAPTURE_AND_RETHROW((apply_ctx.notified[i])) - } - - // combine inline messages and process - if (apply_ctx.inline_messages.size() > 0) { - output.inline_transaction = Inlinetransaction(trx); - (*output.inline_transaction).messages = std::move(apply_ctx.inline_messages); - } - - for( auto& asynctrx : apply_ctx.deferred_transactions ) { - digest_type::encoder enc; fc::raw::pack( enc, trx ); fc::raw::pack( enc, asynctrx ); - auto id = enc.result(); - auto gtrx = Generatedtransaction(id, asynctrx); - - _db.create([&](generated_transaction_object& transaction) { - transaction.trx = gtrx; - transaction.status = generated_transaction_object::PENDING; - }); - - output.deferred_transactions.emplace_back( gtrx ); - } - - // propagate used_authorizations up the context chain - if (parent_context != nullptr) - for (int i = 0; i < apply_ctx.used_authorizations.size(); ++i) - if (apply_ctx.used_authorizations[i]) - parent_context->used_authorizations[i] = true; - - // process_message recurses for each notified account, but we only want to run this check at the top level - if (parent_context == nullptr && (_skip_flags & skip_authority_check) == false) - EOS_ASSERT(apply_ctx.all_authorizations_used(), tx_irrelevant_auth, - "action declared authorities it did not need: ${unused}", - ("unused", apply_ctx.unused_authorizations())("message", message)); -} -*/ - -/* -void chain_controller::apply_message(apply_context& context) -{ try { - /// context.code => the execution namespace - /// message.code / message.type => Event - const auto& m = context.msg; - auto contract_handlers_itr = apply_handlers.find(context.code); - if (contract_handlers_itr != apply_handlers.end()) { - auto message_handler_itr = contract_handlers_itr->second.find({m.code, m.type}); - if (message_handler_itr != contract_handlers_itr->second.end()) { - message_handler_itr->second(context); - return; - } - } - const auto& recipient = _db.get(context.code); - if (recipient.code.size()) { - //idump((context.code)(context.msg.type)); - const uint32_t execution_time = - _skip_flags | received_block - ? _rcvd_block_txn_execution_time - : _skip_flags | created_block - ? _create_block_txn_execution_time - : _txn_execution_time; - const bool is_received_block = _skip_flags & received_block; - wasm_interface::get().apply(context, execution_time, is_received_block); - } - -} FC_CAPTURE_AND_RETHROW((context.msg)) } -*/ - - void chain_controller::require_scope( const scope_name& scope )const { switch( uint64_t(scope) ) { case config::eosio_all_scope: @@ -802,26 +744,17 @@ void chain_controller::update_global_properties(const signed_block& b) { try { }); - /* - auto schedule = _admin->get_next_round(_db); - auto config = _admin->get_blockchain_configuration(_db, schedule); - _db.modify(gpo, [schedule = std::move(schedule), config = std::move(config)] (global_property_object& gpo) { - gpo.active_producers = std::move(schedule); - gpo.configuration = std::move(config); - }); - - auto active_producers_authority = types::Authority(config::ProducersAuthorityThreshold, {}, {}); + auto active_producers_authority = authority(config::producers_authority_threshold, {}, {}); for(auto& name : gpo.active_producers) { - active_producers_authority.accounts.push_back({{name, config::ActiveName}, 1}); + active_producers_authority.accounts.push_back({{name.producer_name, config::active_name}, 1}); } auto& po = _db.get( boost::make_tuple(config::producers_account_name, - config::active_level_name ) ); + config::active_name ) ); _db.modify(po,[active_producers_authority] (permission_object& po) { po.auth = active_producers_authority; }); - */ } } FC_CAPTURE_AND_RETHROW() } diff --git a/libraries/chain/include/eosio/chain/chain_controller.hpp b/libraries/chain/include/eosio/chain/chain_controller.hpp index a9993b92d..7aed27212 100644 --- a/libraries/chain/include/eosio/chain/chain_controller.hpp +++ b/libraries/chain/include/eosio/chain/chain_controller.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include @@ -392,6 +393,7 @@ namespace eosio { namespace chain { void _spinup_db(); void _spinup_fork_db(); + void _start_cycle(); // producer_schedule_type calculate_next_round( const signed_block& next_block ); @@ -402,6 +404,7 @@ namespace eosio { namespace chain { optional _pending_block_session; optional _pending_block; uint32_t _pending_transaction_count = 0; + pending_cycle_state _pending_cycle; bool _currently_applying_block = false; bool _currently_replaying_blocks = false; diff --git a/libraries/chain/include/eosio/chain/generated_transaction_object.hpp b/libraries/chain/include/eosio/chain/generated_transaction_object.hpp index cf2c0f9b3..269620a82 100644 --- a/libraries/chain/include/eosio/chain/generated_transaction_object.hpp +++ b/libraries/chain/include/eosio/chain/generated_transaction_object.hpp @@ -29,7 +29,7 @@ namespace eosio { namespace chain { id_type id; transaction_id_type trx_id; account_name sender; - uint64_t sender_id = 0; /// ID given this transaction by the sender + uint32_t sender_id = 0; /// ID given this transaction by the sender time_point delay_until; /// this generated transaction will not be applied until the specified time time_point expiration; /// this generated transaction will not be applied after this time shared_vector packed_trx; @@ -53,7 +53,7 @@ namespace eosio { namespace chain { >, ordered_unique< tag, composite_key< generated_transaction_object, - BOOST_MULTI_INDEX_MEMBER( generated_transaction_object, time_point, expiration), + BOOST_MULTI_INDEX_MEMBER( generated_transaction_object, time_point, delay_until), BOOST_MULTI_INDEX_MEMBER( generated_transaction_object, generated_transaction_object::id_type, id) > > @@ -61,7 +61,7 @@ namespace eosio { namespace chain { >; typedef chainbase::generic_index generated_transaction_index; -} } +} } // eosio::chain CHAINBASE_SET_INDEX_TYPE(eosio::chain::generated_transaction_object, eosio::chain::generated_transaction_multi_index) diff --git a/libraries/chain/include/eosio/chain/pending_cycle_state.hpp b/libraries/chain/include/eosio/chain/pending_cycle_state.hpp new file mode 100644 index 000000000..292448314 --- /dev/null +++ b/libraries/chain/include/eosio/chain/pending_cycle_state.hpp @@ -0,0 +1,69 @@ +#pragma once +#include + +namespace eosio { namespace chain { + + struct pending_cycle_state { + set read_scopes; + map write_scope_to_shard; + + /** + * @return the shard number this transation goes in or (-1) if it + * cannot go on any shard due to conflict + */ + uint32_t schedule( const transaction& trx ) { + uint32_t current_shard = -1; + + for( const auto& ws : trx.write_scope ) { + if( read_scopes.find(ws) != read_scopes.end() ) + return -1; + + auto itr = write_scope_to_shard.find(ws); + if( itr != write_scope_to_shard.end() ) { + if( current_shard == -1 ) { + current_shard = itr->second; + continue; + } + if( current_shard != itr->second ) + return -1; /// conflict detected + } + } + + for( const auto& rs : trx.read_scope ) + { + auto itr = write_scope_to_shard.find(rs); + if( itr != write_scope_to_shard.end() ) { + if( current_shard == -1 ) { + current_shard = itr->second; + continue; + } + if( current_shard != itr->second ) + return -1; /// schedule conflict + current_shard = itr->second; + } + } + + if( current_shard == -1 ) { + shards.resize( shards.size()+1 ); + current_shard = shards.size() - 1; + + for( auto ws : trx.write_scope ) + { + shards.back().write_scopes.insert( ws ); + write_scope_to_shard[ws] = current_shard; + } + for( auto rs : trx.read_scope ) + read_scopes.insert(rs); + } + return current_shard; + } /// schedule + + struct pending_shard { + set write_scopes; + }; + + vector shards; + }; + + +} } /// eosio::chain diff --git a/libraries/chain/include/eosio/chain/transaction.hpp b/libraries/chain/include/eosio/chain/transaction.hpp index e10fb065d..621b1723e 100644 --- a/libraries/chain/include/eosio/chain/transaction.hpp +++ b/libraries/chain/include/eosio/chain/transaction.hpp @@ -147,7 +147,7 @@ namespace eosio { namespace chain { */ struct deferred_transaction : public transaction { - uint32_t id; /// ID assigned by sender of generated, accessible via WASM api when executing normal or error + uint32_t sender_id; /// ID assigned by sender of generated, accessible via WASM api when executing normal or error account_name sender; /// receives error handler callback time_point_sec execute_after; /// delayed exeuction }; @@ -159,6 +159,6 @@ FC_REFLECT( eosio::chain::action, (scope)(name)(authorization)(data) ) FC_REFLECT( eosio::chain::transaction_header, (expiration)(region)(ref_block_num)(ref_block_prefix) ) FC_REFLECT_DERIVED( eosio::chain::transaction, (eosio::chain::transaction_header), (read_scope)(write_scope)(actions) ) FC_REFLECT_DERIVED( eosio::chain::signed_transaction, (eosio::chain::transaction), (signatures) ) -FC_REFLECT_DERIVED( eosio::chain::deferred_transaction, (eosio::chain::transaction), (id)(sender)(execute_after) ) +FC_REFLECT_DERIVED( eosio::chain::deferred_transaction, (eosio::chain::transaction), (sender_id)(sender)(execute_after) ) -- GitLab