提交 76d1c28e 编写于 作者: S sdong

Make CompactionPicker more easily tested

Summary:
Make compaction picker easier to test.
The basic idea is to separate a minimum subcomponent of Version to VersionStorageInfo, which just responsible to LSM tree. A stub VersionStorageInfo can then be easily created and passed into compaction picker so that we can check the outputs.

It now passes most tests. Still two things need to be done:
(1) deal with the FIFO compaction's file size.
(2) write an example test to make sure the interface can do the job.

Add a compaction_picker_test to make sure compaction picker codes can be easily unit tested.

Test Plan:
Pass all unit tests and compaction_picker_test

Reviewers: yhchiang, rven, igor, ljin

Reviewed By: ljin

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D27639
上级 01e6f850
......@@ -131,6 +131,7 @@ TESTS = \
spatial_db_test \
version_edit_test \
version_set_test \
compaction_picker_test \
file_indexer_test \
write_batch_test \
write_controller_test\
......@@ -452,6 +453,9 @@ version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS)
version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
compaction_picker_test: db/compaction_picker_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/compaction_picker_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
file_indexer_test : db/file_indexer_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/file_indexer_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
......
......@@ -324,8 +324,9 @@ ColumnFamilyData::~ColumnFamilyData() {
void ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
if (current_ != nullptr) {
const double score = current_->MaxCompactionScore();
const int max_level = current_->MaxCompactionScoreLevel();
auto* vstorage = current_->GetStorageInfo();
const double score = vstorage->MaxCompactionScore();
const int max_level = vstorage->MaxCompactionScoreLevel();
auto write_controller = column_family_set_->write_controller_;
......@@ -337,26 +338,26 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"(waiting for flush), max_write_buffer_number is set to %d",
name_.c_str(), imm()->size(),
mutable_cf_options.max_write_buffer_number);
} else if (current_->NumLevelFiles(0) >=
} else if (vstorage->NumLevelFiles(0) >=
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), current_->NumLevelFiles(0));
name_.c_str(), vstorage->NumLevelFiles(0));
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
current_->NumLevelFiles(0) >=
vstorage->NumLevelFiles(0) >=
mutable_cf_options.level0_slowdown_writes_trigger) {
uint64_t slowdown = SlowdownAmount(
current_->NumLevelFiles(0),
mutable_cf_options.level0_slowdown_writes_trigger,
mutable_cf_options.level0_stop_writes_trigger);
uint64_t slowdown =
SlowdownAmount(vstorage->NumLevelFiles(0),
mutable_cf_options.level0_slowdown_writes_trigger,
mutable_cf_options.level0_stop_writes_trigger);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
"us)",
name_.c_str(), current_->NumLevelFiles(0), slowdown);
name_.c_str(), vstorage->NumLevelFiles(0), slowdown);
} else if (mutable_cf_options.hard_rate_limit > 1.0 &&
score > mutable_cf_options.hard_rate_limit) {
uint64_t kHardLimitSlowdown = 1000;
......@@ -403,8 +404,11 @@ void ColumnFamilyData::CreateNewMemtable(
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto result = compaction_picker_->PickCompaction(
mutable_options, current_, log_buffer);
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, current_->GetStorageInfo(), log_buffer);
if (result != nullptr) {
result->SetInputVersion(current_);
}
return result;
}
......@@ -413,9 +417,13 @@ Compaction* ColumnFamilyData::CompactRange(
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(
mutable_cf_options, current_, input_level, output_level,
output_path_id, begin, end, compaction_end);
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, current_->GetStorageInfo(), input_level,
output_level, output_path_id, begin, end, compaction_end);
if (result != nullptr) {
result->SetInputVersion(current_);
}
return result;
}
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
......
......@@ -29,7 +29,17 @@ uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
return sum;
}
Compaction::Compaction(Version* input_version, int start_level, int out_level,
void Compaction::SetInputVersion(Version* input_version) {
input_version_ = input_version;
cfd_ = input_version_->cfd();
cfd_->Ref();
input_version_->Ref();
edit_ = new VersionEdit();
edit_->SetColumnFamily(cfd_->GetID());
}
Compaction::Compaction(int number_levels, int start_level, int out_level,
uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes,
uint32_t output_path_id,
......@@ -39,9 +49,10 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level,
output_level_(out_level),
max_output_file_size_(target_file_size),
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
input_version_(input_version),
number_levels_(input_version_->NumberLevels()),
cfd_(input_version_->cfd()),
input_version_(nullptr),
edit_(nullptr),
number_levels_(number_levels),
cfd_(nullptr),
output_path_id_(output_path_id),
output_compression_(output_compression),
seek_compaction_(seek_compaction),
......@@ -56,10 +67,6 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level,
is_full_compaction_(false),
is_manual_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
cfd_->Ref();
input_version_->Ref();
edit_ = new VersionEdit();
edit_->SetColumnFamily(cfd_->GetID());
for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0;
}
......@@ -113,6 +120,7 @@ void Compaction::AddInputDeletions(VersionEdit* edit) {
}
bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) {
assert(input_version_ != nullptr);
assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO);
if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
return bottommost_level_;
......@@ -120,7 +128,8 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) {
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = cfd_->user_comparator();
for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->LevelFiles(lvl);
const std::vector<FileMetaData*>& files =
input_version_->GetStorageInfo()->LevelFiles(lvl);
for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
......@@ -176,9 +185,9 @@ void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
}
// Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool is_manual) {
assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO);
if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) {
void Compaction::SetupBottomMostLevel(VersionStorageInfo* vstorage,
bool is_manual, bool level0_only) {
if (level0_only) {
// If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that
// all files will be picked in a single compaction
......@@ -193,7 +202,7 @@ void Compaction::SetupBottomMostLevel(bool is_manual) {
bottommost_level_ = true;
// checks whether there are files living beyond the output_level.
for (int i = output_level_ + 1; i < number_levels_; i++) {
if (input_version_->NumLevelFiles(i) > 0) {
if (vstorage->NumLevelFiles(i) > 0) {
bottommost_level_ = false;
break;
}
......@@ -218,7 +227,8 @@ void Compaction::ReleaseCompactionFiles(Status status) {
}
void Compaction::ResetNextCompactionIndex() {
input_version_->SetNextCompactionIndex(start_level_, 0);
assert(input_version_ != nullptr);
input_version_->GetStorageInfo()->ResetNextCompactionIndex(start_level_);
}
namespace {
......
......@@ -28,6 +28,7 @@ struct CompactionInputFiles {
class Version;
class ColumnFamilyData;
class VersionStorageInfo;
// A Compaction encapsulates information about a compaction.
class Compaction {
......@@ -161,13 +162,15 @@ class Compaction {
// is the sum of all input file sizes.
uint64_t OutputFilePreallocationSize(const MutableCFOptions& mutable_options);
void SetInputVersion(Version* input_version);
private:
friend class CompactionPicker;
friend class UniversalCompactionPicker;
friend class FIFOCompactionPicker;
friend class LevelCompactionPicker;
Compaction(Version* input_version, int start_level, int out_level,
Compaction(int num_levels, int start_level, int out_level,
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
uint32_t output_path_id, CompressionType output_compression,
bool seek_compaction = false, bool deletion_compaction = false);
......@@ -230,7 +233,8 @@ class Compaction {
// bottommost level.
//
// @see BottomMostLevel()
void SetupBottomMostLevel(bool is_manual);
void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual,
bool level0_only);
// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
......
此差异已折叠。
......@@ -18,12 +18,13 @@
#include <vector>
#include <memory>
#include <set>
#include <string>
namespace rocksdb {
class LogBuffer;
class Compaction;
class Version;
class VersionStorageInfo;
class CompactionPicker {
public:
......@@ -35,9 +36,10 @@ class CompactionPicker {
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) = 0;
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) = 0;
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
......@@ -51,9 +53,9 @@ class CompactionPicker {
// Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey!
virtual Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options, Version* version,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
// Given the current number of levels, returns the lowest allowed level
......@@ -93,18 +95,21 @@ class CompactionPicker {
// populated.
//
// Will return false if it is impossible to apply this compaction.
bool ExpandWhileOverlapping(Compaction* c);
bool ExpandWhileOverlapping(const std::string& cf_name,
VersionStorageInfo* vstorage, Compaction* c);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(std::vector<FileMetaData*>& files);
// Returns true if any one of the parent files are being compacted
bool ParentRangeInCompaction(Version* version, const InternalKey* smallest,
bool ParentRangeInCompaction(VersionStorageInfo* vstorage,
const InternalKey* smallest,
const InternalKey* largest, int level,
int* index);
void SetupOtherInputs(const MutableCFOptions& mutable_cf_options,
Compaction* c);
void SetupOtherInputs(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, Compaction* c);
const ImmutableCFOptions& ioptions_;
......@@ -121,9 +126,10 @@ class UniversalCompactionPicker : public CompactionPicker {
UniversalCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
// The maxinum allowed input level. Always return 0.
virtual int MaxInputLevel(int current_num_levels) const override {
......@@ -133,14 +139,14 @@ class UniversalCompactionPicker : public CompactionPicker {
private:
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(
const MutableCFOptions& mutable_cf_options,
Version* version, double score, unsigned int ratio,
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score, unsigned int ratio,
unsigned int num_files, LogBuffer* log_buffer);
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(
const MutableCFOptions& mutable_cf_options,
Version* version, double score, LogBuffer* log_buffer);
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, double score, LogBuffer* log_buffer);
// Pick a path ID to place a newly generated file, with its estimated file
// size.
......@@ -153,9 +159,10 @@ class LevelCompactionPicker : public CompactionPicker {
LevelCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
// Returns current_num_levels - 2, meaning the last level cannot be
// compaction input level.
......@@ -169,7 +176,8 @@ class LevelCompactionPicker : public CompactionPicker {
// If level is 0 and there is already a compaction on that level, this
// function will return nullptr.
Compaction* PickCompactionBySize(const MutableCFOptions& mutable_cf_options,
Version* version, int level, double score);
VersionStorageInfo* vstorage, int level,
double score);
};
class FIFOCompactionPicker : public CompactionPicker {
......@@ -178,14 +186,15 @@ class FIFOCompactionPicker : public CompactionPicker {
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* version,
LogBuffer* log_buffer) override;
virtual Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options, Version* version,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) override;
// The maxinum allowed input level. Always return 0.
......
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/compaction_picker.h"
#include <string>
#include "util/logging.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
class CountingLogger : public Logger {
public:
virtual void Logv(const char* format, va_list ap) override { log_count++; }
size_t log_count;
};
class CompactionPickerTest {
public:
const Comparator* ucmp;
InternalKeyComparator icmp;
Options options;
ImmutableCFOptions ioptions;
MutableCFOptions mutable_cf_options;
LevelCompactionPicker level_compaction_picker;
std::string cf_name;
CountingLogger logger;
LogBuffer log_buffer;
VersionStorageInfo vstorage;
uint32_t file_num;
CompactionOptionsFIFO fifo_options;
std::vector<uint64_t> size_being_compacted;
CompactionPickerTest()
: ucmp(BytewiseComparator()),
icmp(ucmp),
ioptions(options),
mutable_cf_options(options, ioptions),
level_compaction_picker(ioptions, &icmp),
cf_name("dummy"),
log_buffer(InfoLogLevel::INFO_LEVEL, &logger),
vstorage(&icmp, ucmp, options.num_levels, kCompactionStyleLevel,
nullptr),
file_num(1) {
fifo_options.max_table_files_size = 1;
mutable_cf_options.RefreshDerivedOptions(ioptions);
size_being_compacted.resize(options.num_levels);
}
~CompactionPickerTest() {
auto* files = vstorage.GetFiles();
for (int i = 0; i < vstorage.NumberLevels(); i++) {
for (auto* f : files[i]) {
delete f;
}
}
}
void Add(int level, uint32_t file_number, const char* smallest,
const char* largest, uint64_t file_size = 0, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100,
SequenceNumber largest_seq = 100) {
assert(level < vstorage.NumberLevels());
auto& files = vstorage.GetFiles()[level];
FileMetaData* f = new FileMetaData;
f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
f->largest = InternalKey(largest, largest_seq, kTypeValue);
f->compensated_file_size = file_size;
files.push_back(f);
}
void UpdateVersionStorageInfo() {
vstorage.ComputeCompactionScore(mutable_cf_options, fifo_options,
size_being_compacted);
vstorage.UpdateFilesBySize();
vstorage.UpdateNumNonEmptyLevels();
vstorage.GenerateFileIndexer();
vstorage.GenerateLevelFilesBrief();
vstorage.SetFinalized();
}
};
TEST(CompactionPickerTest, Empty) {
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
ASSERT_TRUE(compaction.get() == nullptr);
}
TEST(CompactionPickerTest, Single) {
mutable_cf_options.level0_file_num_compaction_trigger = 2;
Add(0, 1U, "p", "q");
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
ASSERT_TRUE(compaction.get() == nullptr);
}
TEST(CompactionPickerTest, Level0Trigger) {
mutable_cf_options.level0_file_num_compaction_trigger = 2;
Add(0, 1U, "150", "200");
Add(0, 2U, "200", "250");
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2, compaction->num_input_files(0));
ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber());
}
TEST(CompactionPickerTest, Level1Trigger) {
Add(1, 66U, "150", "200", 1000000000U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1, compaction->num_input_files(0));
ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber());
}
TEST(CompactionPickerTest, Level1Trigger2) {
Add(1, 66U, "150", "200", 1000000000U);
Add(1, 88U, "201", "300", 1000000000U);
Add(2, 6U, "150", "180", 1000000000U);
Add(2, 7U, "180", "220", 1000000000U);
Add(2, 8U, "220", "300", 1000000000U);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name, mutable_cf_options, &vstorage, &log_buffer));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1, compaction->num_input_files(0));
ASSERT_EQ(2, compaction->num_input_files(1));
ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(6U, compaction->input(1, 0)->fd.GetNumber());
ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber());
}
} // namespace rocksdb
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }
......@@ -1497,9 +1497,9 @@ Status DBImpl::FlushMemTableToOutputFile(
if (madeProgress) {
*madeProgress = 1;
}
Version::LevelSummaryStorage tmp;
VersionStorageInfo::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->LevelSummary(&tmp));
cfd->current()->GetStorageInfo()->LevelSummary(&tmp));
if (disable_delete_obsolete_files_ == 0) {
// add to deletion state
......@@ -1545,7 +1545,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
MutexLock l(&mutex_);
Version* base = cfd->current();
for (int level = 1; level < cfd->NumberLevels(); level++) {
if (base->OverlapInLevel(level, begin, end)) {
if (base->GetStorageInfo()->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
}
......@@ -1623,14 +1623,14 @@ bool DBImpl::SetOptions(ColumnFamilyHandle* column_family,
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, int level) {
mutex_.AssertHeld();
Version* current = cfd->current();
auto* vstorage = cfd->current()->GetStorageInfo();
int minimum_level = level;
for (int i = level - 1; i > 0; --i) {
// stop if level i is not empty
if (current->NumLevelFiles(i) > 0) break;
if (vstorage->NumLevelFiles(i) > 0) break;
// stop if level i is too small (cannot fit the level files)
if (mutable_cf_options.MaxBytesForLevel(i) <
current->NumLevelBytes(level)) {
vstorage->NumLevelBytes(level)) {
break;
}
......@@ -1682,7 +1682,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
for (const auto& f : cfd->current()->files_[level]) {
for (const auto& f : cfd->current()->GetStorageInfo()->files_[level]) {
edit.DeleteFile(level, f->fd.GetNumber());
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
......@@ -1898,7 +1898,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bool is_compaction_needed = false;
// no need to refcount since we're under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->current()->NeedsCompaction()) {
if (cfd->current()->GetStorageInfo()->NeedsCompaction()) {
is_compaction_needed = true;
break;
}
......@@ -2269,14 +2269,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options());
Version::LevelSummaryStorage tmp;
LogToBuffer(
log_buffer,
"[%s] Moved #%" PRIu64 " to level-%d %" PRIu64 " bytes %s: %s\n",
c->column_family_data()->GetName().c_str(),
f->fd.GetNumber(), c->level() + 1,
f->fd.GetFileSize(),
status.ToString().c_str(), c->input_version()->LevelSummary(&tmp));
VersionStorageInfo::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, "[%s] Moved #%" PRIu64 " to level-%d %" PRIu64
" bytes %s: %s\n",
c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
c->level() + 1, f->fd.GetFileSize(), status.ToString().c_str(),
c->input_version()->GetStorageInfo()->LevelSummary(&tmp));
c->ReleaseCompactionFiles(status);
*madeProgress = true;
} else {
......@@ -3008,7 +3006,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
LogToBuffer(log_buffer, "[%s] Compaction start summary: %s\n",
cfd->GetName().c_str(), scratch);
assert(cfd->current()->NumLevelFiles(compact->compaction->level()) > 0);
assert(cfd->current()->GetStorageInfo()->NumLevelFiles(
compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(!compact->outfile);
......@@ -3246,26 +3245,26 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
status = InstallCompactionResults(compact, mutable_cf_options, log_buffer);
InstallSuperVersionBackground(cfd, job_context, mutable_cf_options);
}
Version::LevelSummaryStorage tmp;
LogToBuffer(
log_buffer,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
cfd->GetName().c_str(), cfd->current()->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1) /
static_cast<double>(stats.micros),
stats.bytes_written / static_cast<double>(stats.micros),
compact->compaction->output_level(), stats.files_in_leveln,
stats.files_in_levelnp1, stats.files_out_levelnp1,
stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
stats.bytes_written / 1048576.0,
(stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
(double)stats.bytes_readn,
stats.bytes_written / (double)stats.bytes_readn,
status.ToString().c_str(), stats.num_input_records,
stats.num_dropped_records);
VersionStorageInfo::LevelSummaryStorage tmp;
LogToBuffer(log_buffer,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
cfd->GetName().c_str(),
cfd->current()->GetStorageInfo()->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1) /
static_cast<double>(stats.micros),
stats.bytes_written / static_cast<double>(stats.micros),
compact->compaction->output_level(), stats.files_in_leveln,
stats.files_in_levelnp1, stats.files_out_levelnp1,
stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
stats.bytes_written / 1048576.0,
(stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
(double)stats.bytes_readn,
stats.bytes_written / (double)stats.bytes_readn,
status.ToString().c_str(), stats.num_input_records,
stats.num_dropped_records);
return status;
}
......@@ -4375,16 +4374,16 @@ Status DBImpl::DeleteFile(std::string name) {
// Only the files in the last level can be deleted externally.
// This is to make sure that any deletion tombstones are not
// lost. Check that the level passed is the last level.
auto* vstoreage = cfd->current()->GetStorageInfo();
for (int i = level + 1; i < cfd->NumberLevels(); i++) {
if (cfd->current()->NumLevelFiles(i) != 0) {
if (vstoreage->NumLevelFiles(i) != 0) {
Log(db_options_.info_log,
"DeleteFile %s FAILED. File not in last level\n", name.c_str());
return Status::InvalidArgument("File not in last level");
}
}
// if level == 0, it has to be the oldest file
if (level == 0 &&
cfd->current()->files_[0].back()->fd.GetNumber() != number) {
if (level == 0 && vstoreage->files_[0].back()->fd.GetNumber() != number) {
return Status::InvalidArgument("File in level 0, but not oldest");
}
edit.SetColumnFamily(cfd->GetID());
......@@ -4637,9 +4636,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
Version* current = cfd->current();
for (int i = 1; i < current->NumberLevels(); ++i) {
int num_files = current->NumLevelFiles(i);
auto* vstorage = cfd->current()->GetStorageInfo();
for (int i = 1; i < vstorage->NumberLevels(); ++i) {
int num_files = vstorage->NumLevelFiles(i);
if (num_files > 0) {
s = Status::InvalidArgument(
"Not all files are at level 0. Cannot "
......
......@@ -17,7 +17,8 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { PurgeObsoleteWALFiles(); }
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
MutexLock l(&mutex_);
return default_cf_handle_->cfd()->current()->NumLevelBytes(0);
return default_cf_handle_->cfd()->current()->GetStorageInfo()->NumLevelBytes(
0);
}
Iterator* DBImpl::TEST_NewInternalIterator(Arena* arena,
......@@ -47,7 +48,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
cfd = cfh->cfd();
}
MutexLock l(&mutex_);
return cfd->current()->MaxNextLevelOverlappingBytes();
return cfd->current()->GetStorageInfo()->MaxNextLevelOverlappingBytes();
}
void DBImpl::TEST_GetFilesMetaData(
......@@ -58,7 +59,8 @@ void DBImpl::TEST_GetFilesMetaData(
MutexLock l(&mutex_);
metadata->resize(NumberLevels());
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = cfd->current()->files_[level];
const std::vector<FileMetaData*>& files =
cfd->current()->GetStorageInfo()->LevelFiles(level);
(*metadata)[level].clear();
for (const auto& f : files) {
......
......@@ -202,8 +202,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
if (base != nullptr && db_options_.max_background_compactions <= 1 &&
db_options_.max_background_flushes == 0 &&
cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
level = base->PickLevelForMemTableOutput(mutable_cf_options_,
min_user_key, max_user_key);
level = base->GetStorageInfo()->PickLevelForMemTableOutput(
mutable_cf_options_, min_user_key, max_user_key);
}
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
......
......@@ -220,7 +220,8 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
if (!seek_to_first) {
user_key = ExtractUserKey(internal_key);
}
const std::vector<FileMetaData*>& l0 = sv_->current->LevelFiles(0);
VersionStorageInfo* vstorage = sv_->current->GetStorageInfo();
const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
for (uint32_t i = 0; i < l0.size(); ++i) {
if (seek_to_first) {
l0_iters_[i]->SeekToFirst();
......@@ -248,9 +249,9 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
int32_t search_left_bound = 0;
int32_t search_right_bound = FileIndexer::kLevelMaxIndex;
for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) {
for (int32_t level = 1; level < vstorage->NumberLevels(); ++level) {
const std::vector<FileMetaData*>& level_files =
sv_->current->LevelFiles(level);
vstorage->LevelFiles(level);
if (level_files.empty()) {
search_left_bound = 0;
search_right_bound = FileIndexer::kLevelMaxIndex;
......@@ -258,7 +259,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
}
assert(level_iters_[level - 1] != nullptr);
uint32_t f_idx = 0;
const auto& indexer = sv_->current->GetIndexer();
const auto& indexer = vstorage->GetIndexer();
if (!seek_to_first) {
if (search_left_bound == search_right_bound) {
f_idx = search_left_bound;
......@@ -428,15 +429,18 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
}
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
const auto& l0_files = sv_->current->LevelFiles(0);
auto* vstorage = sv_->current->GetStorageInfo();
const auto& l0_files = vstorage->LevelFiles(0);
l0_iters_.reserve(l0_files.size());
for (const auto* l0 : l0_files) {
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd));
}
level_iters_.reserve(sv_->current->NumberLevels() - 1);
for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) {
const auto& level_files = sv_->current->LevelFiles(level);
level_iters_.reserve(vstorage->NumberLevels() - 1);
for (int32_t level = 1; level < vstorage->NumberLevels(); ++level) {
const auto& level_files = vstorage->LevelFiles(level);
if (level_files.empty()) {
level_iters_.push_back(nullptr);
} else {
......@@ -450,7 +454,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
}
void ForwardIterator::ResetIncompleteIterators() {
const auto& l0_files = sv_->current->LevelFiles(0);
const auto& l0_files = sv_->current->GetStorageInfo()->LevelFiles(0);
for (uint32_t i = 0; i < l0_iters_.size(); ++i) {
assert(i < l0_files.size());
if (!l0_iters_[i]->status().IsIncomplete()) {
......
......@@ -169,7 +169,8 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type,
const Slice& property,
std::string* value) {
assert(value != nullptr);
Version* current = cfd_->current();
auto* current = cfd_->current();
auto* vstorage = current->GetStorageInfo();
Slice in = property;
switch (property_type) {
......@@ -182,7 +183,7 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type,
} else {
char buf[100];
snprintf(buf, sizeof(buf), "%d",
current->NumLevelFiles(static_cast<int>(level)));
vstorage->NumLevelFiles(static_cast<int>(level)));
*value = buf;
return true;
}
......@@ -196,8 +197,8 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type,
for (int level = 0; level < number_levels_; level++) {
snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level,
current->NumLevelFiles(level),
current->NumLevelBytes(level) / kMB);
vstorage->NumLevelFiles(level),
vstorage->NumLevelBytes(level) / kMB);
value->append(buf);
}
return true;
......@@ -229,7 +230,7 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type,
bool InternalStats::GetIntProperty(DBPropertyType property_type,
uint64_t* value, DBImpl* db) const {
Version* current = cfd_->current();
auto* vstorage = cfd_->current()->GetStorageInfo();
switch (property_type) {
case kNumImmutableMemTable:
......@@ -242,7 +243,7 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
case kCompactionPending:
// 1 if the system already determines at least one compacdtion is needed.
// 0 otherwise,
*value = (current->NeedsCompaction() ? 1 : 0);
*value = (vstorage->NeedsCompaction() ? 1 : 0);
return true;
case kBackgroundErrors:
// Accumulated number of errors in background flushes or compactions.
......@@ -270,7 +271,7 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type,
// Use estimated entries in tables + total entries in memtables.
*value = cfd_->mem()->GetNumEntries() +
cfd_->imm()->current()->GetTotalNumEntries() +
current->GetEstimatedActiveKeys();
vstorage->GetEstimatedActiveKeys();
return true;
#ifndef ROCKSDB_LITE
case kIsFileDeletionEnabled:
......@@ -365,24 +366,25 @@ void InternalStats::DumpDBStats(std::string* value) {
}
void InternalStats::DumpCFStats(std::string* value) {
Version* current = cfd_->current();
VersionStorageInfo* vstorage = cfd_->current()->GetStorageInfo();
int num_levels_to_check =
(cfd_->options()->compaction_style != kCompactionStyleUniversal &&
cfd_->options()->compaction_style != kCompactionStyleFIFO)
? current->NumberLevels() - 1
? vstorage->NumberLevels() - 1
: 1;
// Compaction scores are sorted base on its value. Restore them to the
// level order
std::vector<double> compaction_score(number_levels_, 0);
for (int i = 0; i < num_levels_to_check; ++i) {
compaction_score[current->compaction_level_[i]] =
current->compaction_score_[i];
compaction_score[vstorage->compaction_level_[i]] =
vstorage->compaction_score_[i];
}
// Count # of files being compacted for each level
std::vector<int> files_being_compacted(number_levels_, 0);
for (int level = 0; level < num_levels_to_check; ++level) {
for (auto* f : current->files_[level]) {
for (auto* f : vstorage->files_[level]) {
if (f->being_compacted) {
++files_being_compacted[level];
}
......@@ -405,7 +407,7 @@ void InternalStats::DumpCFStats(std::string* value) {
uint64_t total_stall_count = 0;
double total_stall_us = 0;
for (int level = 0; level < number_levels_; level++) {
int files = current->NumLevelFiles(level);
int files = vstorage->NumLevelFiles(level);
total_files += files;
total_files_being_compacted += files_being_compacted[level];
if (comp_stats_[level].micros > 0 || files > 0) {
......@@ -424,7 +426,7 @@ void InternalStats::DumpCFStats(std::string* value) {
stall_leveln_slowdown_hard_[level]);
stats_sum.Add(comp_stats_[level]);
total_file_size += current->NumLevelBytes(level);
total_file_size += vstorage->NumLevelBytes(level);
total_stall_us += stall_us;
total_stall_count += stalls;
total_slowdown_soft += stall_leveln_slowdown_soft_[level];
......@@ -439,10 +441,10 @@ void InternalStats::DumpCFStats(std::string* value) {
double w_amp = (comp_stats_[level].bytes_readn == 0) ? 0.0
: comp_stats_[level].bytes_written /
static_cast<double>(comp_stats_[level].bytes_readn);
PrintLevelStats(buf, sizeof(buf), "L" + std::to_string(level),
files, files_being_compacted[level], current->NumLevelBytes(level),
compaction_score[level], rw_amp, w_amp, stall_us, stalls,
comp_stats_[level]);
PrintLevelStats(buf, sizeof(buf), "L" + std::to_string(level), files,
files_being_compacted[level],
vstorage->NumLevelBytes(level), compaction_score[level],
rw_amp, w_amp, stall_us, stalls, comp_stats_[level]);
value->append(buf);
}
}
......
此差异已折叠。
此差异已折叠。
......@@ -1125,7 +1125,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
int max = -1;
auto default_cfd = versions.GetColumnFamilySet()->GetDefault();
for (int i = 0; i < default_cfd->NumberLevels(); i++) {
if (default_cfd->current()->NumLevelFiles(i)) {
if (default_cfd->current()->GetStorageInfo()->NumLevelFiles(i)) {
max = i;
}
}
......
......@@ -104,28 +104,29 @@ Status CompactedDBImpl::Init(const Options& options) {
}
version_ = cfd_->GetSuperVersion()->current;
user_comparator_ = cfd_->user_comparator();
const LevelFilesBrief& l0 = version_->GetLevelFilesBrief(0);
auto* vstorage = version_->GetStorageInfo();
const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
// L0 should not have files
if (l0.num_files > 1) {
return Status::NotSupported("L0 contain more than 1 file");
}
if (l0.num_files == 1) {
if (version_->NumNonEmptyLevels() > 1) {
if (vstorage->NumNonEmptyLevels() > 1) {
return Status::NotSupported("Both L0 and other level contain files");
}
files_ = l0;
return Status::OK();
}
for (int i = 1; i < version_->NumNonEmptyLevels() - 1; ++i) {
if (version_->GetLevelFilesBrief(i).num_files > 0) {
for (int i = 1; i < vstorage->NumNonEmptyLevels() - 1; ++i) {
if (vstorage->LevelFilesBrief(i).num_files > 0) {
return Status::NotSupported("Other levels also contain files");
}
}
int level = version_->NumNonEmptyLevels() - 1;
if (version_->GetLevelFilesBrief(level).num_files > 0) {
files_ = version_->GetLevelFilesBrief(level);
int level = vstorage->NumNonEmptyLevels() - 1;
if (vstorage->LevelFilesBrief(level).num_files > 0) {
files_ = vstorage->LevelFilesBrief(level);
return Status::OK();
}
return Status::NotSupported("no file exists");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册