提交 130870f9 编写于 作者: B Bart Wyatt

provide an async implementation for incoming::transaction_async , buffer up...

provide an async implementation for incoming::transaction_async , buffer up the retry-able transactions.  EOSIO/eos#3359
上级 0f46f2b2
......@@ -1387,5 +1387,9 @@ void controller::validate_tapos( const transaction& trx )const { try {
("tapos_summary", tapos_block_summary));
} FC_CAPTURE_AND_RETHROW() }
bool controller::is_known_unexpired_transaction( const transaction_id_type& id) const {
return db().find<transaction_object, by_trx_id>(id);
}
} } /// eosio::chain
......@@ -148,6 +148,8 @@ namespace eosio { namespace chain {
void validate_expiration( const transaction& t )const;
void validate_tapos( const transaction& t )const;
bool is_known_unexpired_transaction( const transaction_id_type& id) const;
bool set_proposed_producers( vector<producer_key> producers );
bool skip_auth_check()const;
......
......@@ -52,7 +52,7 @@ struct async_result_visitor : public fc::visitor<std::string> {
[this, api_handle](string, string body, url_response_callback cb) mutable { \
if (body.empty()) body = "{}"; \
api_handle.call_name(fc::json::from_string(body).as<api_namespace::call_name ## _params>(),\
[cb, body](fc::static_variant<fc::exception_ptr, call_result> result){\
[cb, body](const fc::static_variant<fc::exception_ptr, call_result>& result){\
if (result.contains<fc::exception_ptr>()) {\
try {\
result.get<fc::exception_ptr>()->dynamic_rethrow_exception();\
......
......@@ -17,7 +17,7 @@ namespace eosio { namespace chain { namespace plugin_interface {
using namespace appbase;
template<typename T>
using next_function = std::function<void(fc::static_variant<fc::exception_ptr, T>)>;
using next_function = std::function<void(const fc::static_variant<fc::exception_ptr, T>&)>;
struct chain_plugin_interface;
......@@ -48,8 +48,7 @@ namespace eosio { namespace chain { namespace plugin_interface {
namespace methods {
// synchronously push a block/trx to a single provider
using block_sync = method_decl<chain_plugin_interface, void(const signed_block_ptr&), first_provider_policy>;
using transaction_sync = method_decl<chain_plugin_interface, transaction_trace_ptr(const packed_transaction_ptr&, bool), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, next_function<transaction_trace_ptr>)>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
......
......@@ -68,7 +68,7 @@ public:
,accepted_confirmation_channel(app().get_channel<channels::accepted_confirmation>())
,incoming_block_channel(app().get_channel<incoming::channels::block>())
,incoming_block_sync_method(app().get_method<incoming::methods::block_sync>())
,incoming_transaction_sync_method(app().get_method<incoming::methods::transaction_sync>())
,incoming_transaction_async_method(app().get_method<incoming::methods::transaction_async>())
{}
bfs::path blocks_dir;
......@@ -96,7 +96,7 @@ public:
// retained references to methods for easy calling
incoming::methods::block_sync::method_type& incoming_block_sync_method;
incoming::methods::transaction_sync::method_type& incoming_transaction_sync_method;
incoming::methods::transaction_async::method_type& incoming_transaction_async_method;
// method provider handles
methods::get_block_by_number::method_type::handle get_block_by_number_provider;
......@@ -353,10 +353,7 @@ void chain_plugin::accept_block(const signed_block_ptr& block ) {
}
void chain_plugin::accept_transaction(const chain::packed_transaction& trx, next_function<chain::transaction_trace_ptr> next) {
try {
auto trace = my->incoming_transaction_sync_method(std::make_shared<packed_transaction>(trx), false);
next(trace);
} CATCH_AND_CALL(next);
my->incoming_transaction_async_method(std::make_shared<packed_transaction>(trx), false, std::forward<decltype(next)>(next));
}
bool chain_plugin::block_is_on_preferred_chain(const block_id_type& block_id) {
......@@ -672,7 +669,7 @@ fc::variant read_only::get_block(const read_only::get_block_params& params) cons
void read_write::push_block(const read_write::push_block_params& params, next_function<read_write::push_block_results> next) {
try {
db.push_block( std::make_shared<signed_block>(params) );
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>(params));
next(read_write::push_block_results{});
} catch ( boost::interprocess::bad_alloc& ) {
raise(SIGUSR1);
......@@ -680,8 +677,7 @@ void read_write::push_block(const read_write::push_block_params& params, next_fu
}
void read_write::push_transaction(const read_write::push_transaction_params& params, next_function<read_write::push_transaction_results> next) {
chain::transaction_id_type id;
fc::variant pretty_output;
try {
auto pretty_input = std::make_shared<packed_transaction>();
auto resolver = make_resolver(this);
......@@ -689,20 +685,31 @@ void read_write::push_transaction(const read_write::push_transaction_params& par
abi_serializer::from_variant(params, *pretty_input, resolver);
} EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception, "Invalid packed transaction")
auto trx_trace_ptr = app().get_method<incoming::methods::transaction_sync>()(pretty_input, true);
app().get_method<incoming::methods::transaction_async>()(pretty_input, true, [this, next](const fc::static_variant<fc::exception_ptr, transaction_trace_ptr>& result) -> void{
if (result.contains<fc::exception_ptr>()) {
next(result.get<fc::exception_ptr>());
} else {
auto trx_trace_ptr = result.get<transaction_trace_ptr>();
pretty_output = db.to_variant_with_abi( *trx_trace_ptr );;
try {
fc::variant pretty_output;
pretty_output = db.to_variant_with_abi(*trx_trace_ptr);
//abi_serializer::to_variant(*trx_trace_ptr, pretty_output, resolver);
id = trx_trace_ptr->id;
next(read_write::push_transaction_results{ id, pretty_output });
chain::transaction_id_type id = trx_trace_ptr->id;
next(read_write::push_transaction_results{id, pretty_output});
} CATCH_AND_CALL(next);
}
});
} catch ( boost::interprocess::bad_alloc& ) {
raise(SIGUSR1);
} CATCH_AND_CALL(next);
}
static void push_recurse(read_write* rw, int index, const std::shared_ptr<read_write::push_transactions_params>& params, const std::shared_ptr<read_write::push_transactions_results>& results, const next_function<read_write::push_transactions_results>& next) {
auto wrapped_next = [=](fc::static_variant<fc::exception_ptr, read_write::push_transaction_results> result) {
auto wrapped_next = [=](const fc::static_variant<fc::exception_ptr, read_write::push_transaction_results>& result) {
if (result.contains<fc::exception_ptr>()) {
const auto& e = result.get<fc::exception_ptr>();
results->emplace_back( read_write::push_transaction_results{ transaction_id_type(), fc::mutable_variant_object( "error", e->to_detail_string() ) } );
......
......@@ -148,6 +148,7 @@ namespace eosio {
elog( "${e}", ("e", err));
error_results results{websocketpp::http::status_code::internal_server_error,
"Internal Service Error", fc::exception( FC_LOG_MESSAGE( error, e.what()))};
con->set_body( fc::json::to_string( results ));
} catch (...) {
err += "Unknown Exception";
error_results results{websocketpp::http::status_code::internal_server_error,
......@@ -178,10 +179,13 @@ namespace eosio {
auto resource = con->get_uri()->get_resource();
auto handler_itr = url_handlers.find( resource );
if( handler_itr != url_handlers.end()) {
con->defer_http_response();
handler_itr->second( resource, body, [con]( auto code, auto&& body ) {
con->set_body( std::move( body ));
con->set_status( websocketpp::http::status_code::value( code ));
con->send_http_response();
} );
} else {
wlog( "404 - not found: ${ep}", ("ep", resource));
error_results results{websocketpp::http::status_code::not_found,
......
......@@ -2469,8 +2469,10 @@ namespace eosio {
}
dispatcher->recv_transaction(c, tid);
uint64_t code = 0;
chain_plug->accept_transaction(msg, [=](static_variant<fc::exception_ptr, transaction_trace_ptr> result) {
chain_plug->accept_transaction(msg, [=](const static_variant<fc::exception_ptr, transaction_trace_ptr>& result) {
if (result.contains<fc::exception_ptr>()) {
auto e_ptr = result.get<fc::exception_ptr>();
if (e_ptr->code() != tx_duplicate::code_value && e_ptr->code() != expired_tx_exception::code_value)
elog("accept txn threw ${m}",("m",result.get<fc::exception_ptr>()->to_detail_string()));
} else {
auto trace = result.get<transaction_trace_ptr>();
......@@ -2683,6 +2685,7 @@ namespace eosio {
transaction_id_type id = results.second->id();
if (results.first) {
fc_ilog(logger,"signaled NACK, trx-id = ${id} : ${why}",("id", id)("why", results.first->to_detail_string()));
dispatcher->rejected_transaction(id);
} else {
fc_ilog(logger,"signaled ACK, trx-id = ${id}",("id", id));
dispatcher->bcast_transaction(*results.second);
......
......@@ -85,6 +85,22 @@ enum class pending_block_mode {
producing,
speculating
};
#define CATCH_AND_CALL(NEXT)\
catch ( const fc::exception& err ) {\
NEXT(err.dynamic_copy_exception());\
} catch ( const std::exception& e ) {\
fc::exception fce( \
FC_LOG_MESSAGE( warn, "rethrow ${what}: ", ("what",e.what())),\
fc::std_exception_code,\
BOOST_CORE_TYPEID(e).name(),\
e.what() ) ;\
NEXT(fce.dynamic_copy_exception());\
} catch( ... ) {\
fc::unhandled_exception e(\
FC_LOG_MESSAGE(warn, "rethrow"),\
std::current_exception());\
NEXT(e.dynamic_copy_exception());\
}
class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin_impl> {
public:
......@@ -128,7 +144,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
compat::channels::transaction_ack::channel_type& _transaction_ack_channel;
incoming::methods::block_sync::method_type::handle _incoming_block_sync_provider;
incoming::methods::transaction_sync::method_type::handle _incoming_transaction_sync_provider;
incoming::methods::transaction_async::method_type::handle _incoming_transaction_async_provider;
transaction_id_with_expiry_index _blacklisted_transactions;
......@@ -259,13 +275,32 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}
transaction_trace_ptr on_incoming_transaction(const packed_transaction_ptr& trx, bool persist_until_expired = false) {
fc_dlog(_log, "received incoming transaction ${id}", ("id", trx->id()));
return publish_results_of(trx, _transaction_ack_channel, [&]() -> transaction_trace_ptr {
while (true) {
std::vector<std::tuple<packed_transaction_ptr, bool, next_function<transaction_trace_ptr>>> _pending_incoming_transactions;
void on_incoming_transaction_async(const packed_transaction_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
auto block_time = chain.pending_block_state()->header.timestamp.to_time_point();
auto send_response = [this, &trx, &next](const fc::static_variant<fc::exception_ptr, transaction_trace_ptr>& response) {
next(response);
if (response.contains<fc::exception_ptr>()) {
_transaction_ack_channel.publish(std::pair<fc::exception_ptr, packed_transaction_ptr>(response.get<fc::exception_ptr>(), trx));
} else {
_transaction_ack_channel.publish(std::pair<fc::exception_ptr, packed_transaction_ptr>(nullptr, trx));
}
};
auto id = trx->id();
if( fc::time_point(trx->expiration()) < block_time ) {
send_response(std::static_pointer_cast<fc::exception>(std::make_shared<expired_tx_exception>(FC_LOG_MESSAGE(error, "expired transaction ${id}", ("id", id)) )));
return;
}
if( chain.is_known_unexpired_transaction(id) ) {
send_response(std::static_pointer_cast<fc::exception>(std::make_shared<tx_duplicate>(FC_LOG_MESSAGE(error, "duplicate transaction ${id}", ("id", id)) )));
return;
}
auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms);
bool deadline_is_subjective = false;
if (_pending_block_mode == pending_block_mode::producing && block_time < deadline) {
......@@ -273,42 +308,30 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
deadline = block_time;
}
try {
auto trace = chain.push_transaction(std::make_shared<transaction_metadata>(*trx), deadline);
if (trace->except) {
if (failure_is_subjective(*trace->except, deadline_is_subjective) ) {
// if we failed because the block was exhausted push the block out and try again if it succeeds
if (_pending_block_mode == pending_block_mode::producing ) {
fc_dlog(_log, "flushing block under production");
if (maybe_produce_block()) {
continue;
}
} else if (_pending_block_mode == pending_block_mode::speculating) {
fc_dlog(_log, "dropping block under speculation");
chain.abort_block();
schedule_production_loop();
continue;
}
// if we failed to produce a block that was not speculative (for some reason). we are going to
// return the trace with an exception set to the caller. if they don't support any retry mechanics
// this may result in a lost transaction
if (failure_is_subjective(*trace->except, deadline_is_subjective)) {
_pending_incoming_transactions.emplace_back(trx, persist_until_expired, next);
} else {
trace->except->dynamic_rethrow_exception();
auto e_ptr = trace->except->dynamic_copy_exception();
send_response(e_ptr);
}
} else if (persist_until_expired) {
} else {
if (persist_until_expired) {
// if this trx didnt fail/soft-fail and the persist flag is set, store its ID so that we can
// ensure its applied to all future speculative blocks as well.
_persistent_transactions.insert(transaction_id_with_expiry{trx->id(), trx->expiration()});
}
return trace;
send_response(trace);
}
});
} catch ( boost::interprocess::bad_alloc& ) {
raise(SIGUSR1);
} CATCH_AND_CALL(send_response);
}
fc::microseconds get_irreversible_block_age() {
auto now = fc::time_point::now();
if (now < _irreversible_block_time) {
......@@ -450,7 +473,7 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
my->_incoming_transaction_subscription = app().get_channel<incoming::channels::transaction>().subscribe([this](const packed_transaction_ptr& trx){
try {
my->on_incoming_transaction(trx);
my->on_incoming_transaction_async(trx, false, [](const auto&){});
} FC_LOG_AND_DROP();
});
......@@ -458,8 +481,8 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
my->on_incoming_block(block);
});
my->_incoming_transaction_sync_provider = app().get_method<incoming::methods::transaction_sync>().register_provider([this](const packed_transaction_ptr& trx, bool persist_until_expired) -> transaction_trace_ptr {
return my->on_incoming_transaction(trx, persist_until_expired);
my->_incoming_transaction_async_provider = app().get_method<incoming::methods::transaction_async>().register_provider([this](const packed_transaction_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) -> void {
return my->on_incoming_transaction_async(trx, persist_until_expired, next );
});
} FC_LOG_AND_RETHROW() }
......@@ -770,12 +793,20 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
}
} FC_LOG_AND_DROP();
}
}
if (exhausted) {
return start_block_result::exhausted;
} else {
// attempt to apply any pending incoming transactions
if (!_pending_incoming_transactions.empty()) {
auto old_pending = std::move(_pending_incoming_transactions);
_pending_incoming_transactions.clear();
for (auto& e: old_pending) {
on_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e));
}
}
return start_block_result::succeeded;
}
}
......
......@@ -90,7 +90,7 @@ using namespace eosio::chain;
struct txn_test_gen_plugin_impl {
static void push_next_transaction(const std::shared_ptr<std::vector<signed_transaction>>& trxs, size_t index, const std::function<void(const fc::exception_ptr&)>& next ) {
chain_plugin& cp = app().get_plugin<chain_plugin>();
cp.accept_transaction( packed_transaction(trxs->at(index)), [=](fc::static_variant<fc::exception_ptr, transaction_trace_ptr> result){
cp.accept_transaction( packed_transaction(trxs->at(index)), [=](const fc::static_variant<fc::exception_ptr, transaction_trace_ptr>& result){
if (result.contains<fc::exception_ptr>()) {
next(result.get<fc::exception_ptr>());
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册