未验证 提交 ef31d133 编写于 作者: Y Yingchun Lai 提交者: GitHub

feat(rocksdb): Use original CreateCheckpoint and read meta data from meta CF (#535)

上级 b983b15e
Subproject commit 065c1edb3066d5ccfe88f19740caa4d8244c233e
Subproject commit bdc6aa0096b79d64ed9fbe96c3d37ea6dcb0ead9
......@@ -12,7 +12,7 @@ namespace server {
DSN_DEFINE_string("pegasus.server",
get_meta_store_type,
"manifest",
"metacf",
"Where to get meta data, now support 'manifest' and 'metacf'");
DSN_DEFINE_validator(get_meta_store_type, [](const char *type) {
return strcmp(type, "manifest") == 0 || strcmp(type, "metacf") == 0;
......@@ -84,9 +84,27 @@ uint64_t meta_store::get_last_manual_compact_finish_time() const
}
}
uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *meta_cf) const
{
uint64_t last_flushed_decree = 0;
auto ec = get_value_from_meta_cf(db, meta_cf, true, LAST_FLUSHED_DECREE, &last_flushed_decree);
dcheck_eq_replica(::dsn::ERR_OK, ec);
return last_flushed_decree;
}
::dsn::error_code meta_store::get_value_from_meta_cf(bool read_flushed_data,
const std::string &key,
uint64_t *value) const
{
return get_value_from_meta_cf(_db, _meta_cf, read_flushed_data, key, value);
}
::dsn::error_code meta_store::get_value_from_meta_cf(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *cf,
bool read_flushed_data,
const std::string &key,
uint64_t *value)
{
std::string data;
rocksdb::ReadOptions rd_opts;
......@@ -94,14 +112,13 @@ uint64_t meta_store::get_last_manual_compact_finish_time() const
// only read 'flushed' data, mainly to read 'last_flushed_decree'
rd_opts.read_tier = rocksdb::kPersistedTier;
}
auto status = _db->Get(rd_opts, _meta_cf, key, &data);
auto status = db->Get(rd_opts, cf, key, &data);
if (status.ok()) {
bool ok = dsn::buf2uint64(data, *value);
dassert_replica(ok,
"rocksdb {} get {} from meta column family got error value {}",
_db->GetName(),
key,
data);
dassert(dsn::buf2uint64(data, *value),
"rocksdb {} get {} from meta column family got error value {}",
db->GetName(),
key,
data);
return ::dsn::ERR_OK;
}
......
......@@ -33,6 +33,8 @@ public:
meta_store(pegasus_server_impl *server, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf);
uint64_t get_last_flushed_decree() const;
uint64_t get_decree_from_readonly_db(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *meta_cf) const;
uint32_t get_data_version() const;
uint64_t get_last_manual_compact_finish_time() const;
......@@ -45,6 +47,12 @@ private:
get_value_from_meta_cf(bool read_flushed_data, const std::string &key, uint64_t *value) const;
::dsn::error_code set_value_to_meta_cf(const std::string &key, uint64_t value) const;
static ::dsn::error_code get_value_from_meta_cf(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *cf,
bool read_flushed_data,
const std::string &key,
uint64_t *value);
friend class pegasus_write_service;
// Keys of meta data wrote into meta column family.
......
......@@ -1326,16 +1326,17 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());
bool need_create_meta_cf = true;
// Check meta CF only when db exist.
if (db_exist && check_meta_cf(path, &need_create_meta_cf) != ::dsn::ERR_OK) {
derror_replica("check meta column family failed");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
if (need_create_meta_cf) {
// If upgrade from an old Pegasus version which has just one column family (the default
// column family), or create new db, we have to create a new column family to store meta
// data (meta column family).
if (db_exist) {
// When DB exist, meta CF must be present.
bool missing_meta_cf = true;
if (check_meta_cf(path, &missing_meta_cf) != ::dsn::ERR_OK) {
derror_replica("check meta column family failed");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
} else {
// When create new DB, we have to create a new column family to store meta data (meta column
// family).
_db_opts.create_missing_column_families = true;
}
......@@ -1356,29 +1357,31 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
// Create _meta_store which provide Pegasus meta data read and write.
_meta_store = dsn::make_unique<meta_store>(this, _db, _meta_cf);
_last_committed_decree = _meta_store->get_last_flushed_decree();
_pegasus_data_version = _meta_store->get_data_version();
uint64_t last_manual_compact_finish_time = _meta_store->get_last_manual_compact_finish_time();
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
derror_replica("open app failed, unsupported data version {}", _pegasus_data_version);
release_db();
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
if (db_exist) {
_last_committed_decree = _meta_store->get_last_flushed_decree();
_pegasus_data_version = _meta_store->get_data_version();
uint64_t last_manual_compact_finish_time =
_meta_store->get_last_manual_compact_finish_time();
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
derror_replica("open app failed, unsupported data version {}", _pegasus_data_version);
release_db();
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
if (need_create_meta_cf) {
// Write meta data to meta CF according to manifest.
_meta_store->set_data_version(_pegasus_data_version);
_meta_store->set_last_flushed_decree(_last_committed_decree);
_meta_store->set_last_manual_compact_finish_time(last_manual_compact_finish_time);
// update last manual compact finish timestamp
_manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
} else {
// Write initial meta data to meta CF and flush when create new DB.
_meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
_meta_store->set_last_flushed_decree(0);
_meta_store->set_last_manual_compact_finish_time(0);
flush_all_family_columns(true);
}
// only enable filter after correct pegasus_data_version set
_key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version);
_key_ttl_compaction_filter_factory->EnableFilter();
// update LastManualCompactFinishTime
_manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
parse_checkpoints();
// checkpoint if necessary to make last_durable_decree() fresh.
......@@ -1739,8 +1742,8 @@ private:
}
}
uint64_t ci = 0;
status = chkpt->CreateCheckpointQuick(checkpoint_dir, &ci);
// CreateCheckpoint() will not flush memtable when log_size_for_flush = max
status = chkpt->CreateCheckpoint(checkpoint_dir, std::numeric_limits<uint64_t>::max());
if (!status.ok()) {
derror_replica("CreateCheckpoint failed, error = {}", status.ToString());
if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
......@@ -1748,10 +1751,39 @@ private:
}
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
ddebug_replica("copy checkpoint to dir({}) succeed, last_decree = {}", checkpoint_dir, ci);
ddebug_replica("copy checkpoint to dir({}) succeed", checkpoint_dir);
if (checkpoint_decree != nullptr) {
*checkpoint_decree = static_cast<int64_t>(ci);
rocksdb::DB *snapshot_db = nullptr;
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
auto cleanup = [&](bool remove_checkpoint) {
if (remove_checkpoint && !::dsn::utils::filesystem::remove_path(checkpoint_dir)) {
derror_replica("remove checkpoint directory {} failed", checkpoint_dir);
}
release_db(snapshot_db, handles_opened);
};
// Because of RocksDB's restriction, we have to to open default column family even though
// not use it
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()},
{META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}});
status = rocksdb::DB::OpenForReadOnly(
rocksdb::DBOptions(), checkpoint_dir, column_families, &handles_opened, &snapshot_db);
if (!status.ok()) {
derror_replica(
"OpenForReadOnly from {} failed, error = {}", checkpoint_dir, status.ToString());
snapshot_db = nullptr;
cleanup(true);
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
dcheck_eq_replica(handles_opened.size(), 2);
dcheck_eq_replica(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
uint64_t last_flushed_decree =
_meta_store->get_decree_from_readonly_db(snapshot_db, handles_opened[1]);
*checkpoint_decree = last_flushed_decree;
cleanup(false);
}
return ::dsn::ERR_OK;
......@@ -2575,10 +2607,9 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version)
// TODO(heyuchen): set filter _partition_version in further pr
}
::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path,
bool *need_create_meta_cf)
::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path, bool *missing_meta_cf)
{
*need_create_meta_cf = true;
*missing_meta_cf = true;
std::vector<std::string> column_families;
auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, &column_families);
if (!s.ok()) {
......@@ -2588,7 +2619,7 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version)
for (const auto &column_family : column_families) {
if (column_family == META_COLUMN_FAMILY_NAME) {
*need_create_meta_cf = false;
*missing_meta_cf = false;
break;
}
}
......@@ -2607,14 +2638,20 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version)
return ::dsn::ERR_OK;
}
void pegasus_server_impl::release_db()
void pegasus_server_impl::release_db() { release_db(_db, {_data_cf, _meta_cf}); }
void pegasus_server_impl::release_db(rocksdb::DB *db,
const std::vector<rocksdb::ColumnFamilyHandle *> &handles)
{
_db->DestroyColumnFamilyHandle(_data_cf);
_data_cf = nullptr;
_db->DestroyColumnFamilyHandle(_meta_cf);
_meta_cf = nullptr;
delete _db;
_db = nullptr;
if (db) {
for (auto handle : handles) {
dassert_replica(handle != nullptr, "");
db->DestroyColumnFamilyHandle(handle);
handle = nullptr;
}
delete db;
db = nullptr;
}
}
std::string pegasus_server_impl::dump_write_request(dsn::message_ex *request)
......
......@@ -305,9 +305,10 @@ private:
return false;
}
::dsn::error_code check_meta_cf(const std::string &path, bool *need_create_meta_cf);
::dsn::error_code check_meta_cf(const std::string &path, bool *missing_meta_cf);
void release_db();
void release_db(rocksdb::DB *db, const std::vector<rocksdb::ColumnFamilyHandle *> &handles);
::dsn::error_code flush_all_family_columns(bool wait);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册