提交 d1e2bce4 编写于 作者: I Igor Canadi

CallFlushDuringCompaction

上级 b42ceb95
......@@ -2206,9 +2206,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
InstallSuperVersion(c->column_family_data(), deletion_state);
if (options_.allow_thread_local) {
c->column_family_data()->ResetThreadLocalSuperVersions();
}
Version::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, "Moved #%lld to level-%d %lld bytes %s: %s\n",
......@@ -2465,7 +2462,6 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact,
db_directory_.get());
}
//
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
......@@ -2493,6 +2489,25 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
return 0;
}
uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (cfd->imm()->IsFlushPending()) {
cfd->Ref();
FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer);
cfd->Unref();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
log_buffer->FlushBufferToLog();
return env_->NowMicros() - imm_start;
}
return 0;
}
Status DBImpl::ProcessKeyValueCompaction(
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
......@@ -2535,16 +2550,7 @@ Status DBImpl::ProcessKeyValueCompaction(
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (cfd->imm()->IsFlushPending()) {
FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer);
Slice key;
Slice value;
......@@ -2944,6 +2950,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
if (compaction_filter_v2) {
while (backup_input->Valid() && !shutting_down_.Acquire_Load() &&
!cfd->IsDropped()) {
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer);
Slice key = backup_input->key();
Slice value = backup_input->value();
......
......@@ -343,6 +343,13 @@ class DBImpl : public DB {
DeletionState& deletion_state,
LogBuffer* log_buffer);
// This function is called as part of compaction. It enables Flush process to
// preempt compaction, since it's higher prioirty
// Returns: micros spent executing
uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd,
DeletionState& deletion_state,
LogBuffer* log_buffer);
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册