未验证 提交 854f8d80 编写于 作者: D Daniel Larimer 提交者: GitHub

Merge pull request #2842 from asiniscalchi/dev_db_SQL_plugin

[in progress] sql_db_plugin
###############################################################################
# CMake module to search for SOCI library
#
# WARNING: This module is experimental work in progress.
#
# This module defines:
# SOCI_INCLUDE_DIRS = include dirs to be used when using the soci library
# SOCI_LIBRARY = full path to the soci library
# SOCI_VERSION = the soci version found (not yet. soci does not provide that info.)
# SOCI_FOUND = true if soci was found
#
# This module respects:
# LIB_SUFFIX = (64|32|"") Specifies the suffix for the lib directory
#
# For each component you specify in find_package(), the following variables are set.
#
# SOCI_${COMPONENT}_PLUGIN = full path to the soci plugin
# SOCI_${COMPONENT}_FOUND
#
# Copyright (c) 2011 Michael Jansen <info@michael-jansen.biz>
#
# Redistribution and use is allowed according to the terms of the BSD license.
# For details see the accompanying COPYING-CMAKE-SCRIPTS file.
#
###############################################################################
#
### Global Configuration Section
#
SET(_SOCI_ALL_PLUGINS mysql odbc postgresql sqlite3)
SET(_SOCI_REQUIRED_VARS SOCI_INCLUDE_DIR SOCI_LIBRARY)
#
### FIRST STEP: Find the soci headers.
#
FIND_PATH(
SOCI_INCLUDE_DIR soci.h
PATH "/usr/local"
PATH_SUFFIXES "" "soci"
DOC "Soci (http://soci.sourceforge.net) include directory")
MARK_AS_ADVANCED(SOCI_INCLUDE_DIR)
SET(SOCI_INCLUDE_DIRS ${SOCI_INCLUDE_DIR})
#
### SECOND STEP: Find the soci core library. Respect LIB_SUFFIX
#
FIND_LIBRARY(
SOCI_LIBRARY
NAMES soci_core
HINTS ${SOCI_INCLUDE_DIR}/..
PATH_SUFFIXES lib${LIB_SUFFIX})
MARK_AS_ADVANCED(SOCI_LIBRARY)
GET_FILENAME_COMPONENT(SOCI_LIBRARY_DIR ${SOCI_LIBRARY} PATH)
MARK_AS_ADVANCED(SOCI_LIBRARY_DIR)
#
### THIRD STEP: Find all installed plugins if the library was found
#
IF(SOCI_INCLUDE_DIR AND SOCI_LIBRARY)
MESSAGE(STATUS "Soci found: Looking for plugins")
FOREACH(plugin IN LISTS _SOCI_ALL_PLUGINS)
FIND_LIBRARY(
SOCI_${plugin}_PLUGIN
NAMES soci_${plugin}
HINTS ${SOCI_INCLUDE_DIR}/..
PATH_SUFFIXES lib${LIB_SUFFIX})
MARK_AS_ADVANCED(SOCI_${plugin}_PLUGIN)
IF(SOCI_${plugin}_PLUGIN)
MESSAGE(STATUS " * Plugin ${plugin} found ${SOCI_${plugin}_PLUGIN}.")
SET(SOCI_${plugin}_FOUND True)
ELSE()
MESSAGE(STATUS " * Plugin ${plugin} not found.")
SET(SOCI_${plugin}_FOUND False)
ENDIF()
ENDFOREACH()
#
### FOURTH CHECK: Check if the required components were all found
#
FOREACH(component ${Soci_FIND_COMPONENTS})
IF(NOT SOCI_${component}_FOUND)
MESSAGE(SEND_ERROR "Required component ${component} not found.")
ENDIF()
ENDFOREACH()
ENDIF()
#
### ADHERE TO STANDARDS
#
include(FindPackageHandleStandardArgs)
FIND_PACKAGE_HANDLE_STANDARD_ARGS(Soci DEFAULT_MSG ${_SOCI_REQUIRED_VARS})
......@@ -14,6 +14,7 @@ add_subdirectory(txn_test_gen_plugin)
add_subdirectory(db_size_api_plugin)
#add_subdirectory(faucet_testnet_plugin)
#add_subdirectory(mongo_db_plugin)
add_subdirectory(sql_db_plugin)
# Forward variables to top level so packaging picks them up
set(CPACK_DEBIAN_PACKAGE_DEPENDS ${CPACK_DEBIAN_PACKAGE_DEPENDS} PARENT_SCOPE)
find_package(Soci)
if(NOT SOCI_FOUND)
message(STATUS "Database SQL plugin: disabled")
return()
endif()
message(STATUS "Database SQL plugin: enabled")
include_directories(${CMAKE_CURRENT_SOURCE_DIR} include db)
add_library(sql_db_plugin
db/database.cpp
db/accounts_table.cpp
db/transactions_table.cpp
db/blocks_table.cpp
db/actions_table.cpp
sql_db_plugin.cpp
irreversible_block_storage.cpp
block_storage.cpp
)
target_link_libraries(sql_db_plugin
chain_plugin
${SOCI_LIBRARY}
)
add_subdirectory(test)
#include "block_storage.h"
namespace eosio {
void block_storage::consume(const std::vector<chain::block_state_ptr> &blocks)
{
for (const auto& block : blocks)
ilog(block->id.str());
}
}
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/
#pragma once
#include "consumer_core.h"
#include <eosio/chain/block_state.hpp>
namespace eosio {
class block_storage : public consumer_core<chain::block_state_ptr>
{
public:
void consume(const std::vector<chain::block_state_ptr>& blocks) override;
};
} // namespace
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/
#pragma once
#include <thread>
#include <atomic>
#include <vector>
#include <boost/noncopyable.hpp>
#include <fc/log/logger.hpp>
#include "consumer_core.h"
#include "fifo.h"
namespace eosio {
template<typename T>
class consumer final : public boost::noncopyable
{
public:
consumer(std::unique_ptr<consumer_core<T>> core);
~consumer();
void push(const T& element);
private:
void run();
fifo<T> m_fifo;
std::unique_ptr<consumer_core<T>> m_core;
std::atomic<bool> m_exit;
std::unique_ptr<std::thread> m_thread;
};
template<typename T>
consumer<T>::consumer(std::unique_ptr<consumer_core<T> > core):
m_fifo(fifo<T>::behavior::blocking),
m_core(std::move(core)),
m_exit(false),
m_thread(std::make_unique<std::thread>([&]{this->run();}))
{
}
template<typename T>
consumer<T>::~consumer()
{
m_fifo.set_behavior(fifo<T>::behavior::not_blocking);
m_exit = true;
m_thread->join();
}
template<typename T>
void consumer<T>::push(const T& element)
{
m_fifo.push(element);
}
template<typename T>
void consumer<T>::run()
{
dlog("Consumer thread Start");
while (!m_exit)
{
auto elements = m_fifo.pop_all();
m_core->consume(elements);
}
dlog("Consumer thread End");
}
} // namespace
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/
#pragma once
#include <vector>
namespace eosio {
template<typename T>
class consumer_core {
public:
virtual ~consumer_core() {}
virtual void consume(const std::vector<T>& elements) = 0;
};
} // namespace
#include "accounts_table.h"
#include <fc/log/logger.hpp>
namespace eosio {
accounts_table::accounts_table(std::shared_ptr<soci::session> session):
m_session(session)
{
}
void accounts_table::drop()
{
try {
*m_session << "drop table accounts";
}
catch(std::exception& e){
wlog(e.what());
}
}
void accounts_table::create()
{
*m_session << "create table accounts("
"name TEXT,"
"eos_balance REAL,"
"staked_balance REAL,"
"unstaking_balance REAL,"
"abi TEXT,"
"created_at DATETIME,"
"updated_at DATETIME)";
}
} // namespace
#ifndef ACCOUNTS_TABLE_H
#define ACCOUNTS_TABLE_H
#include <memory>
#include <soci/soci.h>
namespace eosio {
class accounts_table
{
public:
accounts_table(std::shared_ptr<soci::session> session);
void drop();
void create();
private:
std::shared_ptr<soci::session> m_session;
};
} // namespace
#endif // ACCOUNTS_TABLE_H
#include "actions_table.h"
namespace eosio {
actions_table::actions_table()
{
}
} // namespace
#ifndef ACTIONS_TABLE_H
#define ACTIONS_TABLE_H
namespace eosio {
class actions_table
{
public:
actions_table();
};
} // namespace
#endif // ACTIONS_TABLE_H
#include "blocks_table.h"
namespace eosio {
blocks_table::blocks_table(std::shared_ptr<soci::session> session):
m_session(session)
{
}
} // namespace
#ifndef BLOCKS_TABLE_H
#define BLOCKS_TABLE_H
#include <memory>
#include <soci/soci.h>
namespace eosio {
class blocks_table
{
public:
blocks_table(std::shared_ptr<soci::session> session);
private:
std::shared_ptr<soci::session> m_session;
};
} // namespace
#endif // BLOCKS_TABLE_H
#include "database.h"
namespace eosio {
database::database(const std::string &uri)
{
m_session = std::make_shared<soci::session>(uri);
m_accounts_table = std::make_unique<accounts_table>(m_session);
m_transactions_table = std::make_unique<transactions_table>();
m_actions_table = std::make_unique<actions_table>();
m_blocks_table = std::make_unique<blocks_table>(m_session);
}
void database::wipe()
{
std::unique_lock<std::mutex> lock(m_mux);
m_accounts_table->drop();
m_accounts_table->create();
}
} // namespace
#ifndef DATABASE_H
#define DATABASE_H
#include <memory>
#include <mutex>
#include <soci/soci.h>
#include "accounts_table.h"
#include "transactions_table.h"
#include "blocks_table.h"
#include "actions_table.h"
namespace eosio {
class database
{
public:
database(const std::string& uri);
void wipe();
private:
mutable std::mutex m_mux;
std::shared_ptr<soci::session> m_session;
std::unique_ptr<accounts_table> m_accounts_table;
std::unique_ptr<actions_table> m_actions_table;
std::unique_ptr<blocks_table> m_blocks_table;
std::unique_ptr<transactions_table> m_transactions_table;
};
} // namespace
#endif // DATABASE_H
#include "transactions_table.h"
namespace eosio {
transactions_table::transactions_table()
{
}
} // namespace
#ifndef TRANSACTIONS_TABLE_H
#define TRANSACTIONS_TABLE_H
namespace eosio {
class transactions_table
{
public:
transactions_table();
};
} // namespace
#endif // TRANSACTIONS_TABLE_H
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/
#pragma once
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <deque>
#include <vector>
#include <boost/noncopyable.hpp>
namespace eosio {
template<typename T>
class fifo : public boost::noncopyable
{
public:
enum class behavior {blocking, not_blocking};
fifo(behavior value);
void push(const T& element);
std::vector<T> pop_all();
void set_behavior(behavior value);
private:
std::mutex m_mux;
std::condition_variable m_cond;
std::atomic<behavior> m_behavior;
std::deque<T> m_deque;
};
template<typename T>
fifo<T>::fifo(behavior value)
{
m_behavior = value;
}
template<typename T>
void fifo<T>::push(const T& element)
{
std::lock_guard<std::mutex> lock(m_mux);
m_deque.push_back(element);
m_cond.notify_one();
}
template<typename T>
std::vector<T> fifo<T>::pop_all()
{
std::unique_lock<std::mutex> lock(m_mux);
m_cond.wait(lock, [&]{return m_behavior == behavior::not_blocking || !m_deque.empty();});
std::vector<T> result;
while(!m_deque.empty())
{
result.push_back(std::move(m_deque.front()));
m_deque.pop_front();
}
return result;
}
template<typename T>
void fifo<T>::set_behavior(behavior value)
{
m_behavior = value;
m_cond.notify_all();
}
} // namespace
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/
#pragma once
#include <eosio/chain_plugin/chain_plugin.hpp>
#include <appbase/application.hpp>
#include <memory>
#include "consumer.h"
namespace eosio {
/**
* @author Alessandro Siniscalchi <asiniscalchi@gmail.com>
*
* Provides persistence to SQL DB for:
* Blocks
* Transactions
* Actions
* Accounts
*
* See data dictionary (DB Schema Definition - EOS API) for description of SQL DB schema.
*
* The goal ultimately is for all chainbase data to be mirrored in SQL DB via a delayed node processing
* irreversible blocks. Currently, only Blocks, Transactions, Messages, and Account balance it mirrored.
* Chainbase is being rewritten to be multi-threaded. Once chainbase is stable, integration directly with
* a mirror database approach can be followed removing the need for the direct processing of Blocks employed
* with this implementation.
*
* If SQL DB env not available (#ifndef SQL DB) this plugin is a no-op.
*/
class sql_db_plugin final : public plugin<sql_db_plugin> {
public:
APPBASE_PLUGIN_REQUIRES((chain_plugin))
sql_db_plugin();
virtual void set_program_options(options_description& cli, options_description& cfg) override;
void plugin_initialize(const variables_map& options);
void plugin_startup();
void plugin_shutdown();
private:
std::unique_ptr<consumer<chain::block_state_ptr>> m_irreversible_block_consumer;
consumer<chain::block_state_ptr> m_block_consumer;
};
}
#include "irreversible_block_storage.h"
namespace eosio {
irreversible_block_storage::irreversible_block_storage(std::shared_ptr<database> db):
m_db(db)
{
}
void irreversible_block_storage::consume(const std::vector<chain::block_state_ptr>& blocks)
{
for (const auto& block : blocks)
{
ilog(block->id.str());
// TODO parse the block and ..
// TODO m_db->act
}
}
} // namespace
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/
#pragma once
#include "consumer_core.h"
#include <memory>
#include <eosio/chain/block_state.hpp>
#include "database.h"
namespace eosio {
class irreversible_block_storage : public consumer_core<chain::block_state_ptr>
{
public:
irreversible_block_storage(std::shared_ptr<database> db);
void consume(const std::vector<chain::block_state_ptr>& blocks) override;
private:
std::shared_ptr<database> m_db;
};
} // namespace
/**
* @file
* @copyright defined in eos/LICENSE.txt
* @author Alessandro Siniscalchi <asiniscalchi@gmail.com>
*/
#include <eosio/sql_db_plugin/sql_db_plugin.hpp>
#include "database.h"
#include "consumer_core.h"
#include "irreversible_block_storage.h"
#include "block_storage.h"
namespace {
const char* BUFFER_SIZE_OPTION = "sql_db-queue-size";
const char* SQL_DB_URI_OPTION = "sql_db-uri";
const char* RESYNC_OPTION = "resync-blockchain";
const char* REPLAY_OPTION = "replay-blockchain";
}
namespace fc { class variant; }
namespace eosio {
static appbase::abstract_plugin& _sql_db_plugin = app().register_plugin<sql_db_plugin>();
sql_db_plugin::sql_db_plugin():
m_block_consumer(std::make_unique<block_storage>())
{
}
void sql_db_plugin::set_program_options(options_description& cli, options_description& cfg)
{
dlog("set_program_options");
cfg.add_options()
(BUFFER_SIZE_OPTION, bpo::value<uint>()->default_value(256),
"The queue size between nodeos and SQL DB plugin thread.")
(SQL_DB_URI_OPTION, bpo::value<std::string>(),
"Sql DB URI connection string"
" If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI.")
;
}
void sql_db_plugin::plugin_initialize(const variables_map& options)
{
ilog("initialize");
std::string uri_str = options.at(SQL_DB_URI_OPTION).as<std::string>();
if (uri_str.empty())
{
wlog("db URI not specified => eosio::sql_db_plugin disabled.");
return;
}
ilog("connecting to ${u}", ("u", uri_str));
auto db = std::make_shared<database>(uri_str);
if (options.at(RESYNC_OPTION).as<bool>() ||
options.at(REPLAY_OPTION).as<bool>())
{
ilog("Resync requested: wiping database");
db->wipe();
}
chain_plugin* chain_plug = app().find_plugin<chain_plugin>();
FC_ASSERT(chain_plug);
auto& chain = chain_plug->chain();
m_irreversible_block_consumer = std::make_unique<consumer<chain::block_state_ptr>>(std::make_unique<irreversible_block_storage>(db));
// chain.accepted_block.connect([=](const chain::block_state_ptr& b) {m_block_consumer.push(b);});
chain.irreversible_block.connect([=](const chain::block_state_ptr& b) {m_irreversible_block_consumer->push(b);});
}
void sql_db_plugin::plugin_startup()
{
ilog("startup");
}
void sql_db_plugin::plugin_shutdown()
{
ilog("shutdown");
}
} // namespace eosio
add_executable(sql_db_plugin_test
test.cpp
fifo_test.cpp
consumer_test.cpp
)
target_link_libraries(sql_db_plugin_test
sql_db_plugin
${Boost_LIBRARIES}
)
#add_test(sql_db_plugin_test sql_db_plugin_test)
#include <boost/test/unit_test.hpp>
#include "consumer.h"
using namespace eosio;
BOOST_AUTO_TEST_SUITE(consumer_test)
BOOST_AUTO_TEST_CASE(instantiate)
{
struct foo : public consumer_core<int>
{
public:
void consume(const std::vector<int> &blocks) override
{
for (int i : blocks)
std::cout << i << std::endl;
}
};
consumer<int> c(std::make_unique<foo>());
c.push(1);
c.push(10);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
BOOST_AUTO_TEST_SUITE_END()
#include <boost/test/unit_test.hpp>
#include "fifo.h"
using namespace eosio;
BOOST_AUTO_TEST_SUITE(fifo_test)
BOOST_AUTO_TEST_CASE(pop_empty_fifo_not_blocking)
{
fifo<int> f(fifo<int>::behavior::not_blocking);
auto v = f.pop_all();
BOOST_TEST(v.size() == 0);
}
BOOST_AUTO_TEST_CASE(change_to_not_blocking)
{
fifo<int> f(fifo<int>::behavior::blocking);
f.push(1);
f.push(2);
f.push(3);
auto v = f.pop_all();
BOOST_TEST(v.size() == 3);
f.set_behavior(fifo<int>::behavior::not_blocking);
v = f.pop_all();
BOOST_TEST(v.size() == 0);
}
BOOST_AUTO_TEST_CASE(pushing_2_int_pop_2_int)
{
fifo<int> f(fifo<int>::behavior::not_blocking);
f.push(1);
f.push(2);
auto v = f.pop_all();
BOOST_TEST(1 == v.at(0));
BOOST_TEST(2 == v.at(1));
}
BOOST_AUTO_TEST_SUITE_END()
#define BOOST_TEST_MODULE "sql_db_plugin"
#include <boost/test/unit_test.hpp>
......@@ -57,6 +57,10 @@ target_link_libraries( nodeos
PRIVATE chain_plugin http_plugin producer_plugin
PRIVATE eosio_chain fc ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} )
if(TARGET sql_db_plugin)
target_link_libraries( nodeos PRIVATE -Wl,${whole_archive_flag} sql_db_plugin -Wl,${no_whole_archive_flag} )
endif()
#if(BUILD_MONGO_DB_PLUGIN)
# target_link_libraries( nodeos
# PRIVATE -Wl,${whole_archive_flag} mongo_db_plugin -Wl,${no_whole_archive_flag} )
......@@ -88,4 +92,4 @@ install(DIRECTORY DESTINATION ${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/lib/eosio
GROUP_EXECUTE
WORLD_READ
WORLD_EXECUTE
)
)
......@@ -55,7 +55,7 @@
DEP_ARRAY=(clang-4.0 lldb-4.0 libclang-4.0-dev cmake make automake libbz2-dev libssl-dev \
libgmp3-dev autotools-dev build-essential libicu-dev python2.7-dev python3-dev \
autoconf libtool curl zlib1g-dev doxygen graphviz)
autoconf libtool curl zlib1g-dev doxygen graphviz)
COUNT=1
DISPLAY=""
DEP=""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册