From d1e2bce42d6c0389d1c17335b0d5acc9556332aa Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 7 Apr 2014 15:03:15 -0700 Subject: [PATCH] CallFlushDuringCompaction --- db/db_impl.cc | 40 ++++++++++++++++++++++++++-------------- db/db_impl.h | 7 +++++++ 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 2427f63ba..6580d54df 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2206,9 +2206,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, db_directory_.get()); InstallSuperVersion(c->column_family_data(), deletion_state); - if (options_.allow_thread_local) { - c->column_family_data()->ResetThreadLocalSuperVersions(); - } Version::LevelSummaryStorage tmp; LogToBuffer(log_buffer, "Moved #%lld to level-%d %lld bytes %s: %s\n", @@ -2465,7 +2462,6 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact, db_directory_.get()); } -// // Given a sequence number, return the sequence number of the // earliest snapshot that this sequence number is visible in. // The snapshots themselves are arranged in ascending order of @@ -2493,6 +2489,25 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( return 0; } +uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, + DeletionState& deletion_state, + LogBuffer* log_buffer) { + if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) { + const uint64_t imm_start = env_->NowMicros(); + mutex_.Lock(); + if (cfd->imm()->IsFlushPending()) { + cfd->Ref(); + FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer); + cfd->Unref(); + bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + } + mutex_.Unlock(); + log_buffer->FlushBufferToLog(); + return env_->NowMicros() - imm_start; + } + return 0; +} + Status DBImpl::ProcessKeyValueCompaction( SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot, @@ -2535,16 +2550,7 @@ Status DBImpl::ProcessKeyValueCompaction( // TODO(icanadi) this currently only checks if flush is necessary on // compacting column family. we should also check if flush is necessary on // other column families, too - if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) { - const uint64_t imm_start = env_->NowMicros(); - mutex_.Lock(); - if (cfd->imm()->IsFlushPending()) { - FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer); - bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary - } - mutex_.Unlock(); - imm_micros += (env_->NowMicros() - imm_start); - } + imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer); Slice key; Slice value; @@ -2944,6 +2950,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (compaction_filter_v2) { while (backup_input->Valid() && !shutting_down_.Acquire_Load() && !cfd->IsDropped()) { + // FLUSH preempts compaction + // TODO(icanadi) this currently only checks if flush is necessary on + // compacting column family. we should also check if flush is necessary on + // other column families, too + imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer); + Slice key = backup_input->key(); Slice value = backup_input->value(); diff --git a/db/db_impl.h b/db/db_impl.h index 4130c3454..e16bf3bb4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -343,6 +343,13 @@ class DBImpl : public DB { DeletionState& deletion_state, LogBuffer* log_buffer); + // This function is called as part of compaction. It enables Flush process to + // preempt compaction, since it's higher prioirty + // Returns: micros spent executing + uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd, + DeletionState& deletion_state, + LogBuffer* log_buffer); + // Call compaction filter if is_compaction_v2 is not true. Then iterate // through input and compact the kv-pairs Status ProcessKeyValueCompaction( -- GitLab