提交 46387eba 编写于 作者: K Kevin Heifner

Fix for resync startup #172

上级 5a7553b4
......@@ -110,43 +110,59 @@ const std::string db_plugin_impl::accounts_col = "Accounts";
void db_plugin_impl::applied_irreversible_block(const signed_block& block) {
if (startup) {
// on startup we don't want to queue, instead push back on caller
process_irreversible_block(block);
} else {
boost::mutex::scoped_lock lock(mtx);
queue.push(block);
lock.unlock();
condtion.notify_one();
try {
if (startup) {
// on startup we don't want to queue, instead push back on caller
process_irreversible_block(block);
} else {
boost::mutex::scoped_lock lock(mtx);
queue.push(block);
lock.unlock();
condtion.notify_one();
}
} catch (fc::exception& e) {
elog("FC Exception while applied_irreversible_block ${e}", ("e", e.to_string()));
} catch (std::exception& e) {
elog("STD Exception while applied_irreversible_block ${e}", ("e", e.what()));
} catch (...) {
elog("Unknown exception while applied_irreversible_block");
}
}
void db_plugin_impl::consum_blocks() {
signed_block block;
size_t size = 0;
while (true) {
boost::mutex::scoped_lock lock(mtx);
while (queue.empty() && !done) {
condtion.wait(lock);
}
size = queue.size();
if (size > 0) {
block = queue.front();
queue.pop();
lock.unlock();
// warn if queue size greater than 75%
if (size > (queue_size * 0.75)) {
wlog("queue size: ${q}", ("q", size + 1));
try {
signed_block block;
size_t size = 0;
while (true) {
boost::mutex::scoped_lock lock(mtx);
while (queue.empty() && !done) {
condtion.wait(lock);
}
size = queue.size();
if (size > 0) {
block = queue.front();
queue.pop();
lock.unlock();
// warn if queue size greater than 75%
if (size > (queue_size * 0.75)) {
wlog("queue size: ${q}", ("q", size + 1));
} else if (done) {
ilog("draining queue, size: ${q}", ("q", size + 1));
}
process_irreversible_block(block);
continue;
} else if (done) {
ilog("draining queue, size: ${q}", ("q", size + 1));
break;
}
process_irreversible_block(block);
continue;
} else if (done) {
break;
}
ilog("db_plugin consum thread shutdown gracefully");
} catch (fc::exception& e) {
elog("FC Exception while consuming block ${e}", ("e", e.to_string()));
} catch (std::exception& e) {
elog("STD Exception while consuming block ${e}", ("e", e.what()));
} catch (...) {
elog("Unknown exception while consuming block");
}
ilog("db_plugin consum thread shutdown gracefully");
}
namespace {
......@@ -575,7 +591,7 @@ void db_plugin_impl::init() {
////////////
db_plugin::db_plugin()
:my(nullptr)
:my(new db_plugin_impl)
{
}
......@@ -600,21 +616,17 @@ void db_plugin::set_program_options(options_description& cli, options_descriptio
void db_plugin::wipe_database() {
#ifdef MONGODB
if (my) {
if (!my->startup) {
elog("ERROR: db_plugin::wipe_database() called before configuration or after startup. Ignoring.");
} else {
my->wipe_database_on_startup = true;
}
if (!my->startup) {
elog("ERROR: db_plugin::wipe_database() called before configuration or after startup. Ignoring.");
} else {
my->wipe_database_on_startup = true;
}
#endif
}
void db_plugin::applied_irreversible_block(const signed_block& block) {
#ifdef MONGODB
if (my) {
my->applied_irreversible_block(block);
}
my->applied_irreversible_block(block);
#endif
}
......@@ -623,8 +635,8 @@ void db_plugin::plugin_initialize(const variables_map& options)
{
#ifdef MONGODB
if (options.count("mongodb-uri")) {
my.reset(new db_plugin_impl);
ilog("initializing db plugin");
my->configured = true;
if (options.count("filter-on-accounts")) {
auto foa = options.at("filter-on-accounts").as<std::vector<std::string>>();
for (auto filter_account : foa)
......@@ -647,8 +659,8 @@ void db_plugin::plugin_initialize(const variables_map& options)
}
my->init();
} else {
ilog("eos::db_plugin configured, but no --mongodb-uri specified.");
ilog("db_plugin disabled.");
wlog("eos::db_plugin configured, but no --mongodb-uri specified.");
wlog("db_plugin disabled.");
}
#endif
}
......@@ -656,7 +668,7 @@ void db_plugin::plugin_initialize(const variables_map& options)
void db_plugin::plugin_startup()
{
#ifdef MONGODB
if (my) {
if (my->configured) {
ilog("starting db plugin");
my->consum_thread = boost::thread([this] { my->consum_blocks(); });
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册