From 130870f9ac5f0b22741cbc63e8dad377b3dcdd8d Mon Sep 17 00:00:00 2001 From: Bart Wyatt Date: Mon, 28 May 2018 17:28:40 -0400 Subject: [PATCH] provide an async implementation for incoming::transaction_async , buffer up the retry-able transactions. EOSIO/eos#3359 --- libraries/chain/controller.cpp | 4 + .../chain/include/eosio/chain/controller.hpp | 2 + plugins/chain_api_plugin/chain_api_plugin.cpp | 2 +- .../include/eosio/chain/plugin_interface.hpp | 5 +- plugins/chain_plugin/chain_plugin.cpp | 39 +++--- plugins/http_plugin/http_plugin.cpp | 4 + plugins/net_plugin/net_plugin.cpp | 7 +- plugins/producer_plugin/producer_plugin.cpp | 123 +++++++++++------- .../txn_test_gen_plugin.cpp | 2 +- 9 files changed, 119 insertions(+), 69 deletions(-) diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index bf8cb76da..ac1e447a4 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -1387,5 +1387,9 @@ void controller::validate_tapos( const transaction& trx )const { try { ("tapos_summary", tapos_block_summary)); } FC_CAPTURE_AND_RETHROW() } +bool controller::is_known_unexpired_transaction( const transaction_id_type& id) const { + return db().find(id); +} + } } /// eosio::chain diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index 0572a7338..d1371c669 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -148,6 +148,8 @@ namespace eosio { namespace chain { void validate_expiration( const transaction& t )const; void validate_tapos( const transaction& t )const; + bool is_known_unexpired_transaction( const transaction_id_type& id) const; + bool set_proposed_producers( vector producers ); bool skip_auth_check()const; diff --git a/plugins/chain_api_plugin/chain_api_plugin.cpp b/plugins/chain_api_plugin/chain_api_plugin.cpp index 8478b76fb..3b9bd4f21 100644 --- a/plugins/chain_api_plugin/chain_api_plugin.cpp +++ b/plugins/chain_api_plugin/chain_api_plugin.cpp @@ -52,7 +52,7 @@ struct async_result_visitor : public fc::visitor { [this, api_handle](string, string body, url_response_callback cb) mutable { \ if (body.empty()) body = "{}"; \ api_handle.call_name(fc::json::from_string(body).as(),\ - [cb, body](fc::static_variant result){\ + [cb, body](const fc::static_variant& result){\ if (result.contains()) {\ try {\ result.get()->dynamic_rethrow_exception();\ diff --git a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp index 4dacdb2f7..5656f2dc8 100644 --- a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp +++ b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp @@ -17,7 +17,7 @@ namespace eosio { namespace chain { namespace plugin_interface { using namespace appbase; template - using next_function = std::function)>; + using next_function = std::function&)>; struct chain_plugin_interface; @@ -48,8 +48,7 @@ namespace eosio { namespace chain { namespace plugin_interface { namespace methods { // synchronously push a block/trx to a single provider using block_sync = method_decl; - using transaction_sync = method_decl; - using transaction_async = method_decl)>; + using transaction_async = method_decl), first_provider_policy>; } } diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index d4d083717..cd8ae844a 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -68,7 +68,7 @@ public: ,accepted_confirmation_channel(app().get_channel()) ,incoming_block_channel(app().get_channel()) ,incoming_block_sync_method(app().get_method()) - ,incoming_transaction_sync_method(app().get_method()) + ,incoming_transaction_async_method(app().get_method()) {} bfs::path blocks_dir; @@ -95,8 +95,8 @@ public: incoming::channels::block::channel_type& incoming_block_channel; // retained references to methods for easy calling - incoming::methods::block_sync::method_type& incoming_block_sync_method; - incoming::methods::transaction_sync::method_type& incoming_transaction_sync_method; + incoming::methods::block_sync::method_type& incoming_block_sync_method; + incoming::methods::transaction_async::method_type& incoming_transaction_async_method; // method provider handles methods::get_block_by_number::method_type::handle get_block_by_number_provider; @@ -353,10 +353,7 @@ void chain_plugin::accept_block(const signed_block_ptr& block ) { } void chain_plugin::accept_transaction(const chain::packed_transaction& trx, next_function next) { - try { - auto trace = my->incoming_transaction_sync_method(std::make_shared(trx), false); - next(trace); - } CATCH_AND_CALL(next); + my->incoming_transaction_async_method(std::make_shared(trx), false, std::forward(next)); } bool chain_plugin::block_is_on_preferred_chain(const block_id_type& block_id) { @@ -672,7 +669,7 @@ fc::variant read_only::get_block(const read_only::get_block_params& params) cons void read_write::push_block(const read_write::push_block_params& params, next_function next) { try { - db.push_block( std::make_shared(params) ); + app().get_method()(std::make_shared(params)); next(read_write::push_block_results{}); } catch ( boost::interprocess::bad_alloc& ) { raise(SIGUSR1); @@ -680,8 +677,7 @@ void read_write::push_block(const read_write::push_block_params& params, next_fu } void read_write::push_transaction(const read_write::push_transaction_params& params, next_function next) { - chain::transaction_id_type id; - fc::variant pretty_output; + try { auto pretty_input = std::make_shared(); auto resolver = make_resolver(this); @@ -689,20 +685,31 @@ void read_write::push_transaction(const read_write::push_transaction_params& par abi_serializer::from_variant(params, *pretty_input, resolver); } EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception, "Invalid packed transaction") - auto trx_trace_ptr = app().get_method()(pretty_input, true); + app().get_method()(pretty_input, true, [this, next](const fc::static_variant& result) -> void{ + if (result.contains()) { + next(result.get()); + } else { + auto trx_trace_ptr = result.get(); + + try { + fc::variant pretty_output; + pretty_output = db.to_variant_with_abi(*trx_trace_ptr); + //abi_serializer::to_variant(*trx_trace_ptr, pretty_output, resolver); + + chain::transaction_id_type id = trx_trace_ptr->id; + next(read_write::push_transaction_results{id, pretty_output}); + } CATCH_AND_CALL(next); + } + }); - pretty_output = db.to_variant_with_abi( *trx_trace_ptr );; - //abi_serializer::to_variant(*trx_trace_ptr, pretty_output, resolver); - id = trx_trace_ptr->id; - next(read_write::push_transaction_results{ id, pretty_output }); } catch ( boost::interprocess::bad_alloc& ) { raise(SIGUSR1); } CATCH_AND_CALL(next); } static void push_recurse(read_write* rw, int index, const std::shared_ptr& params, const std::shared_ptr& results, const next_function& next) { - auto wrapped_next = [=](fc::static_variant result) { + auto wrapped_next = [=](const fc::static_variant& result) { if (result.contains()) { const auto& e = result.get(); results->emplace_back( read_write::push_transaction_results{ transaction_id_type(), fc::mutable_variant_object( "error", e->to_detail_string() ) } ); diff --git a/plugins/http_plugin/http_plugin.cpp b/plugins/http_plugin/http_plugin.cpp index 6eb378a9e..a094309f0 100644 --- a/plugins/http_plugin/http_plugin.cpp +++ b/plugins/http_plugin/http_plugin.cpp @@ -148,6 +148,7 @@ namespace eosio { elog( "${e}", ("e", err)); error_results results{websocketpp::http::status_code::internal_server_error, "Internal Service Error", fc::exception( FC_LOG_MESSAGE( error, e.what()))}; + con->set_body( fc::json::to_string( results )); } catch (...) { err += "Unknown Exception"; error_results results{websocketpp::http::status_code::internal_server_error, @@ -178,10 +179,13 @@ namespace eosio { auto resource = con->get_uri()->get_resource(); auto handler_itr = url_handlers.find( resource ); if( handler_itr != url_handlers.end()) { + con->defer_http_response(); handler_itr->second( resource, body, [con]( auto code, auto&& body ) { con->set_body( std::move( body )); con->set_status( websocketpp::http::status_code::value( code )); + con->send_http_response(); } ); + } else { wlog( "404 - not found: ${ep}", ("ep", resource)); error_results results{websocketpp::http::status_code::not_found, diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 3c999488b..808ad7152 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -2469,9 +2469,11 @@ namespace eosio { } dispatcher->recv_transaction(c, tid); uint64_t code = 0; - chain_plug->accept_transaction(msg, [=](static_variant result) { + chain_plug->accept_transaction(msg, [=](const static_variant& result) { if (result.contains()) { - elog("accept txn threw ${m}",("m",result.get()->to_detail_string())); + auto e_ptr = result.get(); + if (e_ptr->code() != tx_duplicate::code_value && e_ptr->code() != expired_tx_exception::code_value) + elog("accept txn threw ${m}",("m",result.get()->to_detail_string())); } else { auto trace = result.get(); if (!trace->except) { @@ -2683,6 +2685,7 @@ namespace eosio { transaction_id_type id = results.second->id(); if (results.first) { fc_ilog(logger,"signaled NACK, trx-id = ${id} : ${why}",("id", id)("why", results.first->to_detail_string())); + dispatcher->rejected_transaction(id); } else { fc_ilog(logger,"signaled ACK, trx-id = ${id}",("id", id)); dispatcher->bcast_transaction(*results.second); diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 6612515a9..8abe94ebd 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -85,6 +85,22 @@ enum class pending_block_mode { producing, speculating }; +#define CATCH_AND_CALL(NEXT)\ + catch ( const fc::exception& err ) {\ + NEXT(err.dynamic_copy_exception());\ + } catch ( const std::exception& e ) {\ + fc::exception fce( \ + FC_LOG_MESSAGE( warn, "rethrow ${what}: ", ("what",e.what())),\ + fc::std_exception_code,\ + BOOST_CORE_TYPEID(e).name(),\ + e.what() ) ;\ + NEXT(fce.dynamic_copy_exception());\ + } catch( ... ) {\ + fc::unhandled_exception e(\ + FC_LOG_MESSAGE(warn, "rethrow"),\ + std::current_exception());\ + NEXT(e.dynamic_copy_exception());\ + } class producer_plugin_impl : public std::enable_shared_from_this { public: @@ -127,8 +143,8 @@ class producer_plugin_impl : public std::enable_shared_from_thisid())); - return publish_results_of(trx, _transaction_ack_channel, [&]() -> transaction_trace_ptr { - while (true) { - chain::controller& chain = app().get_plugin().chain(); - auto block_time = chain.pending_block_state()->header.timestamp.to_time_point(); + std::vector>> _pending_incoming_transactions; - auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); - bool deadline_is_subjective = false; - if (_pending_block_mode == pending_block_mode::producing && block_time < deadline) { - deadline_is_subjective = true; - deadline = block_time; - } + void on_incoming_transaction_async(const packed_transaction_ptr& trx, bool persist_until_expired, next_function next) { + chain::controller& chain = app().get_plugin().chain(); + auto block_time = chain.pending_block_state()->header.timestamp.to_time_point(); - auto trace = chain.push_transaction(std::make_shared(*trx), deadline); + auto send_response = [this, &trx, &next](const fc::static_variant& response) { + next(response); + if (response.contains()) { + _transaction_ack_channel.publish(std::pair(response.get(), trx)); + } else { + _transaction_ack_channel.publish(std::pair(nullptr, trx)); + } + }; - if (trace->except) { - if (failure_is_subjective(*trace->except, deadline_is_subjective) ) { - // if we failed because the block was exhausted push the block out and try again if it succeeds - if (_pending_block_mode == pending_block_mode::producing ) { - fc_dlog(_log, "flushing block under production"); - - if (maybe_produce_block()) { - continue; - } - } else if (_pending_block_mode == pending_block_mode::speculating) { - fc_dlog(_log, "dropping block under speculation"); - - chain.abort_block(); - schedule_production_loop(); - continue; - } - - // if we failed to produce a block that was not speculative (for some reason). we are going to - // return the trace with an exception set to the caller. if they don't support any retry mechanics - // this may result in a lost transaction - } else { - trace->except->dynamic_rethrow_exception(); - } - } else if (persist_until_expired) { + auto id = trx->id(); + if( fc::time_point(trx->expiration()) < block_time ) { + send_response(std::static_pointer_cast(std::make_shared(FC_LOG_MESSAGE(error, "expired transaction ${id}", ("id", id)) ))); + return; + } + + if( chain.is_known_unexpired_transaction(id) ) { + send_response(std::static_pointer_cast(std::make_shared(FC_LOG_MESSAGE(error, "duplicate transaction ${id}", ("id", id)) ))); + return; + } + + auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); + bool deadline_is_subjective = false; + if (_pending_block_mode == pending_block_mode::producing && block_time < deadline) { + deadline_is_subjective = true; + deadline = block_time; + } + + try { + auto trace = chain.push_transaction(std::make_shared(*trx), deadline); + if (trace->except) { + if (failure_is_subjective(*trace->except, deadline_is_subjective)) { + _pending_incoming_transactions.emplace_back(trx, persist_until_expired, next); + } else { + auto e_ptr = trace->except->dynamic_copy_exception(); + send_response(e_ptr); + } + } else { + if (persist_until_expired) { // if this trx didnt fail/soft-fail and the persist flag is set, store its ID so that we can // ensure its applied to all future speculative blocks as well. _persistent_transactions.insert(transaction_id_with_expiry{trx->id(), trx->expiration()}); } - - return trace; + send_response(trace); } - }); + + } catch ( boost::interprocess::bad_alloc& ) { + raise(SIGUSR1); + } CATCH_AND_CALL(send_response); } + fc::microseconds get_irreversible_block_age() { auto now = fc::time_point::now(); if (now < _irreversible_block_time) { @@ -450,7 +473,7 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ my->_incoming_transaction_subscription = app().get_channel().subscribe([this](const packed_transaction_ptr& trx){ try { - my->on_incoming_transaction(trx); + my->on_incoming_transaction_async(trx, false, [](const auto&){}); } FC_LOG_AND_DROP(); }); @@ -458,8 +481,8 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ my->on_incoming_block(block); }); - my->_incoming_transaction_sync_provider = app().get_method().register_provider([this](const packed_transaction_ptr& trx, bool persist_until_expired) -> transaction_trace_ptr { - return my->on_incoming_transaction(trx, persist_until_expired); + my->_incoming_transaction_async_provider = app().get_method().register_provider([this](const packed_transaction_ptr& trx, bool persist_until_expired, next_function next) -> void { + return my->on_incoming_transaction_async(trx, persist_until_expired, next ); }); } FC_LOG_AND_RETHROW() } @@ -770,12 +793,20 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { } } FC_LOG_AND_DROP(); } - } if (exhausted) { return start_block_result::exhausted; } else { + // attempt to apply any pending incoming transactions + if (!_pending_incoming_transactions.empty()) { + auto old_pending = std::move(_pending_incoming_transactions); + _pending_incoming_transactions.clear(); + for (auto& e: old_pending) { + on_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); + } + } + return start_block_result::succeeded; } } diff --git a/plugins/txn_test_gen_plugin/txn_test_gen_plugin.cpp b/plugins/txn_test_gen_plugin/txn_test_gen_plugin.cpp index 8671ab0a2..780e51aac 100644 --- a/plugins/txn_test_gen_plugin/txn_test_gen_plugin.cpp +++ b/plugins/txn_test_gen_plugin/txn_test_gen_plugin.cpp @@ -90,7 +90,7 @@ using namespace eosio::chain; struct txn_test_gen_plugin_impl { static void push_next_transaction(const std::shared_ptr>& trxs, size_t index, const std::function& next ) { chain_plugin& cp = app().get_plugin(); - cp.accept_transaction( packed_transaction(trxs->at(index)), [=](fc::static_variant result){ + cp.accept_transaction( packed_transaction(trxs->at(index)), [=](const fc::static_variant& result){ if (result.contains()) { next(result.get()); } else { -- GitLab