提交 10912def 编写于 作者: C chenchanglong

alpha version

上级 53e86e82
......@@ -953,21 +953,37 @@ void ColumnFamilyData::CreateNewMemtable(
}
bool ColumnFamilyData::NeedsCompaction() const {
return !current_->storage_info()->IsPickFail() &&
return !current_->storage_info()->IsPickCompactionFail() &&
compaction_picker_->NeedsCompaction(current_->storage_info());
}
bool ColumnFamilyData::NeedsGarbageCollection() const {
return !current_->storage_info()->IsPickGarbageCollectionFail() &&
compaction_picker_->NeedsGarbageCollection(current_->storage_info());
}
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, current_->storage_info(), log_buffer);
if (result != nullptr) {
result->SetInputVersion(current_);
if (result->compaction_type() != kGarbageCollection) {
result->set_compaction_load(current_->GetCompactionLoad());
}
result->set_compaction_load(current_->GetCompactionLoad());
} else {
current_->storage_info()->SetPickCompactionFail();
}
return result;
}
Compaction* ColumnFamilyData::PickGarbageCollection(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickGarbageCollection(
GetName(), mutable_options, current_->storage_info(), log_buffer);
if (result != nullptr) {
result->SetInputVersion(current_);
result->set_gc_load(current_->GetGarbageCollectionLoad());
} else {
current_->storage_info()->SetPickFail();
current_->storage_info()->SetPickGarbageCollectionFail();
}
return result;
}
......
......@@ -271,10 +271,13 @@ class ColumnFamilyData {
// See documentation in compaction_picker.h
// REQUIRES: DB mutex held
bool NeedsCompaction() const;
bool NeedsGarbageCollection() const;
// REQUIRES: DB mutex held
Compaction* PickCompaction(const MutableCFOptions& mutable_options,
LogBuffer* log_buffer);
Compaction* PickGarbageCollection(const MutableCFOptions& mutable_options,
LogBuffer* log_buffer);
// Check if the passed range overlap with any running compactions.
// REQUIRES: DB mutex held
bool RangeOverlapWithCompaction(const Slice& smallest_user_key,
......
......@@ -1267,6 +1267,18 @@ bool LevelCompactionPicker::NeedsCompaction(
return false;
}
bool CompactionPicker::NeedsGarbageCollection(
const VersionStorageInfo* vstorage) const {
if (!vstorage->ExpiredTtlFiles().empty()) {
return true;
}
if (!vstorage->LevelFiles(-1).empty()) {
return true;
}
return false;
}
namespace {
// A class to build a leveled compaction step-by-step.
class LevelCompactionBuilder {
......
......@@ -49,9 +49,9 @@ class CompactionPicker {
// Pick compaction which level has map or link sst
Compaction* PickGarbageCollection(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer);
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer);
virtual void InitFilesBeingCompact(
const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage,
......@@ -84,6 +84,8 @@ class CompactionPicker {
virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const = 0;
bool NeedsGarbageCollection(const VersionStorageInfo* vstorage) const;
// Sanitize the input set of compaction input files.
// When the input parameters do not describe a valid compaction, the
// function will try to fix the input_files by adding necessary
......
......@@ -190,6 +190,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
last_batch_group_size_(0),
unscheduled_flushes_(0),
unscheduled_compactions_(0),
unscheduled_garbage_collections_(0),
bg_bottom_compaction_scheduled_(0),
bg_compaction_scheduled_(0),
num_running_compactions_(0),
......@@ -381,6 +382,7 @@ Status DBImpl::ResumeImpl() {
if (s.ok()) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
SchedulePendingCompaction(cfd);
SchedulePendingGarbageCollection(cfd);
}
MaybeScheduleFlushOrCompaction();
}
......@@ -2113,6 +2115,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
->BottommostFilesMarkedForCompaction()
.empty()) {
SchedulePendingCompaction(cfd);
SchedulePendingGarbageCollection(cfd);
MaybeScheduleFlushOrCompaction();
}
}
......
......@@ -1080,9 +1080,11 @@ class DBImpl : public DB {
void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);
void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingGarbageCollection(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id);
static void BGWorkCompaction(void* arg);
static void BGWorkGarbageCollection(void* arg);
// Runs a pre-chosen universal compaction involving bottom level in a
// separate, bottom-pri thread pool.
static void BGWorkBottomCompaction(void* arg);
......@@ -1380,6 +1382,7 @@ class DBImpl : public DB {
std::deque<log::Writer*> logs_to_free_queue_;
int unscheduled_flushes_;
int unscheduled_compactions_;
int unscheduled_garbage_collections_;
// count how many background compactions are running or have been scheduled in
// the BOTTOM pool
......
......@@ -1845,6 +1845,17 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
return;
}
while (bg_garbage_collection_scheduled_ < bg_job_limits.max_garbage_collections
&& unscheduled_garbage_collections_ > 0){
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->prepicked_compaction = nullptr;
bg_garbage_collection_scheduled_++;
unscheduled_garbage_collections_--;
env_->Schedule(&DBImpl::BGWorkGarbageCollection, ca, Env::Priority::LOW,
this, &DBImpl::UnscheduleCallback);
}
while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
......@@ -1978,6 +1989,13 @@ void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
}
}
void DBImpl::SchedulePendingGarbageCollection(ColumnFamilyData* cfd) {
if (!cfd->queued_for_garbage_collection() && cfd->NeedsGarbageCollection()) {
AddToGarbageCollectionQueue(cfd);
++unscheduled_garbage_collections_;
}
}
void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id) {
mutex_.AssertHeld();
......@@ -2855,6 +2873,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions.
SchedulePendingCompaction(cfd);
SchedulePendingGarbageCollection(cfd);
MaybeScheduleFlushOrCompaction();
// Update max_total_in_memory_state_
......
......@@ -52,6 +52,7 @@ Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
vstorage->ComputeCompactionScore(*cfd->ioptions(),
*cfd->GetLatestMutableCFOptions());
SchedulePendingCompaction(cfd);
SchedulePendingGarbageCollection(cfd);
MaybeScheduleFlushOrCompaction();
}
return Status::OK();
......
......@@ -1175,7 +1175,8 @@ VersionStorageInfo::VersionStorageInfo(
current_num_samples_(0),
estimated_compaction_needed_bytes_(0),
finalized_(false),
is_pick_fail_(false),
is_pick_compaction_fail(false),
is_pick_garbage_collection_fail(false),
force_consistency_checks_(_force_consistency_checks) {
++files_; // level -1 used for dependence files
if (ref_vstorage != nullptr) {
......@@ -1852,7 +1853,7 @@ void VersionStorageInfo::ComputeCompactionScore(
}
}
}
is_pick_fail_ = false;
is_pick_compaction_fail = false;
ComputeFilesMarkedForCompaction();
ComputeBottommostFilesMarkedForCompaction();
if (mutable_cf_options.ttl > 0) {
......
......@@ -248,10 +248,16 @@ class VersionStorageInfo {
int level);
// Set picker compaction fail
void SetPickFail() { is_pick_fail_ = true; }
void SetPickCompactionFail() { is_pick_compaction_fail = true; }
// Is picker compaction fail
bool IsPickFail() const { return is_pick_fail_; }
bool IsPickCompactionFail() const { return is_pick_compaction_fail; }
// Set picker garbage collection fail
void SetPickGarbageCollectionFail() { is_pick_garbage_collection_fail = true; }
// Is picker garbage collection fail
bool IsPickGarbageCollectionFail() const { return is_pick_garbage_collection_fail; }
int num_levels() const { return num_levels_; }
......@@ -559,7 +565,8 @@ class VersionStorageInfo {
bool finalized_;
bool is_pick_fail_;
bool is_pick_compaction_fail;
bool is_pick_garbage_collection_fail;
// If set to true, we will run consistency checks even if RocksDB
// is compiled in release mode
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册