提交 73fd5bff 编写于 作者: T Todd Fleming

state history

上级 48f93a0d
......@@ -521,118 +521,155 @@ auto catch_and_log(F f) {
}
/*
* irreversible_state.log:
* *.log:
* +---------+----------------+-----------+------------------+-----+---------+----------------+
* | Entry i | Pos of Entry i | Entry i+1 | Pos of Entry i+1 | ... | Entry z | Pos of Entry z |
* +---------+----------------+-----------+------------------+-----+---------+----------------+
*
* irreversible_state.index:
* *.index:
* +----------------+------------------+-----+----------------+
* | Pos of Entry i | Pos of Entry i+1 | ... | Pos of Entry z |
* +----------------+------------------+-----+----------------+
*
* Each entry:
* each entry:
* uint32_t block_num
* uint64_t size of remainder
* uint64_t size of payload
* uint8_t version
* payload
*
* irreversible_state payload:
* uint32_t size of deltas
* char[] deltas
*/
struct irrev_state_log_header {
uint32_t block_num;
uint64_t size;
struct history_log_header {
uint32_t block_num = 0;
uint64_t payload_size = 0;
uint16_t version = 0;
};
struct state_history_plugin_impl : std::enable_shared_from_this<state_history_plugin_impl> {
chain_plugin* chain_plug = nullptr;
std::unique_ptr<chainbase::database> db;
std::fstream irreversible_log;
std::fstream irreversible_index;
uint32_t irreversible_begin_block = 0;
uint32_t irreversible_end_block = 0;
fc::optional<scoped_connection> accepted_block_connection;
fc::optional<scoped_connection> irreversible_block_connection;
string endpoint_address = "0.0.0.0";
uint16_t endpoint_port = 4321;
std::unique_ptr<tcp::acceptor> acceptor;
void open_irreversible(const boost::filesystem::path& path) {
open_irreversible_log((path / "irreversible_state.log").string());
open_irreversible_index((path / "irreversible_state.index").string());
}
void open_irreversible_log(const std::string& filename) {
irreversible_log.open(filename,
std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
uint64_t size = irreversible_log.tellg();
if (size >= sizeof(irrev_state_log_header)) {
irrev_state_log_header header;
irreversible_log.seekg(0);
irreversible_log.read((char*)&header, sizeof(header));
EOS_ASSERT(sizeof(header) + header.size + sizeof(uint64_t) <= size, plugin_exception,
"corrupt irreversible_state.log (1)");
irreversible_begin_block = header.block_num;
struct history_log {
const char* const name = "";
std::fstream log;
std::fstream index;
uint32_t begin_block = 0;
uint32_t end_block = 0;
void open_log(const std::string& filename) {
log.open(filename, std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
uint64_t size = log.tellg();
if (size >= sizeof(history_log_header)) {
history_log_header header;
log.seekg(0);
log.read((char*)&header, sizeof(header));
EOS_ASSERT(header.version == 0 && sizeof(header) + header.payload_size + sizeof(uint64_t) <= size,
plugin_exception, "corrupt ${name}.log (1)", ("name", name));
begin_block = header.block_num;
uint64_t end_pos;
irreversible_log.seekg(size - sizeof(end_pos));
irreversible_log.read((char*)&end_pos, sizeof(end_pos));
EOS_ASSERT(end_pos <= size && end_pos + sizeof(header) <= size, plugin_exception,
"corrupt irreversible_state.log (2)");
irreversible_log.seekg(end_pos);
irreversible_log.read((char*)&header, sizeof(header));
EOS_ASSERT(end_pos + sizeof(header) + header.size + sizeof(uint64_t) == size, plugin_exception,
"corrupt irreversible_state.log (3)");
irreversible_end_block = header.block_num + 1;
EOS_ASSERT(irreversible_begin_block < irreversible_end_block, plugin_exception,
"corrupt irreversible_state.log (4)");
ilog("irreversible_state.log has blocks ${b}-${e}",
("b", irreversible_begin_block)("e", irreversible_end_block - 1));
log.seekg(size - sizeof(end_pos));
log.read((char*)&end_pos, sizeof(end_pos));
EOS_ASSERT(end_pos <= size && end_pos + sizeof(header) <= size, plugin_exception, "corrupt ${name}.log (2)",
("name", name));
log.seekg(end_pos);
log.read((char*)&header, sizeof(header));
EOS_ASSERT(end_pos + sizeof(header) + header.payload_size + sizeof(uint64_t) == size, plugin_exception,
"corrupt ${name}.log (3)", ("name", name));
end_block = header.block_num + 1;
EOS_ASSERT(begin_block < end_block, plugin_exception, "corrupt ${name}.log (4)", ("name", name));
ilog("${name}.log has blocks ${b}-${e}", ("name", name)("b", begin_block)("e", end_block - 1));
} else {
EOS_ASSERT(!size, plugin_exception, "corrupt irreversible_state.log (5)");
ilog("irreversible_state.log is empty");
EOS_ASSERT(!size, plugin_exception, "corrupt ${name}.log (5)", ("name", name));
ilog("${name}.log is empty", ("name", name));
}
}
void open_irreversible_index(const std::string& filename) {
irreversible_index.open(filename,
std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
if (irreversible_index.tellg() == (irreversible_end_block - irreversible_begin_block) * sizeof(uint64_t))
void open_index(const std::string& filename) {
index.open(filename, std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
if (index.tellg() == (end_block - begin_block) * sizeof(uint64_t))
return;
ilog("Regenerate irreversible_state.index");
irreversible_index.close();
irreversible_index.open(filename,
std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::trunc);
ilog("Regenerate ${name}.index", ("name", name));
index.close();
index.open(filename, std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::trunc);
irreversible_log.seekg(0, std::ios_base::end);
uint64_t size = irreversible_log.tellg();
log.seekg(0, std::ios_base::end);
uint64_t size = log.tellg();
uint64_t pos = 0;
while (pos < size) {
irreversible_index.write((char*)&pos, sizeof(pos));
irrev_state_log_header header;
EOS_ASSERT(pos + sizeof(header) <= size, plugin_exception, "corrupt irreversible_state.log (6)");
irreversible_log.seekg(pos);
irreversible_log.read((char*)&header, sizeof(header));
uint64_t suffix_pos = pos + sizeof(header) + header.size;
index.write((char*)&pos, sizeof(pos));
history_log_header header;
EOS_ASSERT(pos + sizeof(header) <= size, plugin_exception, "corrupt ${name}.log (6)", ("name", name));
log.seekg(pos);
log.read((char*)&header, sizeof(header));
uint64_t suffix_pos = pos + sizeof(header) + header.payload_size;
uint64_t suffix;
EOS_ASSERT(suffix_pos + sizeof(suffix) <= size, plugin_exception, "corrupt irreversible_state.log (7)");
irreversible_log.seekg(suffix_pos);
irreversible_log.read((char*)&suffix, sizeof(suffix));
EOS_ASSERT(suffix_pos + sizeof(suffix) <= size, plugin_exception, "corrupt ${name}.log (7)", ("name", name));
log.seekg(suffix_pos);
log.read((char*)&suffix, sizeof(suffix));
// ilog("block ${b} at ${pos}-${end} suffix=${suffix} file_size=${fs}",
// ("b", header.block_num)("pos", pos)("end", suffix_pos + sizeof(suffix))("suffix", suffix)("fs", size));
EOS_ASSERT(suffix == pos, plugin_exception, "corrupt irreversible_state.log (8)");
EOS_ASSERT(suffix == pos, plugin_exception, "corrupt ${name}.log (8)", ("name", name));
pos = suffix_pos + sizeof(suffix);
}
}
void get_irreversible(uint32_t block_num, irrev_state_log_header& header, bytes& deltas) {
template <typename F>
void write_entry(history_log_header header, F write_payload) {
EOS_ASSERT(begin_block == end_block || header.block_num == end_block, plugin_exception,
"writing unexpected block_num to ${name}.log", ("name", name));
log.seekg(0, std::ios_base::end);
uint64_t pos = log.tellg();
log.write((char*)&header, sizeof(header));
write_payload(log);
uint64_t end = log.tellg();
EOS_ASSERT(end == pos + sizeof(header) + header.payload_size, plugin_exception,
"wrote payload with incorrect size to ${name}.log", ("name", name));
log.write((char*)&pos, sizeof(pos));
index.seekg(0, std::ios_base::end);
index.write((char*)&pos, sizeof(pos));
if (begin_block == end_block)
begin_block = header.block_num;
end_block = header.block_num + 1;
}
// returns stream positioned at payload
std::fstream& get_entry(uint32_t block_num, history_log_header& header) {
EOS_ASSERT(block_num >= begin_block && block_num < end_block, plugin_exception,
"read non-existing block in ${name}.log", ("name", name));
uint64_t pos;
irreversible_index.seekg((block_num - irreversible_begin_block) * sizeof(pos));
irreversible_index.read((char*)&pos, sizeof(pos));
irreversible_log.seekg(pos);
irreversible_log.read((char*)&header, sizeof(header));
deltas.resize(header.size);
if (header.size)
irreversible_log.read(deltas.data(), header.size);
index.seekg((block_num - begin_block) * sizeof(pos));
index.read((char*)&pos, sizeof(pos));
log.seekg(pos);
log.read((char*)&header, sizeof(header));
return log;
}
}; // history_log
struct state_history_plugin_impl : std::enable_shared_from_this<state_history_plugin_impl> {
chain_plugin* chain_plug = nullptr;
std::unique_ptr<chainbase::database> db;
history_log state_log{"irreversible_state"};
fc::optional<scoped_connection> accepted_block_connection;
fc::optional<scoped_connection> irreversible_block_connection;
string endpoint_address = "0.0.0.0";
uint16_t endpoint_port = 4321;
std::unique_ptr<tcp::acceptor> acceptor;
void open_irreversible(const boost::filesystem::path& path) {
state_log.open_log((path / "irreversible_state.log").string());
state_log.open_index((path / "irreversible_state.index").string());
}
void get_irreversible_state(uint32_t block_num, bytes& deltas) {
history_log_header header;
auto& stream = state_log.get_entry(block_num, header);
uint32_t s;
stream.read((char*)&s, sizeof(s));
deltas.resize(s);
if (s)
stream.read(deltas.data(), s);
}
struct session : std::enable_shared_from_this<session> {
......@@ -726,10 +763,8 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
result.found = true;
result.deltas.assign(it->deltas.begin(), it->deltas.end());
// dlog(" bytes: ${b}", ("b", result.deltas));
} else if (req.block_num >= plugin->irreversible_begin_block &&
req.block_num < plugin->irreversible_end_block) {
irrev_state_log_header header;
plugin->get_irreversible(req.block_num, header, result.deltas);
} else if (req.block_num >= plugin->state_log.begin_block && req.block_num < plugin->state_log.end_block) {
plugin->get_irreversible_state(req.block_num, result.deltas);
result.found = true;
}
send(std::move(result));
......@@ -889,25 +924,17 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
auto& obj = *idx.begin();
if (obj.id._id > block_state->block->block_num())
break;
if (obj.id._id == irreversible_end_block || irreversible_begin_block == irreversible_end_block) {
if (obj.id._id == state_log.end_block || state_log.begin_block == state_log.end_block) {
// ilog("write irreversible ${n}", ("n", obj.id._id));
irreversible_log.seekg(0, std::ios_base::end);
uint64_t pos = irreversible_log.tellg();
irrev_state_log_header header{.block_num = (uint32_t)obj.id._id, .size = obj.deltas.size()};
irreversible_log.write((char*)&header, sizeof(header));
if (!obj.deltas.empty())
irreversible_log.write(obj.deltas.data(), obj.deltas.size());
irreversible_log.write((char*)&pos, sizeof(pos));
// ilog("block ${b} at ${pos}-${end}",
// ("b", header.block_num)("pos", pos)("end", pos + sizeof(header) + obj.deltas.size() + sizeof(pos)));
irreversible_index.seekg(0, std::ios_base::end);
irreversible_index.write((char*)&pos, sizeof(pos));
if (irreversible_begin_block == irreversible_end_block)
irreversible_begin_block = obj.id._id;
irreversible_end_block = obj.id._id + 1;
EOS_ASSERT(obj.deltas.size() == (uint32_t)obj.deltas.size(), plugin_exception, "deltas is too big");
history_log_header header{.block_num = (uint32_t)obj.id._id,
.payload_size = sizeof(uint32_t) + obj.deltas.size()};
state_log.write_entry(header, [&](auto& stream) {
uint32_t s = (uint32_t)obj.deltas.size();
stream.write((char*)&s, sizeof(s));
if (!obj.deltas.empty())
stream.write(obj.deltas.data(), obj.deltas.size());
});
}
if (idx.size() >= 2 && (++idx.begin())->id._id <= block_state->block->block_num()) {
// ilog("drop irreversible ${n}", ("n", obj.id._id));
......@@ -972,11 +999,11 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {
auto& ind = my->db->get_index<state_history_index>().indices();
ilog("in-memory state history has ${n} blocks", ("n", ind.size()));
if (my->irreversible_end_block != my->irreversible_begin_block) {
if (my->state_log.end_block != my->state_log.begin_block) {
// todo: load if ind.empty()
EOS_ASSERT(!ind.empty(), plugin_exception, "irreversible_state.log and shared_memory don't match");
auto& x = *ind.begin();
EOS_ASSERT(x.id._id == my->irreversible_end_block - 1, plugin_exception,
EOS_ASSERT(x.id._id == my->state_log.end_block - 1, plugin_exception,
"irreversible_state.log and shared_memory don't match");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册