提交 96ab15d3 编写于 作者: I Igor Canadi

GetOptionsFromString + fixes to block_based_table_options

Summary:
In mongo, we currently have a single column family and I'd like to support setting rocksdb::Options from string. This diff provides an option to GetOptionsFromString()

There's one more problem. Currently GetColumnFamilyOptionsFromString() overwrites block_based options. In mongo I set default values for block_cache and some other values of BlockBasedTableOptions and I don't want them reset to default with GetOptionsFromString().

Test Plan: added unit test

Reviewers: yhchiang, rven, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D33729
上级 73711f95
...@@ -53,6 +53,9 @@ Status GetBlockBasedTableOptionsFromString( ...@@ -53,6 +53,9 @@ Status GetBlockBasedTableOptionsFromString(
const std::string& opts_str, const std::string& opts_str,
BlockBasedTableOptions* new_table_options); BlockBasedTableOptions* new_table_options);
Status GetOptionsFromString(const Options& base_options,
const std::string& opts_str, Options* new_options);
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/utilities/convenience.h" #include "rocksdb/utilities/convenience.h"
#include "table/block_based_table_factory.h"
#include "util/options_helper.h" #include "util/options_helper.h"
namespace rocksdb { namespace rocksdb {
...@@ -347,6 +348,195 @@ Status StringToMap(const std::string& opts_str, ...@@ -347,6 +348,195 @@ Status StringToMap(const std::string& opts_str,
return Status::OK(); return Status::OK();
} }
bool ParseColumnFamilyOption(const std::string& name, const std::string& value,
ColumnFamilyOptions* new_options) {
try {
if (ParseMemtableOptions(name, value, new_options)) {
} else if (ParseCompactionOptions(name, value, new_options)) {
} else if (ParseMiscOptions(name, value, new_options)) {
} else if (name == "block_based_table_factory") {
// Nested options
BlockBasedTableOptions table_opt, base_table_options;
auto block_based_table_factory = dynamic_cast<BlockBasedTableFactory*>(
new_options->table_factory.get());
if (block_based_table_factory != nullptr) {
base_table_options = block_based_table_factory->GetTableOptions();
}
Status table_opt_s = GetBlockBasedTableOptionsFromString(
base_table_options, value, &table_opt);
if (!table_opt_s.ok()) {
return false;
}
new_options->table_factory.reset(NewBlockBasedTableFactory(table_opt));
} else if (name == "min_write_buffer_number_to_merge") {
new_options->min_write_buffer_number_to_merge = ParseInt(value);
} else if (name == "compression") {
new_options->compression = ParseCompressionType(value);
} else if (name == "compression_per_level") {
new_options->compression_per_level.clear();
size_t start = 0;
while (true) {
size_t end = value.find(':', start);
if (end == std::string::npos) {
new_options->compression_per_level.push_back(
ParseCompressionType(value.substr(start)));
break;
} else {
new_options->compression_per_level.push_back(
ParseCompressionType(value.substr(start, end - start)));
start = end + 1;
}
}
} else if (name == "compression_opts") {
size_t start = 0;
size_t end = value.find(':');
if (end == std::string::npos) {
return false;
}
new_options->compression_opts.window_bits =
ParseInt(value.substr(start, end - start));
start = end + 1;
end = value.find(':', start);
if (end == std::string::npos) {
return false;
}
new_options->compression_opts.level =
ParseInt(value.substr(start, end - start));
start = end + 1;
if (start >= value.size()) {
return false;
}
new_options->compression_opts.strategy =
ParseInt(value.substr(start, value.size() - start));
} else if (name == "num_levels") {
new_options->num_levels = ParseInt(value);
} else if (name == "purge_redundant_kvs_while_flush") {
new_options->purge_redundant_kvs_while_flush =
ParseBoolean(name, value);
} else if (name == "compaction_style") {
new_options->compaction_style = ParseCompactionStyle(value);
} else if (name == "compaction_options_universal") {
// TODO(ljin): add support
return false;
} else if (name == "compaction_options_fifo") {
new_options->compaction_options_fifo.max_table_files_size =
ParseUint64(value);
} else if (name == "bloom_locality") {
new_options->bloom_locality = ParseUint32(value);
} else if (name == "min_partial_merge_operands") {
new_options->min_partial_merge_operands = ParseUint32(value);
} else if (name == "inplace_update_support") {
new_options->inplace_update_support = ParseBoolean(name, value);
} else if (name == "prefix_extractor") {
const std::string kFixedPrefixName = "fixed:";
const std::string kCappedPrefixName = "capped:";
auto& pe_value = value;
if (pe_value.size() > kFixedPrefixName.size() &&
pe_value.compare(0, kFixedPrefixName.size(), kFixedPrefixName) == 0) {
int prefix_length =
ParseInt(trim(value.substr(kFixedPrefixName.size())));
new_options->prefix_extractor.reset(
NewFixedPrefixTransform(prefix_length));
} else if (pe_value.size() > kCappedPrefixName.size() &&
pe_value.compare(0, kCappedPrefixName.size(),
kCappedPrefixName) == 0) {
int prefix_length =
ParseInt(trim(pe_value.substr(kCappedPrefixName.size())));
new_options->prefix_extractor.reset(
NewCappedPrefixTransform(prefix_length));
} else {
return false;
}
} else {
return false;
}
}
catch (std::exception& e) {
return false;
}
return true;
}
bool ParseDBOption(const std::string& name, const std::string& value,
DBOptions* new_options) {
try {
if (name == "create_if_missing") {
new_options->create_if_missing = ParseBoolean(name, value);
} else if (name == "create_missing_column_families") {
new_options->create_missing_column_families =
ParseBoolean(name, value);
} else if (name == "error_if_exists") {
new_options->error_if_exists = ParseBoolean(name, value);
} else if (name == "paranoid_checks") {
new_options->paranoid_checks = ParseBoolean(name, value);
} else if (name == "max_open_files") {
new_options->max_open_files = ParseInt(value);
} else if (name == "max_total_wal_size") {
new_options->max_total_wal_size = ParseUint64(value);
} else if (name == "disable_data_sync") {
new_options->disableDataSync = ParseBoolean(name, value);
} else if (name == "use_fsync") {
new_options->use_fsync = ParseBoolean(name, value);
} else if (name == "db_paths") {
// TODO(ljin): add support
return false;
} else if (name == "db_log_dir") {
new_options->db_log_dir = value;
} else if (name == "wal_dir") {
new_options->wal_dir = value;
} else if (name == "delete_obsolete_files_period_micros") {
new_options->delete_obsolete_files_period_micros = ParseUint64(value);
} else if (name == "max_background_compactions") {
new_options->max_background_compactions = ParseInt(value);
} else if (name == "max_background_flushes") {
new_options->max_background_flushes = ParseInt(value);
} else if (name == "max_log_file_size") {
new_options->max_log_file_size = ParseSizeT(value);
} else if (name == "log_file_time_to_roll") {
new_options->log_file_time_to_roll = ParseSizeT(value);
} else if (name == "keep_log_file_num") {
new_options->keep_log_file_num = ParseSizeT(value);
} else if (name == "max_manifest_file_size") {
new_options->max_manifest_file_size = ParseUint64(value);
} else if (name == "table_cache_numshardbits") {
new_options->table_cache_numshardbits = ParseInt(value);
} else if (name == "table_cache_remove_scan_count_limit") {
new_options->table_cache_remove_scan_count_limit = ParseInt(value);
} else if (name == "WAL_ttl_seconds") {
new_options->WAL_ttl_seconds = ParseUint64(value);
} else if (name == "WAL_size_limit_MB") {
new_options->WAL_size_limit_MB = ParseUint64(value);
} else if (name == "manifest_preallocation_size") {
new_options->manifest_preallocation_size = ParseSizeT(value);
} else if (name == "allow_os_buffer") {
new_options->allow_os_buffer = ParseBoolean(name, value);
} else if (name == "allow_mmap_reads") {
new_options->allow_mmap_reads = ParseBoolean(name, value);
} else if (name == "allow_mmap_writes") {
new_options->allow_mmap_writes = ParseBoolean(name, value);
} else if (name == "is_fd_close_on_exec") {
new_options->is_fd_close_on_exec = ParseBoolean(name, value);
} else if (name == "skip_log_error_on_recovery") {
new_options->skip_log_error_on_recovery = ParseBoolean(name, value);
} else if (name == "stats_dump_period_sec") {
new_options->stats_dump_period_sec = ParseUint32(value);
} else if (name == "advise_random_on_open") {
new_options->advise_random_on_open = ParseBoolean(name, value);
} else if (name == "db_write_buffer_size") {
new_options->db_write_buffer_size = ParseUint64(value);
} else if (name == "use_adaptive_mutex") {
new_options->use_adaptive_mutex = ParseBoolean(name, value);
} else if (name == "bytes_per_sync") {
new_options->bytes_per_sync = ParseUint64(value);
} else {
return false;
}
}
catch (std::exception& e) {
return false;
}
return true;
}
Status GetBlockBasedTableOptionsFromMap( Status GetBlockBasedTableOptionsFromMap(
const BlockBasedTableOptions& table_options, const BlockBasedTableOptions& table_options,
...@@ -434,109 +624,8 @@ Status GetColumnFamilyOptionsFromMap( ...@@ -434,109 +624,8 @@ Status GetColumnFamilyOptionsFromMap(
assert(new_options); assert(new_options);
*new_options = base_options; *new_options = base_options;
for (const auto& o : opts_map) { for (const auto& o : opts_map) {
try { if (!ParseColumnFamilyOption(o.first, o.second, new_options)) {
if (ParseMemtableOptions(o.first, o.second, new_options)) { return Status::InvalidArgument("Can't parse option " + o.first);
} else if (ParseCompactionOptions(o.first, o.second, new_options)) {
} else if (ParseMiscOptions(o.first, o.second, new_options)) {
} else if (o.first == "block_based_table_factory") {
// Nested options
BlockBasedTableOptions table_opt;
Status table_opt_s = GetBlockBasedTableOptionsFromString(
BlockBasedTableOptions(), o.second, &table_opt);
if (!table_opt_s.ok()) {
return table_opt_s;
}
new_options->table_factory.reset(NewBlockBasedTableFactory(table_opt));
} else if (o.first == "min_write_buffer_number_to_merge") {
new_options->min_write_buffer_number_to_merge = ParseInt(o.second);
} else if (o.first == "compression") {
new_options->compression = ParseCompressionType(o.second);
} else if (o.first == "compression_per_level") {
new_options->compression_per_level.clear();
size_t start = 0;
while (true) {
size_t end = o.second.find(':', start);
if (end == std::string::npos) {
new_options->compression_per_level.push_back(
ParseCompressionType(o.second.substr(start)));
break;
} else {
new_options->compression_per_level.push_back(
ParseCompressionType(o.second.substr(start, end - start)));
start = end + 1;
}
}
} else if (o.first == "compression_opts") {
size_t start = 0;
size_t end = o.second.find(':');
if (end == std::string::npos) {
return Status::InvalidArgument("invalid config value for: "
+ o.first);
}
new_options->compression_opts.window_bits =
ParseInt(o.second.substr(start, end - start));
start = end + 1;
end = o.second.find(':', start);
if (end == std::string::npos) {
return Status::InvalidArgument("invalid config value for: "
+ o.first);
}
new_options->compression_opts.level =
ParseInt(o.second.substr(start, end - start));
start = end + 1;
if (start >= o.second.size()) {
return Status::InvalidArgument("invalid config value for: "
+ o.first);
}
new_options->compression_opts.strategy =
ParseInt(o.second.substr(start, o.second.size() - start));
} else if (o.first == "num_levels") {
new_options->num_levels = ParseInt(o.second);
} else if (o.first == "purge_redundant_kvs_while_flush") {
new_options->purge_redundant_kvs_while_flush =
ParseBoolean(o.first, o.second);
} else if (o.first == "compaction_style") {
new_options->compaction_style = ParseCompactionStyle(o.second);
} else if (o.first == "compaction_options_universal") {
// TODO(ljin): add support
return Status::NotSupported("Not supported: " + o.first);
} else if (o.first == "compaction_options_fifo") {
new_options->compaction_options_fifo.max_table_files_size
= ParseUint64(o.second);
} else if (o.first == "bloom_locality") {
new_options->bloom_locality = ParseUint32(o.second);
} else if (o.first == "min_partial_merge_operands") {
new_options->min_partial_merge_operands = ParseUint32(o.second);
} else if (o.first == "inplace_update_support") {
new_options->inplace_update_support = ParseBoolean(o.first, o.second);
} else if (o.first == "prefix_extractor") {
const std::string kFixedPrefixName = "fixed:";
const std::string kCappedPrefixName = "capped:";
auto& pe_value = o.second;
if (pe_value.size() > kFixedPrefixName.size() &&
pe_value.compare(0, kFixedPrefixName.size(), kFixedPrefixName) ==
0) {
int prefix_length =
ParseInt(trim(o.second.substr(kFixedPrefixName.size())));
new_options->prefix_extractor.reset(
NewFixedPrefixTransform(prefix_length));
} else if (pe_value.size() > kCappedPrefixName.size() &&
pe_value.compare(0, kCappedPrefixName.size(),
kCappedPrefixName) == 0) {
int prefix_length =
ParseInt(trim(pe_value.substr(kCappedPrefixName.size())));
new_options->prefix_extractor.reset(
NewCappedPrefixTransform(prefix_length));
} else {
return Status::InvalidArgument("Invalid Prefix Extractor type: " +
pe_value);
}
} else {
return Status::InvalidArgument("Unrecognized option: " + o.first);
}
} catch (std::exception& e) {
return Status::InvalidArgument("error parsing " + o.first + ":" +
std::string(e.what()));
} }
} }
return Status::OK(); return Status::OK();
...@@ -561,83 +650,8 @@ Status GetDBOptionsFromMap( ...@@ -561,83 +650,8 @@ Status GetDBOptionsFromMap(
assert(new_options); assert(new_options);
*new_options = base_options; *new_options = base_options;
for (const auto& o : opts_map) { for (const auto& o : opts_map) {
try { if (!ParseDBOption(o.first, o.second, new_options)) {
if (o.first == "create_if_missing") { return Status::InvalidArgument("Can't parse option " + o.first);
new_options->create_if_missing = ParseBoolean(o.first, o.second);
} else if (o.first == "create_missing_column_families") {
new_options->create_missing_column_families =
ParseBoolean(o.first, o.second);
} else if (o.first == "error_if_exists") {
new_options->error_if_exists = ParseBoolean(o.first, o.second);
} else if (o.first == "paranoid_checks") {
new_options->paranoid_checks = ParseBoolean(o.first, o.second);
} else if (o.first == "max_open_files") {
new_options->max_open_files = ParseInt(o.second);
} else if (o.first == "max_total_wal_size") {
new_options->max_total_wal_size = ParseUint64(o.second);
} else if (o.first == "disable_data_sync") {
new_options->disableDataSync = ParseBoolean(o.first, o.second);
} else if (o.first == "use_fsync") {
new_options->use_fsync = ParseBoolean(o.first, o.second);
} else if (o.first == "db_paths") {
// TODO(ljin): add support
return Status::NotSupported("Not supported: " + o.first);
} else if (o.first == "db_log_dir") {
new_options->db_log_dir = o.second;
} else if (o.first == "wal_dir") {
new_options->wal_dir = o.second;
} else if (o.first == "delete_obsolete_files_period_micros") {
new_options->delete_obsolete_files_period_micros =
ParseUint64(o.second);
} else if (o.first == "max_background_compactions") {
new_options->max_background_compactions = ParseInt(o.second);
} else if (o.first == "max_background_flushes") {
new_options->max_background_flushes = ParseInt(o.second);
} else if (o.first == "max_log_file_size") {
new_options->max_log_file_size = ParseSizeT(o.second);
} else if (o.first == "log_file_time_to_roll") {
new_options->log_file_time_to_roll = ParseSizeT(o.second);
} else if (o.first == "keep_log_file_num") {
new_options->keep_log_file_num = ParseSizeT(o.second);
} else if (o.first == "max_manifest_file_size") {
new_options->max_manifest_file_size = ParseUint64(o.second);
} else if (o.first == "table_cache_numshardbits") {
new_options->table_cache_numshardbits = ParseInt(o.second);
} else if (o.first == "table_cache_remove_scan_count_limit") {
new_options->table_cache_remove_scan_count_limit = ParseInt(o.second);
} else if (o.first == "WAL_ttl_seconds") {
new_options->WAL_ttl_seconds = ParseUint64(o.second);
} else if (o.first == "WAL_size_limit_MB") {
new_options->WAL_size_limit_MB = ParseUint64(o.second);
} else if (o.first == "manifest_preallocation_size") {
new_options->manifest_preallocation_size = ParseSizeT(o.second);
} else if (o.first == "allow_os_buffer") {
new_options->allow_os_buffer = ParseBoolean(o.first, o.second);
} else if (o.first == "allow_mmap_reads") {
new_options->allow_mmap_reads = ParseBoolean(o.first, o.second);
} else if (o.first == "allow_mmap_writes") {
new_options->allow_mmap_writes = ParseBoolean(o.first, o.second);
} else if (o.first == "is_fd_close_on_exec") {
new_options->is_fd_close_on_exec = ParseBoolean(o.first, o.second);
} else if (o.first == "skip_log_error_on_recovery") {
new_options->skip_log_error_on_recovery =
ParseBoolean(o.first, o.second);
} else if (o.first == "stats_dump_period_sec") {
new_options->stats_dump_period_sec = ParseUint32(o.second);
} else if (o.first == "advise_random_on_open") {
new_options->advise_random_on_open = ParseBoolean(o.first, o.second);
} else if (o.first == "db_write_buffer_size") {
new_options->db_write_buffer_size = ParseUint64(o.second);
} else if (o.first == "use_adaptive_mutex") {
new_options->use_adaptive_mutex = ParseBoolean(o.first, o.second);
} else if (o.first == "bytes_per_sync") {
new_options->bytes_per_sync = ParseUint64(o.second);
} else {
return Status::InvalidArgument("Unrecognized option: " + o.first);
}
} catch (std::exception& e) {
return Status::InvalidArgument("error parsing " + o.first + ":" +
std::string(e.what()));
} }
} }
return Status::OK(); return Status::OK();
...@@ -655,5 +669,25 @@ Status GetDBOptionsFromString( ...@@ -655,5 +669,25 @@ Status GetDBOptionsFromString(
return GetDBOptionsFromMap(base_options, opts_map, new_options); return GetDBOptionsFromMap(base_options, opts_map, new_options);
} }
Status GetOptionsFromString(const Options& base_options,
const std::string& opts_str, Options* new_options) {
std::unordered_map<std::string, std::string> opts_map;
Status s = StringToMap(opts_str, &opts_map);
if (!s.ok()) {
return s;
}
DBOptions new_db_options(base_options);
ColumnFamilyOptions new_cf_options(base_options);
for (const auto& o : opts_map) {
if (ParseDBOption(o.first, o.second, &new_db_options)) {
} else if (ParseColumnFamilyOption(o.first, o.second, &new_cf_options)) {
} else {
return Status::InvalidArgument("Can't parse option " + o.first);
}
}
*new_options = Options(new_db_options, new_cf_options);
return Status::OK();
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb
...@@ -277,8 +277,9 @@ TEST(OptionsTest, GetOptionsFromMapTest) { ...@@ -277,8 +277,9 @@ TEST(OptionsTest, GetOptionsFromMapTest) {
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
#ifndef ROCKSDB_LITE // GetOptionsFromString is not supported in ROCKSDB_LITE #ifndef ROCKSDB_LITE // GetColumnFamilyOptionsFromString is not supported in
TEST(OptionsTest, GetOptionsFromStringTest) { // ROCKSDB_LITE
TEST(OptionsTest, GetColumnFamilyOptionsFromStringTest) {
ColumnFamilyOptions base_cf_opt; ColumnFamilyOptions base_cf_opt;
ColumnFamilyOptions new_cf_opt; ColumnFamilyOptions new_cf_opt;
base_cf_opt.table_factory.reset(); base_cf_opt.table_factory.reset();
...@@ -452,6 +453,38 @@ TEST(OptionsTest, GetBlockBasedTableOptionsFromString) { ...@@ -452,6 +453,38 @@ TEST(OptionsTest, GetBlockBasedTableOptionsFromString) {
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
#ifndef ROCKSDB_LITE // GetOptionsFromString is not supported in RocksDB Lite
TEST(OptionsTest, GetOptionsFromStringTest) {
Options base_options, new_options;
base_options.write_buffer_size = 20;
base_options.min_write_buffer_number_to_merge = 15;
BlockBasedTableOptions block_based_table_options;
block_based_table_options.cache_index_and_filter_blocks = true;
base_options.table_factory.reset(
NewBlockBasedTableFactory(block_based_table_options));
ASSERT_OK(GetOptionsFromString(
base_options,
"write_buffer_size=10;max_write_buffer_number=16;"
"block_based_table_factory={block_cache=1M;block_size=4;};"
"create_if_missing=true;max_open_files=1",
&new_options));
ASSERT_EQ(new_options.write_buffer_size, 10U);
ASSERT_EQ(new_options.max_write_buffer_number, 16U);
BlockBasedTableOptions new_block_based_table_options =
dynamic_cast<BlockBasedTableFactory*>(new_options.table_factory.get())
->GetTableOptions();
ASSERT_EQ(new_block_based_table_options.block_cache->GetCapacity(), 1U << 20);
ASSERT_EQ(new_block_based_table_options.block_size, 4);
// don't overwrite block based table options
ASSERT_TRUE(new_block_based_table_options.cache_index_and_filter_blocks);
ASSERT_EQ(new_options.create_if_missing, true);
ASSERT_EQ(new_options.max_open_files, 1);
}
#endif // !ROCKSDB_LITE
Status StringToMap( Status StringToMap(
const std::string& opts_str, const std::string& opts_str,
std::unordered_map<std::string, std::string>* opts_map); std::unordered_map<std::string, std::string>* opts_map);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册