diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index b6e407e636e6121b0f5ed5963845a7d6c078cca7..67ba9cd4a6f403280e9ea131fabd03f39ce7f17e 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -19,10 +19,10 @@ namespace rocksdb { namespace { -uint64_t TotalFileSize(const std::vector& files) { +uint64_t TotalCompensatedFileSize(const std::vector& files) { uint64_t sum = 0; for (size_t i = 0; i < files.size() && files[i]; i++) { - sum += files[i]->fd.GetFileSize(); + sum += files[i]->compensated_file_size; } return sum; } @@ -80,7 +80,7 @@ void CompactionPicker::SizeBeingCompacted(std::vector& sizes) { for (auto c : compactions_in_progress_[level]) { assert(c->level() == level); for (int i = 0; i < c->num_input_files(0); i++) { - total += c->input(0, i)->fd.GetFileSize(); + total += c->input(0, i)->compensated_file_size; } } sizes[level] = total; @@ -261,9 +261,9 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { std::vector expanded0; c->input_version_->GetOverlappingInputs( level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr); - const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]); - const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]); - const uint64_t expanded0_size = TotalFileSize(expanded0); + const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0]); + const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1]); + const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); uint64_t limit = ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && @@ -335,7 +335,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, MaxFileSizeForLevel(input_level) * options_->source_compaction_factor; uint64_t total = 0; for (size_t i = 0; i + 1 < inputs.size(); ++i) { - uint64_t s = inputs[i]->fd.GetFileSize(); + uint64_t s = inputs[i]->compensated_file_size; total += s; if (total >= limit) { **compaction_end = inputs[i + 1]->smallest; @@ -483,11 +483,11 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, FileMetaData* f = c->input_version_->files_[level][index]; // check to verify files are arranged in descending size - assert( - (i == file_size.size() - 1) || - (i >= Version::number_of_files_to_sort_ - 1) || - (f->fd.GetFileSize() >= - c->input_version_->files_[level][file_size[i + 1]]->fd.GetFileSize())); + assert((i == file_size.size() - 1) || + (i >= Version::number_of_files_to_sort_ - 1) || + (f->compensated_file_size >= + c->input_version_->files_[level][file_size[i + 1]]-> + compensated_file_size)); // do not pick a file to compact if it is being compacted // from n-1 level. @@ -665,7 +665,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // This file is not being compacted. Consider it as the // first candidate to be compacted. - uint64_t candidate_size = f != nullptr ? f->fd.GetFileSize() : 0; + uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0; if (f != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: Possible candidate file %lu[%d].", @@ -703,9 +703,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // by the last-resort read amp strategy which disregards size ratios. break; } - candidate_size = f->fd.GetFileSize(); + candidate_size = f->compensated_file_size; } else { // default kCompactionStopStyleTotalSize - candidate_size += f->fd.GetFileSize(); + candidate_size += f->compensated_file_size; } candidate_count++; } @@ -721,10 +721,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int index = file_by_time[i]; FileMetaData* f = version->files_[level][index]; LogToBuffer(log_buffer, - "[%s] Universal: Skipping file %lu[%d] with size %lu %d\n", - version->cfd_->GetName().c_str(), - (unsigned long)f->fd.GetNumber(), i, - (unsigned long)f->fd.GetFileSize(), f->being_compacted); + "[%s] Universal: Skipping file %" PRIu64 "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), + i, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted); } } } @@ -759,10 +759,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int index = file_by_time[i]; FileMetaData* f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); - LogToBuffer( - log_buffer, "[%s] Universal: Picking file %lu[%d] with size %lu\n", - version->cfd_->GetName().c_str(), (unsigned long)f->fd.GetNumber(), i, - (unsigned long)f->fd.GetFileSize()); + LogToBuffer(log_buffer, + "[%s] Universal: Picking file %" PRIu64 "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")\n", + version->cfd_->GetName().c_str(), + f->fd.GetNumber(), i, + f->fd.GetFileSize(), f->compensated_file_size); } return c; } @@ -826,7 +828,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( " is already being compacted. No size amp reduction possible.\n"); return nullptr; } - candidate_size += f->fd.GetFileSize(); + candidate_size += f->compensated_file_size; candidate_count++; } if (candidate_count == 0) { @@ -866,10 +868,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); LogToBuffer(log_buffer, - "[%s] Universal: size amp picking file %lu[%d] with size %lu", - version->cfd_->GetName().c_str(), - (unsigned long)f->fd.GetNumber(), index, - (unsigned long)f->fd.GetFileSize()); + "[%s] Universal: size amp picking file %" PRIu64 "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + version->cfd_->GetName().c_str(), + f->fd.GetNumber(), index, + f->fd.GetFileSize(), f->compensated_file_size); } return c; } @@ -879,7 +882,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, assert(version->NumberLevels() == 1); uint64_t total_size = 0; for (const auto& file : version->files_[0]) { - total_size += file->fd.GetFileSize(); + total_size += file->compensated_file_size; } if (total_size <= options_->compaction_options_fifo.max_table_files_size || @@ -907,7 +910,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, for (auto ritr = version->files_[0].rbegin(); ritr != version->files_[0].rend(); ++ritr) { auto f = *ritr; - total_size -= f->fd.GetFileSize(); + total_size -= f->compensated_file_size; c->inputs_[0].push_back(f); char tmp_fsize[16]; AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); diff --git a/db/db_test.cc b/db/db_test.cc index ab559b53a228c1190018300cc74499b2e04b8364..6344722edf7702a90d76f98448c239696b02fe31 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2726,6 +2726,119 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1); } +namespace { +static const int kCDTValueSize = 1000; +static const int kCDTKeysPerBuffer = 4; +static const int kCDTNumLevels = 8; +Options DeletionTriggerOptions() { + Options options; + options.compression = kNoCompression; + options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24); + options.min_write_buffer_number_to_merge = 1; + options.num_levels = kCDTNumLevels; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 1; + options.target_file_size_base = options.write_buffer_size * 2; + options.target_file_size_multiplier = 2; + options.max_bytes_for_level_base = + options.target_file_size_base * options.target_file_size_multiplier; + options.max_bytes_for_level_multiplier = 2; + options.disable_auto_compactions = false; + return options; +} +} // anonymous namespace + +TEST(DBTest, CompactionDeletionTrigger) { + Options options = DeletionTriggerOptions(); + options.create_if_missing = true; + + for (int tid = 0; tid < 2; ++tid) { + uint64_t db_size[2]; + + DestroyAndReopen(&options); + Random rnd(301); + + const int kTestSize = kCDTKeysPerBuffer * 512; + std::vector values; + for (int k = 0; k < kTestSize; ++k) { + values.push_back(RandomString(&rnd, kCDTValueSize)); + ASSERT_OK(Put(Key(k), values[k])); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[0] = Size(Key(0), Key(kTestSize - 1)); + + for (int k = 0; k < kTestSize; ++k) { + ASSERT_OK(Delete(Key(k))); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[1] = Size(Key(0), Key(kTestSize - 1)); + + // must have much smaller db size. + ASSERT_GT(db_size[0] / 3, db_size[1]); + + // repeat the test with universal compaction + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; + } +} + +TEST(DBTest, CompactionDeletionTriggerReopen) { + for (int tid = 0; tid < 2; ++tid) { + uint64_t db_size[3]; + Options options = DeletionTriggerOptions(); + options.create_if_missing = true; + + DestroyAndReopen(&options); + Random rnd(301); + + // round 1 --- insert key/value pairs. + const int kTestSize = kCDTKeysPerBuffer * 512; + std::vector values; + for (int k = 0; k < kTestSize; ++k) { + values.push_back(RandomString(&rnd, kCDTValueSize)); + ASSERT_OK(Put(Key(k), values[k])); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[0] = Size(Key(0), Key(kTestSize - 1)); + Close(); + + // round 2 --- disable auto-compactions and issue deletions. + options.create_if_missing = false; + options.disable_auto_compactions = true; + Reopen(&options); + + for (int k = 0; k < kTestSize; ++k) { + ASSERT_OK(Delete(Key(k))); + } + db_size[1] = Size(Key(0), Key(kTestSize - 1)); + Close(); + // as auto_compaction is off, we shouldn't see too much reduce + // in db size. + ASSERT_LT(db_size[0] / 3, db_size[1]); + + // round 3 --- reopen db with auto_compaction on and see if + // deletion compensation still work. + options.disable_auto_compactions = false; + Reopen(&options); + // insert relatively small amount of data to trigger auto compaction. + for (int k = 0; k < kTestSize / 10; ++k) { + ASSERT_OK(Put(Key(k), values[k])); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[2] = Size(Key(0), Key(kTestSize - 1)); + // this time we're expecting significant drop in size. + ASSERT_GT(db_size[0] / 3, db_size[2]); + + // repeat the test with universal compaction + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; + } +} + // This is a static filter used for filtering // kvs during the compaction process. static int cfilter_count; diff --git a/db/version_edit.h b/db/version_edit.h index 1d214a1490a4a7889be8dd78ad0560f965ba0942..df1cc7827256952b3d09cf48dd437edee361f5db 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -40,6 +40,11 @@ struct FileDescriptor { struct FileMetaData { int refs; FileDescriptor fd; + uint64_t compensated_file_size; // File size compensated by deletion entry. + uint64_t num_entries; // the number of entries. + uint64_t num_deletions; // the number of deletion entries. + uint64_t raw_key_size; // total uncompressed key size. + uint64_t raw_value_size; // total uncompressed value size. InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table bool being_compacted; // Is this file undergoing compaction? @@ -52,6 +57,11 @@ struct FileMetaData { FileMetaData() : refs(0), fd(0, 0), + compensated_file_size(0), + num_entries(0), + num_deletions(0), + raw_key_size(0), + raw_value_size(0), being_compacted(false), table_reader_handle(nullptr) {} }; @@ -149,6 +159,7 @@ class VersionEdit { private: friend class VersionSet; + friend class Version; typedef std::set< std::pair> DeletedFileSet; diff --git a/db/version_set.cc b/db/version_set.cc index 10b25533c6ba143527b3feeb154bad9eba8e4559..29611f0a0c7e4a89d5f5ba5886e5c3a2d8f0b240 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -47,6 +47,15 @@ static uint64_t TotalFileSize(const std::vector& files) { return sum; } +static uint64_t TotalCompensatedFileSize( + const std::vector& files) { + uint64_t sum = 0; + for (size_t i = 0; i < files.size() && files[i]; i++) { + sum += files[i]->compensated_file_size; + } + return sum; +} + Version::~Version() { assert(refs_ == 0); @@ -241,53 +250,69 @@ class Version::LevelFileIteratorState : public TwoLevelIteratorState { bool for_compaction_; }; -Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { +Status Version::GetTableProperties(std::shared_ptr* tp, + const FileMetaData* file_meta, + const std::string* fname) { auto table_cache = cfd_->table_cache(); auto options = cfd_->options(); + Status s = table_cache->GetTableProperties( + vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd, + tp, true /* no io */); + if (s.ok()) { + return s; + } + + // We only ignore error type `Incomplete` since it's by design that we + // disallow table when it's not in table cache. + if (!s.IsIncomplete()) { + return s; + } + + // 2. Table is not present in table cache, we'll read the table properties + // directly from the properties block in the file. + std::unique_ptr file; + if (fname != nullptr) { + s = options->env->NewRandomAccessFile( + *fname, &file, vset_->storage_options_); + } else { + s = options->env->NewRandomAccessFile( + TableFileName(vset_->dbname_, file_meta->fd.GetNumber()), + &file, vset_->storage_options_); + } + if (!s.ok()) { + return s; + } + + TableProperties* raw_table_properties; + // By setting the magic number to kInvalidTableMagicNumber, we can by + // pass the magic number check in the footer. + s = ReadTableProperties( + file.get(), file_meta->fd.GetFileSize(), + Footer::kInvalidTableMagicNumber /* table's magic number */, + vset_->env_, options->info_log.get(), &raw_table_properties); + if (!s.ok()) { + return s; + } + RecordTick(options->statistics.get(), + NUMBER_DIRECT_LOAD_TABLE_PROPERTIES); + + *tp = std::shared_ptr(raw_table_properties); + return s; +} + +Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { for (int level = 0; level < num_levels_; level++) { for (const auto& file_meta : files_[level]) { auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber()); // 1. If the table is already present in table cache, load table // properties from there. std::shared_ptr table_properties; - Status s = table_cache->GetTableProperties( - vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd, - &table_properties, true /* no io */); + Status s = GetTableProperties(&table_properties, file_meta, &fname); if (s.ok()) { props->insert({fname, table_properties}); - continue; - } - - // We only ignore error type `Incomplete` since it's by design that we - // disallow table when it's not in table cache. - if (!s.IsIncomplete()) { - return s; - } - - // 2. Table is not present in table cache, we'll read the table properties - // directly from the properties block in the file. - std::unique_ptr file; - s = options->env->NewRandomAccessFile(fname, &file, - vset_->storage_options_); - if (!s.ok()) { - return s; - } - - TableProperties* raw_table_properties; - // By setting the magic number to kInvalidTableMagicNumber, we can by - // pass the magic number check in the footer. - s = ReadTableProperties( - file.get(), file_meta->fd.GetFileSize(), - Footer::kInvalidTableMagicNumber /* table's magic number */, - vset_->env_, options->info_log.get(), &raw_table_properties); - if (!s.ok()) { + } else { return s; } - RecordTick(options->statistics.get(), - NUMBER_DIRECT_LOAD_TABLE_PROPERTIES); - - props->insert({fname, std::shared_ptr( - raw_table_properties)}); } } @@ -492,7 +517,11 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, compaction_level_(num_levels_), version_number_(version_number), file_indexer_(num_levels_, cfd == nullptr ? nullptr - : cfd->internal_comparator().user_comparator()) { + : cfd->internal_comparator().user_comparator()), + total_file_size_(0), + total_raw_key_size_(0), + total_raw_value_size_(0), + num_non_deletions_(0) { } void Version::Get(const ReadOptions& options, @@ -699,6 +728,58 @@ void Version::PrepareApply(std::vector& size_being_compacted) { UpdateNumNonEmptyLevels(); } +bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { + if (file_meta->num_entries > 0) { + return false; + } + std::shared_ptr tp; + Status s = GetTableProperties(&tp, file_meta); + if (!s.ok()) { + return false; + } + if (tp.get() == nullptr) return false; + file_meta->num_entries = tp->num_entries; + file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties); + file_meta->raw_value_size = tp->raw_value_size; + file_meta->raw_key_size = tp->raw_key_size; + + return true; +} + +void Version::UpdateTemporaryStats(const VersionEdit* edit) { + static const int kDeletionWeightOnCompaction = 2; + + // incrementally update the average value size by + // including newly added files into the global stats + int init_count = 0; + int total_count = 0; + for (int level = 0; level < num_levels_; level++) { + for (auto* file_meta : files_[level]) { + if (MaybeInitializeFileMetaData(file_meta)) { + // each FileMeta will be initialized only once. + total_file_size_ += file_meta->fd.GetFileSize(); + total_raw_key_size_ += file_meta->raw_key_size; + total_raw_value_size_ += file_meta->raw_value_size; + num_non_deletions_ += + file_meta->num_entries - file_meta->num_deletions; + init_count++; + } + total_count++; + } + } + + uint64_t average_value_size = GetAverageValueSize(); + + // compute the compensated size + for (int level = 0; level < num_levels_; level++) { + for (auto* file_meta : files_[level]) { + file_meta->compensated_file_size = file_meta->fd.GetFileSize() + + file_meta->num_deletions * average_value_size * + kDeletionWeightOnCompaction; + } + } +} + void Version::ComputeCompactionScore( std::vector& size_being_compacted) { double max_score = 0; @@ -728,7 +809,7 @@ void Version::ComputeCompactionScore( uint64_t total_size = 0; for (unsigned int i = 0; i < files_[level].size(); i++) { if (!files_[level][i]->being_compacted) { - total_size += files_[level][i]->fd.GetFileSize(); + total_size += files_[level][i]->compensated_file_size; numfiles++; } } @@ -747,7 +828,7 @@ void Version::ComputeCompactionScore( } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = - TotalFileSize(files_[level]) - size_being_compacted[level]; + TotalCompensatedFileSize(files_[level]) - size_being_compacted[level]; score = static_cast(level_bytes) / cfd_->compaction_picker()->MaxBytesForLevel(level); if (max_score < score) { @@ -783,9 +864,10 @@ namespace { // Compator that is used to sort files based on their size // In normal mode: descending size -bool CompareSizeDescending(const Version::Fsize& first, - const Version::Fsize& second) { - return (first.file->fd.GetFileSize() > second.file->fd.GetFileSize()); +bool CompareCompensatedSizeDescending(const Version::Fsize& first, + const Version::Fsize& second) { + return (first.file->compensated_file_size > + second.file->compensated_file_size); } // A static compator used to sort files based on their seqno // In universal style : descending seqno @@ -846,7 +928,7 @@ void Version::UpdateFilesBySize() { num = temp.size(); } std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), - CompareSizeDescending); + CompareCompensatedSizeDescending); } assert(temp.size() == files.size()); @@ -1674,6 +1756,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!edit->IsColumnFamilyManipulation()) { // This is cpu-heavy operations, which should be called outside mutex. + v->UpdateTemporaryStats(edit); v->PrepareApply(size_being_compacted); } diff --git a/db/version_set.h b/db/version_set.h index cf526c2bd1e882d46602fd5c3181cb13cfb0f34e..542db74660c509da94d16528094f7bbaf0c52f32 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -196,6 +196,25 @@ class Version { // Returns the version nuber of this version uint64_t GetVersionNumber() const { return version_number_; } + uint64_t GetAverageValueSize() const { + if (num_non_deletions_ == 0) { + return 0; + } + assert(total_raw_key_size_ + total_raw_value_size_ > 0); + assert(total_file_size_ > 0); + return total_raw_value_size_ / num_non_deletions_ * total_file_size_ / + (total_raw_key_size_ + total_raw_value_size_); + } + + // REQUIRES: lock is held + // On success, "tp" will contains the table properties of the file + // specified in "file_meta". If the file name of "file_meta" is + // known ahread, passing it by a non-null "fname" can save a + // file-name conversion. + Status GetTableProperties(std::shared_ptr* tp, + const FileMetaData* file_meta, + const std::string* fname = nullptr); + // REQUIRES: lock is held // On success, *props will be populated with all SSTables' table properties. // The keys of `props` are the sst file name, the values of `props` are the @@ -228,6 +247,15 @@ class Version { // Update num_non_empty_levels_. void UpdateNumNonEmptyLevels(); + // The helper function of UpdateTemporaryStats, which may fill the missing + // fields of file_mata from its associated TableProperties. + // Returns true if it does initialize FileMetaData. + bool MaybeInitializeFileMetaData(FileMetaData* file_meta); + + // Update the temporary stats associated with the current version. + // This temporary stats will be used in compaction. + void UpdateTemporaryStats(const VersionEdit* edit); + // Sort all files for this version based on their file size and // record results in files_by_size_. The largest files are listed first. void UpdateFilesBySize(); @@ -285,6 +313,16 @@ class Version { Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0); FileIndexer file_indexer_; + // total file size + uint64_t total_file_size_; + // the total size of all raw keys. + uint64_t total_raw_key_size_; + // the total size of all raw values. + uint64_t total_raw_value_size_; + // total number of non-deletion entries + uint64_t num_non_deletions_; + + ~Version(); // re-initializes the index that is used to offset into files_by_size_