提交 3a133880 编写于 作者: 赵明

More blob config

上级 ac462b8e
......@@ -2560,6 +2560,16 @@ void rocksdb_options_set_blob_size(rocksdb_options_t* opt, size_t blob_size) {
opt->rep.blob_size = blob_size;
}
void rocksdb_options_set_blob_large_key_size(rocksdb_options_t* opt,
size_t large_key_size) {
opt->rep.blob_large_key_size = large_key_size;
}
void rocksdb_options_set_blob_large_key_ratio(rocksdb_options_t* opt,
size_t large_key_ratio) {
opt->rep.blob_large_key_ratio = large_key_ratio;
}
void rocksdb_options_set_blob_gc_ratio(rocksdb_options_t* opt, int ratio) {
opt->rep.blob_gc_ratio = ratio;
}
......
......@@ -336,8 +336,17 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
result.max_subcompactions = 1;
}
if (result.blob_gc_ratio > 1) {
result.blob_gc_ratio = 1;
if (result.blob_size < 8) {
result.blob_size = 8;
}
if (result.blob_large_key_ratio > 1) {
result.blob_large_key_ratio = 1;
}
if (result.blob_large_key_ratio < 0) {
result.blob_large_key_ratio = 0;
}
if (result.blob_gc_ratio > 0.5) {
result.blob_gc_ratio = 0.5;
}
if (result.blob_gc_ratio < 0) {
result.blob_gc_ratio = 0;
......
......@@ -139,7 +139,7 @@ struct CompactionWorkerContext {
/// For both filter and filter_factory according to which is not null
EncodedString compaction_filter_data;
uint64_t blob_size;
BlobConfig blob_config;
std::string table_factory;
std::string table_factory_options;
uint32_t bloom_locality;
......
......@@ -221,9 +221,11 @@ AJSON(CompactionFilterContext, is_full_compaction, is_manual_compaction,
using NameParam = CompactionWorkerContext::NameParam;
AJSON(NameParam, name, param);
AJSON(BlobConfig, blob_size, large_key_size, large_key_ratio);
AJSON(CompactionWorkerContext, user_comparator, merge_operator,
merge_operator_data, compaction_filter, compaction_filter_factory,
compaction_filter_context, compaction_filter_data, blob_size,
compaction_filter_context, compaction_filter_data, blob_config,
table_factory, table_factory_options, bloom_locality, cf_paths,
prefix_extractor, prefix_extractor_options, has_start, has_end, start,
end, last_sequence, earliest_write_conflict_snapshot,
......@@ -414,7 +416,6 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(Slice data) {
return make_error(Status::Corruption("Missing CompactionFilterFactory!"));
}
}
cf_options.blob_size = context.blob_size;
if (context.table_factory.empty()) {
return make_error(Status::Corruption("Bad table_factory name !"));
} else {
......@@ -672,7 +673,7 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(Slice data) {
nullptr, env, false, false, &range_del_agg,
std::unique_ptr<CompactionIterator::CompactionProxy>(
new RemoteCompactionProxy(&context)),
mutable_cf_options.blob_size, compaction_filter, nullptr,
context.blob_config, compaction_filter, nullptr,
context.preserve_deletes_seqnum));
if (start != nullptr) {
......@@ -741,7 +742,7 @@ std::string RemoteCompactionDispatcher::Worker::DoCompaction(Slice data) {
second_pass_iter_storage.input.get(), &separate_helper, end, ucmp,
merge_ptr, context.last_sequence, &context.existing_snapshots,
context.earliest_write_conflict_snapshot, nullptr, env, false, false,
range_del_agg_ptr, std::move(compaction), mutable_cf_options.blob_size,
range_del_agg_ptr, std::move(compaction), context.blob_config,
second_pass_iter_storage.compaction_filter, nullptr,
context.preserve_deletes_seqnum);
};
......
......@@ -113,7 +113,7 @@ CompactionIterator::CompactionIterator(
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction,
size_t blob_size, const CompactionFilter* compaction_filter,
BlobConfig blob_config, const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum)
: CompactionIterator(
......@@ -122,7 +122,7 @@ CompactionIterator::CompactionIterator(
report_detailed_time, expect_valid_internal_key, range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
blob_size, compaction_filter, shutting_down,
blob_config, compaction_filter, shutting_down,
preserve_deletes_seqnum) {}
CompactionIterator::CompactionIterator(
......@@ -133,7 +133,7 @@ CompactionIterator::CompactionIterator(
const SnapshotChecker* snapshot_checker, Env* env,
bool /*report_detailed_time*/, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction, size_t blob_size,
std::unique_ptr<CompactionProxy> compaction, BlobConfig blob_config,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum)
......@@ -149,7 +149,8 @@ CompactionIterator::CompactionIterator(
expect_valid_internal_key_(expect_valid_internal_key),
range_del_agg_(range_del_agg),
compaction_(std::move(compaction)),
blob_size_(blob_size),
blob_config_(blob_config),
blob_large_key_ratio_lsh16_(blob_config.large_key_ratio * 65536),
compaction_filter_(compaction_filter),
shutting_down_(shutting_down),
preserve_deletes_seqnum_(preserve_deletes_seqnum),
......@@ -730,11 +731,18 @@ void CompactionIterator::PrepareOutput() {
//
// Can we do the same for levels above bottom level as long as
// KeyNotExistsBeyondOutputLevel() return true?
if (blob_size_ > 0 && value_.file_number() != uint64_t(-1) &&
if (blob_config_.blob_size < size_t(-1) &&
current_user_key_.size() <= blob_config_.large_key_size &&
value_.file_number() != uint64_t(-1) &&
(ikey_.type == kTypeValue || ikey_.type == kTypeMerge)) {
auto s = value_.fetch();
if (s.ok()) {
if (value_.size() >= blob_size_) {
assert(value_.size() < (1ull << 49));
assert(blob_large_key_ratio_lsh16_ < (1ull << 17));
// key.size << 16 <= value.size * large_key_ratio_lsh16
if (value_.size() >= blob_config_.blob_size &&
(current_user_key_.size() << 16) <=
value_.size() * blob_large_key_ratio_lsh16_) {
ikey_.type =
ikey_.type == kTypeValue ? kTypeValueIndex : kTypeMergeIndex;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
......
......@@ -68,7 +68,8 @@ class CompactionIterator {
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr, size_t blob_size = uint64_t(-1),
const Compaction* compaction = nullptr,
BlobConfig blob_config = BlobConfig{size_t(-1), size_t(0), 0.0},
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0);
......@@ -82,7 +83,7 @@ class CompactionIterator {
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction, size_t blob_size,
std::unique_ptr<CompactionProxy> compaction, BlobConfig blob_config,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0);
......@@ -149,7 +150,8 @@ class CompactionIterator {
bool expect_valid_internal_key_;
CompactionRangeDelAggregator* range_del_agg_;
std::unique_ptr<CompactionProxy> compaction_;
const size_t blob_size_;
const BlobConfig blob_config_;
const uint64_t blob_large_key_ratio_lsh16_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
const SequenceNumber preserve_deletes_seqnum_;
......
......@@ -699,7 +699,7 @@ Status CompactionJob::Run() {
}
context.compaction_filter_factory = factory->Name();
}
context.blob_size = c->mutable_cf_options()->blob_size;
context.blob_config = c->mutable_cf_options()->get_blob_config();
context.table_factory = iopt->table_factory->Name();
s = iopt->table_factory->GetOptionString(&context.table_factory_options,
"\n");
......@@ -1234,8 +1234,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
&range_del_agg, sub_compact->compaction, mutable_cf_options->blob_size,
compaction_filter, shutting_down_, preserve_deletes_seqnum_));
&range_del_agg, sub_compact->compaction,
mutable_cf_options->get_blob_config(), compaction_filter, shutting_down_,
preserve_deletes_seqnum_));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
......@@ -1288,7 +1289,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
merge_ptr, versions_->LastSequence(), &existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_, env_, false,
false, range_del_agg_ptr, sub_compact->compaction,
mutable_cf_options->blob_size,
mutable_cf_options->get_blob_config(),
second_pass_iter_storage.compaction_filter, shutting_down_,
preserve_deletes_seqnum_);
};
......
......@@ -211,7 +211,7 @@ struct AdvancedColumnFamilyOptions {
int max_write_buffer_number_to_maintain = 0;
// LazyCompaction
bool enable_lazy_compaction = false;
bool enable_lazy_compaction = true;
// Allows thread-safe inplace updates. If this is true, there is no way to
// achieve point-in-time consistency using snapshot or iterator (assuming
......
......@@ -1059,7 +1059,7 @@ class DB {
const TransactionLogIterator::ReadOptions& read_options =
TransactionLogIterator::ReadOptions()) = 0;
virtual void SetGuardSeqno(SequenceNumber guard_seqno) {}
virtual void SetGuardSeqno(SequenceNumber /*guard_seqno*/) {}
// Windows API macro interference
#undef DeleteFile
......
......@@ -288,12 +288,24 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// concurrently perform a compaction job by breaking it into multiple,
// smaller ones that are run simultaneously.
// Default: 0 (init from DBOptions::max_subcompactions.)
uint32_t max_subcompactions = 0;
uint32_t max_subcompactions = 8;
// Don't separate Value if value.size < blob_size
// Set size_t(-1) to disable Key Value separation
// valid [8 , size_t(-1)]
size_t blob_size = 512;
// Don't separate Value if key.size > blob_large_ley_size
// valid [0 , size_t(-1)]
size_t blob_large_key_size = 128;
// Key Value separate blob value size
size_t blob_size = size_t(-1);
// Don't separate Value if key.size > value.size * blob_large_ley_ratio
// valid [0 , 1]
double blob_large_key_ratio = 0.25;
// Key Value separate gc ratio
// Key Value separation gc ratio
// Startup GC when garbage ratio larger than blob_gc_ratio
// valid [0 , 0.5]
double blob_gc_ratio = 0.05;
// This is a factory that provides TableFactory objects.
......
......@@ -528,9 +528,6 @@ class TableFactory
// Return is delete range supported
virtual bool IsDeleteRangeSupported() const { return false; }
// Return is row cache supported
virtual bool IsRowCacheSupported() const { return false; }
};
#ifndef ROCKSDB_LITE
......
......@@ -150,6 +150,10 @@ void MutableCFOptions::Dump(Logger* log) const {
max_subcompactions);
ROCKS_LOG_INFO(log, " blob_size: %zd",
blob_size);
ROCKS_LOG_INFO(log, " blob_large_key_size: %zd",
blob_large_key_size);
ROCKS_LOG_INFO(log, " blob_large_key_ratio: %f",
blob_large_key_ratio);
ROCKS_LOG_INFO(log, " blob_gc_ratio: %f",
blob_gc_ratio);
ROCKS_LOG_INFO(log, " soft_pending_compaction_bytes_limit: %" PRIu64,
......
......@@ -126,6 +126,12 @@ struct ImmutableCFOptions {
std::vector<DbPath> cf_paths;
};
struct BlobConfig {
size_t blob_size;
size_t large_key_size;
double large_key_ratio;
};
struct MutableCFOptions {
explicit MutableCFOptions(const ColumnFamilyOptions& options)
: write_buffer_size(options.write_buffer_size),
......@@ -141,6 +147,8 @@ struct MutableCFOptions {
disable_auto_compactions(options.disable_auto_compactions),
max_subcompactions(options.max_subcompactions),
blob_size(options.blob_size),
blob_large_key_size(options.blob_large_key_size),
blob_large_key_ratio(options.blob_large_key_ratio),
blob_gc_ratio(options.blob_gc_ratio),
soft_pending_compaction_bytes_limit(
options.soft_pending_compaction_bytes_limit),
......@@ -180,6 +188,8 @@ struct MutableCFOptions {
disable_auto_compactions(false),
max_subcompactions(0),
blob_size(0),
blob_large_key_size(0),
blob_large_key_ratio(0),
blob_gc_ratio(0),
soft_pending_compaction_bytes_limit(0),
hard_pending_compaction_bytes_limit(0),
......@@ -200,6 +210,10 @@ struct MutableCFOptions {
explicit MutableCFOptions(const Options& options);
BlobConfig get_blob_config() const {
return BlobConfig{ blob_size, blob_large_key_size, blob_large_key_ratio };
}
// Must be called after any change to MutableCFOptions
void RefreshDerivedOptions(int num_levels);
......@@ -232,6 +246,8 @@ struct MutableCFOptions {
bool disable_auto_compactions;
uint32_t max_subcompactions;
size_t blob_size;
size_t blob_large_key_size;
double blob_large_key_ratio;
double blob_gc_ratio;
uint64_t soft_pending_compaction_bytes_limit;
uint64_t hard_pending_compaction_bytes_limit;
......
......@@ -247,6 +247,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
max_subcompactions);
ROCKS_LOG_HEADER(log, " Options.blob_size: %zd",
blob_size);
ROCKS_LOG_HEADER(log, " Options.blob_large_key_size: %zd",
blob_large_key_size);
ROCKS_LOG_HEADER(log, " Options.blob_large_key_ratio: %f",
blob_large_key_ratio);
ROCKS_LOG_HEADER(log, " Options.blob_gc_ratio: %f",
blob_gc_ratio);
......
......@@ -156,6 +156,8 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
cf_opts.disable_auto_compactions =
mutable_cf_options.disable_auto_compactions;
cf_opts.blob_size = mutable_cf_options.blob_size;
cf_opts.blob_large_key_size = mutable_cf_options.blob_large_key_size;
cf_opts.blob_large_key_ratio = mutable_cf_options.blob_large_key_ratio;
cf_opts.blob_gc_ratio = mutable_cf_options.blob_gc_ratio;
cf_opts.soft_pending_compaction_bytes_limit =
mutable_cf_options.soft_pending_compaction_bytes_limit;
......@@ -1701,6 +1703,14 @@ std::unordered_map<std::string, OptionTypeInfo>
{offset_of(&ColumnFamilyOptions::blob_size), OptionType::kSizeT,
OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, blob_size)}},
{"blob_large_key_size",
{offset_of(&ColumnFamilyOptions::blob_large_key_size),
OptionType::kSizeT, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, blob_large_key_size)}},
{"blob_large_key_ratio",
{offset_of(&ColumnFamilyOptions::blob_large_key_ratio),
OptionType::kDouble, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, blob_large_key_ratio)}},
{"blob_gc_ratio",
{offset_of(&ColumnFamilyOptions::blob_gc_ratio), OptionType::kDouble,
OptionVerificationType::kNormal, true,
......
......@@ -451,6 +451,9 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"hard_pending_compaction_bytes_limit=0;"
"disable_auto_compactions=false;"
"enable_lazy_compaction=false;"
"blob_size=1028;"
"blob_large_key_size=1024;"
"blob_large_key_ratio=0.5;"
"blob_size=1024;"
"blob_gc_ratio=0.05;"
"report_bg_io_stats=true;"
......
......@@ -352,9 +352,17 @@ bool TerarkZipCFOptionsFromEnv(ColumnFamilyOptions& cfo,
tzo.singleIndexMaxSize =
std::min<size_t>(tzo.singleIndexMaxSize, 0x1E0000000);
cfo.table_factory.reset(NewTerarkZipTableFactory(
tzo, std::shared_ptr<TableFactory>(
NewAdaptiveTableFactory(cfo.table_factory))));
bool use_terark_zip_table = true;
if (const char* env = getenv("TerarkZipTable_table_factory")) {
if (strcasecmp(env, "default") == 0) {
use_terark_zip_table = false;
}
}
if (use_terark_zip_table) {
cfo.table_factory.reset(NewTerarkZipTableFactory(
tzo, std::shared_ptr<TableFactory>(
NewAdaptiveTableFactory(cfo.table_factory))));
}
if (const char* env = getenv("TerarkZipTable_compaction_style")) {
if (strcasecmp(env, "Level") == 0) {
cfo.compaction_style = kCompactionStyleLevel;
......@@ -401,6 +409,8 @@ bool TerarkZipCFOptionsFromEnv(ColumnFamilyOptions& cfo,
MyOverrideInt(cfo, max_subcompactions);
MyOverrideXiB(cfo, blob_size);
MyOverrideXiB(cfo, blob_large_key_size);
MyOverrideDouble(cfo, blob_large_key_ratio);
MyOverrideDouble(cfo, blob_gc_ratio);
if (tzo.debugLevel) {
......
......@@ -94,7 +94,7 @@ struct TerarkZipTableOptions {
uint64_t smallTaskMemory = 1200 << 20; // 1.2G
// use dictZip for value when average value length >= minDictZipValueSize
// otherwise do not use dictZip
uint32_t minDictZipValueSize = 15;
uint32_t minDictZipValueSize = 32;
uint32_t keyPrefixLen = 0; // for IndexID
uint64_t singleIndexMinSize = 8ULL << 20; // 8M
......
terark-core @ c9f8a573
Subproject commit d18423558ab68060f3dc75fd5859653ceb69cae9
Subproject commit c9f8a573b1017f2eed5e82f0e8d3d90dfdcd9f4f
......@@ -784,7 +784,7 @@ int main(int argc, const char* argv[], const char* env[])
options.target_file_size_multiplier = 1;
//options.level0_stop_writes_trigger = 40;
options.enable_lazy_compaction = true;
options.blob_size = 1;
options.blob_size = 8;
options.blob_gc_ratio = 0.1;
#if TEST_TERARK
//options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbto));
......@@ -803,7 +803,7 @@ int main(int argc, const char* argv[], const char* env[])
options.max_bytes_for_level_base = options.target_file_size_base * 4;
options.target_file_size_multiplier = 1;
//options.level0_stop_writes_trigger = 40;
options.blob_size = 1;
options.blob_size = 8;
options.blob_gc_ratio = 0.1;
#if WORKER_TEST
options.compaction_dispatcher.reset();
......
......@@ -925,7 +925,12 @@ DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
DEFINE_bool(enable_lazy_compaction, true, "Enable map or link compaction");
DEFINE_uint64(blob_size, 0, "Key Value Separate blob size");
DEFINE_uint64(blob_size, size_t(-1), "Key Value Separate blob size");
DEFINE_uint64(blob_large_key_size, size_t(-1),
"Key Value Separate large key size");
DEFINE_double(blob_large_key_ratio, 1, "Key Value Separate large key ratio");
DEFINE_double(blob_gc_ratio, 0.2, "Blob SST gc ratio");
......@@ -3425,6 +3430,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
options.disable_auto_compactions = FLAGS_disable_auto_compactions;
options.enable_lazy_compaction = FLAGS_enable_lazy_compaction;
options.blob_size = FLAGS_blob_size;
options.blob_large_key_size = FLAGS_blob_large_key_size;
options.blob_large_key_ratio = FLAGS_large_key_ratio;
options.blob_gc_ratio = FLAGS_blob_gc_ratio;
options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册