提交 0a3db28d 编写于 作者: A amytai 提交者: Facebook Github Bot

Disallow compactions if there isn't enough free space

Summary:
This diff handles cases where compaction causes an ENOSPC error.
This does not handle corner cases where another background job is started while compaction is running, and the other background job triggers ENOSPC, although we do allow the user to provision for these background jobs with SstFileManager::SetCompactionBufferSize.
It also does not handle the case where compaction has finished and some other background job independently triggers ENOSPC.

Usage: Functionality is inside SstFileManager. In particular, users should set SstFileManager::SetMaxAllowedSpaceUsage, which is the reference highwatermark for determining whether to cancel compactions.
Closes https://github.com/facebook/rocksdb/pull/3449

Differential Revision: D7016941

Pulled By: amytai

fbshipit-source-id: 8965ab8dd8b00972e771637a41b4e6c645450445
上级 20c508c1
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
* Remove CompactionEventListener. * Remove CompactionEventListener.
### New Features ### New Features
* SstFileManager now can cancel compactions if they will result in max space errors. SstFileManager users can also use SetCompactionBufferSize to specify how much space must be leftover during a compaction for auxiliary file functions such as logging and flushing.
* Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables. * Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables.
* If `ColumnFamilyOptions::max_subcompactions` is set greater than one, we now parallelize large manual level-based compactions. * If `ColumnFamilyOptions::max_subcompactions` is set greater than one, we now parallelize large manual level-based compactions.
* Add "rocksdb.live-sst-files-size" DB property to return total bytes of all SST files belong to the latest LSM tree. * Add "rocksdb.live-sst-files-size" DB property to return total bytes of all SST files belong to the latest LSM tree.
......
...@@ -886,7 +886,7 @@ class TestEnv : public EnvWrapper { ...@@ -886,7 +886,7 @@ class TestEnv : public EnvWrapper {
return Status::OK(); return Status::OK();
} }
private: private:
int close_count; int close_count;
}; };
...@@ -896,7 +896,7 @@ TEST_F(DBBasicTest, DBClose) { ...@@ -896,7 +896,7 @@ TEST_F(DBBasicTest, DBClose) {
ASSERT_OK(DestroyDB(dbname, options)); ASSERT_OK(DestroyDB(dbname, options));
DB* db = nullptr; DB* db = nullptr;
TestEnv *env = new TestEnv(); TestEnv* env = new TestEnv();
options.create_if_missing = true; options.create_if_missing = true;
options.env = env; options.env = env;
Status s = DB::Open(options, dbname, &db); Status s = DB::Open(options, dbname, &db);
......
...@@ -404,9 +404,7 @@ Status DBImpl::CloseHelper() { ...@@ -404,9 +404,7 @@ Status DBImpl::CloseHelper() {
return ret; return ret;
} }
Status DBImpl::CloseImpl() { Status DBImpl::CloseImpl() { return CloseHelper(); }
return CloseHelper();
}
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
if (!closed_) { if (!closed_) {
......
...@@ -378,7 +378,9 @@ class DBImpl : public DB { ...@@ -378,7 +378,9 @@ class DBImpl : public DB {
Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
// Wait for any compaction // Wait for any compaction
Status TEST_WaitForCompact(); // We add a bool parameter to wait for unscheduledCompactions_ == 0, but this
// is only for the special test of CancelledCompactions
Status TEST_WaitForCompact(bool waitUnscheduled = false);
// Return the maximum overlapping data (in bytes) at next level for any // Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1. // file at a level >= 1.
......
...@@ -1503,7 +1503,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, ...@@ -1503,7 +1503,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
if (made_progress || if (made_progress ||
(bg_compaction_scheduled_ == 0 && (bg_compaction_scheduled_ == 0 &&
bg_bottom_compaction_scheduled_ == 0) || bg_bottom_compaction_scheduled_ == 0) ||
HasPendingManualCompaction()) { HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
// signal if // signal if
// * made_progress -- need to wakeup DelayWrite // * made_progress -- need to wakeup DelayWrite
// * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
...@@ -1566,6 +1566,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ...@@ -1566,6 +1566,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// InternalKey manual_end_storage; // InternalKey manual_end_storage;
// InternalKey* manual_end = &manual_end_storage; // InternalKey* manual_end = &manual_end_storage;
#ifndef ROCKSDB_LITE
bool sfm_bookkeeping = false;
#endif // ROCKSDB_LITE
if (is_manual) { if (is_manual) {
ManualCompactionState* m = manual_compaction; ManualCompactionState* m = manual_compaction;
assert(m->in_progress); assert(m->in_progress);
...@@ -1628,27 +1631,65 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ...@@ -1628,27 +1631,65 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction"); TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction"); TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
bool enough_room = true;
if (c != nullptr) { if (c != nullptr) {
// update statistics #ifndef ROCKSDB_LITE
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, auto sfm = static_cast<SstFileManagerImpl*>(
c->inputs(0)->size()); immutable_db_options_.sst_file_manager.get());
// There are three things that can change compaction score: if (sfm) {
// 1) When flush or compaction finish. This case is covered by enough_room = sfm->EnoughRoomForCompaction(c.get());
// InstallSuperVersionAndScheduleWork if (enough_room) {
// 2) When MutableCFOptions changes. This case is also covered by sfm_bookkeeping = true;
// InstallSuperVersionAndScheduleWork, because this is when the new }
// options take effect. }
// 3) When we Pick a new compaction, we "remove" those files being #endif // ROCKSDB_LITE
// compacted from the calculation, which then influences compaction if (!enough_room) {
// score. Here we check if we need the new compaction even without the // Just in case tests want to change the value of enough_room
// files that are currently being compacted. If we need another TEST_SYNC_POINT_CALLBACK(
// compaction, we might be able to execute it in parallel, so we add it "DBImpl::BackgroundCompaction():CancelledCompaction",
// to the queue and schedule a new thread. &enough_room);
if (cfd->NeedsCompaction()) { }
// Yes, we need more compactions! if (!enough_room) {
// Then don't do the compaction
c->ReleaseCompactionFiles(status);
c->column_family_data()
->current()
->storage_info()
->ComputeCompactionScore(*(c->immutable_cf_options()),
*(c->mutable_cf_options()));
ROCKS_LOG_BUFFER(log_buffer,
"Cancelled compaction because not enough room");
AddToCompactionQueue(cfd); AddToCompactionQueue(cfd);
++unscheduled_compactions_; ++unscheduled_compactions_;
MaybeScheduleFlushOrCompaction();
c.reset();
// Don't need to sleep here, because BackgroundCallCompaction
// will sleep if !s.ok()
status = Status::CompactionTooLarge();
} else {
// update statistics
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs(0)->size());
// There are three things that can change compaction score:
// 1) When flush or compaction finish. This case is covered by
// InstallSuperVersionAndScheduleWork
// 2) When MutableCFOptions changes. This case is also covered by
// InstallSuperVersionAndScheduleWork, because this is when the new
// options take effect.
// 3) When we Pick a new compaction, we "remove" those files being
// compacted from the calculation, which then influences compaction
// score. Here we check if we need the new compaction even without the
// files that are currently being compacted. If we need another
// compaction, we might be able to execute it in parallel, so we add
// it to the queue and schedule a new thread.
if (cfd->NeedsCompaction()) {
// Yes, we need more compactions!
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
MaybeScheduleFlushOrCompaction();
}
} }
} }
} }
...@@ -1808,6 +1849,16 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ...@@ -1808,6 +1849,16 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (c != nullptr) { if (c != nullptr) {
c->ReleaseCompactionFiles(status); c->ReleaseCompactionFiles(status);
*made_progress = true; *made_progress = true;
#ifndef ROCKSDB_LITE
// Need to make sure SstFileManager does its bookkeeping
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm && sfm_bookkeeping) {
sfm->OnCompactionCompletion(c.get());
}
#endif // ROCKSDB_LITE
NotifyOnCompactionCompleted( NotifyOnCompactionCompleted(
c->column_family_data(), c.get(), status, c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id); compaction_job_stats, job_context->job_id);
...@@ -1815,7 +1866,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ...@@ -1815,7 +1866,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// this will unref its input_version and column_family_data // this will unref its input_version and column_family_data
c.reset(); c.reset();
if (status.ok()) { if (status.ok() || status.IsCompactionTooLarge()) {
// Done // Done
} else if (status.IsShutdownInProgress()) { } else if (status.IsShutdownInProgress()) {
// Ignore compaction errors found during shutting down // Ignore compaction errors found during shutting down
......
...@@ -117,7 +117,7 @@ Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) { ...@@ -117,7 +117,7 @@ Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) {
return WaitForFlushMemTable(cfd); return WaitForFlushMemTable(cfd);
} }
Status DBImpl::TEST_WaitForCompact() { Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {
// Wait until the compaction completes // Wait until the compaction completes
// TODO: a bug here. This function actually does not necessarily // TODO: a bug here. This function actually does not necessarily
...@@ -126,7 +126,8 @@ Status DBImpl::TEST_WaitForCompact() { ...@@ -126,7 +126,8 @@ Status DBImpl::TEST_WaitForCompact() {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) && bg_flush_scheduled_ ||
(wait_unscheduled && unscheduled_compactions_)) &&
bg_error_.ok()) { bg_error_.ok()) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
......
...@@ -540,6 +540,52 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) { ...@@ -540,6 +540,52 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
ASSERT_NOK(Flush()); ASSERT_NOK(Flush());
} }
TEST_F(DBSSTTest, CancellingCompactionsWorks) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.level0_file_num_compaction_trigger = 2;
DestroyAndReopen(options);
int cancelled_compaction = 0;
int completed_compactions = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) {
cancelled_compaction++;
sfm->SetMaxAllowedSpaceUsage(0);
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
[&](void* arg) { completed_compactions++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
// Generate a file containing 10 keys.
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
}
ASSERT_OK(Flush());
uint64_t total_file_size = 0;
auto files_in_db = GetAllSSTFiles(&total_file_size);
// Set the maximum allowed space usage to the current total size
sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
// Generate another file to trigger compaction.
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
}
ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact(true);
ASSERT_GT(cancelled_compaction, 0);
ASSERT_GT(completed_compactions, 0);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) { TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
// This test will set a maximum allowed space for the DB, then it will // This test will set a maximum allowed space for the DB, then it will
// keep filling the DB until the limit is reached and bg_error_ is set. // keep filling the DB until the limit is reached and bg_error_ is set.
...@@ -568,6 +614,12 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) { ...@@ -568,6 +614,12 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
estimate_multiplier++; // used in the main loop assert estimate_multiplier++; // used in the main loop assert
}); });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) {
bool* enough_room = static_cast<bool*>(arg);
*enough_room = true;
});
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached", "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached",
[&](void* arg) { [&](void* arg) {
......
...@@ -54,6 +54,5 @@ void DumpMallocStats(std::string* stats) { ...@@ -54,6 +54,5 @@ void DumpMallocStats(std::string* stats) {
#else #else
void DumpMallocStats(std::string*) {} void DumpMallocStats(std::string*) {}
#endif // ROCKSDB_JEMALLOC #endif // ROCKSDB_JEMALLOC
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
...@@ -73,7 +73,7 @@ RandomAccessFile::~RandomAccessFile() { ...@@ -73,7 +73,7 @@ RandomAccessFile::~RandomAccessFile() {
WritableFile::~WritableFile() { WritableFile::~WritableFile() {
} }
Logger::~Logger() { } Logger::~Logger() {}
Status Logger::Close() { Status Logger::Close() {
if (!closed_) { if (!closed_) {
......
...@@ -288,9 +288,7 @@ class HdfsLogger : public Logger { ...@@ -288,9 +288,7 @@ class HdfsLogger : public Logger {
} }
protected: protected:
virtual Status CloseImpl() override { virtual Status CloseImpl() override { return HdfsCloseHelper(); }
return HdfsCloseHelper();
}
public: public:
HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
......
...@@ -1514,48 +1514,46 @@ class TestEnv : public EnvWrapper { ...@@ -1514,48 +1514,46 @@ class TestEnv : public EnvWrapper {
explicit TestEnv() : EnvWrapper(Env::Default()), explicit TestEnv() : EnvWrapper(Env::Default()),
close_count(0) { } close_count(0) { }
class TestLogger : public Logger { class TestLogger : public Logger {
public: public:
using Logger::Logv; using Logger::Logv;
TestLogger(TestEnv *env_ptr) : Logger() { env = env_ptr; } TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger() { ~TestLogger() {
if (!closed_) { if (!closed_) {
CloseHelper(); CloseHelper();
} }
} }
virtual void Logv(const char* /*format*/, va_list /*ap*/) override{}; virtual void Logv(const char* format, va_list ap) override{};
protected:
virtual Status CloseImpl() override {
return CloseHelper();
}
private:
Status CloseHelper() {
env->CloseCountInc();;
return Status::OK();
}
TestEnv *env;
};
void CloseCountInc() { close_count++; }
int GetCloseCount() { return close_count; } protected:
virtual Status CloseImpl() override { return CloseHelper(); }
virtual Status NewLogger(const std::string& /*fname*/, private:
shared_ptr<Logger>* result) { Status CloseHelper() {
result->reset(new TestLogger(this)); env->CloseCountInc();;
return Status::OK(); return Status::OK();
} }
TestEnv* env;
};
private: void CloseCountInc() { close_count++; }
int close_count;
};
class EnvTest : public testing::Test { int GetCloseCount() { return close_count; }
virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) {
result->reset(new TestLogger(this));
return Status::OK();
}
private:
int close_count;
}; };
class EnvTest : public testing::Test {};
TEST_F(EnvTest, Close) { TEST_F(EnvTest, Close) {
TestEnv *env = new TestEnv(); TestEnv* env = new TestEnv();
std::shared_ptr<Logger> logger; std::shared_ptr<Logger> logger;
Status s; Status s;
...@@ -1577,7 +1575,6 @@ TEST_F(EnvTest, Close) { ...@@ -1577,7 +1575,6 @@ TEST_F(EnvTest, Close) {
delete env; delete env;
} }
INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam, INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam,
::testing::Values(std::pair<Env*, bool>(Env::Default(), ::testing::Values(std::pair<Env*, bool>(Env::Default(),
false))); false)));
......
...@@ -52,9 +52,7 @@ class PosixLogger : public Logger { ...@@ -52,9 +52,7 @@ class PosixLogger : public Logger {
std::atomic<bool> flush_pending_; std::atomic<bool> flush_pending_;
protected: protected:
virtual Status CloseImpl() override { virtual Status CloseImpl() override { return PosixCloseHelper(); }
return PosixCloseHelper();
}
public: public:
PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env, PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env,
......
...@@ -35,12 +35,22 @@ class SstFileManager { ...@@ -35,12 +35,22 @@ class SstFileManager {
// thread-safe. // thread-safe.
virtual void SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) = 0; virtual void SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) = 0;
// Set the amount of buffer room each compaction should be able to leave.
// In other words, at its maximum disk space consumption, the compaction
// should still leave compaction_buffer_size available on the disk so that
// other background functions may continue, such as logging and flushing.
virtual void SetCompactionBufferSize(uint64_t compaction_buffer_size) = 0;
// Return true if the total size of SST files exceeded the maximum allowed // Return true if the total size of SST files exceeded the maximum allowed
// space usage. // space usage.
// //
// thread-safe. // thread-safe.
virtual bool IsMaxAllowedSpaceReached() = 0; virtual bool IsMaxAllowedSpaceReached() = 0;
// Returns true if the total size of SST files as well as estimated size
// of ongoing compactions exceeds the maximums allowed space usage.
virtual bool IsMaxAllowedSpaceReachedIncludingCompactions() = 0;
// Return the total size of all tracked files. // Return the total size of all tracked files.
// thread-safe // thread-safe
virtual uint64_t GetTotalSize() = 0; virtual uint64_t GetTotalSize() = 0;
......
...@@ -58,7 +58,8 @@ class Status { ...@@ -58,7 +58,8 @@ class Status {
kAborted = 10, kAborted = 10,
kBusy = 11, kBusy = 11,
kExpired = 12, kExpired = 12,
kTryAgain = 13 kTryAgain = 13,
kCompactionTooLarge = 14
}; };
Code code() const { return code_; } Code code() const { return code_; }
...@@ -162,6 +163,14 @@ class Status { ...@@ -162,6 +163,14 @@ class Status {
return Status(kTryAgain, msg, msg2); return Status(kTryAgain, msg, msg2);
} }
static Status CompactionTooLarge(SubCode msg = kNone) {
return Status(kCompactionTooLarge, msg);
}
static Status CompactionTooLarge(const Slice& msg,
const Slice& msg2 = Slice()) {
return Status(kCompactionTooLarge, msg, msg2);
}
static Status NoSpace() { return Status(kIOError, kNoSpace); } static Status NoSpace() { return Status(kIOError, kNoSpace); }
static Status NoSpace(const Slice& msg, const Slice& msg2 = Slice()) { static Status NoSpace(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIOError, kNoSpace, msg, msg2); return Status(kIOError, kNoSpace, msg, msg2);
...@@ -221,6 +230,9 @@ class Status { ...@@ -221,6 +230,9 @@ class Status {
// re-attempted. // re-attempted.
bool IsTryAgain() const { return code() == kTryAgain; } bool IsTryAgain() const { return code() == kTryAgain; }
// Returns true iff the status indicates the proposed compaction is too large
bool IsCompactionTooLarge() const { return code() == kCompactionTooLarge; }
// Returns true iff the status indicates a NoSpace error // Returns true iff the status indicates a NoSpace error
// This is caused by an I/O error returning the specific "out of space" // This is caused by an I/O error returning the specific "out of space"
// error condition. Stricto sensu, an NoSpace error is an I/O error // error condition. Stricto sensu, an NoSpace error is an I/O error
......
...@@ -22,6 +22,8 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<Logger> logger, ...@@ -22,6 +22,8 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<Logger> logger,
: env_(env), : env_(env),
logger_(logger), logger_(logger),
total_files_size_(0), total_files_size_(0),
compaction_buffer_size_(0),
cur_compactions_reserved_size_(0),
max_allowed_space_(0), max_allowed_space_(0),
delete_scheduler_(env, rate_bytes_per_sec, logger.get(), this, delete_scheduler_(env, rate_bytes_per_sec, logger.get(), this,
max_trash_db_ratio) {} max_trash_db_ratio) {}
...@@ -48,6 +50,18 @@ Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) { ...@@ -48,6 +50,18 @@ Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) {
return Status::OK(); return Status::OK();
} }
void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
MutexLock l(&mu_);
uint64_t size_added_by_compaction = 0;
for (size_t i = 0; i < c->num_input_levels(); i++) {
for (size_t j = 0; j < c->num_input_files(i); j++) {
FileMetaData* filemeta = c->input(i, j);
size_added_by_compaction += filemeta->fd.GetFileSize();
}
}
cur_compactions_reserved_size_ -= size_added_by_compaction;
}
Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
const std::string& new_path, const std::string& new_path,
uint64_t* file_size) { uint64_t* file_size) {
...@@ -68,6 +82,12 @@ void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) { ...@@ -68,6 +82,12 @@ void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) {
max_allowed_space_ = max_allowed_space; max_allowed_space_ = max_allowed_space;
} }
void SstFileManagerImpl::SetCompactionBufferSize(
uint64_t compaction_buffer_size) {
MutexLock l(&mu_);
compaction_buffer_size_ = compaction_buffer_size;
}
bool SstFileManagerImpl::IsMaxAllowedSpaceReached() { bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
MutexLock l(&mu_); MutexLock l(&mu_);
if (max_allowed_space_ <= 0) { if (max_allowed_space_ <= 0) {
...@@ -76,6 +96,43 @@ bool SstFileManagerImpl::IsMaxAllowedSpaceReached() { ...@@ -76,6 +96,43 @@ bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
return total_files_size_ >= max_allowed_space_; return total_files_size_ >= max_allowed_space_;
} }
bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
MutexLock l(&mu_);
if (max_allowed_space_ <= 0) {
return false;
}
return total_files_size_ + cur_compactions_reserved_size_ >=
max_allowed_space_;
}
bool SstFileManagerImpl::EnoughRoomForCompaction(Compaction* c) {
MutexLock l(&mu_);
uint64_t size_added_by_compaction = 0;
// First check if we even have the space to do the compaction
for (size_t i = 0; i < c->num_input_levels(); i++) {
for (size_t j = 0; j < c->num_input_files(i); j++) {
FileMetaData* filemeta = c->input(i, j);
size_added_by_compaction += filemeta->fd.GetFileSize();
}
}
if (max_allowed_space_ != 0 &&
(size_added_by_compaction + cur_compactions_reserved_size_ +
total_files_size_ + compaction_buffer_size_ >
max_allowed_space_)) {
return false;
}
// Update cur_compactions_reserved_size_ so concurrent compaction
// don't max out space
cur_compactions_reserved_size_ += size_added_by_compaction;
return true;
}
uint64_t SstFileManagerImpl::GetCompactionsReservedSize() {
MutexLock l(&mu_);
return cur_compactions_reserved_size_;
}
uint64_t SstFileManagerImpl::GetTotalSize() { uint64_t SstFileManagerImpl::GetTotalSize() {
MutexLock l(&mu_); MutexLock l(&mu_);
return total_files_size_; return total_files_size_;
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include "port/port.h" #include "port/port.h"
#include "db/compaction.h"
#include "rocksdb/sst_file_manager.h" #include "rocksdb/sst_file_manager.h"
#include "util/delete_scheduler.h" #include "util/delete_scheduler.h"
...@@ -50,12 +51,29 @@ class SstFileManagerImpl : public SstFileManager { ...@@ -50,12 +51,29 @@ class SstFileManagerImpl : public SstFileManager {
// thread-safe. // thread-safe.
void SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) override; void SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) override;
void SetCompactionBufferSize(uint64_t compaction_buffer_size) override;
// Return true if the total size of SST files exceeded the maximum allowed // Return true if the total size of SST files exceeded the maximum allowed
// space usage. // space usage.
// //
// thread-safe. // thread-safe.
bool IsMaxAllowedSpaceReached() override; bool IsMaxAllowedSpaceReached() override;
bool IsMaxAllowedSpaceReachedIncludingCompactions() override;
// Returns true is there is enough (approximate) space for the specified
// compaction. Space is approximate because this function conservatively
// estimates how much space is currently being used by compactions (i.e.
// if a compaction has started, this function bumps the used space by
// the full compaction size).
bool EnoughRoomForCompaction(Compaction* c);
// Bookkeeping so total_file_sizes_ goes back to normal after compaction
// finishes
void OnCompactionCompletion(Compaction* c);
uint64_t GetCompactionsReservedSize();
// Return the total size of all tracked files. // Return the total size of all tracked files.
uint64_t GetTotalSize() override; uint64_t GetTotalSize() override;
...@@ -95,6 +113,11 @@ class SstFileManagerImpl : public SstFileManager { ...@@ -95,6 +113,11 @@ class SstFileManagerImpl : public SstFileManager {
port::Mutex mu_; port::Mutex mu_;
// The summation of the sizes of all files in tracked_files_ map // The summation of the sizes of all files in tracked_files_ map
uint64_t total_files_size_; uint64_t total_files_size_;
// Compactions should only execute if they can leave at least
// this amount of buffer space for logs and flushes
uint64_t compaction_buffer_size_;
// Estimated size of the current ongoing compactions
uint64_t cur_compactions_reserved_size_;
// A map containing all tracked files and there sizes // A map containing all tracked files and there sizes
// file_path => file_size // file_path => file_size
std::unordered_map<std::string, uint64_t> tracked_files_; std::unordered_map<std::string, uint64_t> tracked_files_;
......
...@@ -811,7 +811,7 @@ TEST_F(BackupableDBTest, NoDoubleCopy) { ...@@ -811,7 +811,7 @@ TEST_F(BackupableDBTest, NoDoubleCopy) {
test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_); test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
std::vector<std::string> should_have_written = { std::vector<std::string> should_have_written = {
"/shared/.00010.sst.tmp", "/shared/.00011.sst.tmp", "/shared/.00010.sst.tmp", "/shared/.00011.sst.tmp",
"/private/1.tmp/CURRENT", "/private/1.tmp/MANIFEST-01", "/private/1.tmp/CURRENT", "/private/1.tmp/MANIFEST-01",
"/private/1.tmp/00011.log", "/meta/.1.tmp"}; "/private/1.tmp/00011.log", "/meta/.1.tmp"};
AppendPath(backupdir_, should_have_written); AppendPath(backupdir_, should_have_written);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册