提交 a5050279 编写于 作者: K Kevin Heifner

Setup to write to mongodb #172

上级 4b9e7a13
......@@ -3,8 +3,36 @@ add_library( db_plugin
db_plugin.cpp
${HEADERS} )
target_link_libraries( db_plugin chain_plugin eos_chain appbase )
target_include_directories( db_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" )
find_package(libmongoc-1.0 1.8 REQUIRED)
message ("-- mongoc found version \"${MONGOC_VERSION}\"")
message ("-- mongoc include path \"${MONGOC_INCLUDE_DIRS}\"")
message ("-- mongoc libraries \"${MONGOC_LIBRARIES}\"")
find_package(PkgConfig QUIET)
# NOTE: For this to work, the PKG_CONFIG_PATH variable (man pkg-config) must be set to point to the
# 'lib/pkgconfig' subdirectory of the directory used as the argument to CMAKE_INSTALL_PREFIX when
# building libmongocxx and libbsoncxx.
pkg_search_module(BSONCXX REQUIRED libbsoncxx)
pkg_search_module(MONGOCXX REQUIRED libmongocxx)
link_directories(
${MONGOCXX_LIBRARY_DIRS}
${BSONCXX_LIBRARY_DIRS}
)
target_include_directories( db_plugin
PRIVATE ${MONGOCXX_INCLUDE_DIRS}
PRIVATE ${BSONCXX_INCLUDE_DIRS}
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include"
)
target_link_libraries( db_plugin
PUBLIC chain_plugin eos_chain appbase
PRIVATE ${MONGOCXX_LIBRARIES} ${BSONCXX_LIBRARIES}
)
install( TARGETS
db_plugin
......
......@@ -18,6 +18,12 @@
#include <boost/thread/thread.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <bsoncxx/builder/stream/document.hpp>
#include <bsoncxx/json.hpp>
#include <mongocxx/client.hpp>
#include <mongocxx/instance.hpp>
namespace fc { class variant; }
namespace eos {
......@@ -30,12 +36,21 @@ using chain::signed_block;
using boost::multi_index_container;
using chain::transaction_id_type;
using namespace boost::multi_index;
using namespace std;
class db_plugin_impl {
public:
db_plugin_impl();
~db_plugin_impl();
void applied_irreversible_block(const signed_block&);
void process_irreversible_block(const signed_block&);
void wipe_database();
bool wipe_database_on_startup{false};
mongocxx::instance mongo_inst;
mongocxx::client mongo_conn;
std::set<AccountName> filter_on;
std::unique_ptr<boost::lockfree::spsc_queue<signed_block>> queue;
boost::thread consum_thread;
......@@ -47,18 +62,18 @@ public:
bool is_scope_relevant(const eos::types::Vector<AccountName>& scope);
static void add(chainbase::database& db, const vector<types::KeyPermissionWeight>& keys, const AccountName& account_name, const PermissionName& permission);
template<typename MultiIndex, typename LookupType>
static void remove(chainbase::database& db, const AccountName& account_name, const PermissionName& permission)
{
const auto& idx = db.get_index<MultiIndex, LookupType>();
auto& mutatable_idx = db.get_mutable_index<MultiIndex>();
auto range = idx.equal_range( boost::make_tuple( account_name, permission ) );
for (auto acct_perm = range.first; acct_perm != range.second; ++acct_perm)
{
mutatable_idx.remove(*acct_perm);
}
}
// template<typename MultiIndex, typename LookupType>
// static void remove(chainbase::database& db, const AccountName& account_name, const PermissionName& permission)
// {
// const auto& idx = db.get_index<MultiIndex, LookupType>();
// auto& mutatable_idx = db.get_mutable_index<MultiIndex>();
// auto range = idx.equal_range( boost::make_tuple( account_name, permission ) );
//
// for (auto acct_perm = range.first; acct_perm != range.second; ++acct_perm)
// {
// mutatable_idx.remove(*acct_perm);
// }
// }
static void add(chainbase::database& db, const vector<types::AccountPermissionWeight>& controlling_accounts, const AccountName& account_name, const PermissionName& permission);
static const AccountName NEW_ACCOUNT;
......@@ -67,7 +82,11 @@ public:
static const PermissionName OWNER;
static const PermissionName ACTIVE;
static const PermissionName RECOVERY;
static const std::string db_name;
static const std::string block_col;
};
const AccountName db_plugin_impl::NEW_ACCOUNT = "newaccount";
const AccountName db_plugin_impl::UPDATE_AUTH = "updateauth";
const AccountName db_plugin_impl::DELETE_AUTH = "deleteauth";
......@@ -75,14 +94,19 @@ const PermissionName db_plugin_impl::OWNER = "owner";
const PermissionName db_plugin_impl::ACTIVE = "active";
const PermissionName db_plugin_impl::RECOVERY = "recovery";
const std::string db_plugin_impl::db_name = "EOS";
const std::string db_plugin_impl::block_col = "block";
void db_plugin_impl::applied_irreversible_block(const signed_block& block) {
if (startup) {
// on startup we don't want to queue, insteas push back on caller
// on startup we don't want to queue, instead push back on caller
process_irreversible_block(block);
} else {
if (!queue->push(block)) {
// TODO what to do if full
elog("queue is full!!!!!");
FC_ASSERT(false, "queue is full");
}
}
}
......@@ -102,6 +126,9 @@ void db_plugin_impl::consum_blocks() {
void db_plugin_impl::process_irreversible_block(const signed_block& block)
{
auto blocks = mongo_conn[db_name][block_col];
bsoncxx::builder::stream::document doc{};
const auto block_id = block.id();
ilog("block ${bid}", ("bid", block_id));
// auto& db = chain_plug->chain().get_mutable_database();
......@@ -117,6 +144,12 @@ void db_plugin_impl::process_irreversible_block(const signed_block& block)
ilog("block ${bid} : ${tid}", ("bid", block_id)("tid", trx.id()));
doc.clear();
doc << "block_id" << block_id.str()
<< "txn_id" << trx.id().str();
blocks.insert_one(doc.view());
// db.create<transaction_history_object>([&block_id,&trx](transaction_history_object& transaction_history) {
// transaction_history.block_id = block_id;
// transaction_history.transaction_id = trx.id();
......@@ -201,9 +234,32 @@ bool db_plugin_impl::is_scope_relevant(const eos::types::Vector<AccountName>& sc
return false;
}
db_plugin_impl::db_plugin_impl()
: mongo_inst{}
, mongo_conn{}
{
}
db_plugin_impl::~db_plugin_impl() {
try {
done = true;
consum_thread.join();
} catch (std::exception& e) {
elog("Exception on db_plugin shutdown of consum thread: ${e}", ("e", e.what()));
}
}
void db_plugin_impl::wipe_database() {
ilog("db wipe_database");
mongo_conn[db_name].drop();
}
//////////
// db_plugin
//////////
db_plugin::db_plugin()
:my(new db_plugin_impl())
:my(new db_plugin_impl)
{
}
......@@ -222,10 +278,11 @@ void db_plugin::set_program_options(options_description& cli, options_descriptio
}
void db_plugin::wipe_database() {
if (true)
elog("TODO: db wipe_database");
else
if (!my->startup) {
elog("ERROR: db_plugin::wipe_database() called before configuration or after startup. Ignoring.");
} else {
my->wipe_database_on_startup = true;
}
}
void db_plugin::applied_irreversible_block(const signed_block& block) {
......@@ -244,6 +301,13 @@ void db_plugin::plugin_initialize(const variables_map& options)
auto size = options.at("queue_size").as<uint>();
my->queue = std::make_unique<boost::lockfree::spsc_queue<signed_block>>(size);
}
// TODO: add command line for uri
my->mongo_conn = mongocxx::client{mongocxx::uri{}};
if (my->wipe_database_on_startup) {
my->wipe_database();
}
}
void db_plugin::plugin_startup()
......@@ -266,8 +330,7 @@ void db_plugin::plugin_startup()
void db_plugin::plugin_shutdown()
{
my->done = true;
my->consum_thread.join();
my.reset();
}
} // namespace eos
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册