From 11da8bc5dfacd966dc474aa493526cd728a99bf7 Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Wed, 12 Mar 2014 16:40:14 -0700 Subject: [PATCH] A heuristic way to check if a memtable is full Summary: This is is based on https://reviews.facebook.net/D15027. It's not finished but I would like to give a prototype to avoid arena over-allocation while making better use of the already allocated memory blocks. Instead of check approximate memtable size, we will take a deeper look at the arena, which incorporate essential idea that @sdong suggests: flush when arena has allocated its last and the last is "almost full" Test Plan: N/A Reviewers: haobo, sdong Reviewed By: sdong CC: leveldb, sdong Differential Revision: https://reviews.facebook.net/D15051 --- db/db_impl.cc | 6 +-- db/db_test.cc | 105 +++++++++++++++++++++++++++++++------------------ db/memtable.cc | 68 +++++++++++++++++++++++++++++++- db/memtable.h | 14 +++++++ util/arena.cc | 1 + util/arena.h | 7 ++++ 6 files changed, 157 insertions(+), 44 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index b36067e1d..52c538277 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1094,8 +1094,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, *max_sequence = last_seq; } - if (!read_only && - mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { + if (!read_only && mem_->ShouldFlush()) { status = WriteLevel0TableForRecovery(mem_, &edit); // we still want to clear memtable, even if the recovery failed delete mem_->Unref(); @@ -3533,8 +3532,7 @@ Status DBImpl::MakeRoomForWrite(bool force, allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); delayed_writes_++; - } else if (!force && - (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { + } else if (!force && !mem_->ShouldFlush()) { // There is room in current memtable if (allow_delay) { DelayLoggingAndReset(); diff --git a/db/db_test.cc b/db/db_test.cc index 2a8cee4e7..c508c666d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include +#include #include #include #include @@ -23,20 +24,20 @@ #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/perf_context.h" -#include "table/plain_table_factory.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "table/block_based_table_factory.h" +#include "table/plain_table_factory.h" #include "util/hash.h" #include "util/hash_linklist_rep.h" +#include "utilities/merge_operators.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/statistics.h" #include "util/testharness.h" #include "util/testutil.h" -#include "utilities/merge_operators.h" namespace rocksdb { @@ -850,6 +851,28 @@ void VerifyTableProperties(DB* db, uint64_t expected_entries_size) { ASSERT_EQ(expected_entries_size, sum); } +std::unordered_map GetMemoryUsage(MemTable* memtable) { + const auto& arena = memtable->TEST_GetArena(); + return {{"memtable.approximate.usage", memtable->ApproximateMemoryUsage()}, + {"arena.approximate.usage", arena.ApproximateMemoryUsage()}, + {"arena.allocated.memory", arena.MemoryAllocatedBytes()}, + {"arena.unused.bytes", arena.AllocatedAndUnused()}, + {"irregular.blocks", arena.IrregularBlockNum()}}; +} + +void PrintMemoryUsage(const std::unordered_map& usage) { + for (const auto& item : usage) { + std::cout << "\t" << item.first << ": " << item.second << std::endl; + } +} + +void AddRandomKV(MemTable* memtable, Random* rnd, size_t arena_block_size) { + memtable->Add(0, kTypeValue, RandomString(rnd, 20) /* key */, + // make sure we will be able to generate some over sized entries + RandomString(rnd, rnd->Uniform(arena_block_size / 4) * 1.15 + + 10) /* value */); +} + TEST(DBTest, Empty) { do { Options options = CurrentOptions(); @@ -1922,7 +1945,7 @@ TEST(DBTest, NumImmutableMemTable) { options.write_buffer_size = 1000000; Reopen(&options); - std::string big_value(1000000, 'x'); + std::string big_value(1000000 * 2, 'x'); std::string num; SetPerfLevel(kEnableTime);; @@ -2205,6 +2228,10 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1), 1); } +// TODO(kailiu) The tests on UniversalCompaction has some issues: +// 1. A lot of magic numbers ("11" or "12"). +// 2. Made assumption on the memtable flush conidtions, which may change from +// time to time. TEST(DBTest, UniversalCompactionTrigger) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; @@ -2222,8 +2249,8 @@ TEST(DBTest, UniversalCompactionTrigger) { for (int num = 0; num < options.level0_file_num_compaction_trigger-1; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2233,7 +2260,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Generate one more file at level-0, which should trigger level-0 // compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2257,8 +2284,8 @@ TEST(DBTest, UniversalCompactionTrigger) { for (int num = 0; num < options.level0_file_num_compaction_trigger-3; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2268,7 +2295,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Generate one more file at level-0, which should trigger level-0 // compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2286,8 +2313,8 @@ TEST(DBTest, UniversalCompactionTrigger) { for (int num = 0; num < options.level0_file_num_compaction_trigger-3; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2312,7 +2339,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Stage 4: // Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a // new file of size 1. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2326,7 +2353,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Stage 5: // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate // a new file of size 1. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2356,8 +2383,8 @@ TEST(DBTest, UniversalCompactionSizeAmplification) { for (int num = 0; num < options.level0_file_num_compaction_trigger-1; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2392,8 +2419,8 @@ TEST(DBTest, UniversalCompactionOptions) { for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2431,8 +2458,8 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { for (int num = 0; num < options.level0_file_num_compaction_trigger-1; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2442,7 +2469,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { // Generate one more file at level-0, which should trigger level-0 // compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2463,8 +2490,8 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { for (int num = 0; num < options.level0_file_num_compaction_trigger-3; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2474,7 +2501,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { // Generate one more file at level-0, which should trigger level-0 // compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2485,7 +2512,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { // Stage 3: // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one // more file at level-0, which should trigger level-0 compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2593,54 +2620,54 @@ TEST(DBTest, UniversalCompactionCompressRatio1) { // The first compaction (2) is compressed. for (int num = 0; num < 2; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 2 * 0.9); + ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 2 * 0.9); // The second compaction (4) is compressed for (int num = 0; num < 2; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 4 * 0.9); + ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 4 * 0.9); // The third compaction (2 4) is compressed since this time it is // (1 1 3.2) and 3.2/5.2 doesn't reach ratio. for (int num = 0; num < 2; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 6 * 0.9); + ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 6 * 0.9); // When we start for the compaction up to (2 4 8), the latest // compressed is not compressed. for (int num = 0; num < 8; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_GT((int) dbfull()->TEST_GetLevel0TotalSize(), - 120000 * 12 * 0.8 + 110000 * 2); + ASSERT_GT((int)dbfull()->TEST_GetLevel0TotalSize(), + 110000 * 11 * 0.8 + 110000 * 2); } TEST(DBTest, UniversalCompactionCompressRatio2) { @@ -2666,8 +2693,8 @@ TEST(DBTest, UniversalCompactionCompressRatio2) { dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), - 120000 * 12 * 0.8 + 110000 * 2); + ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), + 120000 * 12 * 0.8 + 120000 * 2); } #endif diff --git a/db/memtable.cc b/db/memtable.cc index f834d11e8..775b802d1 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -10,6 +10,7 @@ #include "db/memtable.h" #include +#include #include "db/dbformat.h" #include "db/merge_context.h" @@ -31,6 +32,8 @@ namespace rocksdb { MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) : comparator_(cmp), refs_(0), + kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)), + kWriteBufferSize(options.write_buffer_size), arena_(options.arena_block_size), table_(options.memtable_factory->CreateMemTableRep( comparator_, &arena_, options.prefix_extractor.get())), @@ -42,7 +45,11 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) mem_logfile_number_(0), locks_(options.inplace_update_support ? options.inplace_update_num_locks : 0), - prefix_extractor_(options.prefix_extractor.get()) { + prefix_extractor_(options.prefix_extractor.get()), + should_flush_(ShouldFlushNow()) { + // if should_flush_ == true without an entry inserted, something must have + // gone wrong already. + assert(!should_flush_); if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits, options.memtable_prefix_bloom_probes)); @@ -57,6 +64,60 @@ size_t MemTable::ApproximateMemoryUsage() { return arena_.ApproximateMemoryUsage() + table_->ApproximateMemoryUsage(); } +bool MemTable::ShouldFlushNow() const { + // In a lot of times, we cannot allocate arena blocks that exactly matches the + // buffer size. Thus we have to decide if we should over-allocate or + // under-allocate. + // This constant avariable can be interpreted as: if we still have more than + // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over + // allocate one more block. + const double kAllowOverAllocationRatio = 0.6; + + // If arena still have room for new block allocation, we can safely say it + // shouldn't flush. + auto allocated_memory = + table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes(); + + if (allocated_memory + kArenaBlockSize * kAllowOverAllocationRatio < + kWriteBufferSize) { + return false; + } + + // if user keeps adding entries that exceeds kWriteBufferSize, we need to + // flush + // earlier even though we still have much available memory left. + if (allocated_memory > kWriteBufferSize * (1 + kAllowOverAllocationRatio)) { + return true; + } + + // In this code path, Arena has already allocated its "last block", which + // means the total allocatedmemory size is either: + // (1) "moderately" over allocated the memory (no more than `0.4 * arena + // block size`. Or, + // (2) the allocated memory is less than write buffer size, but we'll stop + // here since if we allocate a new arena block, we'll over allocate too much + // more (half of the arena block size) memory. + // + // In either case, to avoid over-allocate, the last block will stop allocation + // when its usage reaches a certain ratio, which we carefully choose "0.75 + // full" as the stop condition because it addresses the following issue with + // great simplicity: What if the next inserted entry's size is + // bigger than AllocatedAndUnused()? + // + // The answer is: if the entry size is also bigger than 0.25 * + // kArenaBlockSize, a dedicated block will be allocated for it; otherwise + // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty + // and regular block. In either case, we *overly* over-allocated. + // + // Therefore, setting the last block to be at most "0.75 full" avoids both + // cases. + // + // NOTE: the average percentage of waste space of this approach can be counted + // as: "arena block size * 0.25 / write buffer size". User who specify a small + // write buffer size and/or big arena block size may suffer. + return arena_.AllocatedAndUnused() < kArenaBlockSize / 4; +} + int MemTable::KeyComparator::operator()(const char* prefix_len_key1, const char* prefix_len_key2) const { // Internal keys are encoded as length-prefixed strings. @@ -198,6 +259,8 @@ void MemTable::Add(SequenceNumber s, ValueType type, if (first_seqno_ == 0) { first_seqno_ = s; } + + should_flush_ = ShouldFlushNow(); } // Callback from MemTable::Get() @@ -460,13 +523,16 @@ bool MemTable::UpdateCallback(SequenceNumber seq, } } RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED); + should_flush_ = ShouldFlushNow(); return true; } else if (status == UpdateStatus::UPDATED) { Add(seq, kTypeValue, key, Slice(str_value)); RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN); + should_flush_ = ShouldFlushNow(); return true; } else if (status == UpdateStatus::UPDATE_FAILED) { // No action required. Return. + should_flush_ = ShouldFlushNow(); return true; } } diff --git a/db/memtable.h b/db/memtable.h index 414d4ac95..451def38f 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -64,6 +64,10 @@ class MemTable { // operations on the same MemTable. size_t ApproximateMemoryUsage(); + // This method heuristically determines if the memtable should continue to + // host more data. + bool ShouldFlush() const { return should_flush_; } + // Return an iterator that yields the contents of the memtable. // // The caller must ensure that the underlying MemTable remains live @@ -161,13 +165,20 @@ class MemTable { return comparator_.comparator; } + const Arena& TEST_GetArena() const { return arena_; } + private: + // Dynamically check if we can add more incoming entries. + bool ShouldFlushNow() const; + friend class MemTableIterator; friend class MemTableBackwardIterator; friend class MemTableList; KeyComparator comparator_; int refs_; + const size_t kArenaBlockSize; + const size_t kWriteBufferSize; Arena arena_; unique_ptr table_; @@ -199,6 +210,9 @@ class MemTable { const SliceTransform* const prefix_extractor_; std::unique_ptr prefix_bloom_; + + // a flag indicating if a memtable has met the criteria to flush + bool should_flush_; }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/util/arena.cc b/util/arena.cc index dffc8b88e..9b2cb82d1 100644 --- a/util/arena.cc +++ b/util/arena.cc @@ -42,6 +42,7 @@ Arena::~Arena() { char* Arena::AllocateFallback(size_t bytes, bool aligned) { if (bytes > kBlockSize / 4) { + ++irregular_block_num; // Object is more than a quarter of our block size. Allocate it separately // to avoid wasting too much space in leftover bytes. return AllocateNewBlock(bytes); diff --git a/util/arena.h b/util/arena.h index bfa7fe4d8..e6963355b 100644 --- a/util/arena.h +++ b/util/arena.h @@ -46,12 +46,19 @@ class Arena { size_t MemoryAllocatedBytes() const { return blocks_memory_; } + size_t AllocatedAndUnused() const { return alloc_bytes_remaining_; } + + // If an allocation is too big, we'll allocate an irregular block with the + // same size of that allocation. + virtual size_t IrregularBlockNum() const { return irregular_block_num; } + private: // Number of bytes allocated in one block const size_t kBlockSize; // Array of new[] allocated memory blocks typedef std::vector Blocks; Blocks blocks_; + size_t irregular_block_num = 0; // Stats for current active block. // For each block, we allocate aligned memory chucks from one end and -- GitLab