From 4446b3c19fd2ca67ad335c26695deebca33f6062 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 3 Oct 2017 13:41:00 -0500 Subject: [PATCH] Transactions now pushed to mongodb #172 --- plugins/db_plugin/db_plugin.cpp | 99 +++++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 24 deletions(-) diff --git a/plugins/db_plugin/db_plugin.cpp b/plugins/db_plugin/db_plugin.cpp index 7ee5c1d9a..bec31f0cb 100644 --- a/plugins/db_plugin/db_plugin.cpp +++ b/plugins/db_plugin/db_plugin.cpp @@ -33,10 +33,7 @@ using chain::block_id_type; using chain::PermissionName; using chain::ProcessedTransaction; using chain::signed_block; -using boost::multi_index_container; using chain::transaction_id_type; -using namespace boost::multi_index; -using namespace std; class db_plugin_impl { public: @@ -84,7 +81,8 @@ public: static const PermissionName RECOVERY; static const std::string db_name; - static const std::string block_col; + static const std::string blocks_col; + static const std::string trans_col; }; const AccountName db_plugin_impl::NEW_ACCOUNT = "newaccount"; @@ -95,7 +93,8 @@ const PermissionName db_plugin_impl::ACTIVE = "active"; const PermissionName db_plugin_impl::RECOVERY = "recovery"; const std::string db_plugin_impl::db_name = "EOS"; -const std::string db_plugin_impl::block_col = "block"; +const std::string db_plugin_impl::blocks_col = "Blocks"; +const std::string db_plugin_impl::trans_col = "Transactions"; void db_plugin_impl::applied_irreversible_block(const signed_block& block) { @@ -124,31 +123,74 @@ void db_plugin_impl::consum_blocks() { ilog("db_plugin consum thread shutdown gracefully"); } +// +// Blocks +// block_num int +// block_id sha256 +// prev_block_id sha256 +// timestamp sec_since_epoch +// transaction_merkle_root sha256 +// producer_account_id string void db_plugin_impl::process_irreversible_block(const signed_block& block) { - auto blocks = mongo_conn[db_name][block_col]; - bsoncxx::builder::stream::document doc{}; + using namespace bsoncxx::types; + using namespace bsoncxx::builder; + + bool transactions_in_block = false; + mongocxx::bulk_write bulk; + + auto blocks = mongo_conn[db_name][blocks_col]; + auto trans = mongo_conn[db_name][trans_col]; + stream::document doc{}; const auto block_id = block.id(); - ilog("block ${bid}", ("bid", block_id)); -// auto& db = chain_plug->chain().get_mutable_database(); - const bool check_relevance = filter_on.size(); - for (const auto& cycle : block.cycles) - { - for (const auto& thread : cycle) - { - for (const auto& trx : thread.user_input) - { + const auto block_id_str = block_id.str(); + doc << "block_num" << b_int32{static_cast(block.block_num())} + << "block_id" << block_id_str + << "prev_block_id" << block.previous.str() + << "timestamp" << b_date{std::chrono::milliseconds{std::chrono::seconds{block.timestamp.sec_since_epoch()}}} + << "transaction_merkle_root" << block.transaction_merkle_root.str() + << "producer_account_id" << block.producer.toString(); + if (!blocks.insert_one(doc.view())) { + elog("Failed to insert block ${bid}", ("bid", block_id)); + } + + const bool check_relevance = filter_on.size() > 0; + for (const auto& cycle : block.cycles) { + for (const auto& thread : cycle) { + for (const auto& trx : thread.user_input) { if (check_relevance && !is_scope_relevant(trx.scope)) continue; - ilog("block ${bid} : ${tid}", ("bid", block_id)("tid", trx.id())); - - doc.clear(); - doc << "block_id" << block_id.str() - << "txn_id" << trx.id().str(); - blocks.insert_one(doc.view()); + auto trx_doc = doc + << "transaction_id" << trx.id().str() + << "block_id" << block_id_str + << "ref_block_num" << b_int32{static_cast(trx.refBlockNum)} + << "ref_block_prefix" << trx.refBlockPrefix.str() + << "scope" << stream::open_array; + for (const auto& account_name : trx.scope) + trx_doc = trx_doc << account_name.toString(); + trx_doc = trx_doc + << stream::close_array + << "read_scope" << stream::open_array; + for (const auto& account_name : trx.readscope) + trx_doc = trx_doc << account_name.toString(); + trx_doc = trx_doc + << stream::close_array + << "expiration" << b_date{std::chrono::milliseconds{std::chrono::seconds{trx.expiration.sec_since_epoch()}}} + << "signatures" << stream::open_array; + for (const auto& sig : trx.signatures) { + trx_doc = trx_doc << fc::json::to_string(sig); + } + auto complete_doc = trx_doc + << stream::close_array + << "messages" << stream::open_array + << stream::close_array << stream::finalize; + + mongocxx::model::insert_one insert_op{complete_doc.view()}; + transactions_in_block = true; + bulk.append(insert_op); // db.create([&block_id,&trx](transaction_history_object& transaction_history) { // transaction_history.block_id = block_id; @@ -199,6 +241,15 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block) } } } + + + if (transactions_in_block) { + auto result = trans.bulk_write(bulk); + if (!result) { + elog("Bulk transaction insert failed for block: ${bid}", ("bid", block_id)); + } + } + } void db_plugin_impl::add(chainbase::database& db, const vector& keys, const AccountName& account_name, const PermissionName& permission) @@ -270,7 +321,7 @@ db_plugin::~db_plugin() void db_plugin::set_program_options(options_description& cli, options_description& cfg) { cfg.add_options() - ("filter_on_accounts,f", bpo::value>()->composing(), + ("filter_on_accounts,f", bpo::value>()->composing(), "Track only transactions whose scopes involve the listed accounts. Default is to track all transactions.") ("queue_size,q", bpo::value()->default_value(1024), "The block queue size") @@ -294,7 +345,7 @@ void db_plugin::plugin_initialize(const variables_map& options) { ilog("initializing db plugin"); if(options.count("filter_on_accounts")) { - auto foa = options.at("filter_on_accounts").as>(); + auto foa = options.at("filter_on_accounts").as>(); for(auto filter_account : foa) my->filter_on.emplace(filter_account); } else if (options.count("queue_size")) { -- GitLab