提交 a6f7ef6a 编写于 作者: K Kevin Heifner

Messages now pushed to mongodb #172

上级 4446b3c1
......@@ -83,6 +83,7 @@ public:
static const std::string db_name;
static const std::string blocks_col;
static const std::string trans_col;
static const std::string msgs_col;
};
const AccountName db_plugin_impl::NEW_ACCOUNT = "newaccount";
......@@ -95,6 +96,7 @@ const PermissionName db_plugin_impl::RECOVERY = "recovery";
const std::string db_plugin_impl::db_name = "EOS";
const std::string db_plugin_impl::blocks_col = "Blocks";
const std::string db_plugin_impl::trans_col = "Transactions";
const std::string db_plugin_impl::msgs_col = "Messages";
void db_plugin_impl::applied_irreversible_block(const signed_block& block) {
......@@ -114,6 +116,7 @@ void db_plugin_impl::consum_blocks() {
signed_block block;
while (!done) {
while (queue->pop(block)) {
ilog("queue size: ${q}", ("q", queue->read_available()+1));
process_irreversible_block(block);
}
}
......@@ -137,34 +140,39 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
using namespace bsoncxx::builder;
bool transactions_in_block = false;
mongocxx::bulk_write bulk;
mongocxx::options::bulk_write bulk_opts;
bulk_opts.ordered(false);
mongocxx::bulk_write bulk_trans{bulk_opts};
auto blocks = mongo_conn[db_name][blocks_col];
auto trans = mongo_conn[db_name][trans_col];
stream::document doc{};
auto blocks = mongo_conn[db_name][blocks_col]; // Blocks
auto trans = mongo_conn[db_name][trans_col]; // Transactions
auto msgs = mongo_conn[db_name][msgs_col]; // Messages
stream::document block_doc{};
const auto block_id = block.id();
const auto block_id_str = block_id.str();
doc << "block_num" << b_int32{static_cast<int32_t>(block.block_num())}
block_doc << "block_num" << b_int32{static_cast<int32_t>(block.block_num())}
<< "block_id" << block_id_str
<< "prev_block_id" << block.previous.str()
<< "timestamp" << b_date{std::chrono::milliseconds{std::chrono::seconds{block.timestamp.sec_since_epoch()}}}
<< "transaction_merkle_root" << block.transaction_merkle_root.str()
<< "producer_account_id" << block.producer.toString();
if (!blocks.insert_one(doc.view())) {
if (!blocks.insert_one(block_doc.view())) {
elog("Failed to insert block ${bid}", ("bid", block_id));
}
const bool check_relevance = filter_on.size() > 0;
const bool check_relevance = !filter_on.empty();
for (const auto& cycle : block.cycles) {
for (const auto& thread : cycle) {
for (const auto& trx : thread.user_input) {
if (check_relevance && !is_scope_relevant(trx.scope))
continue;
doc.clear();
stream::document doc{};
const auto trans_id_str = trx.id().str();
auto trx_doc = doc
<< "transaction_id" << trx.id().str()
<< "transaction_id" << trans_id_str
<< "block_id" << block_id_str
<< "ref_block_num" << b_int32{static_cast<int32_t >(trx.refBlockNum)}
<< "ref_block_prefix" << trx.refBlockPrefix.str()
......@@ -183,27 +191,45 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
for (const auto& sig : trx.signatures) {
trx_doc = trx_doc << fc::json::to_string(sig);
}
auto complete_doc = trx_doc
trx_doc = trx_doc
<< stream::close_array
<< "messages" << stream::open_array
<< stream::close_array << stream::finalize;
<< "messages" << stream::open_array;
mongocxx::bulk_write bulk_msgs{bulk_opts};
int32_t i = 0;
for (const chain::Message& msg : trx.messages) {
auto msg_oid = bsoncxx::oid{};
trx_doc = trx_doc << msg_oid; // add to transaction.messages array
stream::document msg_builder{};
auto msg_doc = msg_builder
<< "_id" << b_oid{msg_oid}
<< "message_id" << b_int32{i}
<< "transaction_id" << trans_id_str
<< "authorization" << stream::open_array;
for (const auto& auth : msg.authorization) {
msg_doc = msg_doc << stream::open_document
<< "account" << auth.account.toString()
<< "permission" << auth.permission.toString()
<< stream::close_document;
}
auto msg_complete = msg_doc << stream::close_array << stream::finalize;
mongocxx::model::insert_one insert_msg{msg_complete.view()};
bulk_msgs.append(insert_msg);
++i;
}
if (!trx.messages.empty()) {
auto result = msgs.bulk_write(bulk_msgs);
if (!result) {
elog("Bulk message insert failed for block: ${bid}, transaction: ${trx}", ("bid", block_id)("trx", trx.id()));
}
}
auto complete_doc = trx_doc << stream::close_array << stream::finalize;
mongocxx::model::insert_one insert_op{complete_doc.view()};
transactions_in_block = true;
bulk.append(insert_op);
// db.create<transaction_history_object>([&block_id,&trx](transaction_history_object& transaction_history) {
// transaction_history.block_id = block_id;
// transaction_history.transaction_id = trx.id();
// });
bulk_trans.append(insert_op);
for (const auto& account_name : trx.scope)
{
// db.create<account_transaction_history_object>([&trx,&account_name](account_transaction_history_object& account_transaction_history) {
// account_transaction_history.account_name = account_name;
// account_transaction_history.transaction_id = trx.id();
// });
}
for (const chain::Message& msg : trx.messages)
{
......@@ -244,7 +270,7 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
if (transactions_in_block) {
auto result = trans.bulk_write(bulk);
auto result = trans.bulk_write(bulk_trans);
if (!result) {
elog("Bulk transaction insert failed for block: ${bid}", ("bid", block_id));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册