提交 368cf3ec 编写于 作者: B Brian Johnson

Added timer for waiting to not waste time during slots that it cannot produce. GH #4971

上级 57da8db6
...@@ -111,14 +111,13 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin ...@@ -111,14 +111,13 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
{ {
} }
optional<fc::time_point> calculate_next_block_time(const account_name& producer_name) const; optional<fc::time_point> calculate_next_block_time(const account_name& producer_name, const block_timestamp_type& current_block_time) const;
void schedule_production_loop(); void schedule_production_loop();
void produce_block(); void produce_block();
bool maybe_produce_block(); bool maybe_produce_block();
boost::program_options::variables_map _options; boost::program_options::variables_map _options;
bool _production_enabled = false; bool _production_enabled = false;
bool _stale_production_window_open = false;
bool _pause_production = false; bool _pause_production = false;
uint32_t _production_skip_flags = 0; //eosio::chain::skip_nothing; uint32_t _production_skip_flags = 0; //eosio::chain::skip_nothing;
...@@ -417,6 +416,9 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin ...@@ -417,6 +416,9 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}; };
start_block_result start_block(bool &last_block); start_block_result start_block(bool &last_block);
fc::time_point calculate_pending_block_time() const;
void schedule_timer(const std::weak_ptr<producer_plugin_impl>& weak_this, const block_timestamp_type& current_block_time);
}; };
void new_chain_banner(const eosio::chain::controller& db) void new_chain_banner(const eosio::chain::controller& db)
...@@ -662,7 +664,6 @@ void producer_plugin::plugin_startup() ...@@ -662,7 +664,6 @@ void producer_plugin::plugin_startup()
my->_irreversible_block_time = fc::time_point::maximum(); my->_irreversible_block_time = fc::time_point::maximum();
} }
my->_stale_production_window_open = my->_production_enabled;
if (!my->_producers.empty()) { if (!my->_producers.empty()) {
ilog("Launching block production for ${n} producers at ${time}.", ("n", my->_producers.size())("time",fc::time_point::now())); ilog("Launching block production for ${n} producers at ${time}.", ("n", my->_producers.size())("time",fc::time_point::now()));
...@@ -803,14 +804,11 @@ void producer_plugin::set_whitelist_blacklist(const producer_plugin::whitelist_b ...@@ -803,14 +804,11 @@ void producer_plugin::set_whitelist_blacklist(const producer_plugin::whitelist_b
} }
optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const account_name& producer_name) const { optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const account_name& producer_name, const block_timestamp_type& current_block_time) const {
chain::controller& chain = app().get_plugin<chain_plugin>().chain(); chain::controller& chain = app().get_plugin<chain_plugin>().chain();
const auto& hbs = chain.head_block_state(); const auto& hbs = chain.head_block_state();
const auto& active_schedule = hbs->active_schedule.producers; const auto& active_schedule = hbs->active_schedule.producers;
const auto& pbs = chain.pending_block_state();
const auto& pbt = pbs->header.timestamp;
// determine if this producer is in the active schedule and if so, where // determine if this producer is in the active schedule and if so, where
auto itr = std::find_if(active_schedule.begin(), active_schedule.end(), [&](const auto& asp){ return asp.producer_name == producer_name; }); auto itr = std::find_if(active_schedule.begin(), active_schedule.end(), [&](const auto& asp){ return asp.producer_name == producer_name; });
if (itr == active_schedule.end()) { if (itr == active_schedule.end()) {
...@@ -829,6 +827,7 @@ optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const a ...@@ -829,6 +827,7 @@ optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const a
auto current_watermark_itr = _producer_watermarks.find(producer_name); auto current_watermark_itr = _producer_watermarks.find(producer_name);
if (current_watermark_itr != _producer_watermarks.end()) { if (current_watermark_itr != _producer_watermarks.end()) {
auto watermark = current_watermark_itr->second; auto watermark = current_watermark_itr->second;
const auto& pbs = chain.pending_block_state();
if (watermark > pbs->block_num) { if (watermark > pbs->block_num) {
// if I have a watermark then I need to wait until after that watermark // if I have a watermark then I need to wait until after that watermark
minimum_offset = watermark - pbs->block_num + 1; minimum_offset = watermark - pbs->block_num + 1;
...@@ -836,7 +835,7 @@ optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const a ...@@ -836,7 +835,7 @@ optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const a
} }
// this producers next opportuity to produce is the next time its slot arrives after or at the calculated minimum // this producers next opportuity to produce is the next time its slot arrives after or at the calculated minimum
uint32_t minimum_slot = pbt.slot + minimum_offset; uint32_t minimum_slot = current_block_time.slot + minimum_offset;
size_t minimum_slot_producer_index = (minimum_slot % (active_schedule.size() * config::producer_repetitions)) / config::producer_repetitions; size_t minimum_slot_producer_index = (minimum_slot % (active_schedule.size() * config::producer_repetitions)) / config::producer_repetitions;
if ( producer_index == minimum_slot_producer_index ) { if ( producer_index == minimum_slot_producer_index ) {
// this is the producer for the minimum slot, go with that // this is the producer for the minimum slot, go with that
...@@ -858,23 +857,28 @@ optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const a ...@@ -858,23 +857,28 @@ optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const a
} }
} }
producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool &last_block) { fc::time_point producer_plugin_impl::calculate_pending_block_time() const {
chain::controller& chain = app().get_plugin<chain_plugin>().chain(); const chain::controller& chain = app().get_plugin<chain_plugin>().chain();
const auto& hbs = chain.head_block_state(); const fc::time_point now = fc::time_point::now();
const fc::time_point base = std::max<fc::time_point>(now, chain.head_block_time());
//Schedule for the next second's tick regardless of chain state const int64_t min_time_to_next_block = (config::block_interval_us) - (base.time_since_epoch().count() % (config::block_interval_us) );
// If we would wait less than 50ms (1/10 of block_interval), wait for the whole block interval.
fc::time_point now = fc::time_point::now();
fc::time_point base = std::max<fc::time_point>(now, chain.head_block_time());
int64_t min_time_to_next_block = (config::block_interval_us) - (base.time_since_epoch().count() % (config::block_interval_us) );
fc::time_point block_time = base + fc::microseconds(min_time_to_next_block); fc::time_point block_time = base + fc::microseconds(min_time_to_next_block);
if((block_time - now) < fc::microseconds(config::block_interval_us/10) ) { // we must sleep for at least 50ms if((block_time - now) < fc::microseconds(config::block_interval_us/10) ) { // we must sleep for at least 50ms
// ilog("Less than ${t}us to next block time, time_to_next_block_time ${bt}",
// ("t", config::block_interval_us/10)("bt", block_time));
block_time += fc::microseconds(config::block_interval_us); block_time += fc::microseconds(config::block_interval_us);
} }
return block_time;
}
producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool &last_block) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
const auto& hbs = chain.head_block_state();
//Schedule for the next second's tick regardless of chain state
// If we would wait less than 50ms (1/10 of block_interval), wait for the whole block interval.
const fc::time_point now = fc::time_point::now();
const fc::time_point block_time = calculate_pending_block_time();
_pending_block_mode = pending_block_mode::producing; _pending_block_mode = pending_block_mode::producing;
...@@ -916,13 +920,8 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool ...@@ -916,13 +920,8 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
if (_pending_block_mode == pending_block_mode::speculating) { if (_pending_block_mode == pending_block_mode::speculating) {
auto head_block_age = now - chain.head_block_time(); auto head_block_age = now - chain.head_block_time();
if (head_block_age > fc::seconds(5)) { if (head_block_age > fc::seconds(5))
if (!_stale_production_window_open) return start_block_result::waiting;
return start_block_result::waiting;
} else {
// once we are not behind, then the stale production window is no longer needed
_stale_production_window_open = false;
}
} }
try { try {
...@@ -1133,9 +1132,15 @@ void producer_plugin_impl::schedule_production_loop() { ...@@ -1133,9 +1132,15 @@ void producer_plugin_impl::schedule_production_loop() {
self->schedule_production_loop(); self->schedule_production_loop();
} }
}); });
} else if (result == start_block_result::waiting) { } else if ((_pending_block_mode == pending_block_mode::speculating || result == start_block_result::waiting) && !_producers.empty() && !production_disabled_by_policy()){
// nothing to do until more blocks arrive block_timestamp_type pending_block_time;
if (result == start_block_result::waiting) {
pending_block_time = calculate_pending_block_time();
} else {
const auto& pbs = chain.pending_block_state();
pending_block_time = pbs->header.timestamp;
}
schedule_timer(weak_this, pending_block_time);
} else if (_pending_block_mode == pending_block_mode::producing) { } else if (_pending_block_mode == pending_block_mode::producing) {
// we succeeded but block may be exhausted // we succeeded but block may be exhausted
...@@ -1162,40 +1167,49 @@ void producer_plugin_impl::schedule_production_loop() { ...@@ -1162,40 +1167,49 @@ void producer_plugin_impl::schedule_production_loop() {
fc_dlog(_log, "Producing Block #${num} returned: ${res}", ("num", chain.pending_block_state()->block_num)("res", res) ); fc_dlog(_log, "Producing Block #${num} returned: ${res}", ("num", chain.pending_block_state()->block_num)("res", res) );
} }
}); });
} else if (_pending_block_mode == pending_block_mode::speculating && !_producers.empty() && !production_disabled_by_policy()){ } else {
// if we have any producers then we should at least set a timer for our next available slot fc_dlog(_log, "Speculative Block Created");
optional<fc::time_point> wake_up_time; }
for (const auto&p: _producers) { }
auto next_producer_block_time = calculate_next_block_time(p);
if (next_producer_block_time) { void producer_plugin_impl::schedule_timer(const std::weak_ptr<producer_plugin_impl>& weak_this, const block_timestamp_type& current_block_time) {
auto producer_wake_up_time = *next_producer_block_time - fc::microseconds(config::block_interval_us); // if we have any producers then we should at least set a timer for our next available slot
if (wake_up_time) { optional<fc::time_point> wake_up_time;
// wake up with a full block interval to the deadline for (const auto&p: _producers) {
wake_up_time = std::min<fc::time_point>(*wake_up_time, producer_wake_up_time); auto next_producer_block_time = calculate_next_block_time(p, current_block_time);
} else { if (next_producer_block_time) {
wake_up_time = producer_wake_up_time; auto producer_wake_up_time = *next_producer_block_time - fc::microseconds(config::block_interval_us);
} if (wake_up_time) {
// wake up with a full block interval to the deadline
wake_up_time = std::min<fc::time_point>(*wake_up_time, producer_wake_up_time);
} else {
wake_up_time = producer_wake_up_time;
} }
} }
}
if (wake_up_time) { if (wake_up_time) {
if (_pending_block_mode == pending_block_mode::speculating)
fc_dlog(_log, "Specualtive Block Created; Scheduling Speculative/Production Change at ${time}", ("time", wake_up_time)); fc_dlog(_log, "Specualtive Block Created; Scheduling Speculative/Production Change at ${time}", ("time", wake_up_time));
static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1)); else
_timer.expires_at(epoch + boost::posix_time::microseconds(wake_up_time->time_since_epoch().count())); fc_dlog(_log, "Waiting; Scheduling Speculative/Production Change at ${time}", ("time", wake_up_time));
_timer.async_wait([weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) { static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1));
auto self = weak_this.lock(); _timer.expires_at(epoch + boost::posix_time::microseconds(wake_up_time->time_since_epoch().count()));
if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) { _timer.async_wait([weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
self->schedule_production_loop(); auto self = weak_this.lock();
} if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) {
}); self->schedule_production_loop();
} else { }
fc_dlog(_log, "Speculative Block Created; Not Scheduling Speculative/Production, no local producers had valid wake up times"); });
}
} else { } else {
fc_dlog(_log, "Speculative Block Created"); if (_pending_block_mode == pending_block_mode::speculating)
fc_dlog(_log, "Speculative Block Created; Not Scheduling Speculative/Production, no local producers had valid wake up times");
else
fc_dlog(_log, "Waiting; Not Scheduling Speculative/Production, no local producers had valid wake up times");
} }
} }
bool producer_plugin_impl::maybe_produce_block() { bool producer_plugin_impl::maybe_produce_block() {
auto reschedule = fc::make_scoped_exit([this]{ auto reschedule = fc::make_scoped_exit([this]{
schedule_production_loop(); schedule_production_loop();
......
...@@ -272,7 +272,7 @@ try: ...@@ -272,7 +272,7 @@ try:
currentMinimumMaxRAM=maxRAMValue currentMinimumMaxRAM=maxRAMValue
addOrSwapFlags[maxRAMFlag]=str(maxRAMValue) addOrSwapFlags[maxRAMFlag]=str(maxRAMValue)
if not nodes[len(nodes)-1].relaunch(nodeIndex, "", newChain=False, addOrSwapFlags=addOrSwapFlags): if not nodes[len(nodes)-1].relaunch(nodeIndex, "", newChain=False, addOrSwapFlags=addOrSwapFlags):
Utils.cmdError("Failed to restart node i with new capacity %s" % (numNodes-1, maxRAMValue)) Utils.cmdError("Failed to restart node %d with new capacity %s" % (numNodes-1, maxRAMValue))
errorExit("Failure - Node should have restarted") errorExit("Failure - Node should have restarted")
addOrSwapFlags={} addOrSwapFlags={}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册