提交 839826d2 编写于 作者: Z Zhang Yifan 提交者: heyuchen

fix: Load options from file when open an exist DB (#587)

上级 eafd1254
Subproject commit 40d86f7f7aa51ba874d9cc91cb7e267f18b2bd11
Subproject commit 2158aeb5022a9881c18bbad3addd02c68eb93323
......@@ -58,6 +58,19 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
return last_flushed_decree;
}
std::string meta_store::get_usage_scenario() const
{
// If couldn't find rocksdb usage scenario in meta column family, return normal in default.
std::string usage_scenario = ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
auto ec = get_string_value_from_meta_cf(false, ROCKSDB_ENV_USAGE_SCENARIO_KEY, &usage_scenario);
dassert_replica(ec == ::dsn::ERR_OK || ec == ::dsn::ERR_OBJECT_NOT_FOUND,
"rocksdb {} get {} from meta column family failed: {}",
_db->GetName(),
ROCKSDB_ENV_USAGE_SCENARIO_KEY,
ec.to_string());
return usage_scenario;
}
::dsn::error_code meta_store::get_value_from_meta_cf(bool read_flushed_data,
const std::string &key,
uint64_t *value) const
......@@ -72,18 +85,37 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
uint64_t *value)
{
std::string data;
auto ec = get_string_value_from_meta_cf(db, cf, read_flushed_data, key, &data);
if (ec != ::dsn::ERR_OK) {
return ec;
}
dassert_f(dsn::buf2uint64(data, *value),
"rocksdb {} get \"{}\" from meta column family failed to parse into uint64",
db->GetName(),
data);
return ::dsn::ERR_OK;
}
::dsn::error_code meta_store::get_string_value_from_meta_cf(bool read_flushed_data,
const std::string &key,
std::string *value) const
{
return get_string_value_from_meta_cf(_db, _meta_cf, read_flushed_data, key, value);
}
::dsn::error_code meta_store::get_string_value_from_meta_cf(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *cf,
bool read_flushed_data,
const std::string &key,
std::string *value)
{
rocksdb::ReadOptions rd_opts;
if (read_flushed_data) {
// only read 'flushed' data, mainly to read 'last_flushed_decree'
rd_opts.read_tier = rocksdb::kPersistedTier;
}
auto status = db->Get(rd_opts, cf, key, &data);
auto status = db->Get(rd_opts, cf, key, value);
if (status.ok()) {
dassert_f(dsn::buf2uint64(data, *value),
"rocksdb {} get {} from meta column family got error value {}",
db->GetName(),
key,
data);
return ::dsn::ERR_OK;
}
......@@ -97,7 +129,13 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
::dsn::error_code meta_store::set_value_to_meta_cf(const std::string &key, uint64_t value) const
{
auto status = _db->Put(_wt_opts, _meta_cf, key, std::to_string(value));
return set_string_value_to_meta_cf(key, std::to_string(value));
}
::dsn::error_code meta_store::set_string_value_to_meta_cf(const std::string &key,
const std::string &value) const
{
auto status = _db->Put(_wt_opts, _meta_cf, key, value);
if (!status.ok()) {
derror_replica(
"Put {}={} to meta column family failed, status {}", key, value, status.ToString());
......@@ -124,5 +162,11 @@ void meta_store::set_last_manual_compact_finish_time(uint64_t last_manual_compac
set_value_to_meta_cf(LAST_MANUAL_COMPACT_FINISH_TIME, last_manual_compact_finish_time));
}
void meta_store::set_usage_scenario(const std::string &usage_scenario) const
{
dcheck_eq_replica(::dsn::ERR_OK,
set_string_value_to_meta_cf(ROCKSDB_ENV_USAGE_SCENARIO_KEY, usage_scenario));
}
} // namespace server
} // namespace pegasus
......@@ -30,21 +30,33 @@ public:
rocksdb::ColumnFamilyHandle *meta_cf) const;
uint32_t get_data_version() const;
uint64_t get_last_manual_compact_finish_time() const;
std::string get_usage_scenario() const;
void set_last_flushed_decree(uint64_t decree) const;
void set_data_version(uint32_t version) const;
void set_last_manual_compact_finish_time(uint64_t last_manual_compact_finish_time) const;
void set_usage_scenario(const std::string &usage_scenario) const;
private:
::dsn::error_code
get_value_from_meta_cf(bool read_flushed_data, const std::string &key, uint64_t *value) const;
::dsn::error_code get_string_value_from_meta_cf(bool read_flushed_data,
const std::string &key,
std::string *value) const;
::dsn::error_code set_value_to_meta_cf(const std::string &key, uint64_t value) const;
::dsn::error_code set_string_value_to_meta_cf(const std::string &key,
const std::string &value) const;
static ::dsn::error_code get_value_from_meta_cf(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *cf,
bool read_flushed_data,
const std::string &key,
uint64_t *value);
static ::dsn::error_code get_string_value_from_meta_cf(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *cf,
bool read_flushed_data,
const std::string &key,
std::string *value);
friend class pegasus_write_service;
......
......@@ -8,6 +8,7 @@
#include <boost/lexical_cast.hpp>
#include <rocksdb/convenience.h>
#include <rocksdb/utilities/checkpoint.h>
#include <rocksdb/utilities/options_util.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/filesystem.h>
......@@ -1326,14 +1327,45 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());
// Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// will be used elsewhere.
rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts;
if (db_exist) {
// When DB exist, meta CF must be present.
bool missing_meta_cf = true;
if (check_meta_cf(path, &missing_meta_cf) != ::dsn::ERR_OK) {
derror_replica("check meta column family failed");
bool missing_data_cf = true;
// Load latest options from option file stored in the db directory.
rocksdb::DBOptions loaded_db_opt;
std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
// Set `ignore_unknown_options` true for forward compatibility.
auto status = rocksdb::LoadLatestOptions(path,
rocksdb::Env::Default(),
&loaded_db_opt,
&loaded_cf_descs,
/*ignore_unknown_options=*/true);
if (!status.ok()) {
derror_replica("load latest option file failed.");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
for (int i = 0; i < loaded_cf_descs.size(); ++i) {
if (loaded_cf_descs[i].name == META_COLUMN_FAMILY_NAME) {
missing_meta_cf = false;
} else if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) {
missing_data_cf = false;
loaded_data_cf_opts = loaded_cf_descs[i].options;
} else {
derror_replica("unknown column family name.");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
}
// When DB exists, meta CF and data CF must be present.
dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
dassert_replica(!missing_data_cf, "Missing data column family");
// Reset usage scenario related options according to loaded_data_cf_opts.
// We don't use `loaded_data_cf_opts` directly because pointer-typed options will only be
// initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
} else {
// When create new DB, we have to create a new column family to store meta data (meta column
// family).
......@@ -1341,7 +1373,13 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
}
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, _data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
{{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
auto s = rocksdb::CheckOptionsCompatibility(
path, rocksdb::Env::Default(), _db_opts, column_families, /*ignore_unknown_options=*/true);
if (!s.ok() && !s.IsNotFound()) {
derror_replica("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db);
if (!status.ok()) {
......@@ -1360,6 +1398,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
if (db_exist) {
_last_committed_decree = _meta_store->get_last_flushed_decree();
_pegasus_data_version = _meta_store->get_data_version();
_usage_scenario = _meta_store->get_usage_scenario();
uint64_t last_manual_compact_finish_time =
_meta_store->get_last_manual_compact_finish_time();
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
......@@ -1407,8 +1446,10 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
_is_open = true;
// set default usage scenario after db opened.
set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
if (!db_exist) {
// When create a new db, update usage scenario according to app envs.
update_usage_scenario(envs);
}
dinfo_replica("start the update rocksdb statistics timer task");
_update_replica_rdb_stat =
......@@ -2472,6 +2513,7 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
return false;
}
if (set_options(new_options)) {
_meta_store->set_usage_scenario(usage_scenario);
_usage_scenario = usage_scenario;
ddebug_replica(
"set usage scenario from \"{}\" to \"{}\" succeed", old_usage_scenario, usage_scenario);
......@@ -2483,6 +2525,23 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
}
}
void pegasus_server_impl::reset_usage_scenario_options(
const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts)
{
// reset usage scenario related options, refer to options set in 'set_usage_scenario' function.
target_opts->level0_file_num_compaction_trigger = base_opts.level0_file_num_compaction_trigger;
target_opts->level0_slowdown_writes_trigger = base_opts.level0_slowdown_writes_trigger;
target_opts->level0_stop_writes_trigger = base_opts.level0_stop_writes_trigger;
target_opts->soft_pending_compaction_bytes_limit =
base_opts.soft_pending_compaction_bytes_limit;
target_opts->hard_pending_compaction_bytes_limit =
base_opts.hard_pending_compaction_bytes_limit;
target_opts->disable_auto_compactions = base_opts.disable_auto_compactions;
target_opts->max_compaction_bytes = base_opts.max_compaction_bytes;
target_opts->write_buffer_size = base_opts.write_buffer_size;
target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
}
bool pegasus_server_impl::set_options(
const std::unordered_map<std::string, std::string> &new_options)
{
......@@ -2607,25 +2666,6 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version)
// TODO(heyuchen): set filter _partition_version in further pr
}
::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path, bool *missing_meta_cf)
{
*missing_meta_cf = true;
std::vector<std::string> column_families;
auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, &column_families);
if (!s.ok()) {
derror_replica("rocksdb::DB::ListColumnFamilies failed, error = {}", s.ToString());
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
for (const auto &column_family : column_families) {
if (column_family == META_COLUMN_FAMILY_NAME) {
*missing_meta_cf = false;
break;
}
}
return ::dsn::ERR_OK;
}
::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait)
{
rocksdb::FlushOptions options;
......
......@@ -166,6 +166,8 @@ private:
friend class pegasus_compression_options_test;
friend class pegasus_server_impl_test;
FRIEND_TEST(pegasus_server_impl_test, default_data_version);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);
friend class pegasus_manual_compact_service;
friend class pegasus_write_service;
......@@ -260,6 +262,9 @@ private:
// return true if successfully changed
bool set_usage_scenario(const std::string &usage_scenario);
void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts);
// return true if successfully set
bool set_options(const std::unordered_map<std::string, std::string> &new_options);
......@@ -305,8 +310,6 @@ private:
return false;
}
::dsn::error_code check_meta_cf(const std::string &path, bool *missing_meta_cf);
void release_db();
void release_db(rocksdb::DB *db, const std::vector<rocksdb::ColumnFamilyHandle *> &handles);
......
......@@ -11,7 +11,7 @@ namespace server {
class pegasus_server_impl_test : public pegasus_server_test_base
{
public:
pegasus_server_impl_test() : pegasus_server_test_base() { start(); }
pegasus_server_impl_test() : pegasus_server_test_base() {}
void test_table_level_slow_query()
{
......@@ -59,12 +59,45 @@ public:
}
};
TEST_F(pegasus_server_impl_test, test_table_level_slow_query) { test_table_level_slow_query(); }
TEST_F(pegasus_server_impl_test, test_table_level_slow_query)
{
start();
test_table_level_slow_query();
}
TEST_F(pegasus_server_impl_test, default_data_version)
{
start();
ASSERT_EQ(_server->_pegasus_data_version, 1);
}
TEST_F(pegasus_server_impl_test, test_open_db_with_latest_options)
{
// open a new db with no app env.
start();
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, _server->_usage_scenario);
// set bulk_load scenario for the db.
ASSERT_TRUE(_server->set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
rocksdb::Options opts = _server->_db->GetOptions();
ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger);
ASSERT_EQ(true, opts.disable_auto_compactions);
// reopen the db.
_server->stop(false);
start();
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
ASSERT_EQ(opts.level0_file_num_compaction_trigger,
_server->_db->GetOptions().level0_file_num_compaction_trigger);
ASSERT_EQ(opts.disable_auto_compactions, _server->_db->GetOptions().disable_auto_compactions);
}
TEST_F(pegasus_server_impl_test, test_open_db_with_app_envs)
{
std::map<std::string, std::string> envs;
envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
start(envs);
ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
}
} // namespace server
} // namespace pegasus
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册