From a6f7ef6a6e3f84d8dee627a16ff79344e1750a0b Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 4 Oct 2017 11:31:02 -0500 Subject: [PATCH] Messages now pushed to mongodb #172 --- plugins/db_plugin/db_plugin.cpp | 78 ++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/plugins/db_plugin/db_plugin.cpp b/plugins/db_plugin/db_plugin.cpp index bec31f0cb..dcddd34ad 100644 --- a/plugins/db_plugin/db_plugin.cpp +++ b/plugins/db_plugin/db_plugin.cpp @@ -83,6 +83,7 @@ public: static const std::string db_name; static const std::string blocks_col; static const std::string trans_col; + static const std::string msgs_col; }; const AccountName db_plugin_impl::NEW_ACCOUNT = "newaccount"; @@ -95,6 +96,7 @@ const PermissionName db_plugin_impl::RECOVERY = "recovery"; const std::string db_plugin_impl::db_name = "EOS"; const std::string db_plugin_impl::blocks_col = "Blocks"; const std::string db_plugin_impl::trans_col = "Transactions"; +const std::string db_plugin_impl::msgs_col = "Messages"; void db_plugin_impl::applied_irreversible_block(const signed_block& block) { @@ -114,6 +116,7 @@ void db_plugin_impl::consum_blocks() { signed_block block; while (!done) { while (queue->pop(block)) { + ilog("queue size: ${q}", ("q", queue->read_available()+1)); process_irreversible_block(block); } } @@ -137,34 +140,39 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block) using namespace bsoncxx::builder; bool transactions_in_block = false; - mongocxx::bulk_write bulk; + mongocxx::options::bulk_write bulk_opts; + bulk_opts.ordered(false); + mongocxx::bulk_write bulk_trans{bulk_opts}; - auto blocks = mongo_conn[db_name][blocks_col]; - auto trans = mongo_conn[db_name][trans_col]; - stream::document doc{}; + auto blocks = mongo_conn[db_name][blocks_col]; // Blocks + auto trans = mongo_conn[db_name][trans_col]; // Transactions + auto msgs = mongo_conn[db_name][msgs_col]; // Messages + stream::document block_doc{}; const auto block_id = block.id(); const auto block_id_str = block_id.str(); - doc << "block_num" << b_int32{static_cast(block.block_num())} + + block_doc << "block_num" << b_int32{static_cast(block.block_num())} << "block_id" << block_id_str << "prev_block_id" << block.previous.str() << "timestamp" << b_date{std::chrono::milliseconds{std::chrono::seconds{block.timestamp.sec_since_epoch()}}} << "transaction_merkle_root" << block.transaction_merkle_root.str() << "producer_account_id" << block.producer.toString(); - if (!blocks.insert_one(doc.view())) { + if (!blocks.insert_one(block_doc.view())) { elog("Failed to insert block ${bid}", ("bid", block_id)); } - const bool check_relevance = filter_on.size() > 0; + const bool check_relevance = !filter_on.empty(); for (const auto& cycle : block.cycles) { for (const auto& thread : cycle) { for (const auto& trx : thread.user_input) { if (check_relevance && !is_scope_relevant(trx.scope)) continue; - doc.clear(); + stream::document doc{}; + const auto trans_id_str = trx.id().str(); auto trx_doc = doc - << "transaction_id" << trx.id().str() + << "transaction_id" << trans_id_str << "block_id" << block_id_str << "ref_block_num" << b_int32{static_cast(trx.refBlockNum)} << "ref_block_prefix" << trx.refBlockPrefix.str() @@ -183,27 +191,45 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block) for (const auto& sig : trx.signatures) { trx_doc = trx_doc << fc::json::to_string(sig); } - auto complete_doc = trx_doc + trx_doc = trx_doc << stream::close_array - << "messages" << stream::open_array - << stream::close_array << stream::finalize; + << "messages" << stream::open_array; + + mongocxx::bulk_write bulk_msgs{bulk_opts}; + int32_t i = 0; + for (const chain::Message& msg : trx.messages) { + auto msg_oid = bsoncxx::oid{}; + trx_doc = trx_doc << msg_oid; // add to transaction.messages array + stream::document msg_builder{}; + auto msg_doc = msg_builder + << "_id" << b_oid{msg_oid} + << "message_id" << b_int32{i} + << "transaction_id" << trans_id_str + << "authorization" << stream::open_array; + for (const auto& auth : msg.authorization) { + msg_doc = msg_doc << stream::open_document + << "account" << auth.account.toString() + << "permission" << auth.permission.toString() + << stream::close_document; + } + auto msg_complete = msg_doc << stream::close_array << stream::finalize; + mongocxx::model::insert_one insert_msg{msg_complete.view()}; + bulk_msgs.append(insert_msg); + ++i; + } + if (!trx.messages.empty()) { + auto result = msgs.bulk_write(bulk_msgs); + if (!result) { + elog("Bulk message insert failed for block: ${bid}, transaction: ${trx}", ("bid", block_id)("trx", trx.id())); + } + } + + auto complete_doc = trx_doc << stream::close_array << stream::finalize; mongocxx::model::insert_one insert_op{complete_doc.view()}; transactions_in_block = true; - bulk.append(insert_op); - -// db.create([&block_id,&trx](transaction_history_object& transaction_history) { -// transaction_history.block_id = block_id; -// transaction_history.transaction_id = trx.id(); -// }); + bulk_trans.append(insert_op); - for (const auto& account_name : trx.scope) - { -// db.create([&trx,&account_name](account_transaction_history_object& account_transaction_history) { -// account_transaction_history.account_name = account_name; -// account_transaction_history.transaction_id = trx.id(); -// }); - } for (const chain::Message& msg : trx.messages) { @@ -244,7 +270,7 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block) if (transactions_in_block) { - auto result = trans.bulk_write(bulk); + auto result = trans.bulk_write(bulk_trans); if (!result) { elog("Bulk transaction insert failed for block: ${bid}", ("bid", block_id)); } -- GitLab