From 250600803c6c2e3b8ff449ab614281c93a7d3f96 Mon Sep 17 00:00:00 2001 From: Bart Wyatt Date: Tue, 8 May 2018 14:24:53 -0400 Subject: [PATCH] - added compat signal which mimics the on_pending_transaction signal from master - this signal is fired whenever an incoming signal is accepted or rejected and indicates the result - moved check for stale_production to the block reception code since that now flows through producer_plugin --- .../include/eosio/chain/plugin_interface.hpp | 23 +++++-- plugins/chain_plugin/chain_plugin.cpp | 14 ++-- plugins/net_plugin/CMakeLists.txt | 4 +- plugins/net_plugin/net_plugin.cpp | 22 +++++- plugins/producer_plugin/producer_plugin.cpp | 68 +++++++++++++++---- 5 files changed, 102 insertions(+), 29 deletions(-) diff --git a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp index 088a74856..858b92f6d 100644 --- a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp +++ b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp @@ -26,8 +26,6 @@ namespace eosio { namespace chain { namespace plugin_interface { using applied_transaction = channel_decl; using accepted_confirmation = channel_decl; - using incoming_block = channel_decl; - using incoming_transaction = channel_decl; } namespace methods { @@ -36,10 +34,25 @@ namespace eosio { namespace chain { namespace plugin_interface { using get_head_block_id = method_decl; using get_last_irreversible_block_number = method_decl; + } + + namespace incoming { + namespace channels { + using block = channel_decl; + using transaction = channel_decl; + } + + namespace methods { + // synchronously push a block/trx to a single provider + using block_sync = method_decl; + using transaction_sync = method_decl; + } + } - // synchronously push a block/trx to a single provider - using incoming_block_sync = method_decl; - using incoming_transaction_sync = method_decl; + namespace compat { + namespace channels { + using transaction_ack = channel_decl>; + } } } } } \ No newline at end of file diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index 17aa19ec5..5ea59174f 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -45,9 +45,9 @@ public: ,accepted_transaction_channel(app().get_channel()) ,applied_transaction_channel(app().get_channel()) ,accepted_confirmation_channel(app().get_channel()) - ,incoming_block_channel(app().get_channel()) - ,incoming_block_sync_method(app().get_method()) - ,incoming_transaction_sync_method(app().get_method()) + ,incoming_block_channel(app().get_channel()) + ,incoming_block_sync_method(app().get_method()) + ,incoming_transaction_sync_method(app().get_method()) {} bfs::path block_log_dir; @@ -72,11 +72,11 @@ public: channels::accepted_transaction::channel_type& accepted_transaction_channel; channels::applied_transaction::channel_type& applied_transaction_channel; channels::accepted_confirmation::channel_type& accepted_confirmation_channel; - channels::incoming_block::channel_type& incoming_block_channel; + incoming::channels::block::channel_type& incoming_block_channel; // retained references to methods for easy calling - methods::incoming_block_sync::method_type& incoming_block_sync_method; - methods::incoming_transaction_sync::method_type& incoming_transaction_sync_method; + incoming::methods::block_sync::method_type& incoming_block_sync_method; + incoming::methods::transaction_sync::method_type& incoming_transaction_sync_method; // method provider handles methods::get_block_by_number::method_type::handle get_block_by_number_provider; @@ -454,7 +454,7 @@ read_write::push_transaction_results read_write::push_transaction(const read_wri abi_serializer::from_variant(params, *pretty_input, resolver); } EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception, "Invalid packed transaction") - auto trx_trace_ptr = app().get_method()(pretty_input); + auto trx_trace_ptr = app().get_method()(pretty_input); fc::variant pretty_output = db.to_variant_with_abi( *trx_trace_ptr );; //abi_serializer::to_variant(*trx_trace_ptr, pretty_output, resolver); diff --git a/plugins/net_plugin/CMakeLists.txt b/plugins/net_plugin/CMakeLists.txt index 5485d8344..5d6515319 100644 --- a/plugins/net_plugin/CMakeLists.txt +++ b/plugins/net_plugin/CMakeLists.txt @@ -1,7 +1,7 @@ -file(GLOB HEADERS "include/eosio/net_plugin/*.hpp") +file(GLOB HEADERS "include/eosio/net_plugin/*.hpp" ) add_library( net_plugin net_plugin.cpp ${HEADERS} ) target_link_libraries( net_plugin chain_plugin producer_plugin appbase fc ) -target_include_directories( net_plugin PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include ) +target_include_directories( net_plugin PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_CURRENT_SOURCE_DIR}/../chain_interface/include ) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 23b45fc22..613adc40c 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -28,6 +29,8 @@ #include #include +using namespace eosio::chain::plugin_interface::compat; + namespace fc { extern std::unordered_map& get_logger_map(); } @@ -189,6 +192,8 @@ namespace eosio { shared_ptr resolver; + channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription; + void connect( connection_ptr c ); void connect( connection_ptr c, tcp::resolver::iterator endpoint_itr ); void start_session( connection_ptr c ); @@ -208,6 +213,8 @@ namespace eosio { void applied_transaction(const transaction_trace_ptr&); void accepted_confirmation(const header_confirmation&); + void transaction_ack(const std::pair&); + bool is_valid( const handshake_message &msg); void handle_message( connection_ptr c, const handshake_message &msg); @@ -2597,7 +2604,7 @@ namespace eosio { void net_plugin_impl::accepted_transaction(const transaction_metadata_ptr& md) { fc_ilog(logger,"signaled, id = ${id}",("id", md->id)); - dispatcher->bcast_transaction(md->packed_trx); +// dispatcher->bcast_transaction(md->packed_trx); } void net_plugin_impl::applied_transaction(const transaction_trace_ptr& txn) { @@ -2608,6 +2615,16 @@ namespace eosio { fc_ilog(logger,"signaled, id = ${id}",("id", head.block_id)); } + void net_plugin_impl::transaction_ack(const std::pair& results) { + transaction_id_type id = results.second->id(); + if (results.first) { + fc_ilog(logger,"signaled NACK, trx-id = ${id} : ${why}",("id", id)("why", results.first->to_detail_string())); + } else { + fc_ilog(logger,"signaled ACK, trx-id = ${id}",("id", id)); + dispatcher->bcast_transaction(*results.second); + } + } + bool net_plugin_impl::authenticate_peer(const handshake_message& msg) const { if(allowed_connections == None) return false; @@ -2887,6 +2904,9 @@ namespace eosio { cc.applied_transaction.connect( boost::bind(&net_plugin_impl::applied_transaction, my.get(), _1)); cc.accepted_confirmation.connect( boost::bind(&net_plugin_impl::accepted_confirmation, my.get(), _1)); } + + my->incoming_transaction_ack_subscription = app().get_channel().subscribe(boost::bind(&net_plugin_impl::transaction_ack, my.get(), _1)); + my->start_monitors(); for( auto seed_node : my->supplied_peers ) { diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 3fe7f712c..23e95a4cb 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -32,6 +32,7 @@ class producer_plugin_impl { public: producer_plugin_impl(boost::asio::io_service& io) :_timer(io) + ,_transaction_ack_channel(app().get_channel()) {} void schedule_production_loop(); @@ -60,11 +61,13 @@ class producer_plugin_impl { producer_plugin* _self = nullptr; - channels::incoming_block::channel_type::handle _incoming_block_subscription; - channels::incoming_transaction::channel_type::handle _incoming_transaction_subscription; + incoming::channels::block::channel_type::handle _incoming_block_subscription; + incoming::channels::transaction::channel_type::handle _incoming_transaction_subscription; - methods::incoming_block_sync::method_type::handle _incoming_block_sync_provider; - methods::incoming_transaction_sync::method_type::handle _incoming_transaction_sync_provider; + compat::channels::transaction_ack::channel_type& _transaction_ack_channel; + + incoming::methods::block_sync::method_type::handle _incoming_block_sync_provider; + incoming::methods::transaction_sync::method_type::handle _incoming_transaction_sync_provider; void on_block( const block_state_ptr& bsp ) { if( bsp->header.timestamp <= _last_signed_block_time ) return; @@ -102,6 +105,40 @@ class producer_plugin_impl { } ) ); } + template + auto publish_results_of(const Type &data, Channel& channel, F f) { + auto publish_success = fc::make_scoped_exit([&, this](){ + channel.publish(std::pair(nullptr, data)); + }); + + try { + return f(); + } catch (const fc::exception& e) { + publish_success.cancel(); + channel.publish(std::pair(e.dynamic_copy_exception(), data)); + throw e; + } catch( const std::exception& e ) { + publish_success.cancel(); + auto fce = fc::exception( + FC_LOG_MESSAGE( info, "Caught std::exception: ${what}", ("what",e.what())), + fc::std_exception_code, + BOOST_CORE_TYPEID(e).name(), + e.what() + ); + channel.publish(std::pair(fce.dynamic_copy_exception(),data)); + throw fce; + } catch( ... ) { + publish_success.cancel(); + auto fce = fc::unhandled_exception( + FC_LOG_MESSAGE( info, "Caught unknown exception"), + std::current_exception() + ); + + channel.publish(std::pair(fce.dynamic_copy_exception(), data)); + throw fce; + } + }; + void on_incoming_block(const signed_block_ptr& block) { chain::controller& chain = app().get_plugin().chain(); // abort the pending block @@ -112,9 +149,13 @@ class producer_plugin_impl { // restart our production loop schedule_production_loop(); }); + // push the new block chain.push_block(block); + if( chain.head_block_state()->header.timestamp.next().to_time_point() >= fc::time_point::now() ) + _production_enabled = true; + ilog("Received block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, confirmed: ${confs}]", ("p",block->producer)("id",fc::variant(block->id()).as_string().substr(0,16)) ("n",block_header::num_from_id(block->id()))("t",block->timestamp) @@ -123,8 +164,10 @@ class producer_plugin_impl { } transaction_trace_ptr on_incoming_transaction(const packed_transaction_ptr& trx) { - chain::controller& chain = app().get_plugin().chain(); - return chain.sync_push(std::make_shared(*trx), fc::time_point::now() + fc::milliseconds(_max_pending_transaction_time_ms)); + return publish_results_of(trx, _transaction_ack_channel, [&]{ + chain::controller& chain = app().get_plugin().chain(); + return chain.sync_push(std::make_shared(*trx), fc::time_point::now() + fc::milliseconds(_max_pending_transaction_time_ms)); + }); } bool start_block(); @@ -240,23 +283,23 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ my->_max_pending_transaction_time_ms = options.at("max-pending-transaction-time").as(); - my->_incoming_block_subscription = app().get_channel().subscribe([this](const signed_block_ptr& block){ + my->_incoming_block_subscription = app().get_channel().subscribe([this](const signed_block_ptr& block){ try { my->on_incoming_block(block); } FC_LOG_AND_DROP(); }); - my->_incoming_transaction_subscription = app().get_channel().subscribe([this](const packed_transaction_ptr& trx){ + my->_incoming_transaction_subscription = app().get_channel().subscribe([this](const packed_transaction_ptr& trx){ try { my->on_incoming_transaction(trx); } FC_LOG_AND_DROP(); }); - my->_incoming_block_sync_provider = app().get_method().register_provider([this](const signed_block_ptr& block){ + my->_incoming_block_sync_provider = app().get_method().register_provider([this](const signed_block_ptr& block){ my->on_incoming_block(block); }); - my->_incoming_transaction_sync_provider = app().get_method().register_provider([this](const packed_transaction_ptr& trx) -> transaction_trace_ptr { + my->_incoming_transaction_sync_provider = app().get_method().register_provider([this](const packed_transaction_ptr& trx) -> transaction_trace_ptr { return my->on_incoming_transaction(trx); }); @@ -457,10 +500,7 @@ block_production_condition::block_production_condition_enum producer_plugin_impl // If the next block production opportunity is in the present or future, we're synced. if( !_production_enabled ) { - if( hbs->header.timestamp.next().to_time_point() >= now ) - _production_enabled = true; - else - return block_production_condition::not_synced; + return block_production_condition::not_synced; } auto pending_block_timestamp = chain.pending_block_state()->header.timestamp; -- GitLab