From dc50a1a59321ee69c5091467e4d0e0e489fcd2a4 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Thu, 16 Oct 2014 16:57:59 -0700 Subject: [PATCH] make max_write_buffer_number dynamic Summary: as title Test Plan: unit test Reviewers: sdong, yhchiang, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D24729 --- db/column_family.cc | 7 +- db/db_impl.cc | 142 ++++++++++++++++++++++---------------- db/db_impl.h | 4 ++ db/db_test.cc | 81 +++++++++++++++++++--- util/mutable_cf_options.h | 3 + util/options_helper.cc | 4 +- 6 files changed, 168 insertions(+), 73 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 0beb23c91..37699af21 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -326,13 +326,14 @@ void ColumnFamilyData::RecalculateWriteStallConditions( auto write_controller = column_family_set_->write_controller_; - if (imm()->size() == options_.max_write_buffer_number) { + if (imm()->size() >= mutable_cf_options.max_write_buffer_number) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1); Log(ioptions_.info_log, "[%s] Stopping writes because we have %d immutable memtables " - "(waiting for flush)", - name_.c_str(), imm()->size()); + "(waiting for flush), max_write_buffer_number is set to %d", + name_.c_str(), imm()->size(), + mutable_cf_options.max_write_buffer_number); } else if (current_->NumLevelFiles(0) >= mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); diff --git a/db/db_impl.cc b/db/db_impl.cc index 259247785..592634600 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1236,9 +1236,12 @@ Status DBImpl::Recover( SetTickerCount(stats_, SEQUENCE_NUMBER, versions_->LastSequence()); } + // Initial value + max_total_in_memory_state_ = 0; for (auto cfd : *versions_->GetColumnFamilySet()) { - max_total_in_memory_state_ += cfd->options()->write_buffer_size * - cfd->options()->max_write_buffer_number; + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; } return s; @@ -1803,8 +1806,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, db_directory_.get()); - superversion_to_free = cfd->InstallSuperVersion( - new_superversion, &mutex_, mutable_cf_options); + superversion_to_free = InstallSuperVersion( + cfd, new_superversion, mutable_cf_options); new_superversion = nullptr; Log(db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(), @@ -1840,10 +1843,10 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { return cfh->cfd()->options()->level0_stop_writes_trigger; } -Status DBImpl::Flush(const FlushOptions& options, +Status DBImpl::Flush(const FlushOptions& flush_options, ColumnFamilyHandle* column_family) { auto cfh = reinterpret_cast(column_family); - return FlushMemTable(cfh->cfd(), options); + return FlushMemTable(cfh->cfd(), flush_options); } SequenceNumber DBImpl::GetLatestSequenceNumber() const { @@ -1933,7 +1936,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, } Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, - const FlushOptions& options) { + const FlushOptions& flush_options) { Status s; { WriteContext context; @@ -1957,7 +1960,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, write_thread_.ExitWriteThread(&w, &w, s); } - if (s.ok() && options.wait) { + if (s.ok() && flush_options.wait) { // Wait until the compaction completes s = WaitForFlushMemTable(cfd); } @@ -3441,7 +3444,7 @@ static void CleanupIteratorState(void* arg1, void* arg2) { } } // namespace -Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, +Iterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, ColumnFamilyData* cfd, SuperVersion* super_version, Arena* arena) { @@ -3451,11 +3454,11 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena); // Collect iterator for mutable mem merge_iter_builder.AddIterator( - super_version->mem->NewIterator(options, arena)); + super_version->mem->NewIterator(read_options, arena)); // Collect all needed child iterators for immutable memtables - super_version->imm->AddIterators(options, &merge_iter_builder); + super_version->imm->AddIterators(read_options, &merge_iter_builder); // Collect iterators for files in L0 - Ln - super_version->current->AddIterators(options, env_options_, + super_version->current->AddIterators(read_options, env_options_, &merge_iter_builder); internal_iter = merge_iter_builder.Finish(); IterState* cleanup = new IterState(this, &mutex_, super_version); @@ -3468,10 +3471,10 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { return default_cf_handle_; } -Status DBImpl::Get(const ReadOptions& options, +Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { - return GetImpl(options, column_family, key, value); + return GetImpl(read_options, column_family, key, value); } // DeletionState gets created and destructed outside of the lock -- we @@ -3488,17 +3491,39 @@ void DBImpl::InstallSuperVersion( ColumnFamilyData* cfd, DeletionState& deletion_state, const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); - // if new_superversion == nullptr, it means somebody already used it - SuperVersion* new_superversion = - (deletion_state.new_superversion != nullptr) ? - deletion_state.new_superversion : new SuperVersion(); SuperVersion* old_superversion = - cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options); + InstallSuperVersion(cfd, deletion_state.new_superversion, + mutable_cf_options); deletion_state.new_superversion = nullptr; deletion_state.superversions_to_free.push_back(old_superversion); } -Status DBImpl::GetImpl(const ReadOptions& options, +SuperVersion* DBImpl::InstallSuperVersion( + ColumnFamilyData* cfd, SuperVersion* new_sv, + const MutableCFOptions& mutable_cf_options) { + mutex_.AssertHeld(); + auto* old = cfd->InstallSuperVersion( + new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); + + // We want to schedule potential flush or compactions since new options may + // have been picked up in this new version. New options may cause flush + // compaction trigger condition to change. + MaybeScheduleFlushOrCompaction(); + + // Update max_total_in_memory_state_ + auto old_memtable_size = 0; + if (old) { + old_memtable_size = old->mutable_cf_options.write_buffer_size * + old->mutable_cf_options.max_write_buffer_number; + } + max_total_in_memory_state_ = + max_total_in_memory_state_ - old_memtable_size + + mutable_cf_options.write_buffer_size * + mutable_cf_options.max_write_buffer_number; + return old; +} + +Status DBImpl::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found) { StopWatch sw(env_, stats_, DB_GET); @@ -3508,8 +3533,9 @@ Status DBImpl::GetImpl(const ReadOptions& options, auto cfd = cfh->cfd(); SequenceNumber snapshot; - if (options.snapshot != nullptr) { - snapshot = reinterpret_cast(options.snapshot)->number_; + if (read_options.snapshot != nullptr) { + snapshot = reinterpret_cast( + read_options.snapshot)->number_; } else { snapshot = versions_->LastSequence(); } @@ -3535,7 +3561,8 @@ Status DBImpl::GetImpl(const ReadOptions& options, RecordTick(stats_, MEMTABLE_HIT); } else { PERF_TIMER_GUARD(get_from_output_files_time); - sv->current->Get(options, lkey, value, &s, &merge_context, value_found); + sv->current->Get(read_options, lkey, value, &s, &merge_context, + value_found); RecordTick(stats_, MEMTABLE_MISS); } @@ -3551,7 +3578,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, } std::vector DBImpl::MultiGet( - const ReadOptions& options, + const ReadOptions& read_options, const std::vector& column_family, const std::vector& keys, std::vector* values) { @@ -3577,8 +3604,9 @@ std::vector DBImpl::MultiGet( } mutex_.Lock(); - if (options.snapshot != nullptr) { - snapshot = reinterpret_cast(options.snapshot)->number_; + if (read_options.snapshot != nullptr) { + snapshot = reinterpret_cast( + read_options.snapshot)->number_; } else { snapshot = versions_->LastSequence(); } @@ -3621,7 +3649,8 @@ std::vector DBImpl::MultiGet( // Done } else { PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(options, lkey, value, &s, &merge_context); + super_version->current->Get(read_options, lkey, value, &s, + &merge_context); } if (s.ok()) { @@ -3659,7 +3688,7 @@ std::vector DBImpl::MultiGet( return stat_list; } -Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, +Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family_name, ColumnFamilyHandle** handle) { *handle = nullptr; @@ -3674,26 +3703,23 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); edit.SetColumnFamily(new_id); edit.SetLogNumber(logfile_number_); - edit.SetComparatorName(options.comparator->Name()); + edit.SetComparatorName(cf_options.comparator->Name()); // LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object - Options opt(db_options_, options); + Options opt(db_options_, cf_options); Status s = versions_->LogAndApply(nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), - &edit, &mutex_, db_directory_.get(), false, &options); + &edit, &mutex_, db_directory_.get(), false, &cf_options); if (s.ok()) { single_column_family_mode_ = false; auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_, - *cfd->GetLatestMutableCFOptions()); + delete InstallSuperVersion(cfd, nullptr, *cfd->GetLatestMutableCFOptions()); *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); Log(db_options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); - max_total_in_memory_state_ += cfd->options()->write_buffer_size * - cfd->options()->max_write_buffer_number; } else { Log(db_options_.info_log, "Creating column family [%s] FAILED -- %s", column_family_name.c_str(), s.ToString().c_str()); @@ -3712,7 +3738,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { edit.DropColumnFamily(); edit.SetColumnFamily(cfd->GetID()); - Status s; { MutexLock l(&mutex_); @@ -3732,8 +3757,9 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { if (s.ok()) { assert(cfd->IsDropped()); - max_total_in_memory_state_ -= cfd->options()->write_buffer_size * - cfd->options()->max_write_buffer_number; + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; Log(db_options_.info_log, "Dropped column family with id %u\n", cfd->GetID()); } else { @@ -3745,14 +3771,14 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { return s; } -bool DBImpl::KeyMayExist(const ReadOptions& options, +bool DBImpl::KeyMayExist(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found) { if (value_found != nullptr) { // falsify later if key-may-exist but can't fetch value *value_found = true; } - ReadOptions roptions = options; + ReadOptions roptions = read_options; roptions.read_tier = kBlockCacheTier; // read from block cache only auto s = GetImpl(roptions, column_family, key, value, value_found); @@ -3941,23 +3967,23 @@ Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, } } -Status DBImpl::Delete(const WriteOptions& options, +Status DBImpl::Delete(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& key) { - return DB::Delete(options, column_family, key); + return DB::Delete(write_options, column_family, key); } -Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { +Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(&mutex_); w.batch = my_batch; - w.sync = options.sync; - w.disableWAL = options.disableWAL; + w.sync = write_options.sync; + w.disableWAL = write_options.disableWAL; w.in_batch_group = false; w.done = false; - w.timeout_hint_us = options.timeout_hint_us; + w.timeout_hint_us = write_options.timeout_hint_us; uint64_t expiration_time = 0; bool has_timeout = false; @@ -3968,7 +3994,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { has_timeout = true; } - if (!options.disableWAL) { + if (!write_options.disableWAL) { RecordTick(stats_, WRITE_WITH_WAL); default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1); } @@ -4074,13 +4100,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // Record statistics RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count); RecordTick(stats_, BYTES_WRITTEN, WriteBatchInternal::ByteSize(updates)); - if (options.disableWAL) { + if (write_options.disableWAL) { flush_on_destroy_ = true; } PERF_TIMER_STOP(write_pre_and_post_process_time); uint64_t log_size = 0; - if (!options.disableWAL) { + if (!write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); Slice log_entry = WriteBatchInternal::Contents(updates); status = log_->AddRecord(log_entry); @@ -4089,7 +4115,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { log_empty_ = false; log_size = log_entry.size(); RecordTick(stats_, WAL_FILE_BYTES, log_size); - if (status.ok() && options.sync) { + if (status.ok() && write_options.sync) { RecordTick(stats_, WAL_FILE_SYNCED); StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); if (db_options_.use_fsync) { @@ -4104,7 +4130,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { status = WriteBatchInternal::InsertInto( updates, column_family_memtables_.get(), - options.ignore_missing_column_families, 0, this, false); + write_options.ignore_missing_column_families, 0, this, false); // A non-OK status here indicates iteration failure (either in-memory // writebatch corruption (very bad), or the client specified invalid // column family). This will later on trigger bg_error_. @@ -4123,7 +4149,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // internal stats default_cf_internal_stats_->AddDBStats( InternalStats::BYTES_WRITTEN, batch_size); - if (!options.disableWAL) { + if (!write_options.disableWAL) { default_cf_internal_stats_->AddDBStats( InternalStats::WAL_FILE_SYNCED, 1); default_cf_internal_stats_->AddDBStats( @@ -4221,8 +4247,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, if (s.ok()) { // Our final size should be less than write_buffer_size // (compression, etc) but err on the side of caution. - lfile->SetPreallocationBlockSize(1.1 * - cfd->options()->write_buffer_size); + lfile->SetPreallocationBlockSize( + 1.1 * mutable_cf_options.write_buffer_size); new_log = new log::Writer(std::move(lfile)); } } @@ -4270,7 +4296,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, new_mem->Ref(); cfd->SetMemtable(new_mem); context->superversions_to_free_.push_back( - cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options)); + InstallSuperVersion(cfd, new_superversion, mutable_cf_options)); return s; } @@ -4616,7 +4642,7 @@ Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, } // Default implementation -- returns not supported status -Status DB::CreateColumnFamily(const ColumnFamilyOptions& options, +Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family_name, ColumnFamilyHandle** handle) { return Status::NotSupported(""); @@ -4739,8 +4765,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_, - *cfd->GetLatestMutableCFOptions()); + delete impl->InstallSuperVersion( + cfd, nullptr, *cfd->GetLatestMutableCFOptions()); } impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); diff --git a/db/db_impl.h b/db/db_impl.h index 149958315..2d5cfe6c2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -630,6 +630,10 @@ class DBImpl : public DB { DeletionState& deletion_state, const MutableCFOptions& mutable_cf_options); + SuperVersion* InstallSuperVersion( + ColumnFamilyData* cfd, SuperVersion* new_sv, + const MutableCFOptions& mutable_cf_options); + // Find Super version and reference it. Based on options, it might return // the thread local cached one. inline SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd); diff --git a/db/db_test.cc b/db/db_test.cc index f516a488f..862eecda6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8137,7 +8137,7 @@ TEST(DBTest, SimpleWriteTimeoutTest) { options.max_background_flushes = 0; options.max_write_buffer_number = 2; options.max_total_wal_size = std::numeric_limits::max(); - WriteOptions write_opt = WriteOptions(); + WriteOptions write_opt; write_opt.timeout_hint_us = 0; DestroyAndReopen(&options); // fill the two write buffers @@ -8173,7 +8173,7 @@ static void RandomTimeoutWriter(void* arg) { DB* db = state->db; Random rnd(1000 + thread_id); - WriteOptions write_opt = WriteOptions(); + WriteOptions write_opt; write_opt.timeout_hint_us = 500; int timeout_count = 0; int num_keys = kNumKeys * 5; @@ -8558,14 +8558,13 @@ TEST(DBTest, DynamicMemtableOptions) { auto gen_l0_kb = [this](int size) { Random rnd(301); - std::vector values; for (int i = 0; i < size; i++) { - values.push_back(RandomString(&rnd, 1024)); - ASSERT_OK(Put(Key(i), values[i])); + ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); } dbfull()->TEST_WaitForFlushMemTable(); }; + // Test write_buffer_size gen_l0_kb(64); ASSERT_EQ(NumTableFilesAtLevel(0), 1); ASSERT_TRUE(SizeAtLevel(0) < k64KB + k5KB); @@ -8587,6 +8586,68 @@ TEST(DBTest, DynamicMemtableOptions) { ASSERT_EQ(NumTableFilesAtLevel(0), 2); ASSERT_TRUE(SizeAtLevel(0) < k128KB + k64KB + 2 * k5KB); ASSERT_TRUE(SizeAtLevel(0) > k128KB + k64KB - 2 * k5KB); + + // Test max_write_buffer_number + // Block compaction thread, which will also block the flushes because + // max_background_flushes == 0, so flushes are getting executed by the + // compaction thread + env_->SetBackgroundThreads(1, Env::LOW); + SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + // Start from scratch and disable compaction/flush. Flush can only happen + // during compaction but trigger is pretty high + options.max_background_flushes = 0; + options.disable_auto_compactions = true; + DestroyAndReopen(&options); + + // Put until timeout, bounded by 256 puts. We should see timeout at ~128KB + int count = 0; + Random rnd(301); + WriteOptions wo; + wo.timeout_hint_us = 1000; + + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) { + count++; + } + ASSERT_TRUE(count > (128 * 0.9) && count < (128 * 1.1)); + + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); + + // Increase + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_write_buffer_number", "8"}, + })); + // Clean up memtable and L0 + dbfull()->CompactRange(nullptr, nullptr); + + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + count = 0; + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { + count++; + } + ASSERT_TRUE(count > (512 * 0.9) && count < (512 * 1.1)); + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); + + // Decrease + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_write_buffer_number", "4"}, + })); + // Clean up memtable and L0 + dbfull()->CompactRange(nullptr, nullptr); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + + count = 0; + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { + count++; + } + ASSERT_TRUE(count > (256 * 0.9) && count < (256 * 1.1)); + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); } TEST(DBTest, DynamicCompactionOptions) { @@ -8617,10 +8678,8 @@ TEST(DBTest, DynamicCompactionOptions) { auto gen_l0_kb = [this](int start, int size, int stride) { Random rnd(301); - std::vector values; for (int i = 0; i < size; i++) { - values.push_back(RandomString(&rnd, 1024)); - ASSERT_OK(Put(Key(start + stride * i), values[i])); + ASSERT_OK(Put(Key(start + stride * i), RandomString(&rnd, 1024))); } dbfull()->TEST_WaitForFlushMemTable(); }; @@ -8666,8 +8725,10 @@ TEST(DBTest, DynamicCompactionOptions) { gen_l0_kb(i, 128, 56); } dbfull()->TEST_WaitForCompact(); - ASSERT_TRUE(SizeAtLevel(1) < 1048576 * 1.1); - ASSERT_TRUE(SizeAtLevel(2) < 4 * 1048576 * 1.1); + ASSERT_TRUE(SizeAtLevel(1) > 1048576 * 0.9 && + SizeAtLevel(1) < 1048576 * 1.1); + ASSERT_TRUE(SizeAtLevel(2) > 4 * 1048576 * 0.9 && + SizeAtLevel(2) < 4 * 1048576 * 1.1); // Change multiplier to 2 with smaller base ASSERT_TRUE(dbfull()->SetOptions({ diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 02f63fed4..4336baa12 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -14,6 +14,7 @@ namespace rocksdb { struct MutableCFOptions { MutableCFOptions(const Options& options, const ImmutableCFOptions& ioptions) : write_buffer_size(options.write_buffer_size), + max_write_buffer_number(options.max_write_buffer_number), arena_block_size(options.arena_block_size), memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits), memtable_prefix_bloom_probes(options.memtable_prefix_bloom_probes), @@ -39,6 +40,7 @@ struct MutableCFOptions { } MutableCFOptions() : write_buffer_size(0), + max_write_buffer_number(0), arena_block_size(0), memtable_prefix_bloom_bits(0), memtable_prefix_bloom_probes(0), @@ -72,6 +74,7 @@ struct MutableCFOptions { // Memtable related options size_t write_buffer_size; + int max_write_buffer_number; size_t arena_block_size; uint32_t memtable_prefix_bloom_bits; uint32_t memtable_prefix_bloom_probes; diff --git a/util/options_helper.cc b/util/options_helper.cc index 67726dc8f..a58c7c596 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -92,6 +92,8 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value, new_options->max_successive_merges = ParseInt64(value); } else if (name == "filter_deletes") { new_options->filter_deletes = ParseBoolean(name, value); + } else if (name == "max_write_buffer_number") { + new_options->max_write_buffer_number = ParseInt(value); } else { return false; } @@ -220,8 +222,6 @@ bool GetColumnFamilyOptionsFromMap( try { if (ParseMemtableOptions(o.first, o.second, new_options)) { } else if (ParseCompactionOptions(o.first, o.second, new_options)) { - } else if (o.first == "max_write_buffer_number") { - new_options->max_write_buffer_number = ParseInt(o.second); } else if (o.first == "min_write_buffer_number_to_merge") { new_options->min_write_buffer_number_to_merge = ParseInt(o.second); } else if (o.first == "compression") { -- GitLab