提交 d559fe66 编写于 作者: K Kevin Heifner

Switched to blocking queue to avoid busy wait #172

上级 72cecfcc
......@@ -6,7 +6,6 @@ add_library( db_plugin
find_package(libmongoc-1.0 1.8)
if (libmongoc-1.0_FOUND)
message("-- mongoc found version \"${MONGOC_VERSION}\"")
message("-- mongoc include path \"${MONGOC_INCLUDE_DIRS}\"")
message("-- mongoc libraries \"${MONGOC_LIBRARIES}\"")
......@@ -24,6 +23,24 @@ if (libmongoc-1.0_FOUND)
${BSONCXX_LIBRARY_DIRS}
)
add_definitions(-DMONGODB)
else()
message("Could NOT find MongoDB. db_plugin with MongoDB support will not be included.")
# sudo apt-get install pkg-config libssl-dev libsasl2-dev
# wget https://github.com/mongodb/mongo-c-driver/releases/download/1.8.0/mongo-c-driver-1.8.0.tar.gz
# tar xzf mongo-c-driver-1.8.0.tar.gz
# cd mongo-c-driver-1.8.0
# ./configure --disable-automatic-init-and-cleanup --enable-static
# make
# sudo make install
#
# git clone https://github.com/mongodb/mongo-cxx-driver.git --branch releases/stable --depth 1
# cd mongo-cxx-driver/build
# cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr/local ..
# sudo make EP_mnmlstc_core
# make
# sudo make install
#
# sudo apt-get install mongodb
endif()
target_include_directories(db_plugin
......
......@@ -9,7 +9,10 @@
#include <fc/variant.hpp>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <queue>
#ifdef MONGODB
#include <bsoncxx/builder/basic/kvp.hpp>
......@@ -64,10 +67,12 @@ public:
size_t queue_size = 0;
size_t processed = 0;
std::unique_ptr<boost::lockfree::spsc_queue<signed_block>> queue;
std::queue<signed_block> queue;
boost::mutex mtx;
boost::condition_variable condtion;
boost::thread consum_thread;
boost::atomic<bool> startup{true};
boost::atomic<bool> done{false};
boost::atomic<bool> startup{true};
void consum_blocks();
......@@ -109,29 +114,36 @@ void db_plugin_impl::applied_irreversible_block(const signed_block& block) {
// on startup we don't want to queue, instead push back on caller
process_irreversible_block(block);
} else {
if (!queue->push(block)) {
// TODO what to do if full
elog("queue is full!!!!!");
FC_ASSERT(false, "queue is full");
}
boost::mutex::scoped_lock lock(mtx);
queue.push(block);
lock.unlock();
condtion.notify_one();
}
}
void db_plugin_impl::consum_blocks() {
signed_block block;
while (!done) {
while (queue->pop(block)) {
auto available = queue->read_available();
size_t size = 0;
while (true) {
boost::mutex::scoped_lock lock(mtx);
while (queue.empty() && !done) {
condtion.wait(lock);
}
size = queue.size();
if (size > 0) {
block = queue.front();
queue.pop();
lock.unlock();
// warn if queue size greater than 75%
if (available > (queue_size * 0.75)) {
wlog("queue size: ${q}", ("q", available + 1));
if (size > (queue_size * 0.75)) {
wlog("queue size: ${q}", ("q", size + 1));
}
process_irreversible_block(block);
continue;
} else if (done) {
break;
}
}
while (queue->pop(block)) {
process_irreversible_block(block);
}
ilog("db_plugin consum thread shutdown gracefully");
}
......@@ -485,6 +497,8 @@ db_plugin_impl::db_plugin_impl()
db_plugin_impl::~db_plugin_impl() {
try {
done = true;
condtion.notify_one();
consum_thread.join();
} catch (std::exception& e) {
elog("Exception on db_plugin shutdown of consum thread: ${e}", ("e", e.what()));
......@@ -589,7 +603,6 @@ void db_plugin::plugin_initialize(const variables_map& options)
} else if (options.count("queue-size")) {
auto size = options.at("queue-size").as<uint>();
my->queue_size = size;
my->queue = std::make_unique<boost::lockfree::spsc_queue<signed_block>>(size);
}
std::string uri = options.at("mongodb-uri").as<std::string>();
......
#pragma once
#include <eos/chain_plugin/chain_plugin.hpp>
#include <eos/chain/block.hpp>
#include <appbase/application.hpp>
#include <memory>
namespace fc { class variant; }
namespace eos {
using db_plugin_impl_ptr = std::shared_ptr<class db_plugin_impl>;
/**
* Provides persistence to MongoDB for:
* Blocks
* Transactions
* Messages
* Accounts
*
* See data dictionary (DB Schema Definition - EOS API) for description of MongoDB schema.
*/
class db_plugin : public plugin<db_plugin> {
public:
APPBASE_PLUGIN_REQUIRES((chain_plugin))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册