diff --git a/Makefile b/Makefile index d9a8feffa072e7161cfe811d4a079307468db901..12dbba15314c908eac16ffb2f84815f648e0745c 100644 --- a/Makefile +++ b/Makefile @@ -143,7 +143,8 @@ TESTS = \ cuckoo_table_builder_test \ cuckoo_table_reader_test \ cuckoo_table_db_test \ - write_batch_with_index_test + write_batch_with_index_test \ + flush_job_test TOOLS = \ sst_dump \ @@ -412,6 +413,9 @@ ttl_test: utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS) write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/column_family.cc b/db/column_family.cc index c47bbb12f9159f7e8c8db4828a45ebdb8969e61b..0127d10ad4a64485e51765a5e4670d4b9752cd16 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -20,6 +20,7 @@ #include #include "db/db_impl.h" +#include "db/job_context.h" #include "db/version_set.h" #include "db/internal_stats.h" #include "db/compaction_picker.h" @@ -71,15 +72,15 @@ ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd, ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { if (cfd_ != nullptr) { - DBImpl::DeletionState deletion_state; + JobContext job_context; mutex_->Lock(); if (cfd_->Unref()) { delete cfd_; } - db_->FindObsoleteFiles(deletion_state, false, true); + db_->FindObsoleteFiles(&job_context, false, true); mutex_->Unlock(); - if (deletion_state.HaveSomethingToDelete()) { - db_->PurgeObsoleteFiles(deletion_state); + if (job_context.HaveSomethingToDelete()) { + db_->PurgeObsoleteFiles(job_context); } } } diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 9f05b8d30794929cb5101b858eedc76ec8bd3bc9..89fe9c983e7474e0894b74285f58f55fbef8f3c1 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -42,7 +42,7 @@ Status DBImpl::DisableFileDeletions() { } Status DBImpl::EnableFileDeletions(bool force) { - DeletionState deletion_state; + JobContext job_context; bool should_purge_files = false; { MutexLock l(&mutex_); @@ -55,7 +55,7 @@ Status DBImpl::EnableFileDeletions(bool force) { if (disable_delete_obsolete_files_ == 0) { Log(db_options_.info_log, "File Deletions Enabled"); should_purge_files = true; - FindObsoleteFiles(deletion_state, true); + FindObsoleteFiles(&job_context, true); } else { Log(db_options_.info_log, "File Deletions Enable, but not really enabled. Counter: %d", @@ -63,7 +63,7 @@ Status DBImpl::EnableFileDeletions(bool force) { } } if (should_purge_files) { - PurgeObsoleteFiles(deletion_state); + PurgeObsoleteFiles(job_context); } LogFlush(db_options_.info_log); return Status::OK(); diff --git a/db/db_impl.cc b/db/db_impl.cc index dc5fc2394ee4320655ac803ff87464bc5f76e93c..c53a4bd92dd74e0c96ff3f1f79e9bdc42edb01bf 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -27,6 +27,7 @@ #include #include "db/builder.h" +#include "db/flush_job.h" #include "db/db_iter.h" #include "db/dbformat.h" #include "db/filename.h" @@ -412,12 +413,12 @@ DBImpl::~DBImpl() { // result, all "live" files can get deleted by accident. However, corrupted // manifest is recoverable by RepairDB(). if (opened_successfully_) { - DeletionState deletion_state; - FindObsoleteFiles(deletion_state, true); + JobContext job_context; + FindObsoleteFiles(&job_context, true); // manifest number starting from 2 - deletion_state.manifest_file_number = 1; - if (deletion_state.HaveSomethingToDelete()) { - PurgeObsoleteFiles(deletion_state); + job_context.manifest_file_number = 1; + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); } } @@ -531,8 +532,7 @@ void DBImpl::MaybeDumpStats() { // force = false -- don't force the full scan, except every // db_options_.delete_obsolete_files_period_micros // force = true -- force the full scan -void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, - bool force, +void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, bool no_full_scan) { mutex_.AssertHeld(); @@ -558,16 +558,16 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, } // get obsolete files - versions_->GetObsoleteFiles(&deletion_state.sst_delete_files); + versions_->GetObsoleteFiles(&job_context->sst_delete_files); // store the current filenum, lognum, etc - deletion_state.manifest_file_number = versions_->ManifestFileNumber(); - deletion_state.pending_manifest_file_number = + job_context->manifest_file_number = versions_->ManifestFileNumber(); + job_context->pending_manifest_file_number = versions_->PendingManifestFileNumber(); - deletion_state.log_number = versions_->MinLogNumber(); - deletion_state.prev_log_number = versions_->PrevLogNumber(); + job_context->log_number = versions_->MinLogNumber(); + job_context->prev_log_number = versions_->PrevLogNumber(); - if (!doing_the_full_scan && !deletion_state.HaveSomethingToDelete()) { + if (!doing_the_full_scan && !job_context->HaveSomethingToDelete()) { // avoid filling up sst_live if we're sure that we // are not going to do the full scan and that we don't have // anything to delete at the moment @@ -576,11 +576,9 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // don't delete live files for (auto pair : pending_outputs_) { - deletion_state.sst_live.emplace_back(pair.first, pair.second, 0); + job_context->sst_live.emplace_back(pair.first, pair.second, 0); } - /* deletion_state.sst_live.insert(pending_outputs_.begin(), - pending_outputs_.end());*/ - versions_->AddLiveFiles(&deletion_state.sst_live); + versions_->AddLiveFiles(&job_context->sst_live); if (doing_the_full_scan) { for (uint32_t path_id = 0; @@ -592,7 +590,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, &files); // Ignore errors for (std::string file : files) { // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes - deletion_state.candidate_files.emplace_back("/" + file, path_id); + job_context->candidate_files.emplace_back("/" + file, path_id); } } @@ -601,7 +599,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, std::vector log_files; env_->GetChildren(db_options_.wal_dir, &log_files); // Ignore errors for (std::string log_file : log_files) { - deletion_state.candidate_files.emplace_back(log_file, 0); + job_context->candidate_files.emplace_back(log_file, 0); } } // Add info log files in db_log_dir @@ -610,15 +608,15 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // Ignore errors env_->GetChildren(db_options_.db_log_dir, &info_log_files); for (std::string log_file : info_log_files) { - deletion_state.candidate_files.emplace_back(log_file, 0); + job_context->candidate_files.emplace_back(log_file, 0); } } } } namespace { -bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first, - const rocksdb::DBImpl::CandidateFileInfo& second) { +bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, + const JobContext::CandidateFileInfo& second) { if (first.file_name > second.file_name) { return true; } else if (first.file_name < second.file_name) { @@ -633,7 +631,7 @@ bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first, // belong to live files are posibly removed. Also, removes all the // files in sst_delete_files and log_delete_files. // It is not necessary to hold the mutex when invoking this method. -void DBImpl::PurgeObsoleteFiles(DeletionState& state) { +void DBImpl::PurgeObsoleteFiles(const JobContext& state) { // we'd better have sth to delete assert(state.HaveSomethingToDelete()); @@ -647,15 +645,14 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { // Now, convert live list to an unordered map, WITHOUT mutex held; // set is slow. std::unordered_map sst_live_map; - for (FileDescriptor& fd : state.sst_live) { + for (const FileDescriptor& fd : state.sst_live) { sst_live_map[fd.GetNumber()] = &fd; } - auto& candidate_files = state.candidate_files; - candidate_files.reserve( - candidate_files.size() + - state.sst_delete_files.size() + - state.log_delete_files.size()); + auto candidate_files = state.candidate_files; + candidate_files.reserve(candidate_files.size() + + state.sst_delete_files.size() + + state.log_delete_files.size()); // We may ignore the dbname when generating the file names. const char* kDumbDbName = ""; for (auto file : state.sst_delete_files) { @@ -784,10 +781,10 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { void DBImpl::DeleteObsoleteFiles() { mutex_.AssertHeld(); - DeletionState deletion_state; - FindObsoleteFiles(deletion_state, true); - if (deletion_state.HaveSomethingToDelete()) { - PurgeObsoleteFiles(deletion_state); + JobContext job_context; + FindObsoleteFiles(&job_context, true); + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); } } @@ -1480,159 +1477,23 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, return s; } -Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, - const MutableCFOptions& mutable_cf_options, - const autovector& mems, - VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer) { - mutex_.AssertHeld(); - const uint64_t start_micros = env_->NowMicros(); - FileMetaData meta; - - meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); - *filenumber = meta.fd.GetNumber(); - pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. - - const SequenceNumber newest_snapshot = snapshots_.GetNewest(); - const SequenceNumber earliest_seqno_in_memtable = - mems[0]->GetFirstSequenceNumber(); - Version* base = cfd->current(); - base->Ref(); // it is likely that we do not need this reference - Status s; - { - mutex_.Unlock(); - log_buffer->FlushBufferToLog(); - std::vector memtables; - ReadOptions ro; - ro.total_order_seek = true; - Arena arena; - for (MemTable* m : mems) { - Log(db_options_.info_log, - "[%s] Flushing memtable with next log file: %" PRIu64 "\n", - cfd->GetName().c_str(), m->GetNextLogNumber()); - memtables.push_back(m->NewIterator(ro, &arena)); - } - { - ScopedArenaIterator iter(NewMergingIterator(&cfd->internal_comparator(), - &memtables[0], - memtables.size(), &arena)); - Log(db_options_.info_log, - "[%s] Level-0 flush table #%" PRIu64 ": started", - cfd->GetName().c_str(), meta.fd.GetNumber()); - - s = BuildTable( - dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), - iter.get(), &meta, cfd->internal_comparator(), newest_snapshot, - earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()), - cfd->ioptions()->compression_opts, Env::IO_HIGH); - LogFlush(db_options_.info_log); - } - Log(db_options_.info_log, - "[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", - cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), - s.ToString().c_str()); - - if (!db_options_.disableDataSync) { - db_directory_->Fsync(); - } - mutex_.Lock(); - } - base->Unref(); - - // re-acquire the most current version - base = cfd->current(); - - // There could be multiple threads writing to its own level-0 file. - // The pending_outputs cannot be cleared here, otherwise this newly - // created file might not be considered as a live-file by another - // compaction thread that is concurrently deleting obselete files. - // The pending_outputs can be cleared only after the new version is - // committed so that other threads can recognize this file as a - // valid one. - // pending_outputs_.erase(meta.number); - - // Note that if file_size is zero, the file has been deleted and - // should not be added to the manifest. - int level = 0; - if (s.ok() && meta.fd.GetFileSize() > 0) { - const Slice min_user_key = meta.smallest.user_key(); - const Slice max_user_key = meta.largest.user_key(); - // if we have more than 1 background thread, then we cannot - // insert files directly into higher levels because some other - // threads could be concurrently producing compacted files for - // that key range. - if (base != nullptr && db_options_.max_background_compactions <= 1 && - db_options_.max_background_flushes == 0 && - cfd->ioptions()->compaction_style == kCompactionStyleLevel) { - level = base->PickLevelForMemTableOutput( - mutable_cf_options, min_user_key, max_user_key); - } - edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), - meta.fd.GetFileSize(), meta.smallest, meta.largest, - meta.smallest_seqno, meta.largest_seqno); - } - - InternalStats::CompactionStats stats(1); - stats.micros = env_->NowMicros() - start_micros; - stats.bytes_written = meta.fd.GetFileSize(); - cfd->internal_stats()->AddCompactionStats(level, stats); - cfd->internal_stats()->AddCFStats( - InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize()); - RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); - return s; -} - Status DBImpl::FlushMemTableToOutputFile( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer) { + bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(cfd->imm()->size() != 0); assert(cfd->imm()->IsFlushPending()); - // Save the contents of the earliest memtable as a new Table - uint64_t file_number; - autovector mems; - cfd->imm()->PickMemtablesToFlush(&mems); - if (mems.empty()) { - LogToBuffer(log_buffer, "[%s] Nothing in memtable to flush", - cfd->GetName().c_str()); - return Status::OK(); - } + FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, + env_options_, versions_.get(), &mutex_, &shutting_down_, + &pending_outputs_, snapshots_.GetNewest(), job_context, + log_buffer, db_directory_.get(), + GetCompressionFlush(*cfd->ioptions()), stats_); - // record the logfile_number_ before we release the mutex - // entries mems are (implicitly) sorted in ascending order by their created - // time. We will use the first memtable's `edit` to keep the meta info for - // this flush. - MemTable* m = mems[0]; - VersionEdit* edit = m->GetEdits(); - edit->SetPrevLogNumber(0); - // SetLogNumber(log_num) indicates logs with number smaller than log_num - // will no longer be picked up for recovery. - edit->SetLogNumber(mems.back()->GetNextLogNumber()); - edit->SetColumnFamily(cfd->GetID()); - - - // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(cfd, mutable_cf_options, mems, edit, - &file_number, log_buffer); - - if (s.ok() && - (shutting_down_.load(std::memory_order_acquire) || cfd->IsDropped())) { - s = Status::ShutdownInProgress( - "Database shutdown or Column family drop during flush"); - } - - if (!s.ok()) { - cfd->imm()->RollbackMemtableFlush(mems, file_number, &pending_outputs_); - } else { - // Replace immutable memtable with the generated Table - s = cfd->imm()->InstallMemtableFlushResults( - cfd, mutable_cf_options, mems, versions_.get(), &mutex_, - db_options_.info_log.get(), file_number, &pending_outputs_, - &deletion_state.memtables_to_free, db_directory_.get(), log_buffer); - } + Status s = flush_job.Run(); if (s.ok()) { - InstallSuperVersion(cfd, deletion_state, mutable_cf_options); + InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); if (madeProgress) { *madeProgress = 1; } @@ -1645,7 +1506,7 @@ Status DBImpl::FlushMemTableToOutputFile( while (alive_log_files_.size() && alive_log_files_.begin()->number < versions_->MinLogNumber()) { const auto& earliest = *alive_log_files_.begin(); - deletion_state.log_delete_files.push_back(earliest.number); + job_context->log_delete_files.push_back(earliest.number); total_log_size_ -= earliest.size; alive_log_files_.pop_front(); } @@ -2082,8 +1943,7 @@ void DBImpl::BGWorkCompaction(void* db) { reinterpret_cast(db)->BackgroundCallCompaction(); } -Status DBImpl::BackgroundFlush(bool* madeProgress, - DeletionState& deletion_state, +Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); @@ -2109,7 +1969,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, cfd->GetName().c_str(), db_options_.max_background_flushes - bg_flush_scheduled_); flush_status = FlushMemTableToOutputFile( - cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer); + cfd, mutable_cf_options, madeProgress, job_context, log_buffer); } if (call_status.ok() && !flush_status.ok()) { call_status = flush_status; @@ -2122,7 +1982,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, void DBImpl::BackgroundCallFlush() { bool madeProgress = false; - DeletionState deletion_state(true); + JobContext job_context(true); assert(bg_flush_scheduled_); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); @@ -2131,7 +1991,7 @@ void DBImpl::BackgroundCallFlush() { Status s; if (!shutting_down_.load(std::memory_order_acquire)) { - s = BackgroundFlush(&madeProgress, deletion_state, &log_buffer); + s = BackgroundFlush(&madeProgress, &job_context, &log_buffer); if (!s.ok()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -2154,9 +2014,9 @@ void DBImpl::BackgroundCallFlush() { // If !s.ok(), this means that Flush failed. In that case, we want // to delete all obsolete files and we force FindObsoleteFiles() - FindObsoleteFiles(deletion_state, !s.ok()); + FindObsoleteFiles(&job_context, !s.ok()); // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { + if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { mutex_.Unlock(); // Have to flush the info logs before bg_flush_scheduled_-- // because if bg_flush_scheduled_ becomes 0 and the lock is @@ -2164,8 +2024,8 @@ void DBImpl::BackgroundCallFlush() { // states of DB so info_log might not be available after that point. // It also applies to access other states that DB owns. log_buffer.FlushBufferToLog(); - if (deletion_state.HaveSomethingToDelete()) { - PurgeObsoleteFiles(deletion_state); + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); } mutex_.Lock(); } @@ -2189,7 +2049,7 @@ void DBImpl::BackgroundCallFlush() { void DBImpl::BackgroundCallCompaction() { bool madeProgress = false; - DeletionState deletion_state(true); + JobContext job_context(true); MaybeDumpStats(); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); @@ -2198,7 +2058,7 @@ void DBImpl::BackgroundCallCompaction() { assert(bg_compaction_scheduled_); Status s; if (!shutting_down_.load(std::memory_order_acquire)) { - s = BackgroundCompaction(&madeProgress, deletion_state, &log_buffer); + s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer); if (!s.ok()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -2221,12 +2081,12 @@ void DBImpl::BackgroundCallCompaction() { // If !s.ok(), this means that Compaction failed. In that case, we want // to delete all obsolete files we might have created and we force - // FindObsoleteFiles(). This is because deletion_state does not catch - // all created files if compaction failed. - FindObsoleteFiles(deletion_state, !s.ok()); + // FindObsoleteFiles(). This is because job_context does not + // catch all created files if compaction failed. + FindObsoleteFiles(&job_context, !s.ok()); // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { + if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { mutex_.Unlock(); // Have to flush the info logs before bg_compaction_scheduled_-- // because if bg_flush_scheduled_ becomes 0 and the lock is @@ -2234,8 +2094,8 @@ void DBImpl::BackgroundCallCompaction() { // states of DB so info_log might not be available after that point. // It also applies to access other states that DB owns. log_buffer.FlushBufferToLog(); - if (deletion_state.HaveSomethingToDelete()) { - PurgeObsoleteFiles(deletion_state); + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); } mutex_.Lock(); } @@ -2271,8 +2131,7 @@ void DBImpl::BackgroundCallCompaction() { } } -Status DBImpl::BackgroundCompaction(bool* madeProgress, - DeletionState& deletion_state, +Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) { *madeProgress = false; mutex_.AssertHeld(); @@ -2312,7 +2171,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, db_options_.max_background_compactions - bg_compaction_scheduled_); cfd->Ref(); flush_stat = FlushMemTableToOutputFile( - cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer); + cfd, mutable_cf_options, madeProgress, job_context, log_buffer); cfd->Unref(); if (!flush_stat.ok()) { if (is_manual) { @@ -2388,8 +2247,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, status = versions_->LogAndApply( c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, db_directory_.get()); - InstallSuperVersion(c->column_family_data(), deletion_state, - *c->mutable_cf_options()); + InstallSuperVersionBackground(c->column_family_data(), job_context, + *c->mutable_cf_options()); LogToBuffer(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); @@ -2407,8 +2266,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, *c->mutable_cf_options(), c->edit(), &mutex_, db_directory_.get()); // Use latest MutableCFOptions - InstallSuperVersion(c->column_family_data(), deletion_state, - *c->mutable_cf_options()); + InstallSuperVersionBackground(c->column_family_data(), job_context, + *c->mutable_cf_options()); Version::LevelSummaryStorage tmp; LogToBuffer( @@ -2423,8 +2282,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); - status = DoCompactionWork(compact, *c->mutable_cf_options(), - deletion_state, log_buffer); + status = DoCompactionWork(compact, *c->mutable_cf_options(), job_context, + log_buffer); CleanupCompaction(compact, status); c->ReleaseCompactionFiles(status); c->ReleaseInputs(); @@ -2694,9 +2553,9 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( return 0; } -uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, - const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, - LogBuffer* log_buffer) { +uint64_t DBImpl::CallFlushDuringCompaction( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + JobContext* job_context, LogBuffer* log_buffer) { if (db_options_.max_background_flushes > 0) { // flush thread will take care of this return 0; @@ -2706,8 +2565,8 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, mutex_.Lock(); if (cfd->imm()->IsFlushPending()) { cfd->Ref(); - FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr, - deletion_state, log_buffer); + FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr, job_context, + log_buffer); cfd->Unref(); bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary } @@ -2719,18 +2578,11 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, } Status DBImpl::ProcessKeyValueCompaction( - const MutableCFOptions& mutable_cf_options, - bool is_snapshot_supported, - SequenceNumber visible_at_tip, - SequenceNumber earliest_snapshot, - SequenceNumber latest_snapshot, - DeletionState& deletion_state, - bool bottommost_level, - int64_t& imm_micros, - Iterator* input, - CompactionState* compact, - bool is_compaction_v2, - int* num_output_records, + const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported, + SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot, + SequenceNumber latest_snapshot, JobContext* job_context, + bool bottommost_level, int64_t* imm_micros, Iterator* input, + CompactionState* compact, bool is_compaction_v2, int* num_output_records, LogBuffer* log_buffer) { assert(num_output_records != nullptr); @@ -2786,8 +2638,8 @@ Status DBImpl::ProcessKeyValueCompaction( // TODO(icanadi) this currently only checks if flush is necessary on // compacting column family. we should also check if flush is necessary on // other column families, too - imm_micros += CallFlushDuringCompaction( - cfd, mutable_cf_options, deletion_state, log_buffer); + (*imm_micros) += CallFlushDuringCompaction(cfd, mutable_cf_options, + job_context, log_buffer); Slice key; Slice value; @@ -3127,7 +2979,7 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact, Status DBImpl::DoCompactionWork(CompactionState* compact, const MutableCFOptions& mutable_cf_options, - DeletionState& deletion_state, + JobContext* job_context, LogBuffer* log_buffer) { assert(compact); compact->CleanupBatchBuffer(); @@ -3198,19 +3050,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (!compaction_filter_v2) { status = ProcessKeyValueCompaction( - mutable_cf_options, - is_snapshot_supported, - visible_at_tip, - earliest_snapshot, - latest_snapshot, - deletion_state, - bottommost_level, - imm_micros, - input.get(), - compact, - false, - &num_output_records, - log_buffer); + mutable_cf_options, is_snapshot_supported, visible_at_tip, + earliest_snapshot, latest_snapshot, job_context, bottommost_level, + &imm_micros, input.get(), compact, false, &num_output_records, + log_buffer); } else { // temp_backup_input always point to the start of the current buffer // temp_backup_input = backup_input; @@ -3231,7 +3074,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // compacting column family. we should also check if flush is necessary on // other column families, too imm_micros += CallFlushDuringCompaction(cfd, mutable_cf_options, - deletion_state, log_buffer); + job_context, log_buffer); Slice key = backup_input->key(); Slice value = backup_input->value(); @@ -3281,18 +3124,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // Done buffering for the current prefix. Spit it out to disk // Now just iterate through all the kv-pairs status = ProcessKeyValueCompaction( - mutable_cf_options, - is_snapshot_supported, - visible_at_tip, - earliest_snapshot, - latest_snapshot, - deletion_state, - bottommost_level, - imm_micros, - input.get(), - compact, - true, - &num_output_records, + mutable_cf_options, is_snapshot_supported, visible_at_tip, + earliest_snapshot, latest_snapshot, job_context, bottommost_level, + &imm_micros, input.get(), compact, true, &num_output_records, log_buffer); if (!status.ok()) { @@ -3319,18 +3153,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); status = ProcessKeyValueCompaction( - mutable_cf_options, - is_snapshot_supported, - visible_at_tip, - earliest_snapshot, - latest_snapshot, - deletion_state, - bottommost_level, - imm_micros, - input.get(), - compact, - true, - &num_output_records, + mutable_cf_options, is_snapshot_supported, visible_at_tip, + earliest_snapshot, latest_snapshot, job_context, bottommost_level, + &imm_micros, input.get(), compact, true, &num_output_records, log_buffer); compact->CleanupBatchBuffer(); @@ -3343,18 +3168,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); status = ProcessKeyValueCompaction( - mutable_cf_options, - is_snapshot_supported, - visible_at_tip, - earliest_snapshot, - latest_snapshot, - deletion_state, - bottommost_level, - imm_micros, - input.get(), - compact, - true, - &num_output_records, + mutable_cf_options, is_snapshot_supported, visible_at_tip, + earliest_snapshot, latest_snapshot, job_context, bottommost_level, + &imm_micros, input.get(), compact, true, &num_output_records, log_buffer); } // checking for compaction filter v2 @@ -3421,7 +3237,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (status.ok()) { status = InstallCompactionResults(compact, mutable_cf_options, log_buffer); - InstallSuperVersion(cfd, deletion_state, mutable_cf_options); + InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); } Version::LevelSummaryStorage tmp; LogToBuffer( @@ -3461,16 +3277,16 @@ static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); if (state->super_version->Unref()) { - DBImpl::DeletionState deletion_state; + JobContext job_context; state->mu->Lock(); state->super_version->Cleanup(); - state->db->FindObsoleteFiles(deletion_state, false, true); + state->db->FindObsoleteFiles(&job_context, false, true); state->mu->Unlock(); delete state->super_version; - if (deletion_state.HaveSomethingToDelete()) { - state->db->PurgeObsoleteFiles(deletion_state); + if (job_context.HaveSomethingToDelete()) { + state->db->PurgeObsoleteFiles(job_context); } } @@ -3511,25 +3327,27 @@ Status DBImpl::Get(const ReadOptions& read_options, return GetImpl(read_options, column_family, key, value); } -// DeletionState gets created and destructed outside of the lock -- we +// JobContext gets created and destructed outside of the lock -- +// we // use this convinently to: // * malloc one SuperVersion() outside of the lock -- new_superversion // * delete SuperVersion()s outside of the lock -- superversions_to_free // -// However, if InstallSuperVersion() gets called twice with the same, -// deletion_state, we can't reuse the SuperVersion() that got malloced because +// However, if InstallSuperVersion() gets called twice with the same +// job_context, we can't reuse the SuperVersion() that got +// malloced +// because // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersion( - ColumnFamilyData* cfd, DeletionState& deletion_state, +void DBImpl::InstallSuperVersionBackground( + ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); - SuperVersion* old_superversion = - InstallSuperVersion(cfd, deletion_state.new_superversion, - mutable_cf_options); - deletion_state.new_superversion = nullptr; - deletion_state.superversions_to_free.push_back(old_superversion); + SuperVersion* old_superversion = InstallSuperVersion( + cfd, job_context->new_superversion, mutable_cf_options); + job_context->new_superversion = nullptr; + job_context->superversions_to_free.push_back(old_superversion); } SuperVersion* DBImpl::InstallSuperVersion( @@ -4529,7 +4347,7 @@ Status DBImpl::DeleteFile(std::string name) { FileMetaData* metadata; ColumnFamilyData* cfd; VersionEdit edit; - DeletionState deletion_state(true); + JobContext job_context(true); { MutexLock l(&mutex_); status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd); @@ -4567,15 +4385,15 @@ Status DBImpl::DeleteFile(std::string name) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, db_directory_.get()); if (status.ok()) { - InstallSuperVersion(cfd, deletion_state, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionBackground(cfd, &job_context, + *cfd->GetLatestMutableCFOptions()); } - FindObsoleteFiles(deletion_state, false); - } // lock released here + FindObsoleteFiles(&job_context, false); + } // lock released here LogFlush(db_options_.info_log); // remove files outside the db-lock - if (deletion_state.HaveSomethingToDelete()) { - PurgeObsoleteFiles(deletion_state); + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); } { MutexLock l(&mutex_); diff --git a/db/db_impl.h b/db/db_impl.h index f730d6ba482866452ae9f459aa012fa4309439a4..15205d90b369211e61b8c6ede55ead025cc24658 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -35,6 +35,7 @@ #include "db/write_controller.h" #include "db/flush_scheduler.h" #include "db/write_thread.h" +#include "db/job_context.h" namespace rocksdb { @@ -223,88 +224,19 @@ class DBImpl : public DB { void TEST_EndWrite(void* w); #endif // NDEBUG - // Structure to store information for candidate files to delete. - struct CandidateFileInfo { - std::string file_name; - uint32_t path_id; - CandidateFileInfo(std::string name, uint32_t path) - : file_name(name), path_id(path) {} - bool operator==(const CandidateFileInfo& other) const { - return file_name == other.file_name && path_id == other.path_id; - } - }; - - // needed for CleanupIteratorState - struct DeletionState { - inline bool HaveSomethingToDelete() const { - return candidate_files.size() || - sst_delete_files.size() || - log_delete_files.size(); - } - - // a list of all files that we'll consider deleting - // (every once in a while this is filled up with all files - // in the DB directory) - std::vector candidate_files; - - // the list of all live sst files that cannot be deleted - std::vector sst_live; - - // a list of sst files that we need to delete - std::vector sst_delete_files; - - // a list of log files that we need to delete - std::vector log_delete_files; - - // a list of memtables to be free - autovector memtables_to_free; - - autovector superversions_to_free; - - SuperVersion* new_superversion; // if nullptr no new superversion - - // the current manifest_file_number, log_number and prev_log_number - // that corresponds to the set of files in 'live'. - uint64_t manifest_file_number, pending_manifest_file_number, log_number, - prev_log_number; - - explicit DeletionState(bool create_superversion = false) { - manifest_file_number = 0; - pending_manifest_file_number = 0; - log_number = 0; - prev_log_number = 0; - new_superversion = create_superversion ? new SuperVersion() : nullptr; - } - - ~DeletionState() { - // free pending memtables - for (auto m : memtables_to_free) { - delete m; - } - // free superversions - for (auto s : superversions_to_free) { - delete s; - } - // if new_superversion was not used, it will be non-nullptr and needs - // to be freed here - delete new_superversion; - } - }; - // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'candidate_files'. // If force == false and the last call was less than // db_options_.delete_obsolete_files_period_micros microseconds ago, - // it will not fill up the deletion_state - void FindObsoleteFiles(DeletionState& deletion_state, - bool force, + // it will not fill up the job_context + void FindObsoleteFiles(JobContext* job_context, bool force, bool no_full_scan = false); // Diffs the files listed in filenames and those that do not // belong to live files are posibly removed. Also, removes all the // files in sst_delete_files and log_delete_files. // It is not necessary to hold the mutex when invoking this method. - void PurgeObsoleteFiles(DeletionState& deletion_state); + void PurgeObsoleteFiles(const JobContext& background_contet); ColumnFamilyHandle* DefaultColumnFamily() const; @@ -347,9 +279,10 @@ class DBImpl : public DB { // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status FlushMemTableToOutputFile( - ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer); + Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, + const MutableCFOptions& mutable_cf_options, + bool* madeProgress, JobContext* job_context, + LogBuffer* log_buffer); // REQUIRES: log_numbers are sorted in ascending order Status RecoverLogFiles(const std::vector& log_numbers, @@ -362,11 +295,6 @@ class DBImpl : public DB { // concurrent flush memtables to storage. Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit); - Status WriteLevel0Table(ColumnFamilyData* cfd, - const MutableCFOptions& mutable_cf_options, - const autovector& mems, - VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer); - Status DelayWrite(uint64_t expiration_time); Status ScheduleFlushes(WriteContext* context); @@ -388,39 +316,32 @@ class DBImpl : public DB { static void BGWorkFlush(void* db); void BackgroundCallCompaction(); void BackgroundCallFlush(); - Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state, + Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer); - Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state, + Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer); void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact, const MutableCFOptions& mutable_cf_options, - DeletionState& deletion_state, - LogBuffer* log_buffer); + JobContext* job_context, LogBuffer* log_buffer); // This function is called as part of compaction. It enables Flush process to // preempt compaction, since it's higher prioirty // Returns: micros spent executing uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd, - const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, - LogBuffer* log_buffer); + const MutableCFOptions& mutable_cf_options, + JobContext* job_context, + LogBuffer* log_buffer); // Call compaction filter if is_compaction_v2 is not true. Then iterate // through input and compact the kv-pairs Status ProcessKeyValueCompaction( - const MutableCFOptions& mutable_cf_options, - bool is_snapshot_supported, - SequenceNumber visible_at_tip, - SequenceNumber earliest_snapshot, - SequenceNumber latest_snapshot, - DeletionState& deletion_state, - bool bottommost_level, - int64_t& imm_micros, - Iterator* input, - CompactionState* compact, - bool is_compaction_v2, - int* num_output_records, - LogBuffer* log_buffer); + const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported, + SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot, + SequenceNumber latest_snapshot, JobContext* job_context, + bool bottommost_level, int64_t* imm_micros, Iterator* input, + CompactionState* compact, bool is_compaction_v2, int* num_output_records, + LogBuffer* log_buffer); // Call compaction_filter_v2->Filter() on kv-pairs in compact void CallCompactionFilterV2(CompactionState* compact, @@ -624,11 +545,12 @@ class DBImpl : public DB { SequenceNumber* prev_snapshot); // Background threads call this function, which is just a wrapper around - // the cfd->InstallSuperVersion() function. Background threads carry - // deletion_state which can have new_superversion already allocated. - void InstallSuperVersion(ColumnFamilyData* cfd, - DeletionState& deletion_state, - const MutableCFOptions& mutable_cf_options); + // the InstallSuperVersion() function. Background threads carry + // job_context which can have new_superversion already + // allocated. + void InstallSuperVersionBackground( + ColumnFamilyData* cfd, JobContext* job_context, + const MutableCFOptions& mutable_cf_options); SuperVersion* InstallSuperVersion( ColumnFamilyData* cfd, SuperVersion* new_sv, diff --git a/db/flush_job.cc b/db/flush_job.cc new file mode 100644 index 0000000000000000000000000000000000000000..ff35e9a9a7c71f00967836b37b92200f1fcbce72 --- /dev/null +++ b/db/flush_job.cc @@ -0,0 +1,223 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/flush_job.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include +#include + +#include "db/builder.h" +#include "db/db_iter.h" +#include "db/dbformat.h" +#include "db/filename.h" +#include "db/log_reader.h" +#include "db/log_writer.h" +#include "db/memtable.h" +#include "db/memtable_list.h" +#include "db/merge_context.h" +#include "db/version_set.h" +#include "port/port.h" +#include "port/likely.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" +#include "rocksdb/table.h" +#include "table/block.h" +#include "table/block_based_table_factory.h" +#include "table/merger.h" +#include "table/table_builder.h" +#include "table/two_level_iterator.h" +#include "util/coding.h" +#include "util/logging.h" +#include "util/log_buffer.h" +#include "util/mutexlock.h" +#include "util/perf_context_imp.h" +#include "util/iostats_context_imp.h" +#include "util/stop_watch.h" +#include "util/sync_point.h" + +namespace rocksdb { + +FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, + const DBOptions& db_options, + const MutableCFOptions& mutable_cf_options, + const EnvOptions& env_options, VersionSet* versions, + port::Mutex* db_mutex, std::atomic* shutting_down, + FileNumToPathIdMap* pending_outputs, + SequenceNumber newest_snapshot, JobContext* job_context, + LogBuffer* log_buffer, Directory* db_directory, + CompressionType output_compression, Statistics* stats) + : dbname_(dbname), + cfd_(cfd), + db_options_(db_options), + mutable_cf_options_(mutable_cf_options), + env_options_(env_options), + versions_(versions), + db_mutex_(db_mutex), + shutting_down_(shutting_down), + pending_outputs_(pending_outputs), + newest_snapshot_(newest_snapshot), + job_context_(job_context), + log_buffer_(log_buffer), + db_directory_(db_directory), + output_compression_(output_compression), + stats_(stats) {} + +Status FlushJob::Run() { + // Save the contents of the earliest memtable as a new Table + uint64_t file_number; + autovector mems; + cfd_->imm()->PickMemtablesToFlush(&mems); + if (mems.empty()) { + LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush", + cfd_->GetName().c_str()); + return Status::OK(); + } + + // entries mems are (implicitly) sorted in ascending order by their created + // time. We will use the first memtable's `edit` to keep the meta info for + // this flush. + MemTable* m = mems[0]; + VersionEdit* edit = m->GetEdits(); + edit->SetPrevLogNumber(0); + // SetLogNumber(log_num) indicates logs with number smaller than log_num + // will no longer be picked up for recovery. + edit->SetLogNumber(mems.back()->GetNextLogNumber()); + edit->SetColumnFamily(cfd_->GetID()); + + // This will release and re-acquire the mutex. + Status s = WriteLevel0Table(mems, edit, &file_number); + + if (s.ok() && + (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { + s = Status::ShutdownInProgress( + "Database shutdown or Column family drop during flush"); + } + + if (!s.ok()) { + cfd_->imm()->RollbackMemtableFlush(mems, file_number, pending_outputs_); + } else { + // Replace immutable memtable with the generated Table + s = cfd_->imm()->InstallMemtableFlushResults( + cfd_, mutable_cf_options_, mems, versions_, db_mutex_, file_number, + pending_outputs_, &job_context_->memtables_to_free, db_directory_, + log_buffer_); + } + + return s; +} + +Status FlushJob::WriteLevel0Table(const autovector& mems, + VersionEdit* edit, uint64_t* filenumber) { + db_mutex_->AssertHeld(); + const uint64_t start_micros = db_options_.env->NowMicros(); + FileMetaData meta; + + meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); + *filenumber = meta.fd.GetNumber(); + // path 0 for level 0 file. + pending_outputs_->insert({meta.fd.GetNumber(), 0}); + + const SequenceNumber earliest_seqno_in_memtable = + mems[0]->GetFirstSequenceNumber(); + Version* base = cfd_->current(); + base->Ref(); // it is likely that we do not need this reference + Status s; + { + db_mutex_->Unlock(); + if (log_buffer_) { + log_buffer_->FlushBufferToLog(); + } + std::vector memtables; + ReadOptions ro; + ro.total_order_seek = true; + Arena arena; + for (MemTable* m : mems) { + Log(db_options_.info_log, + "[%s] Flushing memtable with next log file: %" PRIu64 "\n", + cfd_->GetName().c_str(), m->GetNextLogNumber()); + memtables.push_back(m->NewIterator(ro, &arena)); + } + { + ScopedArenaIterator iter(NewMergingIterator(&cfd_->internal_comparator(), + &memtables[0], + memtables.size(), &arena)); + Log(db_options_.info_log, + "[%s] Level-0 flush table #%" PRIu64 ": started", + cfd_->GetName().c_str(), meta.fd.GetNumber()); + + s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, + cfd_->table_cache(), iter.get(), &meta, + cfd_->internal_comparator(), newest_snapshot_, + earliest_seqno_in_memtable, output_compression_, + cfd_->ioptions()->compression_opts, Env::IO_HIGH); + LogFlush(db_options_.info_log); + } + Log(db_options_.info_log, + "[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", + cfd_->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), + s.ToString().c_str()); + + if (!db_options_.disableDataSync && db_directory_ != nullptr) { + db_directory_->Fsync(); + } + db_mutex_->Lock(); + } + base->Unref(); + + // re-acquire the most current version + base = cfd_->current(); + + // There could be multiple threads writing to its own level-0 file. + // The pending_outputs cannot be cleared here, otherwise this newly + // created file might not be considered as a live-file by another + // compaction thread that is concurrently deleting obselete files. + // The pending_outputs can be cleared only after the new version is + // committed so that other threads can recognize this file as a + // valid one. + // pending_outputs_.erase(meta.number); + + // Note that if file_size is zero, the file has been deleted and + // should not be added to the manifest. + int level = 0; + if (s.ok() && meta.fd.GetFileSize() > 0) { + const Slice min_user_key = meta.smallest.user_key(); + const Slice max_user_key = meta.largest.user_key(); + // if we have more than 1 background thread, then we cannot + // insert files directly into higher levels because some other + // threads could be concurrently producing compacted files for + // that key range. + if (base != nullptr && db_options_.max_background_compactions <= 1 && + db_options_.max_background_flushes == 0 && + cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { + level = base->PickLevelForMemTableOutput(mutable_cf_options_, + min_user_key, max_user_key); + } + edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); + } + + InternalStats::CompactionStats stats(1); + stats.micros = db_options_.env->NowMicros() - start_micros; + stats.bytes_written = meta.fd.GetFileSize(); + cfd_->internal_stats()->AddCompactionStats(level, stats); + cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, + meta.fd.GetFileSize()); + RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); + return s; +} + +} // namespace rocksdb diff --git a/db/flush_job.h b/db/flush_job.h new file mode 100644 index 0000000000000000000000000000000000000000..a5a40ce417b60b0b7d53822c597f879d5b4cecd5 --- /dev/null +++ b/db/flush_job.h @@ -0,0 +1,86 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "db/dbformat.h" +#include "db/log_writer.h" +#include "db/snapshot.h" +#include "db/column_family.h" +#include "db/version_edit.h" +#include "db/memtable_list.h" +#include "port/port.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/memtablerep.h" +#include "rocksdb/transaction_log.h" +#include "util/autovector.h" +#include "util/stop_watch.h" +#include "util/thread_local.h" +#include "util/scoped_arena_iterator.h" +#include "db/internal_stats.h" +#include "db/write_controller.h" +#include "db/flush_scheduler.h" +#include "db/write_thread.h" +#include "db/job_context.h" + +namespace rocksdb { + +class MemTable; +class TableCache; +class Version; +class VersionEdit; +class VersionSet; +class Arena; + +class FlushJob { + public: + // TODO(icanadi) make effort to reduce number of parameters here + // IMPORTANT: mutable_cf_options needs to be alive while FlushJob is alive + FlushJob(const std::string& dbname, ColumnFamilyData* cfd, + const DBOptions& db_options, + const MutableCFOptions& mutable_cf_options, + const EnvOptions& env_options, VersionSet* versions, + port::Mutex* db_mutex, std::atomic* shutting_down, + FileNumToPathIdMap* pending_outputs, SequenceNumber newest_snapshot, + JobContext* job_context, LogBuffer* log_buffer, + Directory* db_directory, CompressionType output_compression, + Statistics* stats); + ~FlushJob() {} + + Status Run(); + + private: + Status WriteLevel0Table(const autovector& mems, VersionEdit* edit, + uint64_t* filenumber); + const std::string& dbname_; + ColumnFamilyData* cfd_; + const DBOptions& db_options_; + const MutableCFOptions& mutable_cf_options_; + const EnvOptions& env_options_; + VersionSet* versions_; + port::Mutex* db_mutex_; + std::atomic* shutting_down_; + FileNumToPathIdMap* pending_outputs_; + SequenceNumber newest_snapshot_; + JobContext* job_context_; + LogBuffer* log_buffer_; + Directory* db_directory_; + CompressionType output_compression_; + Statistics* stats_; +}; + +} // namespace rocksdb diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..06852eedfc4002a98f19c52835a361a432d2b53a --- /dev/null +++ b/db/flush_job_test.cc @@ -0,0 +1,113 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/flush_job.h" +#include "db/column_family.h" +#include "db/version_set.h" +#include "rocksdb/cache.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { + +// TODO(icanadi) Mock out everything else: +// 1. VersionSet +// 2. TableBuilder +// 3. Memtable +class FlushJobTest { + public: + FlushJobTest() + : env_(Env::Default()), + dbname_(test::TmpDir() + "/flush_job_test"), + table_cache_(NewLRUCache(50000, 16, 8)), + versions_(new VersionSet(dbname_, &db_options_, env_options_, + table_cache_.get(), &write_controller_)), + shutting_down_(false) { + ASSERT_OK(env_->CreateDirIfMissing(dbname_)); + db_options_.db_paths.emplace_back(dbname_, + std::numeric_limits::max()); + // TODO(icanadi) Remove this once we mock out VersionSet + NewDB(); + std::vector column_families; + column_families.emplace_back(); + + ASSERT_OK(versions_->Recover(column_families, false)); + } + + void NewDB() { + VersionEdit new_db; + new_db.SetLogNumber(0); + new_db.SetNextFile(2); + new_db.SetLastSequence(0); + + const std::string manifest = DescriptorFileName(dbname_, 1); + unique_ptr file; + Status s = env_->NewWritableFile( + manifest, &file, env_->OptimizeForManifestWrite(env_options_)); + ASSERT_OK(s); + { + log::Writer log(std::move(file)); + std::string record; + new_db.EncodeTo(&record); + s = log.AddRecord(record); + } + ASSERT_OK(s); + // Make "CURRENT" file that points to the new manifest file. + s = SetCurrentFile(env_, dbname_, 1, nullptr); + } + + Env* env_; + std::string dbname_; + EnvOptions env_options_; + std::shared_ptr table_cache_; + WriteController write_controller_; + DBOptions db_options_; + ColumnFamilyOptions cf_options_; + std::unique_ptr versions_; + port::Mutex mutex_; + std::atomic shutting_down_; + FileNumToPathIdMap pending_outputs_; +}; + +TEST(FlushJobTest, Empty) { + JobContext job_context; + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + env_options_, versions_.get(), &mutex_, &shutting_down_, + &pending_outputs_, SequenceNumber(), &job_context, nullptr, + nullptr, kNoCompression, nullptr); + ASSERT_OK(flush_job.Run()); +} + +TEST(FlushJobTest, NonEmpty) { + JobContext job_context; + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + + auto new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(), + *cfd->GetLatestMutableCFOptions()); + new_mem->Ref(); + for (int i = 1; i < 10000; ++i) { + std::string key(std::to_string(i)); + std::string value("value" + std::to_string(i)); + new_mem->Add(SequenceNumber(i), kTypeValue, key, value); + } + cfd->imm()->Add(new_mem); + + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + env_options_, versions_.get(), &mutex_, &shutting_down_, + &pending_outputs_, SequenceNumber(), &job_context, nullptr, + nullptr, kNoCompression, nullptr); + mutex_.Lock(); + ASSERT_OK(flush_job.Run()); + mutex_.Unlock(); + // TODO(icanadi) once you have TableMock, verify that key-values are as + // expected +} + +} // namespace rocksdb + +int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); } diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index cd9299aa4b6899ec8680a68cd2f0ae8610deb39c..04b5b3b3462c384e460cff9323d2a01ffc9e9cd0 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -10,6 +10,7 @@ #include #include +#include "db/job_context.h" #include "db/db_impl.h" #include "db/db_iter.h" #include "db/column_family.h" @@ -155,14 +156,14 @@ void ForwardIterator::Cleanup(bool release_sv) { if (release_sv) { if (sv_ != nullptr && sv_->Unref()) { - DBImpl::DeletionState deletion_state; + JobContext job_context; db_->mutex_.Lock(); sv_->Cleanup(); - db_->FindObsoleteFiles(deletion_state, false, true); + db_->FindObsoleteFiles(&job_context, false, true); db_->mutex_.Unlock(); delete sv_; - if (deletion_state.HaveSomethingToDelete()) { - db_->PurgeObsoleteFiles(deletion_state); + if (job_context.HaveSomethingToDelete()) { + db_->PurgeObsoleteFiles(job_context); } } } diff --git a/db/job_context.h b/db/job_context.h new file mode 100644 index 0000000000000000000000000000000000000000..caf28f7d95b3456f1e856bde8ff07fa2cd10d3d6 --- /dev/null +++ b/db/job_context.h @@ -0,0 +1,87 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include + +#include "db/column_family.h" + +namespace rocksdb { + +class MemTable; + +struct JobContext { + inline bool HaveSomethingToDelete() const { + return candidate_files.size() || sst_delete_files.size() || + log_delete_files.size(); + } + + // Structure to store information for candidate files to delete. + struct CandidateFileInfo { + std::string file_name; + uint32_t path_id; + CandidateFileInfo(std::string name, uint32_t path) + : file_name(std::move(name)), path_id(path) {} + bool operator==(const CandidateFileInfo& other) const { + return file_name == other.file_name && path_id == other.path_id; + } + }; + + // a list of all files that we'll consider deleting + // (every once in a while this is filled up with all files + // in the DB directory) + std::vector candidate_files; + + // the list of all live sst files that cannot be deleted + std::vector sst_live; + + // a list of sst files that we need to delete + std::vector sst_delete_files; + + // a list of log files that we need to delete + std::vector log_delete_files; + + // a list of memtables to be free + autovector memtables_to_free; + + autovector superversions_to_free; + + SuperVersion* new_superversion; // if nullptr no new superversion + + // the current manifest_file_number, log_number and prev_log_number + // that corresponds to the set of files in 'live'. + uint64_t manifest_file_number, pending_manifest_file_number, log_number, + prev_log_number; + + explicit JobContext(bool create_superversion = false) { + manifest_file_number = 0; + pending_manifest_file_number = 0; + log_number = 0; + prev_log_number = 0; + new_superversion = create_superversion ? new SuperVersion() : nullptr; + } + + ~JobContext() { + // free pending memtables + for (auto m : memtables_to_free) { + delete m; + } + // free superversions + for (auto s : superversions_to_free) { + delete s; + } + // if new_superversion was not used, it will be non-nullptr and needs + // to be freed here + delete new_superversion; + } +}; + +} // namespace rocksdb diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 69325c748836a132230da7fcb766af4529a0e904..3c74e073c766fd76fcac881e0dcab1bbd8a55779 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -5,6 +5,11 @@ // #include "db/memtable_list.h" +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include #include #include "rocksdb/db.h" #include "db/memtable.h" @@ -161,10 +166,10 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& mems, VersionSet* vset, - port::Mutex* mu, Logger* info_log, uint64_t file_number, - FileNumToPathIdMap* pending_outputs, autovector* to_delete, - Directory* db_directory, LogBuffer* log_buffer) { + const autovector& mems, VersionSet* vset, port::Mutex* mu, + uint64_t file_number, FileNumToPathIdMap* pending_outputs, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer) { mu->AssertHeld(); // flush was sucessful @@ -194,8 +199,8 @@ Status MemTableList::InstallMemtableFlushResults( break; } - LogToBuffer(log_buffer, "[%s] Level-0 commit table #%lu started", - cfd->GetName().c_str(), (unsigned long)m->file_number_); + LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started", + cfd->GetName().c_str(), m->file_number_); // this can release and reacquire the mutex. s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory); @@ -209,10 +214,9 @@ Status MemTableList::InstallMemtableFlushResults( uint64_t mem_id = 1; // how many memtables has been flushed. do { if (s.ok()) { // commit new state - LogToBuffer(log_buffer, - "[%s] Level-0 commit table #%lu: memtable #%lu done", - cfd->GetName().c_str(), (unsigned long)m->file_number_, - (unsigned long)mem_id); + LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, mem_id); current_->Remove(m); assert(m->file_number_ > 0); @@ -226,10 +230,9 @@ Status MemTableList::InstallMemtableFlushResults( } } else { //commit failed. setup state so that we can flush again. - Log(info_log, - "Level-0 commit table #%lu: memtable #%lu failed", - (unsigned long)m->file_number_, - (unsigned long)mem_id); + LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + m->file_number_, mem_id); m->flush_completed_ = false; m->flush_in_progress_ = false; m->edit_.Clear(); diff --git a/db/memtable_list.h b/db/memtable_list.h index 5e16be5cb80c2009e6ed207dacd4967445fb2e09..9f499b83438ed221e58ce8e666f0581edcfb26eb 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -114,10 +114,10 @@ class MemTableList { // Commit a successful flush in the manifest file Status InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& m, VersionSet* vset, - port::Mutex* mu, Logger* info_log, uint64_t file_number, - FileNumToPathIdMap* pending_outputs, autovector* to_delete, - Directory* db_directory, LogBuffer* log_buffer); + const autovector& m, VersionSet* vset, port::Mutex* mu, + uint64_t file_number, FileNumToPathIdMap* pending_outputs, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/port/stack_trace.cc b/port/stack_trace.cc index 296b1f6209c9040f1b8153634408a7645f889a50..1aeb5f7b5b4c47558321b61f68f9ca3f9d7c62e1 100644 --- a/port/stack_trace.cc +++ b/port/stack_trace.cc @@ -74,7 +74,7 @@ void PrintStackTraceLine(const char* symbol, void* frame) { // out source to atos, for the address translation const int kLineMax = 256; char cmd[kLineMax]; - snprintf(cmd, kLineMax, "xcrun atos %p -p %d 2>&1", frame, pid); + snprintf(cmd, kLineMax, "xcrun atos -d %p -p %d 2>&1", frame, pid); auto f = popen(cmd, "r"); if (f) { char line[kLineMax];