提交 25060080 编写于 作者: B Bart Wyatt

- added compat signal which mimics the on_pending_transaction signal from master

  - this signal is fired whenever an incoming signal is accepted or rejected and indicates the result
- moved check for stale_production to the block reception code since that now flows through producer_plugin
上级 d67370a9
......@@ -26,8 +26,6 @@ namespace eosio { namespace chain { namespace plugin_interface {
using applied_transaction = channel_decl<struct applied_transaction_tag, transaction_trace_ptr>;
using accepted_confirmation = channel_decl<struct accepted_confirmation_tag, header_confirmation>;
using incoming_block = channel_decl<struct incoming_blocks_tag, signed_block_ptr>;
using incoming_transaction = channel_decl<struct incoming_transactions_tag, packed_transaction_ptr>;
}
namespace methods {
......@@ -36,10 +34,25 @@ namespace eosio { namespace chain { namespace plugin_interface {
using get_head_block_id = method_decl<chain_plugin_interface, block_id_type ()>;
using get_last_irreversible_block_number = method_decl<chain_plugin_interface, uint32_t ()>;
}
namespace incoming {
namespace channels {
using block = channel_decl<struct block_tag, signed_block_ptr>;
using transaction = channel_decl<struct transaction_tag, packed_transaction_ptr>;
}
namespace methods {
// synchronously push a block/trx to a single provider
using block_sync = method_decl<chain_plugin_interface, void(const signed_block_ptr&), first_provider_policy>;
using transaction_sync = method_decl<chain_plugin_interface, transaction_trace_ptr(const packed_transaction_ptr&), first_provider_policy>;
}
}
// synchronously push a block/trx to a single provider
using incoming_block_sync = method_decl<chain_plugin_interface, void(const signed_block_ptr&), first_provider_policy>;
using incoming_transaction_sync = method_decl<chain_plugin_interface, transaction_trace_ptr(const packed_transaction_ptr&), first_provider_policy>;
namespace compat {
namespace channels {
using transaction_ack = channel_decl<struct accepted_transaction_tag, std::pair<fc::exception_ptr, packed_transaction_ptr>>;
}
}
} } }
\ No newline at end of file
......@@ -45,9 +45,9 @@ public:
,accepted_transaction_channel(app().get_channel<channels::accepted_transaction>())
,applied_transaction_channel(app().get_channel<channels::applied_transaction>())
,accepted_confirmation_channel(app().get_channel<channels::accepted_confirmation>())
,incoming_block_channel(app().get_channel<channels::incoming_block>())
,incoming_block_sync_method(app().get_method<methods::incoming_block_sync>())
,incoming_transaction_sync_method(app().get_method<methods::incoming_transaction_sync>())
,incoming_block_channel(app().get_channel<incoming::channels::block>())
,incoming_block_sync_method(app().get_method<incoming::methods::block_sync>())
,incoming_transaction_sync_method(app().get_method<incoming::methods::transaction_sync>())
{}
bfs::path block_log_dir;
......@@ -72,11 +72,11 @@ public:
channels::accepted_transaction::channel_type& accepted_transaction_channel;
channels::applied_transaction::channel_type& applied_transaction_channel;
channels::accepted_confirmation::channel_type& accepted_confirmation_channel;
channels::incoming_block::channel_type& incoming_block_channel;
incoming::channels::block::channel_type& incoming_block_channel;
// retained references to methods for easy calling
methods::incoming_block_sync::method_type& incoming_block_sync_method;
methods::incoming_transaction_sync::method_type& incoming_transaction_sync_method;
incoming::methods::block_sync::method_type& incoming_block_sync_method;
incoming::methods::transaction_sync::method_type& incoming_transaction_sync_method;
// method provider handles
methods::get_block_by_number::method_type::handle get_block_by_number_provider;
......@@ -454,7 +454,7 @@ read_write::push_transaction_results read_write::push_transaction(const read_wri
abi_serializer::from_variant(params, *pretty_input, resolver);
} EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception, "Invalid packed transaction")
auto trx_trace_ptr = app().get_method<methods::incoming_transaction_sync>()(pretty_input);
auto trx_trace_ptr = app().get_method<incoming::methods::transaction_sync>()(pretty_input);
fc::variant pretty_output = db.to_variant_with_abi( *trx_trace_ptr );;
//abi_serializer::to_variant(*trx_trace_ptr, pretty_output, resolver);
......
file(GLOB HEADERS "include/eosio/net_plugin/*.hpp")
file(GLOB HEADERS "include/eosio/net_plugin/*.hpp" )
add_library( net_plugin
net_plugin.cpp
${HEADERS} )
target_link_libraries( net_plugin chain_plugin producer_plugin appbase fc )
target_include_directories( net_plugin PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include )
target_include_directories( net_plugin PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_CURRENT_SOURCE_DIR}/../chain_interface/include )
......@@ -9,6 +9,7 @@
#include <eosio/chain/controller.hpp>
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/block.hpp>
#include <eosio/chain/plugin_interface.hpp>
#include <eosio/producer_plugin/producer_plugin.hpp>
#include <eosio/utilities/key_conversion.hpp>
#include <eosio/chain/contract_types.hpp>
......@@ -28,6 +29,8 @@
#include <boost/asio/steady_timer.hpp>
#include <boost/intrusive/set.hpp>
using namespace eosio::chain::plugin_interface::compat;
namespace fc {
extern std::unordered_map<std::string,logger>& get_logger_map();
}
......@@ -189,6 +192,8 @@ namespace eosio {
shared_ptr<tcp::resolver> resolver;
channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;
void connect( connection_ptr c );
void connect( connection_ptr c, tcp::resolver::iterator endpoint_itr );
void start_session( connection_ptr c );
......@@ -208,6 +213,8 @@ namespace eosio {
void applied_transaction(const transaction_trace_ptr&);
void accepted_confirmation(const header_confirmation&);
void transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>&);
bool is_valid( const handshake_message &msg);
void handle_message( connection_ptr c, const handshake_message &msg);
......@@ -2597,7 +2604,7 @@ namespace eosio {
void net_plugin_impl::accepted_transaction(const transaction_metadata_ptr& md) {
fc_ilog(logger,"signaled, id = ${id}",("id", md->id));
dispatcher->bcast_transaction(md->packed_trx);
// dispatcher->bcast_transaction(md->packed_trx);
}
void net_plugin_impl::applied_transaction(const transaction_trace_ptr& txn) {
......@@ -2608,6 +2615,16 @@ namespace eosio {
fc_ilog(logger,"signaled, id = ${id}",("id", head.block_id));
}
void net_plugin_impl::transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>& results) {
transaction_id_type id = results.second->id();
if (results.first) {
fc_ilog(logger,"signaled NACK, trx-id = ${id} : ${why}",("id", id)("why", results.first->to_detail_string()));
} else {
fc_ilog(logger,"signaled ACK, trx-id = ${id}",("id", id));
dispatcher->bcast_transaction(*results.second);
}
}
bool net_plugin_impl::authenticate_peer(const handshake_message& msg) const {
if(allowed_connections == None)
return false;
......@@ -2887,6 +2904,9 @@ namespace eosio {
cc.applied_transaction.connect( boost::bind(&net_plugin_impl::applied_transaction, my.get(), _1));
cc.accepted_confirmation.connect( boost::bind(&net_plugin_impl::accepted_confirmation, my.get(), _1));
}
my->incoming_transaction_ack_subscription = app().get_channel<channels::transaction_ack>().subscribe(boost::bind(&net_plugin_impl::transaction_ack, my.get(), _1));
my->start_monitors();
for( auto seed_node : my->supplied_peers ) {
......
......@@ -32,6 +32,7 @@ class producer_plugin_impl {
public:
producer_plugin_impl(boost::asio::io_service& io)
:_timer(io)
,_transaction_ack_channel(app().get_channel<compat::channels::transaction_ack>())
{}
void schedule_production_loop();
......@@ -60,11 +61,13 @@ class producer_plugin_impl {
producer_plugin* _self = nullptr;
channels::incoming_block::channel_type::handle _incoming_block_subscription;
channels::incoming_transaction::channel_type::handle _incoming_transaction_subscription;
incoming::channels::block::channel_type::handle _incoming_block_subscription;
incoming::channels::transaction::channel_type::handle _incoming_transaction_subscription;
methods::incoming_block_sync::method_type::handle _incoming_block_sync_provider;
methods::incoming_transaction_sync::method_type::handle _incoming_transaction_sync_provider;
compat::channels::transaction_ack::channel_type& _transaction_ack_channel;
incoming::methods::block_sync::method_type::handle _incoming_block_sync_provider;
incoming::methods::transaction_sync::method_type::handle _incoming_transaction_sync_provider;
void on_block( const block_state_ptr& bsp ) {
if( bsp->header.timestamp <= _last_signed_block_time ) return;
......@@ -102,6 +105,40 @@ class producer_plugin_impl {
} ) );
}
template<typename Type, typename Channel, typename F>
auto publish_results_of(const Type &data, Channel& channel, F f) {
auto publish_success = fc::make_scoped_exit([&, this](){
channel.publish(std::pair<fc::exception_ptr, Type>(nullptr, data));
});
try {
return f();
} catch (const fc::exception& e) {
publish_success.cancel();
channel.publish(std::pair<fc::exception_ptr, Type>(e.dynamic_copy_exception(), data));
throw e;
} catch( const std::exception& e ) {
publish_success.cancel();
auto fce = fc::exception(
FC_LOG_MESSAGE( info, "Caught std::exception: ${what}", ("what",e.what())),
fc::std_exception_code,
BOOST_CORE_TYPEID(e).name(),
e.what()
);
channel.publish(std::pair<fc::exception_ptr, Type>(fce.dynamic_copy_exception(),data));
throw fce;
} catch( ... ) {
publish_success.cancel();
auto fce = fc::unhandled_exception(
FC_LOG_MESSAGE( info, "Caught unknown exception"),
std::current_exception()
);
channel.publish(std::pair<fc::exception_ptr, Type>(fce.dynamic_copy_exception(), data));
throw fce;
}
};
void on_incoming_block(const signed_block_ptr& block) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
// abort the pending block
......@@ -112,9 +149,13 @@ class producer_plugin_impl {
// restart our production loop
schedule_production_loop();
});
// push the new block
chain.push_block(block);
if( chain.head_block_state()->header.timestamp.next().to_time_point() >= fc::time_point::now() )
_production_enabled = true;
ilog("Received block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, confirmed: ${confs}]",
("p",block->producer)("id",fc::variant(block->id()).as_string().substr(0,16))
("n",block_header::num_from_id(block->id()))("t",block->timestamp)
......@@ -123,8 +164,10 @@ class producer_plugin_impl {
}
transaction_trace_ptr on_incoming_transaction(const packed_transaction_ptr& trx) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
return chain.sync_push(std::make_shared<transaction_metadata>(*trx), fc::time_point::now() + fc::milliseconds(_max_pending_transaction_time_ms));
return publish_results_of(trx, _transaction_ack_channel, [&]{
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
return chain.sync_push(std::make_shared<transaction_metadata>(*trx), fc::time_point::now() + fc::milliseconds(_max_pending_transaction_time_ms));
});
}
bool start_block();
......@@ -240,23 +283,23 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
my->_max_pending_transaction_time_ms = options.at("max-pending-transaction-time").as<int32_t>();
my->_incoming_block_subscription = app().get_channel<channels::incoming_block>().subscribe([this](const signed_block_ptr& block){
my->_incoming_block_subscription = app().get_channel<incoming::channels::block>().subscribe([this](const signed_block_ptr& block){
try {
my->on_incoming_block(block);
} FC_LOG_AND_DROP();
});
my->_incoming_transaction_subscription = app().get_channel<channels::incoming_transaction>().subscribe([this](const packed_transaction_ptr& trx){
my->_incoming_transaction_subscription = app().get_channel<incoming::channels::transaction>().subscribe([this](const packed_transaction_ptr& trx){
try {
my->on_incoming_transaction(trx);
} FC_LOG_AND_DROP();
});
my->_incoming_block_sync_provider = app().get_method<methods::incoming_block_sync>().register_provider([this](const signed_block_ptr& block){
my->_incoming_block_sync_provider = app().get_method<incoming::methods::block_sync>().register_provider([this](const signed_block_ptr& block){
my->on_incoming_block(block);
});
my->_incoming_transaction_sync_provider = app().get_method<methods::incoming_transaction_sync>().register_provider([this](const packed_transaction_ptr& trx) -> transaction_trace_ptr {
my->_incoming_transaction_sync_provider = app().get_method<incoming::methods::transaction_sync>().register_provider([this](const packed_transaction_ptr& trx) -> transaction_trace_ptr {
return my->on_incoming_transaction(trx);
});
......@@ -457,10 +500,7 @@ block_production_condition::block_production_condition_enum producer_plugin_impl
// If the next block production opportunity is in the present or future, we're synced.
if( !_production_enabled )
{
if( hbs->header.timestamp.next().to_time_point() >= now )
_production_enabled = true;
else
return block_production_condition::not_synced;
return block_production_condition::not_synced;
}
auto pending_block_timestamp = chain.pending_block_state()->header.timestamp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册