提交 3962475a 编写于 作者: K Kevin Heifner

-Add ABI for user types via setcode #172

-Add MongodB URI command line option
-Add exception handling of process_irreversible_block
-Add verification of last block on startup
上级 bb873659
#include <eos/db_plugin/db_plugin.hpp>
#include <eos/chain/chain_controller.hpp>
#include <eos/chain/config.hpp>
#include <eos/chain/exceptions.hpp>
#include <eos/chain/transaction.hpp>
#include <eos/chain/types.hpp>
#include <eos/native_contract/native_contract_chain_initializer.hpp>
#include <fc/crypto/sha256.hpp>
#include <fc/io/json.hpp>
#include <fc/variant.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <eos/chain/multi_index_includes.hpp>
#include <boost/range/adaptors.hpp>
#include <boost/range/algorithm.hpp>
#include <boost/range/algorithm_ext.hpp>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <bsoncxx/builder/basic/kvp.hpp>
#include <bsoncxx/builder/basic/document.hpp>
#include <bsoncxx/builder/stream/document.hpp>
#include <bsoncxx/json.hpp>
......@@ -45,18 +38,22 @@ public:
void applied_irreversible_block(const signed_block&);
void process_irreversible_block(const signed_block&);
void _process_irreversible_block(const signed_block&);
void init();
void wipe_database();
std::set<AccountName> filter_on;
types::Abi eos_abi; // cached for common use
static types::Abi eos_abi; // cached for common use
bool configured{false};
bool wipe_database_on_startup{false};
mongocxx::instance mongo_inst;
mongocxx::client mongo_conn;
mongocxx::collection accounts;
size_t queue_size = 0;
size_t processed = 0;
std::unique_ptr<boost::lockfree::spsc_queue<signed_block>> queue;
boost::thread consum_thread;
boost::atomic<bool> startup{true};
......@@ -72,6 +69,7 @@ public:
static const FuncName lock;
static const FuncName unlock;
static const FuncName claim;
static const FuncName setcode;
static const std::string db_name;
static const std::string blocks_col;
......@@ -80,11 +78,14 @@ public:
static const std::string accounts_col;
};
types::Abi db_plugin_impl::eos_abi;
const FuncName db_plugin_impl::newaccount = "newaccount";
const FuncName db_plugin_impl::transfer = "transfer";
const FuncName db_plugin_impl::lock = "lock";
const FuncName db_plugin_impl::unlock = "unlock";
const FuncName db_plugin_impl::claim = "claim";
const FuncName db_plugin_impl::setcode = "setcode";
const std::string db_plugin_impl::db_name = "EOS";
const std::string db_plugin_impl::blocks_col = "Blocks";
......@@ -110,7 +111,11 @@ void db_plugin_impl::consum_blocks() {
signed_block block;
while (!done) {
while (queue->pop(block)) {
ilog("queue size: ${q}", ("q", queue->read_available()+1));
auto available = queue->read_available();
// warn if queue size greater than 75%
if (available > (queue_size * 0.75)) {
wlog("queue size: ${q}", ("q", available + 1));
}
process_irreversible_block(block);
}
}
......@@ -120,18 +125,82 @@ void db_plugin_impl::consum_blocks() {
ilog("db_plugin consum thread shutdown gracefully");
}
//
// Blocks
// block_num int32
// block_id sha256
// prev_block_id sha256
// timestamp sec_since_epoch
// transaction_merkle_root sha256
// producer_account_id string
void db_plugin_impl::process_irreversible_block(const signed_block& block)
namespace {
auto find_account(mongocxx::collection& accounts, const AccountName& name) {
using bsoncxx::builder::stream::document;
document find_acc{};
find_acc << "name" << name.toString();
auto account = accounts.find_one(find_acc.view());
if (!account) {
FC_THROW("Unable to find account ${n}", ("n", name));
}
return *account;
}
void add_data(bsoncxx::builder::basic::document& msg_doc,
mongocxx::collection& accounts,
const chain::Message& msg)
{
using bsoncxx::builder::basic::kvp;
types::AbiSerializer abis;
if (msg.code == config::EosContractName) {
abis.setAbi(db_plugin_impl::eos_abi);
} else {
auto from_account = find_account(accounts, msg.code);
auto abi = fc::json::from_string(from_account.view()["abi"].get_utf8().value.to_string()).as<types::Abi>();
abis.setAbi(abi);
}
try {
auto v = abis.binaryToVariant(abis.getActionType(msg.type), msg.data);
auto json = fc::json::to_string(v);
try {
const auto& value = bsoncxx::from_json(json);
msg_doc.append(kvp("data", value));
return;
} catch (std::exception& e) {
elog("Unable to convert EOS JSON to MongoDB JSON: ${e}", ("e", e.what()));
elog(" EOS JSON: ${j}", ("j", json));
}
} catch (fc::exception& e) {
elog("Unable to convert Message.data to ABI type: ${t}, what: ${e}", ("t", msg.type)("e", e.to_string()));
}
// if anything went wrong just store raw hex_data
msg_doc.append(kvp("hex_data", fc::variant(msg.data).as_string()));
}
void verify_last_block(mongocxx::collection& blocks, const std::string& prev_block_id) {
mongocxx::options::find opts;
opts.sort(bsoncxx::from_json(R"xxx({ "_id" : -1 })xxx"));
auto last_block = blocks.find_one({}, opts);
if (!last_block) {
FC_THROW("No blocks found in database");
}
const auto id = last_block->view()["block_id"].get_utf8().value.to_string();
if (id != prev_block_id) {
FC_THROW("Did not find expected block ${pid}, instead found ${id}", ("pid", prev_block_id)("id", id));
}
}
}
void db_plugin_impl::process_irreversible_block(const signed_block& block) {
try {
_process_irreversible_block(block);
} catch (fc::exception& e) {
elog("FC Exception while processing block ${e}", ("e", e.to_string()));
} catch (std::exception& e) {
elog("STD Exception while processing block ${e}", ("e", e.what()));
} catch (...) {
elog("Unknown exception while processing block");
}
}
void db_plugin_impl::_process_irreversible_block(const signed_block& block)
{
using namespace bsoncxx::types;
using namespace bsoncxx::builder;
using bsoncxx::builder::basic::kvp;
bool transactions_in_block = false;
mongocxx::options::bulk_write bulk_opts;
......@@ -145,10 +214,15 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
stream::document block_doc{};
const auto block_id = block.id();
const auto block_id_str = block_id.str();
const auto prev_block_id_str = block.previous.str();
// verify on restart we have previous block
if (processed == 0 && !startup)
verify_last_block(blocks, prev_block_id_str);
block_doc << "block_num" << b_int32{static_cast<int32_t>(block.block_num())}
<< "block_id" << block_id_str
<< "prev_block_id" << block.previous.str()
<< "prev_block_id" << prev_block_id_str
<< "timestamp" << b_date{std::chrono::milliseconds{std::chrono::seconds{block.timestamp.sec_since_epoch()}}}
<< "transaction_merkle_root" << block.transaction_merkle_root.str()
<< "producer_account_id" << block.producer.toString();
......@@ -186,7 +260,7 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
<< "expiration" << b_date{std::chrono::milliseconds{std::chrono::seconds{trx.expiration.sec_since_epoch()}}}
<< "signatures" << stream::open_array;
for (const auto& sig : trx.signatures) {
trx_doc = trx_doc << fc::json::to_string(sig);
trx_doc = trx_doc << fc::variant(sig).as_string();
}
trx_doc = trx_doc
<< stream::close_array
......@@ -197,44 +271,23 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
for (const chain::Message& msg : trx.messages) {
auto msg_oid = bsoncxx::oid{};
trx_doc = trx_doc << msg_oid; // add to transaction.messages array
stream::document msg_builder{};
auto msg_doc = msg_builder
<< "_id" << b_oid{msg_oid}
<< "message_id" << b_int32{i}
<< "transaction_id" << trans_id_str
<< "authorization" << stream::open_array;
for (const auto& auth : msg.authorization) {
msg_doc = msg_doc << stream::open_document
<< "account" << auth.account.toString()
<< "permission" << auth.permission.toString()
<< stream::close_document;
}
auto msg_next_doc = msg_doc
<< stream::close_array
<< "type" << msg.type.toString();
bool data_added = false;
if (msg.code == config::EosContractName) {
types::AbiSerializer abis;
abis.setAbi(eos_abi);
try {
auto v = abis.binaryToVariant(abis.getActionType(msg.type), msg.data);
auto json = fc::json::to_string(v);
try {
const auto& value = bsoncxx::from_json(json);
msg_next_doc = msg_next_doc << "data" << value;
data_added = true;
} catch(std::exception& e) {
elog("Unable to convert EOS JSON to MongoDB JSON: ${e}", ("e", e.what()));
elog(" EOS JSON: ${j}", ("j", json));
}
} catch(fc::exception& e) {
elog("Unable to convert Message.data to ABI type: ${t}, what: ${e}", ("t", msg.type)("e", e.what()));
auto msg_doc = bsoncxx::builder::basic::document{};
msg_doc.append(kvp("_id", b_oid{msg_oid}),
kvp("message_id", b_int32{i}),
kvp("transaction_id", trans_id_str));
msg_doc.append(kvp("authorization", [&msg](bsoncxx::builder::basic::sub_array subarr) {
for (const auto& auth : msg.authorization) {
subarr.append([&auth](bsoncxx::builder::basic::sub_document subdoc) {
subdoc.append(kvp("account", auth.account.toString()),
kvp("permission", auth.permission.toString()));
});
}
}
if (!data_added)
msg_next_doc = msg_next_doc << "hex_data" << fc::variant(msg.data).as_string();
auto msg_complete = msg_next_doc << stream::finalize;
mongocxx::model::insert_one insert_msg{msg_complete.view()};
}));
msg_doc.append(kvp("handler_account_name", msg.code.toString()));
msg_doc.append(kvp("type", msg.type.toString()));
add_data(msg_doc, accounts, msg);
mongocxx::model::insert_one insert_msg{msg_doc.view()};
bulk_msgs.append(insert_msg);
// eos account update
......@@ -242,7 +295,7 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
try {
update_account(msg);
} catch(fc::exception& e) {
elog("Unable to update account ${e}", ("e", e.what()));
elog("Unable to update account ${e}", ("e", e.to_string()));
}
}
......@@ -264,8 +317,7 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
}
}
}
if (transactions_in_block) {
auto result = trans.bulk_write(bulk_trans);
if (!result) {
......@@ -273,22 +325,9 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
}
}
++processed;
}
namespace {
auto find_account(mongocxx::collection& accounts, const AccountName& name) {
using bsoncxx::builder::stream::document;
document find_acc{};
find_acc << "name" << name.toString();
auto account = accounts.find_one(find_acc.view());
if (!account) {
FC_THROW("Unable to find account ${n}", ("n", name));
}
return *account;
}
}
void db_plugin_impl::update_account(const chain::Message& msg) {
using bsoncxx::builder::basic::kvp;
......@@ -305,7 +344,6 @@ void db_plugin_impl::update_account(const chain::Message& msg) {
auto transfer = msg.as<types::transfer>();
auto from_name = transfer.from.toString();
auto to_name = transfer.to.toString();
auto accounts = mongo_conn[db_name][accounts_col]; // Accounts
auto from_account = find_account(accounts, transfer.from);
auto to_account = find_account(accounts, transfer.to);
......@@ -324,7 +362,6 @@ void db_plugin_impl::update_account(const chain::Message& msg) {
} else if (msg.type == newaccount) {
auto newaccount = msg.as<types::newaccount>();
auto accounts = mongo_conn[db_name][accounts_col]; // Accounts
// find creator to update its balance
auto from_name = newaccount.creator.toString();
......@@ -355,7 +392,6 @@ void db_plugin_impl::update_account(const chain::Message& msg) {
} else if (msg.type == lock) {
auto lock = msg.as<types::lock>();
auto accounts = mongo_conn[db_name][accounts_col]; // Accounts
auto from_account = find_account(accounts, lock.from);
auto to_account = find_account(accounts, lock.to);
......@@ -374,7 +410,6 @@ void db_plugin_impl::update_account(const chain::Message& msg) {
} else if (msg.type == unlock) {
auto unlock = msg.as<types::unlock>();
auto accounts = mongo_conn[db_name][accounts_col]; // Accounts
auto from_account = find_account(accounts, unlock.account);
Asset unstack_balance = Asset::fromString(from_account.view()["unstacking_balance"].get_utf8().value.to_string());
......@@ -394,7 +429,6 @@ void db_plugin_impl::update_account(const chain::Message& msg) {
} else if (msg.type == claim) {
auto claim = msg.as<types::claim>();
auto accounts = mongo_conn[db_name][accounts_col]; // Accounts
auto from_account = find_account(accounts, claim.account);
Asset balance = Asset::fromString(from_account.view()["eos_balance"].get_utf8().value.to_string());
......@@ -409,6 +443,17 @@ void db_plugin_impl::update_account(const chain::Message& msg) {
<< close_document;
accounts.update_one(document{} << "_id" << from_account.view()["_id"].get_oid() << finalize, update_from.view());
} else if (msg.type == setcode) {
auto setcode = msg.as<types::setcode>();
auto from_account = find_account(accounts, setcode.account);
document update_from{};
update_from << "$set" << open_document
<< "abi" << fc::json::to_string(setcode.abi)
<< close_document;
accounts.update_one(document{} << "_id" << from_account.view()["_id"].get_oid() << finalize, update_from.view());
}
}
......@@ -447,7 +492,7 @@ void db_plugin_impl::init() {
eos_abi = native_contract::native_contract_chain_initializer::eos_contract_abi();
auto accounts = mongo_conn[db_name][accounts_col]; // Accounts
accounts = mongo_conn[db_name][accounts_col]; // Accounts
bsoncxx::builder::stream::document doc{};
if (accounts.count(doc.view()) == 0) {
doc << "name" << config::EosContractName.toString()
......@@ -478,9 +523,9 @@ void db_plugin_impl::init() {
}
}
//////////
////////////
// db_plugin
//////////
////////////
db_plugin::db_plugin()
:my(new db_plugin_impl)
......@@ -494,10 +539,12 @@ db_plugin::~db_plugin()
void db_plugin::set_program_options(options_description& cli, options_description& cfg)
{
cfg.add_options()
("filter_on_accounts,f", bpo::value<std::vector<std::string>>()->composing(),
("filter-on-accounts,f", bpo::value<std::vector<std::string>>()->composing(),
"Track only transactions whose scopes involve the listed accounts. Default is to track all transactions.")
("queue_size,q", bpo::value<uint>()->default_value(1024),
"The block queue size")
("queue-size,q", bpo::value<uint>()->default_value(256),
"The queue size between EOSd and MongoDB process thread.")
("mongodb-uri,m", bpo::value<std::string>()->default_value("mongodb://localhost:27017"),
"MongoDB URI connection string, see: https://docs.mongodb.com/master/reference/connection-string/")
;
}
......@@ -517,17 +564,19 @@ void db_plugin::applied_irreversible_block(const signed_block& block) {
void db_plugin::plugin_initialize(const variables_map& options)
{
ilog("initializing db plugin");
if(options.count("filter_on_accounts")) {
auto foa = options.at("filter_on_accounts").as<std::vector<std::string>>();
if(options.count("filter-on-accounts")) {
auto foa = options.at("filter-on-accounts").as<std::vector<std::string>>();
for(auto filter_account : foa)
my->filter_on.emplace(filter_account);
} else if (options.count("queue_size")) {
auto size = options.at("queue_size").as<uint>();
} else if (options.count("queue-size")) {
auto size = options.at("queue-size").as<uint>();
my->queue_size = size;
my->queue = std::make_unique<boost::lockfree::spsc_queue<signed_block>>(size);
}
// TODO: add command line for uri
my->mongo_conn = mongocxx::client{mongocxx::uri{}};
std::string uri = options.at("mongodb-uri").as<std::string>();
ilog("connecting to ${u}", ("u", uri));
my->mongo_conn = mongocxx::client{mongocxx::uri{uri}};
if (my->wipe_database_on_startup) {
my->wipe_database();
......
......@@ -9,7 +9,7 @@ namespace fc { class variant; }
namespace eos {
typedef std::shared_ptr<class db_plugin_impl> db_plugin_impl_ptr;
using db_plugin_impl_ptr = std::shared_ptr<class db_plugin_impl>;
class db_plugin : public plugin<db_plugin> {
public:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册