提交 4b9e7a13 编写于 作者: K Kevin Heifner

Added boost lockfree queue and consum thread #172

上级 a727f57d
......@@ -15,6 +15,8 @@
#include <boost/range/adaptors.hpp>
#include <boost/range/algorithm.hpp>
#include <boost/range/algorithm_ext.hpp>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/spsc_queue.hpp>
namespace fc { class variant; }
......@@ -32,19 +34,15 @@ using namespace boost::multi_index;
class db_plugin_impl {
public:
void applied_irreversible_block(const signed_block&);
void process_irreversible_block(const signed_block&);
chain_plugin* chain_plug;
std::set<AccountName> filter_on;
std::unique_ptr<boost::lockfree::spsc_queue<signed_block>> queue;
boost::thread consum_thread;
boost::atomic<bool> startup{true};
boost::atomic<bool> done{false};
private:
struct block_comp
{
bool operator()(const block_id_type& a, const block_id_type& b) const
{
return chain::block_header::num_from_id(a) > chain::block_header::num_from_id(b);
}
};
typedef std::multimap<block_id_type, transaction_id_type, block_comp> block_transaction_id_map;
void consum_blocks();
bool is_scope_relevant(const eos::types::Vector<AccountName>& scope);
static void add(chainbase::database& db, const vector<types::KeyPermissionWeight>& keys, const AccountName& account_name, const PermissionName& permission);
......@@ -77,7 +75,32 @@ const PermissionName db_plugin_impl::OWNER = "owner";
const PermissionName db_plugin_impl::ACTIVE = "active";
const PermissionName db_plugin_impl::RECOVERY = "recovery";
void db_plugin_impl::applied_irreversible_block(const signed_block& block)
void db_plugin_impl::applied_irreversible_block(const signed_block& block) {
if (startup) {
// on startup we don't want to queue, insteas push back on caller
process_irreversible_block(block);
} else {
if (!queue->push(block)) {
// TODO what to do if full
elog("queue is full!!!!!");
}
}
}
void db_plugin_impl::consum_blocks() {
signed_block block;
while (!done) {
while (queue->pop(block)) {
process_irreversible_block(block);
}
}
while (queue->pop(block)) {
process_irreversible_block(block);
}
ilog("db_plugin consum thread shutdown gracefully");
}
void db_plugin_impl::process_irreversible_block(const signed_block& block)
{
const auto block_id = block.id();
ilog("block ${bid}", ("bid", block_id));
......@@ -193,6 +216,8 @@ void db_plugin::set_program_options(options_description& cli, options_descriptio
cfg.add_options()
("filter_on_accounts,f", bpo::value<vector<string>>()->composing(),
"Track only transactions whose scopes involve the listed accounts. Default is to track all transactions.")
("queue_size,q", bpo::value<uint>()->default_value(1024),
"The block queue size")
;
}
......@@ -215,6 +240,9 @@ void db_plugin::plugin_initialize(const variables_map& options)
auto foa = options.at("filter_on_accounts").as<vector<string>>();
for(auto filter_account : foa)
my->filter_on.emplace(filter_account);
} else if (options.count("queue_size")) {
auto size = options.at("queue_size").as<uint>();
my->queue = std::make_unique<boost::lockfree::spsc_queue<signed_block>>(size);
}
}
......@@ -223,17 +251,23 @@ void db_plugin::plugin_startup()
ilog("starting db plugin");
// TODO: during chain startup we want to pushback on apply so that chainbase has to wait for db.
// TODO: assert that last irreversible in db is one less than received (on startup only?, every so often?)
my->chain_plug = app().find_plugin<chain_plugin>();
// my->chain_plug = app().find_plugin<chain_plugin>();
// auto& db = my->chain_plug->chain().get_mutable_database();
// db.add_index<account_control_history_multi_index>();
// db.add_index<account_transaction_history_multi_index>();
// db.add_index<public_key_history_multi_index>();
// db.add_index<transaction_history_multi_index>();
my->consum_thread = boost::thread([this]{ my->consum_blocks(); });
// chain_controller is created and has resynced or replayed if needed
my->startup = false;
}
void db_plugin::plugin_shutdown()
{
my->done = true;
my->consum_thread.join();
}
} // namespace eos
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册