提交 ee7e0347 编写于 作者: T Todd Fleming

state history: irreversible_state.log

上级 df12ea2d
......@@ -520,14 +520,110 @@ auto catch_and_log(F f) {
}
}
/*
* irreversible_state.log:
* +---------+----------------+-----------+------------------+-----+---------+----------------+
* | Entry i | Pos of Entry i | Entry i+1 | Pos of Entry i+1 | ... | Entry z | Pos of Entry z |
* +---------+----------------+-----------+------------------+-----+---------+----------------+
*
* irreversible_state.index:
* +----------------+------------------+-----+----------------+
* | Pos of Entry i | Pos of Entry i+1 | ... | Pos of Entry z |
* +----------------+------------------+-----+----------------+
*
* Each entry:
* uint32_t block_num
* uint64_t size of remainder
* char[] deltas
*/
struct irrev_state_log_header {
uint32_t block_num;
uint64_t size;
};
struct state_history_plugin_impl : std::enable_shared_from_this<state_history_plugin_impl> {
chain_plugin* chain_plug = nullptr;
std::unique_ptr<chainbase::database> db;
std::fstream irreversible_log;
std::fstream irreversible_index;
uint32_t irreversible_begin_block = 0;
uint32_t irreversible_end_block = 0;
fc::optional<scoped_connection> accepted_block_connection;
fc::optional<scoped_connection> irreversible_block_connection;
string endpoint_address = "0.0.0.0";
uint16_t endpoint_port = 4321;
std::unique_ptr<tcp::acceptor> acceptor;
void open_irreversible(const boost::filesystem::path& path) {
open_irreversible_log((path / "irreversible_state.log").string());
open_irreversible_index((path / "irreversible_state.index").string());
}
void open_irreversible_log(const std::string& filename) {
irreversible_log.open(filename,
std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
uint64_t size = irreversible_log.tellg();
if (size >= sizeof(irrev_state_log_header)) {
irrev_state_log_header header;
irreversible_log.seekg(0);
irreversible_log.read((char*)&header, sizeof(header));
EOS_ASSERT(sizeof(header) + header.size + sizeof(uint64_t) <= size, plugin_exception,
"corrupt irreversible_state.log (1)");
irreversible_begin_block = header.block_num;
uint64_t end_pos;
irreversible_log.seekg(size - sizeof(end_pos));
irreversible_log.read((char*)&end_pos, sizeof(end_pos));
EOS_ASSERT(end_pos <= size && end_pos + sizeof(header) <= size, plugin_exception,
"corrupt irreversible_state.log (2)");
irreversible_log.seekg(end_pos);
irreversible_log.read((char*)&header, sizeof(header));
EOS_ASSERT(end_pos + sizeof(header) + header.size + sizeof(uint64_t) == size, plugin_exception,
"corrupt irreversible_state.log (3)");
irreversible_end_block = header.block_num + 1;
EOS_ASSERT(irreversible_begin_block < irreversible_end_block, plugin_exception,
"corrupt irreversible_state.log (4)");
ilog("irreversible_state.log has blocks ${b}-${e}",
("b", irreversible_begin_block)("e", irreversible_end_block - 1));
} else {
EOS_ASSERT(!size, plugin_exception, "corrupt irreversible_state.log (5)");
ilog("irreversible_state.log is empty");
}
}
void open_irreversible_index(const std::string& filename) {
irreversible_index.open(filename,
std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
if (irreversible_index.tellg() == (irreversible_end_block - irreversible_begin_block) * sizeof(uint64_t))
return;
ilog("Regenerate irreversible_state.index");
irreversible_index.close();
irreversible_index.open(filename,
std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::trunc);
irreversible_log.seekg(0, std::ios_base::end);
uint64_t size = irreversible_log.tellg();
uint64_t pos = 0;
while (pos < size) {
irreversible_index.write((char*)&pos, sizeof(pos));
irrev_state_log_header header;
EOS_ASSERT(pos + sizeof(header) <= size, plugin_exception, "corrupt irreversible_state.log (6)");
irreversible_log.seekg(pos);
irreversible_log.read((char*)&header, sizeof(header));
uint64_t suffix_pos = pos + sizeof(header) + header.size;
uint64_t suffix;
EOS_ASSERT(suffix_pos + sizeof(suffix) <= size, plugin_exception, "corrupt irreversible_state.log (7)");
irreversible_log.seekg(suffix_pos);
irreversible_log.read((char*)&suffix, sizeof(suffix));
// ilog("block ${b} at ${pos}-${end} suffix=${suffix} file_size=${fs}",
// ("b", header.block_num)("pos", pos)("end", suffix_pos + sizeof(suffix))("suffix", suffix)("fs", size));
EOS_ASSERT(suffix == pos, plugin_exception, "corrupt irreversible_state.log (8)");
pos = suffix_pos + sizeof(suffix);
}
}
struct session : std::enable_shared_from_this<session> {
std::shared_ptr<state_history_plugin_impl> plugin;
std::unique_ptr<ws::stream<tcp::socket>> stream;
......@@ -707,6 +803,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
void on_accept(boost::system::error_code ec) {}
void on_accepted_block(const block_state_ptr& block_state) {
// ilog("block ${n}", ("n", block_state->block->block_num()));
auto& chain = chain_plug->chain();
auto& idx = db->get_index<state_history_index>();
......@@ -722,7 +819,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
bool fresh = idx.indices().empty();
if (fresh)
ilog("start fresh at block ${n}", ("n", block_state->block->block_num()));
ilog("Placing initial state in block ${n}", ("n", block_state->block->block_num()));
db->create<state_history_object>([&](state_history_object& hist) {
hist.id = block_state->block->block_num();
......@@ -765,8 +862,43 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
// dlog(" ${s} bytes", ("s", bin.size()));
hist.deltas.insert(hist.deltas.end(), bin.begin(), bin.end());
});
}
};
} // on_accepted_block
void on_irreversible_block(const block_state_ptr& block_state) {
// ilog("irreversible ${n}", ("n", block_state->block->block_num()));
auto& idx = db->get_index<state_history_index>().indices();
while (!idx.empty()) {
auto& obj = *idx.begin();
if (obj.id._id > block_state->block->block_num())
break;
if (obj.id._id == irreversible_end_block || irreversible_begin_block == irreversible_end_block) {
// ilog("write irreversible ${n}", ("n", obj.id._id));
irreversible_log.seekg(0, std::ios_base::end);
uint64_t pos = irreversible_log.tellg();
irrev_state_log_header header{.block_num = (uint32_t)obj.id._id, .size = obj.deltas.size()};
irreversible_log.write((char*)&header, sizeof(header));
if (!obj.deltas.empty())
irreversible_log.write(obj.deltas.data(), obj.deltas.size());
irreversible_log.write((char*)&pos, sizeof(pos));
// ilog("block ${b} at ${pos}-${end}",
// ("b", header.block_num)("pos", pos)("end", pos + sizeof(header) + obj.deltas.size() + sizeof(pos)));
irreversible_index.seekg(0, std::ios_base::end);
irreversible_index.write((char*)&pos, sizeof(pos));
if (irreversible_begin_block == irreversible_end_block)
irreversible_begin_block = obj.id._id;
irreversible_end_block = obj.id._id + 1;
}
if (idx.size() >= 2 && (++idx.begin())->id._id <= block_state->block->block_num()) {
// ilog("drop irreversible ${n}", ("n", obj.id._id));
db->remove(obj);
} else
break;
}
} // on_irreversible_block
}; // state_history_plugin_impl
state_history_plugin::state_history_plugin()
: my(std::make_shared<state_history_plugin_impl>()) {}
......@@ -793,6 +925,8 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {
auto& chain = my->chain_plug->chain();
my->accepted_block_connection.emplace(
chain.accepted_block.connect([&](const block_state_ptr& p) { my->on_accepted_block(p); }));
my->irreversible_block_connection.emplace(
chain.irreversible_block.connect([&](const block_state_ptr& p) { my->on_irreversible_block(p); }));
auto dir_option = options.at("state-history-dir").as<bfs::path>();
boost::filesystem::path state_history_dir;
......@@ -816,8 +950,18 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {
my->db = std::make_unique<chainbase::database>(state_history_dir, database::read_write, state_history_size);
my->db->add_index<state_history_index>();
my->open_irreversible(state_history_dir);
auto& ind = my->db->get_index<state_history_index>().indices();
ilog("in-memory state history has ${n} blocks", ("n", ind.size()));
if (my->irreversible_end_block != my->irreversible_begin_block) {
// todo: load if ind.empty()
EOS_ASSERT(!ind.empty(), plugin_exception, "irreversible_state.log and shared_memory don't match");
auto& x = *ind.begin();
EOS_ASSERT(x.id._id == my->irreversible_end_block - 1, plugin_exception,
"irreversible_state.log and shared_memory don't match");
}
// auto& ind = my->db->get_index<state_history_index>().indices();
// size_t total = 0, max_size = 0, max_block = 0, num = 0;
// for (auto& x : ind) {
// if (!(x.id._id % 10000))
......@@ -834,12 +978,13 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {
// dlog("num: ${s}", ("s", num));
}
FC_LOG_AND_RETHROW()
}
} // namespace eosio
void state_history_plugin::plugin_startup() { my->listen(); }
void state_history_plugin::plugin_shutdown() {
my->accepted_block_connection.reset();
my->irreversible_block_connection.reset();
while (!my->sessions.empty())
my->sessions.begin()->second->close();
my->db.reset();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册