未验证 提交 78615689 编写于 作者: K Kevin Heifner 提交者: GitHub

Merge pull request #2727 from wanderingbort/feature/route-all-pushes-through-producer

route all pushes through producer
......@@ -348,6 +348,7 @@ struct controller_impl {
if( add_to_fork_db ) {
pending->_pending_block_state->validated = true;
auto new_bsp = fork_db.add( pending->_pending_block_state );
emit( self.accepted_block_header, pending->_pending_block_state );
head = fork_db.head();
FC_ASSERT( new_bsp == head, "committed block did not become the new head in fork database" );
}
......
......@@ -57,7 +57,7 @@ namespace eosio { namespace chain {
vector<transaction_receipt> transactions; /// new or generated transactions
};
typedef std::shared_ptr<signed_block> signed_block_ptr;
using signed_block_ptr = std::shared_ptr<signed_block>;
struct producer_confirmation {
block_id_type block_id;
......
......@@ -27,7 +27,7 @@ namespace eosio { namespace chain {
vector<transaction_metadata_ptr> trxs;
};
typedef std::shared_ptr<block_state> block_state_ptr;
using block_state_ptr = std::shared_ptr<block_state>;
} } /// namespace eosio::chain
......
......@@ -30,6 +30,17 @@ namespace eosio { namespace chain {
static block_timestamp maximum() { return block_timestamp( 0xffff ); }
static block_timestamp min() { return block_timestamp(0); }
block_timestamp next() const {
FC_ASSERT( std::numeric_limits<uint32_t>::max() - slot >= 1, "block timestamp overflow" );
auto result = block_timestamp(*this);
result.slot += 1;
return result;
}
fc::time_point to_time_point() const {
return (fc::time_point)(*this);
}
operator fc::time_point() const {
int64_t msec = slot * (int64_t)IntervalMs;
msec += EpochMs;
......
......@@ -37,7 +37,6 @@ namespace eosio { namespace chain {
struct runtime_limits {
fc::microseconds max_push_block_us = fc::microseconds(100000);
fc::microseconds max_push_transaction_us = fc::microseconds(1000'000);
fc::microseconds max_deferred_transactions_us = fc::microseconds(100000);
};
path block_log_dir = chain::config::default_block_log_dir;
......@@ -196,7 +195,7 @@ namespace eosio { namespace chain {
} } /// eosio::chain
FC_REFLECT( eosio::chain::controller::config::runtime_limits, (max_push_block_us)(max_push_transaction_us)(max_deferred_transactions_us) )
FC_REFLECT( eosio::chain::controller::config::runtime_limits, (max_push_block_us)(max_push_transaction_us) )
FC_REFLECT( eosio::chain::controller::config,
(block_log_dir)
(shared_memory_dir)(shared_memory_size)(read_only)
......
......@@ -31,7 +31,7 @@ namespace eosio { namespace chain {
};
struct transaction_trace;
typedef std::shared_ptr<transaction_trace> transaction_trace_ptr;
using transaction_trace_ptr = std::shared_ptr<transaction_trace>;
struct transaction_trace {
transaction_id_type id;
......@@ -55,7 +55,7 @@ namespace eosio { namespace chain {
uint64_t cpu_usage;
vector<transaction_trace_ptr> trx_traces;
};
typedef std::shared_ptr<block_trace> block_trace_ptr;
using block_trace_ptr = std::shared_ptr<block_trace>;
} } /// namespace eosio::chain
......
......@@ -137,6 +137,7 @@ namespace eosio { namespace chain {
void set_transaction(const transaction& t, const vector<bytes>& cfd, compression_type _compression = none);
};
using packed_transaction_ptr = std::shared_ptr<packed_transaction>;
/**
* When a transaction is generated it can be scheduled to occur
......
......@@ -48,6 +48,6 @@ class transaction_metadata {
uint32_t total_actions()const { return trx.context_free_actions.size() + trx.actions.size(); }
};
typedef std::shared_ptr<transaction_metadata> transaction_metadata_ptr;
using transaction_metadata_ptr = std::shared_ptr<transaction_metadata>;
} } // eosio::chain
......@@ -444,6 +444,23 @@ namespace fc
wdump( __VA_ARGS__ ); \
}
#define FC_LOG_AND_DROP( ... ) \
catch( fc::exception& er ) { \
wlog( "${details}", ("details",er.to_detail_string()) ); \
} catch( const std::exception& e ) { \
fc::exception fce( \
FC_LOG_MESSAGE( warn, "rethrow ${what}: ",FC_FORMAT_ARG_PARAMS( __VA_ARGS__ )("what",e.what()) ), \
fc::std_exception_code,\
BOOST_CORE_TYPEID(e).name(), \
e.what() ) ; \
wlog( "${details}", ("details",fce.to_detail_string()) ); \
} catch( ... ) { \
fc::unhandled_exception e( \
FC_LOG_MESSAGE( warn, "rethrow", FC_FORMAT_ARG_PARAMS( __VA_ARGS__) ), \
std::current_exception() ); \
wlog( "${details}", ("details",e.to_detail_string()) ); \
}
/**
* @def FC_RETHROW_EXCEPTIONS(LOG_LEVEL,FORMAT,...)
......
......@@ -25,11 +25,14 @@ namespace eosio { namespace chain { namespace plugin_interface {
using accepted_transaction = channel_decl<struct accepted_transaction_tag, transaction_metadata_ptr>;
using applied_transaction = channel_decl<struct applied_transaction_tag, transaction_trace_ptr>;
using accepted_confirmation = channel_decl<struct accepted_confirmation_tag, header_confirmation>;
using incoming_block = channel_decl<struct incoming_blocks_tag, signed_block_ptr>;
using incoming_transaction = channel_decl<struct incoming_transactions_tag, packed_transaction_ptr>;
}
namespace methods {
using get_block_by_number = method_decl<chain_plugin_interface, const signed_block_ptr&(uint32_t block_num)>;
using get_block_by_id = method_decl<chain_plugin_interface, const signed_block_ptr&(const block_id_type& block_id)>;
using get_block_by_number = method_decl<chain_plugin_interface, signed_block_ptr(uint32_t block_num)>;
using get_block_by_id = method_decl<chain_plugin_interface, signed_block_ptr(const block_id_type& block_id)>;
using get_head_block_id = method_decl<chain_plugin_interface, block_id_type ()>;
using get_last_irreversible_block_number = method_decl<chain_plugin_interface, uint32_t ()>;
......
......@@ -45,6 +45,7 @@ public:
,accepted_transaction_channel(app().get_channel<channels::accepted_transaction>())
,applied_transaction_channel(app().get_channel<channels::applied_transaction>())
,accepted_confirmation_channel(app().get_channel<channels::accepted_confirmation>())
,incoming_block_channel(app().get_channel<channels::incoming_block>())
{}
bfs::path block_log_dir;
......@@ -61,7 +62,6 @@ public:
chain_id_type chain_id;
int32_t max_reversible_block_time_ms;
int32_t max_pending_transaction_time_ms;
int32_t max_deferred_transaction_time_ms;
//txn_msg_rate_limits rate_limits;
fc::optional<vm_type> wasm_runtime;
......@@ -72,6 +72,7 @@ public:
channels::accepted_transaction::channel_type& accepted_transaction_channel;
channels::applied_transaction::channel_type& applied_transaction_channel;
channels::accepted_confirmation::channel_type& accepted_confirmation_channel;
channels::incoming_block::channel_type& incoming_block_channel;
// method provider handles
methods::get_block_by_number::method_type::handle get_block_by_number_provider;
......@@ -79,6 +80,8 @@ public:
methods::get_head_block_id::method_type::handle get_head_block_id_provider;
methods::get_last_irreversible_block_number::method_type::handle get_last_irreversible_block_number_provider;
// things we subscribe to
channels::incoming_transaction::channel_type::handle incoming_transaction_subscription;
};
chain_plugin::chain_plugin()
......@@ -99,8 +102,6 @@ void chain_plugin::set_program_options(options_description& cli, options_descrip
"Limits the maximum time (in milliseconds) that a reversible block is allowed to run before being considered invalid")
("max-pending-transaction-time", bpo::value<int32_t>()->default_value(30),
"Limits the maximum time (in milliseconds) that is allowed a pushed transaction's code to execute before being considered invalid")
("max-deferred-transaction-time", bpo::value<int32_t>()->default_value(20),
"Limits the maximum time (in milliseconds) that is allowed a to push deferred transactions at the start of a block")
("wasm-runtime", bpo::value<eosio::chain::wasm_interface::vm_type>()->value_name("wavm/binaryen"), "Override default WASM runtime")
("shared-memory-size-mb", bpo::value<uint64_t>()->default_value(config::default_shared_memory_size / (1024 * 1024)), "Maximum size MB of database shared memory file")
......@@ -182,7 +183,6 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
my->max_reversible_block_time_ms = options.at("max-reversible-block-time").as<int32_t>();
my->max_pending_transaction_time_ms = options.at("max-pending-transaction-time").as<int32_t>();
my->max_deferred_transaction_time_ms = options.at("max-deferred-transaction-time").as<int32_t>();
if(options.count("wasm-runtime"))
my->wasm_runtime = options.at("wasm-runtime").as<vm_type>();
......@@ -209,21 +209,17 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
my->chain_config->limits.max_push_transaction_us = fc::milliseconds(my->max_pending_transaction_time_ms);
}
if (my->max_deferred_transaction_time_ms > 0 ) {
my->chain_config->limits.max_deferred_transactions_us = fc::milliseconds(my->max_deferred_transaction_time_ms);
}
if(my->wasm_runtime)
my->chain_config->wasm_runtime = *my->wasm_runtime;
my->chain.emplace(*my->chain_config);
// set up method providers
my->get_block_by_number_provider = app().get_method<methods::get_block_by_number>().register_provider([this](uint32_t block_num) -> const signed_block_ptr& {
my->get_block_by_number_provider = app().get_method<methods::get_block_by_number>().register_provider([this](uint32_t block_num) -> signed_block_ptr {
return my->chain->fetch_block_by_number(block_num);
});
my->get_block_by_id_provider = app().get_method<methods::get_block_by_id>().register_provider([this](block_id_type id) -> const signed_block_ptr& {
my->get_block_by_id_provider = app().get_method<methods::get_block_by_id>().register_provider([this](block_id_type id) -> signed_block_ptr {
return my->chain->fetch_block_by_id(id);
});
......@@ -259,6 +255,12 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
my->chain->accepted_confirmation.connect([this](const header_confirmation& conf){
my->accepted_confirmation_channel.publish(conf);
});
my->incoming_transaction_subscription = app().get_channel<channels::incoming_transaction>().subscribe([this](const packed_transaction_ptr& ptrx){
try {
my->chain->push_transaction(std::make_shared<transaction_metadata>(*ptrx), get_transaction_deadline());
} FC_LOG_AND_DROP();
});
}
void chain_plugin::plugin_startup()
......@@ -286,7 +288,7 @@ chain_apis::read_write chain_plugin::get_read_write_api() {
}
void chain_plugin::accept_block(const signed_block_ptr& block ) {
chain().push_block( block );
my->incoming_block_channel.publish(block);
}
void chain_plugin::accept_transaction(const packed_transaction& trx) {
......
......@@ -239,8 +239,6 @@ struct faucet_testnet_plugin_impl {
trx.sign(_create_account_private_key, chainid);
try {
if( !cc.pending_block_state() )
cc.start_block();
cc.push_transaction( std::make_shared<transaction_metadata>(trx) );
} catch (const account_name_exists_exception& ) {
// another transaction ended up adding the account, so look for alternates
......
......@@ -7,4 +7,4 @@ add_library( producer_plugin
target_link_libraries( producer_plugin chain_plugin appbase eosio_chain eos_utilities )
target_include_directories( producer_plugin
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" )
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_SOURCE_DIR}/../chain_interface/include" )
......@@ -4,6 +4,7 @@
*/
#include <eosio/producer_plugin/producer_plugin.hpp>
#include <eosio/chain/producer_object.hpp>
#include <eosio/chain/plugin_interface.hpp>
#include <fc/io/json.hpp>
#include <fc/smart_ref_impl.hpp>
......@@ -24,11 +25,13 @@ namespace eosio {
static appbase::abstract_plugin& _producer_plugin = app().register_plugin<producer_plugin>();
using namespace eosio::chain;
using namespace eosio::chain::plugin_interface;
class producer_plugin_impl {
public:
producer_plugin_impl(boost::asio::io_service& io)
: _timer(io) {}
:_timer(io)
{}
void schedule_production_loop();
block_production_condition::block_production_condition_enum block_production_loop();
......@@ -43,6 +46,8 @@ class producer_plugin_impl {
std::set<chain::account_name> _producers;
boost::asio::deadline_timer _timer;
int32_t _max_deferred_transaction_time_ms;
block_production_condition::block_production_condition_enum _prev_result = block_production_condition::produced;
uint32_t _prev_result_count = 0;
......@@ -52,6 +57,8 @@ class producer_plugin_impl {
producer_plugin* _self = nullptr;
channels::incoming_block::channel_type::handle _incoming_block_subscription;
void on_block( const block_state_ptr& bsp ) {
if( bsp->header.timestamp <= _last_signed_block_time ) return;
if( bsp->header.timestamp <= _start_time ) return;
......@@ -83,6 +90,20 @@ class producer_plugin_impl {
}
} ) );
}
void on_incoming_block(const signed_block_ptr& block) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
// abort the pending block
chain.abort_block();
try {
// push the new block
chain.push_block(block);
} FC_LOG_AND_DROP();
// restart our production loop
schedule_production_loop();
}
};
void new_chain_banner(const eosio::chain::controller& db)
......@@ -97,7 +118,7 @@ void new_chain_banner(const eosio::chain::controller& db)
"*******************************\n"
"\n";
if( db.head_block_state()->get_slot_at_time(fc::time_point::now()) > 200 )
if( db.head_block_state()->header.timestamp.to_time_point() < (fc::time_point::now() - fc::milliseconds(200 * config::block_interval_ms)))
{
std::cerr << "Your genesis seems to have an old timestamp\n"
"Please consider using the --genesis-timestamp option to give your genesis a recent timestamp\n"
......@@ -125,6 +146,8 @@ void producer_plugin::set_program_options(
producer_options.add_options()
("enable-stale-production,e", boost::program_options::bool_switch()->notifier([this](bool e){my->_production_enabled = e;}), "Enable block production, even if the chain is stale.")
("max-deferred-transaction-time", bpo::value<int32_t>()->default_value(20),
"Limits the maximum time (in milliseconds) that is allowed a to push deferred transactions at the start of a block")
("required-participation", boost::program_options::value<uint32_t>()
->default_value(uint32_t(config::required_producer_participation/config::percent_1))
->notifier([this](uint32_t e) {
......@@ -186,6 +209,13 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
my->_private_keys[key_id_to_wif_pair.first] = key_id_to_wif_pair.second;
}
}
my->_max_deferred_transaction_time_ms = options.at("max-deferred-transaction-time").as<int32_t>();
my->_incoming_block_subscription = app().get_channel<channels::incoming_block>().subscribe([this](const signed_block_ptr& block){
my->on_incoming_block(block);
});
} FC_LOG_AND_RETHROW() }
void producer_plugin::plugin_startup()
......@@ -206,7 +236,7 @@ void producer_plugin::plugin_startup()
my->schedule_production_loop();
} else
elog("No producers configured! Please add producer IDs and private keys to configuration.");
ilog("producer plugin: plugin_startup() end");
ilog("producer plugin: plugin_startup() end");
} FC_CAPTURE_AND_RETHROW() }
void producer_plugin::plugin_shutdown() {
......@@ -218,6 +248,8 @@ void producer_plugin::plugin_shutdown() {
}
void producer_plugin_impl::schedule_production_loop() {
_timer.cancel();
//Schedule for the next second's tick regardless of chain state
// If we would wait less than 50ms (1/10 of block_interval), wait for the whole block interval.
fc::time_point now = fc::time_point::now();
......@@ -229,14 +261,42 @@ void producer_plugin_impl::schedule_production_loop() {
time_to_next_block_time += config::block_interval_us;
}
fc::time_point block_time = now + fc::microseconds(time_to_next_block_time);
static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1));
_timer.expires_at( epoch + boost::posix_time::microseconds(block_time.time_since_epoch().count()));
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
try {
chain.abort_block();
chain.start_block(block_time);
} FC_LOG_AND_DROP();
if (chain.pending_block_state()) {
// TODO: BIG BAD WARNING, THIS WILL HAPPILY BLOW PAST DEADLINES BUT CONTROLLER IS NOT YET SAFE FOR DEADLINE USAGE
try {
while (chain.push_next_unapplied_transaction(fc::time_point::maximum()));
} FC_LOG_AND_DROP();
chain.abort_block();
chain.start_block( now + fc::microseconds(time_to_next_block_time) );
try {
while (chain.push_next_scheduled_transaction(fc::time_point::maximum()));
} FC_LOG_AND_DROP();
_timer.expires_from_now( boost::posix_time::microseconds(time_to_next_block_time) );
//_timer.async_wait(boost::bind(&producer_plugin_impl::block_production_loop, this));
_timer.async_wait( [&](const boost::system::error_code&){ block_production_loop(); } );
//_timer.async_wait(boost::bind(&producer_plugin_impl::block_production_loop, this));
_timer.async_wait([&](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted) {
block_production_loop();
}
});
} else {
elog("Failed to start a pending block, will try again later");
// we failed to start a block, so try again later?
_timer.async_wait([&](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted) {
schedule_production_loop();
}
});
}
}
block_production_condition::block_production_condition_enum producer_plugin_impl::block_production_loop() {
......@@ -320,19 +380,13 @@ block_production_condition::block_production_condition_enum producer_plugin_impl
// If the next block production opportunity is in the present or future, we're synced.
if( !_production_enabled )
{
if( hbs.get_slot_time(1) >= now )
if( hbs.header.timestamp.next().to_time_point() >= now )
_production_enabled = true;
else
return block_production_condition::not_synced;
}
// is anyone scheduled to produce now or one second in the future?
uint32_t slot = hbs.get_slot_at_time( now );
if( slot == 0 )
{
capture("next_time", hbs.get_slot_time(1));
return block_production_condition::not_time_yet;
}
auto pending_block_timestamp = chain.pending_block_state()->header.timestamp;
//
// this assert should not fail, because now <= db.head_block_time()
......@@ -344,7 +398,7 @@ block_production_condition::block_production_condition_enum producer_plugin_impl
//
assert( now > chain.head_block_time() );
const auto& scheduled_producer = hbs.get_scheduled_producer( slot );
const auto& scheduled_producer = hbs.get_scheduled_producer( pending_block_timestamp );
// we must control the producer scheduled to produce the next block.
if( _producers.find( scheduled_producer.producer_name ) == _producers.end() )
{
......@@ -352,7 +406,6 @@ block_production_condition::block_production_condition_enum producer_plugin_impl
return block_production_condition::not_my_turn;
}
auto scheduled_time = hbs.get_slot_time( slot );
auto private_key_itr = _private_keys.find( scheduled_producer.block_signing_key );
if( private_key_itr == _private_keys.end() )
......@@ -361,16 +414,16 @@ block_production_condition::block_production_condition_enum producer_plugin_impl
return block_production_condition::no_private_key;
}
uint32_t prate = hbs.producer_participation_rate();
/*uint32_t prate = hbs.producer_participation_rate();
if( prate < _required_producer_participation )
{
capture("pct", uint32_t(prate / config::percent_1));
return block_production_condition::low_participation;
}
}*/
if( llabs(( time_point(scheduled_time) - now).count()) > fc::milliseconds( config::block_interval_ms ).count() )
if( llabs(( pending_block_timestamp.to_time_point() - now).count()) > fc::milliseconds( config::block_interval_ms ).count() )
{
capture("scheduled_time", scheduled_time)("now", now);
capture("scheduled_time", pending_block_timestamp)("now", now);
return block_production_condition::lag;
}
......
......@@ -73,8 +73,6 @@ using namespace eosio::chain;
struct txn_test_gen_plugin_impl {
transaction_trace_ptr push_transaction( signed_transaction& trx ) { try {
controller& cc = app().get_plugin<chain_plugin>().chain();
if( !cc.pending_block_state() )
cc.start_block();
return cc.push_transaction( std::make_shared<transaction_metadata>(trx) );
} FC_CAPTURE_AND_RETHROW( (transaction_header(trx)) ) }
......
......@@ -544,7 +544,7 @@ BOOST_AUTO_TEST_CASE(checktime_fail_tests) { try {
// 1) compilation of the smart contract should probably not count towards the CPU time of a transaction that first uses it;
// 2) checktime should eventually switch to a deterministic metric which should hopefully fix the inconsistencies
// of this test succeeding/failing on different machines (for example, succeeding on our local dev machines but failing on Jenkins).
TESTER t( {fc::milliseconds(5000), fc::milliseconds(5000), fc::milliseconds(-1)} );
TESTER t( {fc::milliseconds(5000), fc::milliseconds(5000)} );
t.produce_blocks(2);
t.create_account( N(testapi) );
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册