From 0e22badc089b3170b1ab1eba82f850eb44dc9314 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 3 Feb 2014 15:28:03 -0800 Subject: [PATCH] [column families] Iterator and MultiGet Summary: Support for different column families in Iterator and MultiGet code path. Test Plan: make check Reviewers: dhruba, haobo, kailiu, sdong CC: leveldb Differential Revision: https://reviews.facebook.net/D15849 --- db/column_family.cc | 5 +- db/db_impl.cc | 164 ++++++++++++++++++++++++----------------- db/db_impl.h | 15 ++-- db/db_impl_readonly.cc | 20 +++-- db/db_impl_readonly.h | 131 ++++++++++++++++---------------- db/tailing_iter.cc | 26 ++++--- db/tailing_iter.h | 5 +- 7 files changed, 202 insertions(+), 164 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 234d5e50d..7a3dfca87 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -32,9 +32,10 @@ SuperVersion* SuperVersion::Ref() { } bool SuperVersion::Unref() { - assert(refs > 0); // fetch_sub returns the previous value of ref - return refs.fetch_sub(1, std::memory_order_relaxed) == 1; + uint32_t previous_refs = refs.fetch_sub(1, std::memory_order_relaxed); + assert(previous_refs > 0); + return previous_refs == 1; } void SuperVersion::Cleanup() { diff --git a/db/db_impl.cc b/db/db_impl.cc index 9d9056907..763b89276 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1423,10 +1423,6 @@ int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) { return cfd->options()->level0_stop_writes_trigger; } -uint64_t DBImpl::CurrentVersionNumber() const { - return default_cfd_->GetSuperVersionNumber(); -} - Status DBImpl::Flush(const FlushOptions& options, const ColumnFamilyHandle& column_family) { mutex_.Lock(); @@ -2724,12 +2720,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) { } // namespace Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, - SequenceNumber* latest_snapshot) { - mutex_.Lock(); - *latest_snapshot = versions_->LastSequence(); - SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref(); - mutex_.Unlock(); - + ColumnFamilyData* cfd, + SuperVersion* super_version) { std::vector iterator_list; // Collect iterator for mutable mem iterator_list.push_back(super_version->mem->NewIterator(options)); @@ -2738,9 +2730,8 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, // Collect iterators for files in L0 - Ln super_version->current->AddIterators(options, storage_options_, &iterator_list); - Iterator* internal_iter = - NewMergingIterator(&default_cfd_->internal_comparator(), - &iterator_list[0], iterator_list.size()); + Iterator* internal_iter = NewMergingIterator( + &cfd->internal_comparator(), &iterator_list[0], iterator_list.size()); IterState* cleanup = new IterState(this, &mutex_, super_version); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); @@ -2749,18 +2740,20 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, } Iterator* DBImpl::TEST_NewInternalIterator() { - SequenceNumber ignored; - return NewInternalIterator(ReadOptions(), &ignored); + mutex_.Lock(); + SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref(); + mutex_.Unlock(); + return NewInternalIterator(ReadOptions(), default_cfd_, super_version); } std::pair DBImpl::GetTailingIteratorPair( - const ReadOptions& options, + const ReadOptions& options, ColumnFamilyData* cfd, uint64_t* superversion_number) { mutex_.Lock(); - SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref(); + SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); if (superversion_number != nullptr) { - *superversion_number = CurrentVersionNumber(); + *superversion_number = cfd->GetSuperVersionNumber(); } mutex_.Unlock(); @@ -2772,8 +2765,8 @@ std::pair DBImpl::GetTailingIteratorPair( std::vector list; super_version->imm->AddIterators(options, &list); super_version->current->AddIterators(options, storage_options_, &list); - Iterator* immutable_iter = NewMergingIterator( - &default_cfd_->internal_comparator(), &list[0], list.size()); + Iterator* immutable_iter = + NewMergingIterator(&cfd->internal_comparator(), &list[0], list.size()); // create a DBIter that only uses memtable content; see NewIterator() immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), @@ -2910,84 +2903,106 @@ std::vector DBImpl::MultiGet( StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false); SequenceNumber snapshot; + struct MultiGetColumnFamilyData { + SuperVersion* super_version; + Version::GetStats stats; + bool have_stat_update = false; + }; + std::unordered_map multiget_cf_data; + // fill up and allocate outside of mutex + for (auto cf : column_family) { + if (multiget_cf_data.find(cf.id) == multiget_cf_data.end()) { + multiget_cf_data.insert({cf.id, new MultiGetColumnFamilyData()}); + } + } + mutex_.Lock(); if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; } else { snapshot = versions_->LastSequence(); } - - SuperVersion* get_version = default_cfd_->GetSuperVersion()->Ref(); + for (auto mgd_iter : multiget_cf_data) { + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(mgd_iter.first); + assert(cfd != nullptr); + mgd_iter.second->super_version = cfd->GetSuperVersion()->Ref(); + } mutex_.Unlock(); - bool have_stat_update = false; - Version::GetStats stats; - // Contain a list of merge operations if merge occurs. MergeContext merge_context; // Note: this always resizes the values array - int numKeys = keys.size(); - std::vector statList(numKeys); - values->resize(numKeys); + size_t num_keys = keys.size(); + std::vector stat_list(num_keys); + values->resize(num_keys); // Keep track of bytes that we read for statistics-recording later - uint64_t bytesRead = 0; + uint64_t bytes_read = 0; // For each of the given keys, apply the entire "get" process as follows: // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. - for (int i=0; imem->Get(lkey, value, &s, merge_context, options_)) { + auto mgd_iter = multiget_cf_data.find(column_family[i].id); + assert(mgd_iter != multiget_cf_data.end()); + auto mgd = mgd_iter->second; + auto super_version = mgd->super_version; + if (super_version->mem->Get(lkey, value, &s, merge_context, options_)) { // Done - } else if (get_version->imm->Get(lkey, value, &s, merge_context, - options_)) { + } else if (super_version->imm->Get(lkey, value, &s, merge_context, + options_)) { // Done } else { - get_version->current->Get(options, lkey, value, &s, &merge_context, - &stats, options_); - have_stat_update = true; + super_version->current->Get(options, lkey, value, &s, &merge_context, + &mgd->stats, options_); + mgd->have_stat_update = true; } if (s.ok()) { - bytesRead += value->size(); + bytes_read += value->size(); } } - bool delete_get_version = false; - if (!options_.disable_seek_compaction && have_stat_update) { - mutex_.Lock(); - if (get_version->current->UpdateStats(stats)) { - MaybeScheduleFlushOrCompaction(); - } - if (get_version->Unref()) { - get_version->Cleanup(); - delete_get_version = true; + autovector superversions_to_delete; + + bool schedule_flush_or_compaction = false; + mutex_.Lock(); + for (auto mgd_iter : multiget_cf_data) { + auto mgd = mgd_iter.second; + if (!options_.disable_seek_compaction && mgd->have_stat_update) { + if (mgd->super_version->current->UpdateStats(mgd->stats)) { + schedule_flush_or_compaction = true; + } } - mutex_.Unlock(); - } else { - if (get_version->Unref()) { - mutex_.Lock(); - get_version->Cleanup(); - mutex_.Unlock(); - delete_get_version = true; + if (mgd->super_version->Unref()) { + mgd->super_version->Cleanup(); + superversions_to_delete.push_back(mgd->super_version); } } - if (delete_get_version) { - delete get_version; + if (schedule_flush_or_compaction) { + MaybeScheduleFlushOrCompaction(); + } + mutex_.Unlock(); + + for (auto td : superversions_to_delete) { + delete td; + } + for (auto mgd : multiget_cf_data) { + delete mgd.second; } RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS); - RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys); - RecordTick(options_.statistics.get(), NUMBER_MULTIGET_BYTES_READ, bytesRead); + RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, num_keys); + RecordTick(options_.statistics.get(), NUMBER_MULTIGET_BYTES_READ, bytes_read); - return statList; + return stat_list; } Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, @@ -3056,19 +3071,28 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, Iterator* DBImpl::NewIterator(const ReadOptions& options, const ColumnFamilyHandle& column_family) { - Iterator* iter; + SequenceNumber latest_snapshot = 0; + SuperVersion* super_version = nullptr; + mutex_.Lock(); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + assert(cfd != nullptr); + if (!options.tailing) { + super_version = cfd->GetSuperVersion()->Ref(); + latest_snapshot = versions_->LastSequence(); + } + mutex_.Unlock(); + Iterator* iter; if (options.tailing) { - iter = new TailingIterator(this, options, user_comparator()); + iter = new TailingIterator(this, options, cfd); } else { - SequenceNumber latest_snapshot; - iter = NewInternalIterator(options, &latest_snapshot); + iter = NewInternalIterator(options, cfd, super_version); iter = NewDBIterator( - &dbname_, env_, options_, user_comparator(), iter, - (options.snapshot != nullptr - ? reinterpret_cast(options.snapshot)->number_ - : latest_snapshot)); + &dbname_, env_, options_, cfd->user_comparator(), iter, + (options.snapshot != nullptr + ? reinterpret_cast(options.snapshot)->number_ + : latest_snapshot)); } if (options.prefix) { @@ -3529,6 +3553,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, value->clear(); MutexLock l(&mutex_); auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + assert(cfd != nullptr); return internal_stats_.GetProperty(property, value, cfd); } @@ -3538,7 +3563,10 @@ void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family, Version* v; { MutexLock l(&mutex_); - v = default_cfd_->current(); + auto cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + assert(cfd != nullptr); + v = cfd->current(); v->Ref(); } diff --git a/db/db_impl.h b/db/db_impl.h index 4f8cca1b8..7fb18880b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -257,10 +257,8 @@ class DBImpl : public DB { return internal_comparator_.user_comparator(); } - ColumnFamilyData* GetDefaultColumnFamily() { return default_cfd_; } - - Iterator* NewInternalIterator(const ReadOptions&, - SequenceNumber* latest_snapshot); + Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, + SuperVersion* super_version); private: friend class DB; @@ -367,16 +365,13 @@ class DBImpl : public DB { // hold the data set. Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1); - // Returns the current SuperVersion number. - uint64_t CurrentVersionNumber() const; - // Returns a pair of iterators (mutable-only and immutable-only) used - // internally by TailingIterator and stores CurrentVersionNumber() in + // internally by TailingIterator and stores cfd->GetSuperVersionNumber() in // *superversion_number. These iterators are always up-to-date, i.e. can // be used to read new data. std::pair GetTailingIteratorPair( - const ReadOptions& options, - uint64_t* superversion_number); + const ReadOptions& options, ColumnFamilyData* cfd, + uint64_t* superversion_number); // Constant after construction const InternalFilterPolicy internal_filter_policy_; diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index ab3a63d95..82d4b8cb0 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -29,6 +29,7 @@ #include "db/write_batch_internal.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/column_family.h" #include "rocksdb/status.h" #include "rocksdb/table.h" #include "rocksdb/merge_operator.h" @@ -57,7 +58,8 @@ Status DBImplReadOnly::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; SequenceNumber snapshot = versions_->LastSequence(); - SuperVersion* super_version = GetDefaultColumnFamily()->GetSuperVersion(); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + SuperVersion* super_version = cfd->GetSuperVersion(); MergeContext merge_context; LookupKey lkey(key, snapshot); if (super_version->mem->Get(lkey, value, &s, merge_context, options_)) { @@ -69,14 +71,18 @@ Status DBImplReadOnly::Get(const ReadOptions& options, return s; } -Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options) { - SequenceNumber latest_snapshot; - Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); +Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options, + const ColumnFamilyHandle& column_family) { + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + assert(cfd != nullptr); + SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); + SequenceNumber latest_snapshot = versions_->LastSequence(); + Iterator* internal_iter = NewInternalIterator(options, cfd, super_version); return NewDBIterator( - &dbname_, env_, options_, user_comparator(),internal_iter, + &dbname_, env_, options_, user_comparator(), internal_iter, (options.snapshot != nullptr - ? reinterpret_cast(options.snapshot)->number_ - : latest_snapshot)); + ? reinterpret_cast(options.snapshot)->number_ + : latest_snapshot)); } Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 1a70180eb..218ae6f54 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -12,6 +12,8 @@ #include #include +#include +#include #include "db/dbformat.h" #include "db/log_writer.h" #include "db/snapshot.h" @@ -23,79 +25,80 @@ namespace rocksdb { class DBImplReadOnly : public DBImpl { -public: + public: DBImplReadOnly(const Options& options, const std::string& dbname); - virtual ~DBImplReadOnly(); + virtual ~DBImplReadOnly(); - // Implementations of the DB interface - using DB::Get; - virtual Status Get(const ReadOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, - std::string* value); + // Implementations of the DB interface + using DB::Get; + virtual Status Get(const ReadOptions& options, + const ColumnFamilyHandle& column_family, const Slice& key, + std::string* value); - // TODO: Implement ReadOnly MultiGet? + // TODO: Implement ReadOnly MultiGet? - using DBImpl::NewIterator; - virtual Iterator* NewIterator(const ReadOptions&); + using DBImpl::NewIterator; + virtual Iterator* NewIterator(const ReadOptions&, + const ColumnFamilyHandle& column_family); - virtual Status NewIterators( - const ReadOptions& options, - const std::vector& column_family, - std::vector* iterators) { + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_family, + std::vector* iterators) { // TODO - return Status::NotSupported("Not supported yet."); - } + return Status::NotSupported("Not supported yet."); + } - using DBImpl::Put; - virtual Status Put(const WriteOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, - const Slice& value) { - return Status::NotSupported("Not supported operation in read only mode."); - } - using DBImpl::Merge; - virtual Status Merge(const WriteOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, - const Slice& value) { - return Status::NotSupported("Not supported operation in read only mode."); - } - using DBImpl::Delete; - virtual Status Delete(const WriteOptions& options, + using DBImpl::Put; + virtual Status Put(const WriteOptions& options, + const ColumnFamilyHandle& column_family, const Slice& key, + const Slice& value) { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::Merge; + virtual Status Merge(const WriteOptions& options, const ColumnFamilyHandle& column_family, - const Slice& key) { - return Status::NotSupported("Not supported operation in read only mode."); - } - virtual Status Write(const WriteOptions& options, WriteBatch* updates) { - return Status::NotSupported("Not supported operation in read only mode."); - } - using DBImpl::CompactRange; - virtual Status CompactRange(const ColumnFamilyHandle& column_family, - const Slice* begin, const Slice* end, - bool reduce_level = false, int target_level = -1) { - return Status::NotSupported("Not supported operation in read only mode."); - } - virtual Status DisableFileDeletions() { - return Status::NotSupported("Not supported operation in read only mode."); - } - virtual Status EnableFileDeletions(bool force) { - return Status::NotSupported("Not supported operation in read only mode."); - } - virtual Status GetLiveFiles(std::vector&, - uint64_t* manifest_file_size, - bool flush_memtable = true) { - return Status::NotSupported("Not supported operation in read only mode."); - } - using DBImpl::Flush; - virtual Status Flush(const FlushOptions& options, - const ColumnFamilyHandle& column_family) { - return Status::NotSupported("Not supported operation in read only mode."); - } + const Slice& key, const Slice& value) { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::Delete; + virtual Status Delete(const WriteOptions& options, + const ColumnFamilyHandle& column_family, + const Slice& key) { + return Status::NotSupported("Not supported operation in read only mode."); + } + virtual Status Write(const WriteOptions& options, WriteBatch* updates) { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::CompactRange; + virtual Status CompactRange(const ColumnFamilyHandle& column_family, + const Slice* begin, const Slice* end, + bool reduce_level = false, + int target_level = -1) { + return Status::NotSupported("Not supported operation in read only mode."); + } + virtual Status DisableFileDeletions() { + return Status::NotSupported("Not supported operation in read only mode."); + } + virtual Status EnableFileDeletions(bool force) { + return Status::NotSupported("Not supported operation in read only mode."); + } + virtual Status GetLiveFiles(std::vector&, + uint64_t* manifest_file_size, + bool flush_memtable = true) { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::Flush; + virtual Status Flush(const FlushOptions& options, + const ColumnFamilyHandle& column_family) { + return Status::NotSupported("Not supported operation in read only mode."); + } -private: - friend class DB; + private: + friend class DB; - // No copying allowed - DBImplReadOnly(const DBImplReadOnly&); - void operator=(const DBImplReadOnly&); + // No copying allowed + DBImplReadOnly(const DBImplReadOnly&); + void operator=(const DBImplReadOnly&); }; - } diff --git a/db/tailing_iter.cc b/db/tailing_iter.cc index 5644b1211..dda8c3a99 100644 --- a/db/tailing_iter.cc +++ b/db/tailing_iter.cc @@ -8,15 +8,19 @@ #include #include #include "db/db_impl.h" +#include "db/column_family.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" namespace rocksdb { TailingIterator::TailingIterator(DBImpl* db, const ReadOptions& options, - const Comparator* comparator) - : db_(db), options_(options), comparator_(comparator), - version_number_(0), current_(nullptr), + ColumnFamilyData* cfd) + : db_(db), + options_(options), + cfd_(cfd), + version_number_(0), + current_(nullptr), status_(Status::InvalidArgument("Seek() not called on this iterator")) {} bool TailingIterator::Valid() const { @@ -53,10 +57,9 @@ void TailingIterator::Seek(const Slice& target) { // 'target' -- in this case, prev_key_ is included in the interval, so // prev_inclusive_ has to be set. - if (!is_prev_set_ || - comparator_->Compare(prev_key_, target) >= !is_prev_inclusive_ || - (immutable_->Valid() && - comparator_->Compare(target, immutable_->key()) > 0) || + const Comparator* cmp = cfd_->user_comparator(); + if (!is_prev_set_ || cmp->Compare(prev_key_, target) >= !is_prev_inclusive_ || + (immutable_->Valid() && cmp->Compare(target, immutable_->key()) > 0) || (options_.prefix_seek && !IsSamePrefix(target))) { SeekImmutable(target); } @@ -121,7 +124,7 @@ void TailingIterator::SeekToLast() { void TailingIterator::CreateIterators() { std::pair iters = - db_->GetTailingIteratorPair(options_, &version_number_); + db_->GetTailingIteratorPair(options_, cfd_, &version_number_); assert(iters.first && iters.second); @@ -137,9 +140,10 @@ void TailingIterator::UpdateCurrent() { if (mutable_->Valid()) { current_ = mutable_.get(); } + const Comparator* cmp = cfd_->user_comparator(); if (immutable_->Valid() && (current_ == nullptr || - comparator_->Compare(immutable_->key(), current_->key()) < 0)) { + cmp->Compare(immutable_->key(), current_->key()) < 0)) { current_ = immutable_.get(); } @@ -151,11 +155,11 @@ void TailingIterator::UpdateCurrent() { bool TailingIterator::IsCurrentVersion() const { return mutable_ != nullptr && immutable_ != nullptr && - version_number_ == db_->CurrentVersionNumber(); + version_number_ == cfd_->GetSuperVersionNumber(); } bool TailingIterator::IsSamePrefix(const Slice& target) const { - const SliceTransform* extractor = db_->options_.prefix_extractor; + const SliceTransform* extractor = cfd_->options()->prefix_extractor; assert(extractor); assert(is_prev_set_); diff --git a/db/tailing_iter.h b/db/tailing_iter.h index 3b8343a28..2a5a02e24 100644 --- a/db/tailing_iter.h +++ b/db/tailing_iter.h @@ -13,6 +13,7 @@ namespace rocksdb { class DBImpl; +class ColumnFamilyData; /** * TailingIterator is a special type of iterator that doesn't use an (implicit) @@ -25,7 +26,7 @@ class DBImpl; class TailingIterator : public Iterator { public: TailingIterator(DBImpl* db, const ReadOptions& options, - const Comparator* comparator); + ColumnFamilyData* cfd); virtual ~TailingIterator() {} virtual bool Valid() const override; @@ -41,7 +42,7 @@ class TailingIterator : public Iterator { private: DBImpl* const db_; const ReadOptions options_; - const Comparator* const comparator_; + ColumnFamilyData* const cfd_; uint64_t version_number_; // TailingIterator merges the contents of the two iterators below (one using -- GitLab