提交 5187d896 编写于 作者: L Lei Jin

unfriend Compaction and CompactionPicker from VersionSet

Summary: as title

Test Plan: running make all check

Reviewers: sdong, yhchiang, rven, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D27549
上级 75d7e2c3
...@@ -41,7 +41,7 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level, ...@@ -41,7 +41,7 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level,
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes), max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
input_version_(input_version), input_version_(input_version),
number_levels_(input_version_->NumberLevels()), number_levels_(input_version_->NumberLevels()),
cfd_(input_version_->cfd_), cfd_(input_version_->cfd()),
output_path_id_(output_path_id), output_path_id_(output_path_id),
output_compression_(output_compression), output_compression_(output_compression),
seek_compaction_(seek_compaction), seek_compaction_(seek_compaction),
...@@ -119,7 +119,7 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { ...@@ -119,7 +119,7 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) {
// Maybe use binary search to find right entry instead of linear search? // Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = cfd_->user_comparator(); const Comparator* user_cmp = cfd_->user_comparator();
for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) { for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->files_[lvl]; const std::vector<FileMetaData*>& files = input_version_->LevelFiles(lvl);
for (; level_ptrs_[lvl] < files.size(); ) { for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]]; FileMetaData* f = files[level_ptrs_[lvl]];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
...@@ -217,7 +217,7 @@ void Compaction::ReleaseCompactionFiles(Status status) { ...@@ -217,7 +217,7 @@ void Compaction::ReleaseCompactionFiles(Status status) {
} }
void Compaction::ResetNextCompactionIndex() { void Compaction::ResetNextCompactionIndex() {
input_version_->ResetNextCompactionIndex(start_level_); input_version_->SetNextCompactionIndex(start_level_, 0);
} }
namespace { namespace {
......
...@@ -331,7 +331,7 @@ Compaction* CompactionPicker::CompactRange( ...@@ -331,7 +331,7 @@ Compaction* CompactionPicker::CompactRange(
delete c; delete c;
Log(ioptions_.info_log, Log(ioptions_.info_log,
"[%s] Could not compact due to expansion failure.\n", "[%s] Could not compact due to expansion failure.\n",
version->cfd_->GetName().c_str()); version->cfd()->GetName().c_str());
return nullptr; return nullptr;
} }
...@@ -455,22 +455,21 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( ...@@ -455,22 +455,21 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
// Pick the largest file in this level that is not already // Pick the largest file in this level that is not already
// being compacted // being compacted
std::vector<int>& file_size = c->input_version_->files_by_size_[level]; std::vector<int>& file_size = version->files_by_size_[level];
// record the first file that is not yet compacted // record the first file that is not yet compacted
int nextIndex = -1; int nextIndex = -1;
for (unsigned int i = c->input_version_->next_file_to_compact_by_size_[level]; for (unsigned int i = version->NextCompactionIndex(level);
i < file_size.size(); i++) { i < file_size.size(); i++) {
int index = file_size[i]; int index = file_size[i];
FileMetaData* f = c->input_version_->files_[level][index]; FileMetaData* f = version->files_[level][index];
// Check to verify files are arranged in descending compensated size. // Check to verify files are arranged in descending compensated size.
assert((i == file_size.size() - 1) || assert((i == file_size.size() - 1) ||
(i >= Version::number_of_files_to_sort_ - 1) || (i >= Version::number_of_files_to_sort_ - 1) ||
(f->compensated_file_size >= (f->compensated_file_size >=
c->input_version_->files_[level][file_size[i + 1]]-> version->files_[level][file_size[i + 1]]->compensated_file_size));
compensated_file_size));
// do not pick a file to compact if it is being compacted // do not pick a file to compact if it is being compacted
// from n-1 level. // from n-1 level.
...@@ -486,7 +485,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( ...@@ -486,7 +485,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
// Do not pick this file if its parents at level+1 are being compacted. // Do not pick this file if its parents at level+1 are being compacted.
// Maybe we can avoid redoing this work in SetupOtherInputs // Maybe we can avoid redoing this work in SetupOtherInputs
int parent_index = -1; int parent_index = -1;
if (ParentRangeInCompaction(c->input_version_, &f->smallest, &f->largest, if (ParentRangeInCompaction(version, &f->smallest, &f->largest,
level, &parent_index)) { level, &parent_index)) {
continue; continue;
} }
...@@ -502,7 +501,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( ...@@ -502,7 +501,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
} }
// store where to start the iteration in the next call to PickCompaction // store where to start the iteration in the next call to PickCompaction
version->next_file_to_compact_by_size_[level] = nextIndex; version->SetNextCompactionIndex(level, nextIndex);
return c; return c;
} }
......
...@@ -615,6 +615,8 @@ uint64_t Version::GetEstimatedActiveKeys() { ...@@ -615,6 +615,8 @@ uint64_t Version::GetEstimatedActiveKeys() {
void Version::AddIterators(const ReadOptions& read_options, void Version::AddIterators(const ReadOptions& read_options,
const EnvOptions& soptions, const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder) { MergeIteratorBuilder* merge_iter_builder) {
assert(finalized_);
// Merge all level zero files together since they may overlap // Merge all level zero files together since they may overlap
for (size_t i = 0; i < file_levels_[0].num_files; i++) { for (size_t i = 0; i < file_levels_[0].num_files; i++) {
const auto& file = file_levels_[0].files[i]; const auto& file = file_levels_[0].files[i];
...@@ -675,7 +677,8 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, ...@@ -675,7 +677,8 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
accumulated_raw_value_size_(0), accumulated_raw_value_size_(0),
accumulated_num_non_deletions_(0), accumulated_num_non_deletions_(0),
accumulated_num_deletions_(0), accumulated_num_deletions_(0),
num_samples_(0) { num_samples_(0),
finalized_(false) {
if (cfd != nullptr && cfd->current() != nullptr) { if (cfd != nullptr && cfd->current() != nullptr) {
accumulated_file_size_ = cfd->current()->accumulated_file_size_; accumulated_file_size_ = cfd->current()->accumulated_file_size_;
accumulated_raw_key_size_ = cfd->current()->accumulated_raw_key_size_; accumulated_raw_key_size_ = cfd->current()->accumulated_raw_key_size_;
...@@ -942,13 +945,20 @@ void Version::ComputeCompactionScore( ...@@ -942,13 +945,20 @@ void Version::ComputeCompactionScore(
} }
namespace { namespace {
// used to sort files by size
struct Fsize {
int index;
FileMetaData* file;
};
// Compator that is used to sort files based on their size // Compator that is used to sort files based on their size
// In normal mode: descending size // In normal mode: descending size
bool CompareCompensatedSizeDescending(const Version::Fsize& first, bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
const Version::Fsize& second) {
return (first.file->compensated_file_size > return (first.file->compensated_file_size >
second.file->compensated_file_size); second.file->compensated_file_size);
} }
} // anonymous namespace } // anonymous namespace
void Version::UpdateNumNonEmptyLevels() { void Version::UpdateNumNonEmptyLevels() {
...@@ -1683,6 +1693,9 @@ VersionSet::~VersionSet() { ...@@ -1683,6 +1693,9 @@ VersionSet::~VersionSet() {
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Version* v) { Version* v) {
// Mark v finalized
v->finalized_ = true;
// Make "v" current // Make "v" current
assert(v->refs_ == 0); assert(v->refs_ == 0);
Version* current = column_family_data->current(); Version* current = column_family_data->current();
......
...@@ -41,7 +41,6 @@ namespace rocksdb { ...@@ -41,7 +41,6 @@ namespace rocksdb {
namespace log { class Writer; } namespace log { class Writer; }
class Compaction; class Compaction;
class CompactionPicker;
class Iterator; class Iterator;
class LogBuffer; class LogBuffer;
class LookupKey; class LookupKey;
...@@ -87,7 +86,6 @@ class Version { ...@@ -87,7 +86,6 @@ class Version {
// Append to *iters a sequence of iterators that will // Append to *iters a sequence of iterators that will
// yield the contents of this Version when merged together. // yield the contents of this Version when merged together.
// REQUIRES: This version has been saved (see VersionSet::SaveTo) // REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, const EnvOptions& soptions, void AddIterators(const ReadOptions&, const EnvOptions& soptions,
MergeIteratorBuilder* merger_iter_builder); MergeIteratorBuilder* merger_iter_builder);
...@@ -179,8 +177,11 @@ class Version { ...@@ -179,8 +177,11 @@ class Version {
int NumberLevels() const { return num_levels_; } int NumberLevels() const { return num_levels_; }
// REQUIRES: lock is held // REQUIRES: This version has been saved (see VersionSet::SaveTo)
int NumLevelFiles(int level) const { return files_[level].size(); } int NumLevelFiles(int level) const {
assert(finalized_);
return files_[level].size();
}
// Return the combined file size of all files at the specified level. // Return the combined file size of all files at the specified level.
uint64_t NumLevelBytes(int level) const; uint64_t NumLevelBytes(int level) const;
...@@ -242,19 +243,31 @@ class Version { ...@@ -242,19 +243,31 @@ class Version {
size_t GetMemoryUsageByTableReaders(); size_t GetMemoryUsageByTableReaders();
// used to sort files by size ColumnFamilyData* cfd() const { return cfd_; }
struct Fsize {
int index; // REQUIRES: This version has been saved (see VersionSet::SaveTo)
FileMetaData* file; const std::vector<FileMetaData*>& LevelFiles(int level) const {
}; assert(finalized_);
return files_[level];
}
// REQUIRES: lock is held
// Set the index that is used to offset into files_by_size_ to find
// the next compaction candidate file.
void SetNextCompactionIndex(int level, int index) {
next_file_to_compact_by_size_[level] = index;
}
// REQUIRES: lock is held
int NextCompactionIndex(int level) const {
return next_file_to_compact_by_size_[level];
}
private: private:
friend class Compaction;
friend class VersionSet; friend class VersionSet;
friend class DBImpl; friend class DBImpl;
friend class CompactedDBImpl; friend class CompactedDBImpl;
friend class ColumnFamilyData; friend class ColumnFamilyData;
friend class CompactionPicker;
friend class LevelCompactionPicker; friend class LevelCompactionPicker;
friend class UniversalCompactionPicker; friend class UniversalCompactionPicker;
friend class FIFOCompactionPicker; friend class FIFOCompactionPicker;
...@@ -356,13 +369,11 @@ class Version { ...@@ -356,13 +369,11 @@ class Version {
// the number of samples // the number of samples
uint64_t num_samples_; uint64_t num_samples_;
~Version(); // Used to assert APIs that are only safe to use after the version
// is finalized
bool finalized_;
// re-initializes the index that is used to offset into files_by_size_ ~Version();
// to find the next compaction candidate file.
void ResetNextCompactionIndex(int level) {
next_file_to_compact_by_size_[level] = 0;
}
// No copying allowed // No copying allowed
Version(const Version&); Version(const Version&);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册