From ef31d133301e1158f7e665cb6e5c792de72551e9 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Tue, 30 Jun 2020 20:52:20 +0800 Subject: [PATCH] feat(rocksdb): Use original CreateCheckpoint and read meta data from meta CF (#535) --- rdsn | 2 +- src/server/meta_store.cpp | 33 ++++++-- src/server/meta_store.h | 8 ++ src/server/pegasus_server_impl.cpp | 119 +++++++++++++++++++---------- src/server/pegasus_server_impl.h | 3 +- 5 files changed, 114 insertions(+), 51 deletions(-) diff --git a/rdsn b/rdsn index 065c1ed..bdc6aa0 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 065c1edb3066d5ccfe88f19740caa4d8244c233e +Subproject commit bdc6aa0096b79d64ed9fbe96c3d37ea6dcb0ead9 diff --git a/src/server/meta_store.cpp b/src/server/meta_store.cpp index a25b07a..0b4d4c8 100644 --- a/src/server/meta_store.cpp +++ b/src/server/meta_store.cpp @@ -12,7 +12,7 @@ namespace server { DSN_DEFINE_string("pegasus.server", get_meta_store_type, - "manifest", + "metacf", "Where to get meta data, now support 'manifest' and 'metacf'"); DSN_DEFINE_validator(get_meta_store_type, [](const char *type) { return strcmp(type, "manifest") == 0 || strcmp(type, "metacf") == 0; @@ -84,9 +84,27 @@ uint64_t meta_store::get_last_manual_compact_finish_time() const } } +uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db, + rocksdb::ColumnFamilyHandle *meta_cf) const +{ + uint64_t last_flushed_decree = 0; + auto ec = get_value_from_meta_cf(db, meta_cf, true, LAST_FLUSHED_DECREE, &last_flushed_decree); + dcheck_eq_replica(::dsn::ERR_OK, ec); + return last_flushed_decree; +} + ::dsn::error_code meta_store::get_value_from_meta_cf(bool read_flushed_data, const std::string &key, uint64_t *value) const +{ + return get_value_from_meta_cf(_db, _meta_cf, read_flushed_data, key, value); +} + +::dsn::error_code meta_store::get_value_from_meta_cf(rocksdb::DB *db, + rocksdb::ColumnFamilyHandle *cf, + bool read_flushed_data, + const std::string &key, + uint64_t *value) { std::string data; rocksdb::ReadOptions rd_opts; @@ -94,14 +112,13 @@ uint64_t meta_store::get_last_manual_compact_finish_time() const // only read 'flushed' data, mainly to read 'last_flushed_decree' rd_opts.read_tier = rocksdb::kPersistedTier; } - auto status = _db->Get(rd_opts, _meta_cf, key, &data); + auto status = db->Get(rd_opts, cf, key, &data); if (status.ok()) { - bool ok = dsn::buf2uint64(data, *value); - dassert_replica(ok, - "rocksdb {} get {} from meta column family got error value {}", - _db->GetName(), - key, - data); + dassert(dsn::buf2uint64(data, *value), + "rocksdb {} get {} from meta column family got error value {}", + db->GetName(), + key, + data); return ::dsn::ERR_OK; } diff --git a/src/server/meta_store.h b/src/server/meta_store.h index ea818fb..bde7d21 100644 --- a/src/server/meta_store.h +++ b/src/server/meta_store.h @@ -33,6 +33,8 @@ public: meta_store(pegasus_server_impl *server, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf); uint64_t get_last_flushed_decree() const; + uint64_t get_decree_from_readonly_db(rocksdb::DB *db, + rocksdb::ColumnFamilyHandle *meta_cf) const; uint32_t get_data_version() const; uint64_t get_last_manual_compact_finish_time() const; @@ -45,6 +47,12 @@ private: get_value_from_meta_cf(bool read_flushed_data, const std::string &key, uint64_t *value) const; ::dsn::error_code set_value_to_meta_cf(const std::string &key, uint64_t value) const; + static ::dsn::error_code get_value_from_meta_cf(rocksdb::DB *db, + rocksdb::ColumnFamilyHandle *cf, + bool read_flushed_data, + const std::string &key, + uint64_t *value); + friend class pegasus_write_service; // Keys of meta data wrote into meta column family. diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 67112d3..b181c44 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1326,16 +1326,17 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str()); - bool need_create_meta_cf = true; - // Check meta CF only when db exist. - if (db_exist && check_meta_cf(path, &need_create_meta_cf) != ::dsn::ERR_OK) { - derror_replica("check meta column family failed"); - return ::dsn::ERR_LOCAL_APP_FAILURE; - } - if (need_create_meta_cf) { - // If upgrade from an old Pegasus version which has just one column family (the default - // column family), or create new db, we have to create a new column family to store meta - // data (meta column family). + if (db_exist) { + // When DB exist, meta CF must be present. + bool missing_meta_cf = true; + if (check_meta_cf(path, &missing_meta_cf) != ::dsn::ERR_OK) { + derror_replica("check meta column family failed"); + return ::dsn::ERR_LOCAL_APP_FAILURE; + } + dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0"); + } else { + // When create new DB, we have to create a new column family to store meta data (meta column + // family). _db_opts.create_missing_column_families = true; } @@ -1356,29 +1357,31 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache // Create _meta_store which provide Pegasus meta data read and write. _meta_store = dsn::make_unique(this, _db, _meta_cf); - _last_committed_decree = _meta_store->get_last_flushed_decree(); - _pegasus_data_version = _meta_store->get_data_version(); - uint64_t last_manual_compact_finish_time = _meta_store->get_last_manual_compact_finish_time(); - if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) { - derror_replica("open app failed, unsupported data version {}", _pegasus_data_version); - release_db(); - return ::dsn::ERR_LOCAL_APP_FAILURE; - } + if (db_exist) { + _last_committed_decree = _meta_store->get_last_flushed_decree(); + _pegasus_data_version = _meta_store->get_data_version(); + uint64_t last_manual_compact_finish_time = + _meta_store->get_last_manual_compact_finish_time(); + if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) { + derror_replica("open app failed, unsupported data version {}", _pegasus_data_version); + release_db(); + return ::dsn::ERR_LOCAL_APP_FAILURE; + } - if (need_create_meta_cf) { - // Write meta data to meta CF according to manifest. - _meta_store->set_data_version(_pegasus_data_version); - _meta_store->set_last_flushed_decree(_last_committed_decree); - _meta_store->set_last_manual_compact_finish_time(last_manual_compact_finish_time); + // update last manual compact finish timestamp + _manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time); + } else { + // Write initial meta data to meta CF and flush when create new DB. + _meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX); + _meta_store->set_last_flushed_decree(0); + _meta_store->set_last_manual_compact_finish_time(0); + flush_all_family_columns(true); } // only enable filter after correct pegasus_data_version set _key_ttl_compaction_filter_factory->SetPegasusDataVersion(_pegasus_data_version); _key_ttl_compaction_filter_factory->EnableFilter(); - // update LastManualCompactFinishTime - _manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time); - parse_checkpoints(); // checkpoint if necessary to make last_durable_decree() fresh. @@ -1739,8 +1742,8 @@ private: } } - uint64_t ci = 0; - status = chkpt->CreateCheckpointQuick(checkpoint_dir, &ci); + // CreateCheckpoint() will not flush memtable when log_size_for_flush = max + status = chkpt->CreateCheckpoint(checkpoint_dir, std::numeric_limits::max()); if (!status.ok()) { derror_replica("CreateCheckpoint failed, error = {}", status.ToString()); if (!::dsn::utils::filesystem::remove_path(checkpoint_dir)) { @@ -1748,10 +1751,39 @@ private: } return ::dsn::ERR_LOCAL_APP_FAILURE; } - ddebug_replica("copy checkpoint to dir({}) succeed, last_decree = {}", checkpoint_dir, ci); + ddebug_replica("copy checkpoint to dir({}) succeed", checkpoint_dir); if (checkpoint_decree != nullptr) { - *checkpoint_decree = static_cast(ci); + rocksdb::DB *snapshot_db = nullptr; + std::vector handles_opened; + auto cleanup = [&](bool remove_checkpoint) { + if (remove_checkpoint && !::dsn::utils::filesystem::remove_path(checkpoint_dir)) { + derror_replica("remove checkpoint directory {} failed", checkpoint_dir); + } + release_db(snapshot_db, handles_opened); + }; + + // Because of RocksDB's restriction, we have to to open default column family even though + // not use it + std::vector column_families( + {{DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}, + {META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}}); + status = rocksdb::DB::OpenForReadOnly( + rocksdb::DBOptions(), checkpoint_dir, column_families, &handles_opened, &snapshot_db); + if (!status.ok()) { + derror_replica( + "OpenForReadOnly from {} failed, error = {}", checkpoint_dir, status.ToString()); + snapshot_db = nullptr; + cleanup(true); + return ::dsn::ERR_LOCAL_APP_FAILURE; + } + dcheck_eq_replica(handles_opened.size(), 2); + dcheck_eq_replica(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME); + uint64_t last_flushed_decree = + _meta_store->get_decree_from_readonly_db(snapshot_db, handles_opened[1]); + *checkpoint_decree = last_flushed_decree; + + cleanup(false); } return ::dsn::ERR_OK; @@ -2575,10 +2607,9 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version) // TODO(heyuchen): set filter _partition_version in further pr } -::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path, - bool *need_create_meta_cf) +::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path, bool *missing_meta_cf) { - *need_create_meta_cf = true; + *missing_meta_cf = true; std::vector column_families; auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, &column_families); if (!s.ok()) { @@ -2588,7 +2619,7 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version) for (const auto &column_family : column_families) { if (column_family == META_COLUMN_FAMILY_NAME) { - *need_create_meta_cf = false; + *missing_meta_cf = false; break; } } @@ -2607,14 +2638,20 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version) return ::dsn::ERR_OK; } -void pegasus_server_impl::release_db() +void pegasus_server_impl::release_db() { release_db(_db, {_data_cf, _meta_cf}); } + +void pegasus_server_impl::release_db(rocksdb::DB *db, + const std::vector &handles) { - _db->DestroyColumnFamilyHandle(_data_cf); - _data_cf = nullptr; - _db->DestroyColumnFamilyHandle(_meta_cf); - _meta_cf = nullptr; - delete _db; - _db = nullptr; + if (db) { + for (auto handle : handles) { + dassert_replica(handle != nullptr, ""); + db->DestroyColumnFamilyHandle(handle); + handle = nullptr; + } + delete db; + db = nullptr; + } } std::string pegasus_server_impl::dump_write_request(dsn::message_ex *request) diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 61f6f26..2ce6015 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -305,9 +305,10 @@ private: return false; } - ::dsn::error_code check_meta_cf(const std::string &path, bool *need_create_meta_cf); + ::dsn::error_code check_meta_cf(const std::string &path, bool *missing_meta_cf); void release_db(); + void release_db(rocksdb::DB *db, const std::vector &handles); ::dsn::error_code flush_all_family_columns(bool wait); -- GitLab