提交 4446b3c1 编写于 作者: K Kevin Heifner

Transactions now pushed to mongodb #172

上级 a5050279
......@@ -33,10 +33,7 @@ using chain::block_id_type;
using chain::PermissionName;
using chain::ProcessedTransaction;
using chain::signed_block;
using boost::multi_index_container;
using chain::transaction_id_type;
using namespace boost::multi_index;
using namespace std;
class db_plugin_impl {
public:
......@@ -84,7 +81,8 @@ public:
static const PermissionName RECOVERY;
static const std::string db_name;
static const std::string block_col;
static const std::string blocks_col;
static const std::string trans_col;
};
const AccountName db_plugin_impl::NEW_ACCOUNT = "newaccount";
......@@ -95,7 +93,8 @@ const PermissionName db_plugin_impl::ACTIVE = "active";
const PermissionName db_plugin_impl::RECOVERY = "recovery";
const std::string db_plugin_impl::db_name = "EOS";
const std::string db_plugin_impl::block_col = "block";
const std::string db_plugin_impl::blocks_col = "Blocks";
const std::string db_plugin_impl::trans_col = "Transactions";
void db_plugin_impl::applied_irreversible_block(const signed_block& block) {
......@@ -124,31 +123,74 @@ void db_plugin_impl::consum_blocks() {
ilog("db_plugin consum thread shutdown gracefully");
}
//
// Blocks
// block_num int
// block_id sha256
// prev_block_id sha256
// timestamp sec_since_epoch
// transaction_merkle_root sha256
// producer_account_id string
void db_plugin_impl::process_irreversible_block(const signed_block& block)
{
auto blocks = mongo_conn[db_name][block_col];
bsoncxx::builder::stream::document doc{};
using namespace bsoncxx::types;
using namespace bsoncxx::builder;
bool transactions_in_block = false;
mongocxx::bulk_write bulk;
auto blocks = mongo_conn[db_name][blocks_col];
auto trans = mongo_conn[db_name][trans_col];
stream::document doc{};
const auto block_id = block.id();
ilog("block ${bid}", ("bid", block_id));
// auto& db = chain_plug->chain().get_mutable_database();
const bool check_relevance = filter_on.size();
for (const auto& cycle : block.cycles)
{
for (const auto& thread : cycle)
{
for (const auto& trx : thread.user_input)
{
const auto block_id_str = block_id.str();
doc << "block_num" << b_int32{static_cast<int32_t>(block.block_num())}
<< "block_id" << block_id_str
<< "prev_block_id" << block.previous.str()
<< "timestamp" << b_date{std::chrono::milliseconds{std::chrono::seconds{block.timestamp.sec_since_epoch()}}}
<< "transaction_merkle_root" << block.transaction_merkle_root.str()
<< "producer_account_id" << block.producer.toString();
if (!blocks.insert_one(doc.view())) {
elog("Failed to insert block ${bid}", ("bid", block_id));
}
const bool check_relevance = filter_on.size() > 0;
for (const auto& cycle : block.cycles) {
for (const auto& thread : cycle) {
for (const auto& trx : thread.user_input) {
if (check_relevance && !is_scope_relevant(trx.scope))
continue;
ilog("block ${bid} : ${tid}", ("bid", block_id)("tid", trx.id()));
doc.clear();
doc << "block_id" << block_id.str()
<< "txn_id" << trx.id().str();
blocks.insert_one(doc.view());
auto trx_doc = doc
<< "transaction_id" << trx.id().str()
<< "block_id" << block_id_str
<< "ref_block_num" << b_int32{static_cast<int32_t >(trx.refBlockNum)}
<< "ref_block_prefix" << trx.refBlockPrefix.str()
<< "scope" << stream::open_array;
for (const auto& account_name : trx.scope)
trx_doc = trx_doc << account_name.toString();
trx_doc = trx_doc
<< stream::close_array
<< "read_scope" << stream::open_array;
for (const auto& account_name : trx.readscope)
trx_doc = trx_doc << account_name.toString();
trx_doc = trx_doc
<< stream::close_array
<< "expiration" << b_date{std::chrono::milliseconds{std::chrono::seconds{trx.expiration.sec_since_epoch()}}}
<< "signatures" << stream::open_array;
for (const auto& sig : trx.signatures) {
trx_doc = trx_doc << fc::json::to_string(sig);
}
auto complete_doc = trx_doc
<< stream::close_array
<< "messages" << stream::open_array
<< stream::close_array << stream::finalize;
mongocxx::model::insert_one insert_op{complete_doc.view()};
transactions_in_block = true;
bulk.append(insert_op);
// db.create<transaction_history_object>([&block_id,&trx](transaction_history_object& transaction_history) {
// transaction_history.block_id = block_id;
......@@ -199,6 +241,15 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
}
}
}
if (transactions_in_block) {
auto result = trans.bulk_write(bulk);
if (!result) {
elog("Bulk transaction insert failed for block: ${bid}", ("bid", block_id));
}
}
}
void db_plugin_impl::add(chainbase::database& db, const vector<types::KeyPermissionWeight>& keys, const AccountName& account_name, const PermissionName& permission)
......@@ -270,7 +321,7 @@ db_plugin::~db_plugin()
void db_plugin::set_program_options(options_description& cli, options_description& cfg)
{
cfg.add_options()
("filter_on_accounts,f", bpo::value<vector<string>>()->composing(),
("filter_on_accounts,f", bpo::value<std::vector<std::string>>()->composing(),
"Track only transactions whose scopes involve the listed accounts. Default is to track all transactions.")
("queue_size,q", bpo::value<uint>()->default_value(1024),
"The block queue size")
......@@ -294,7 +345,7 @@ void db_plugin::plugin_initialize(const variables_map& options)
{
ilog("initializing db plugin");
if(options.count("filter_on_accounts")) {
auto foa = options.at("filter_on_accounts").as<vector<string>>();
auto foa = options.at("filter_on_accounts").as<std::vector<std::string>>();
for(auto filter_account : foa)
my->filter_on.emplace(filter_account);
} else if (options.count("queue_size")) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册