From 45a5e3ede01b64d8f3a5c2fe411b728f1d7b9242 Mon Sep 17 00:00:00 2001 From: Stanislau Hlebik Date: Thu, 4 Sep 2014 17:40:41 -0700 Subject: [PATCH] Remove path with arena==nullptr from NewInternalIterator Summary: Simply code by removing code path which does not use Arena from NewInternalIterator Test Plan: make all check make valgrind_check Reviewers: sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D22395 --- db/db_impl.cc | 119 +++++++++++++++----------------- db/db_impl.h | 8 +-- db/db_impl_debug.cc | 5 +- db/db_test.cc | 128 +++++++++++++++++++---------------- db/forward_iterator.cc | 10 +-- db/forward_iterator.h | 2 + db/memtable.cc | 10 +-- db/memtable.h | 3 +- db/memtable_list.cc | 5 +- db/memtable_list.h | 2 +- db/repair.cc | 17 +++-- db/version_set.cc | 25 ------- db/version_set.h | 2 - db/write_batch_test.cc | 5 +- java/rocksjni/write_batch.cc | 6 +- table/table_test.cc | 53 ++++++++++++--- util/ldb_cmd.cc | 4 +- util/scoped_arena_iterator.h | 28 ++++++++ 18 files changed, 240 insertions(+), 192 deletions(-) create mode 100644 util/scoped_arena_iterator.h diff --git a/db/db_impl.cc b/db/db_impl.cc index 049d40c7b..4e3816d64 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1415,31 +1415,32 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. ReadOptions ro; ro.total_order_seek = true; - Iterator* iter = mem->NewIterator(ro); - const SequenceNumber newest_snapshot = snapshots_.GetNewest(); - const SequenceNumber earliest_seqno_in_memtable = - mem->GetFirstSequenceNumber(); - Log(options_.info_log, "[%s] Level-0 table #%" PRIu64 ": started", - cfd->GetName().c_str(), meta.fd.GetNumber()); - + Arena arena; Status s; { - mutex_.Unlock(); - s = BuildTable(dbname_, env_, *cfd->ioptions(), env_options_, - cfd->table_cache(), iter, &meta, cfd->internal_comparator(), - newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->options()), - cfd->options()->compression_opts, Env::IO_HIGH); - LogFlush(options_.info_log); - mutex_.Lock(); - } + ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); + const SequenceNumber newest_snapshot = snapshots_.GetNewest(); + const SequenceNumber earliest_seqno_in_memtable = + mem->GetFirstSequenceNumber(); + Log(options_.info_log, "[%s] Level-0 table #%" PRIu64 ": started", + cfd->GetName().c_str(), meta.fd.GetNumber()); - Log(options_.info_log, - "[%s] Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s", - cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), - s.ToString().c_str()); - delete iter; + { + mutex_.Unlock(); + s = BuildTable( + dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), + iter.get(), &meta, cfd->internal_comparator(), newest_snapshot, + earliest_seqno_in_memtable, GetCompressionFlush(*cfd->options()), + cfd->options()->compression_opts, Env::IO_HIGH); + LogFlush(options_.info_log); + mutex_.Lock(); + } + Log(options_.info_log, + "[%s] Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s", + cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), + s.ToString().c_str()); + } pending_outputs_.erase(meta.fd.GetNumber()); // Note that if file_size is zero, the file has been deleted and @@ -1485,24 +1486,27 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, std::vector memtables; ReadOptions ro; ro.total_order_seek = true; + Arena arena; for (MemTable* m : mems) { Log(options_.info_log, "[%s] Flushing memtable with next log file: %" PRIu64 "\n", cfd->GetName().c_str(), m->GetNextLogNumber()); - memtables.push_back(m->NewIterator(ro)); + memtables.push_back(m->NewIterator(ro, &arena)); + } + { + ScopedArenaIterator iter(NewMergingIterator(&cfd->internal_comparator(), + &memtables[0], + memtables.size(), &arena)); + Log(options_.info_log, "[%s] Level-0 flush table #%" PRIu64 ": started", + cfd->GetName().c_str(), meta.fd.GetNumber()); + + s = BuildTable( + dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), + iter.get(), &meta, cfd->internal_comparator(), newest_snapshot, + earliest_seqno_in_memtable, GetCompressionFlush(*cfd->options()), + cfd->options()->compression_opts, Env::IO_HIGH); + LogFlush(options_.info_log); } - Iterator* iter = NewMergingIterator(&cfd->internal_comparator(), - &memtables[0], memtables.size()); - Log(options_.info_log, "[%s] Level-0 flush table #%" PRIu64 ": started", - cfd->GetName().c_str(), meta.fd.GetNumber()); - - s = BuildTable(dbname_, env_, *cfd->ioptions(), env_options_, - cfd->table_cache(), iter, &meta, cfd->internal_comparator(), - newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->options()), - cfd->options()->compression_opts, Env::IO_HIGH); - LogFlush(options_.info_log); - delete iter; Log(options_.info_log, "[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), @@ -3349,31 +3353,18 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SuperVersion* super_version, Arena* arena) { Iterator* internal_iter; - if (arena != nullptr) { - // Need to create internal iterator from the arena. - MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena); - // Collect iterator for mutable mem - merge_iter_builder.AddIterator( - super_version->mem->NewIterator(options, arena)); - // Collect all needed child iterators for immutable memtables - super_version->imm->AddIterators(options, &merge_iter_builder); - // Collect iterators for files in L0 - Ln - super_version->current->AddIterators(options, env_options_, - &merge_iter_builder); - internal_iter = merge_iter_builder.Finish(); - } else { - // Need to create internal iterator using malloc. - std::vector iterator_list; - // Collect iterator for mutable mem - iterator_list.push_back(super_version->mem->NewIterator(options)); - // Collect all needed child iterators for immutable memtables - super_version->imm->AddIterators(options, &iterator_list); - // Collect iterators for files in L0 - Ln - super_version->current->AddIterators(options, env_options_, - &iterator_list); - internal_iter = NewMergingIterator(&cfd->internal_comparator(), - &iterator_list[0], iterator_list.size()); - } + assert(arena != nullptr); + // Need to create internal iterator from the arena. + MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena); + // Collect iterator for mutable mem + merge_iter_builder.AddIterator( + super_version->mem->NewIterator(options, arena)); + // Collect all needed child iterators for immutable memtables + super_version->imm->AddIterators(options, &merge_iter_builder); + // Collect iterators for files in L0 - Ln + super_version->current->AddIterators(options, env_options_, + &merge_iter_builder); + internal_iter = merge_iter_builder.Finish(); IterState* cleanup = new IterState(this, &mutex_, super_version); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); @@ -3790,10 +3781,12 @@ Status DBImpl::NewIterators( ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot; - auto iter = NewInternalIterator(options, cfd, super_versions[i]); - iter = NewDBIterator(env_, *cfd->options(), - cfd->user_comparator(), iter, snapshot); - iterators->push_back(iter); + ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( + env_, *cfd->options(), cfd->user_comparator(), snapshot); + Iterator* internal_iter = NewInternalIterator( + options, cfd, super_versions[i], db_iter->GetArena()); + db_iter->SetIterUnderDBIter(internal_iter); + iterators->push_back(db_iter); } } diff --git a/db/db_impl.h b/db/db_impl.h index caacd012a..1ccaabb6c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -30,6 +30,7 @@ #include "util/autovector.h" #include "util/stop_watch.h" #include "util/thread_local.h" +#include "util/scoped_arena_iterator.h" #include "db/internal_stats.h" namespace rocksdb { @@ -173,8 +174,8 @@ class DBImpl : public DB { // Return an internal iterator over the current state of the database. // The keys of this iterator are internal keys (see format.h). // The returned iterator should be deleted when no longer needed. - Iterator* TEST_NewInternalIterator(ColumnFamilyHandle* column_family = - nullptr); + Iterator* TEST_NewInternalIterator( + Arena* arena, ColumnFamilyHandle* column_family = nullptr); // Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. @@ -297,8 +298,7 @@ class DBImpl : public DB { Statistics* stats_; Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, - SuperVersion* super_version, - Arena* arena = nullptr); + SuperVersion* super_version, Arena* arena); private: friend class DB; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 8df66f6c6..77d4e0551 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -20,7 +20,8 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() { return default_cf_handle_->cfd()->current()->NumLevelBytes(0); } -Iterator* DBImpl::TEST_NewInternalIterator(ColumnFamilyHandle* column_family) { +Iterator* DBImpl::TEST_NewInternalIterator(Arena* arena, + ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; if (column_family == nullptr) { cfd = default_cf_handle_->cfd(); @@ -33,7 +34,7 @@ Iterator* DBImpl::TEST_NewInternalIterator(ColumnFamilyHandle* column_family) { SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); mutex_.Unlock(); ReadOptions roptions; - return NewInternalIterator(roptions, cfd, super_version); + return NewInternalIterator(roptions, cfd, super_version, arena); } int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( diff --git a/db/db_test.cc b/db/db_test.cc index 0b0365211..5bd781696 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -41,6 +41,7 @@ #include "util/rate_limiter.h" #include "util/statistics.h" #include "util/testharness.h" +#include "util/scoped_arena_iterator.h" #include "util/sync_point.h" #include "util/testutil.h" @@ -755,11 +756,12 @@ class DBTest { } std::string AllEntriesFor(const Slice& user_key, int cf = 0) { - Iterator* iter; + ScopedArenaIterator iter; + Arena arena; if (cf == 0) { - iter = dbfull()->TEST_NewInternalIterator(); + iter.set(dbfull()->TEST_NewInternalIterator(&arena)); } else { - iter = dbfull()->TEST_NewInternalIterator(handles_[cf]); + iter.set(dbfull()->TEST_NewInternalIterator(&arena, handles_[cf])); } InternalKey target(user_key, kMaxSequenceNumber, kTypeValue); iter->Seek(target.Encode()); @@ -804,7 +806,6 @@ class DBTest { } result += "]"; } - delete iter; return result; } @@ -1042,11 +1043,12 @@ class DBTest { // Utility method to test InplaceUpdate void validateNumberOfEntries(int numValues, int cf = 0) { - Iterator* iter; + ScopedArenaIterator iter; + Arena arena; if (cf != 0) { - iter = dbfull()->TEST_NewInternalIterator(handles_[cf]); + iter.set(dbfull()->TEST_NewInternalIterator(&arena, handles_[cf])); } else { - iter = dbfull()->TEST_NewInternalIterator(); + iter.set(dbfull()->TEST_NewInternalIterator(&arena)); } iter->SeekToFirst(); ASSERT_EQ(iter->status().ok(), true); @@ -1060,7 +1062,6 @@ class DBTest { ASSERT_EQ(ikey.sequence, (unsigned)seq--); iter->Next(); } - delete iter; ASSERT_EQ(0, seq); } @@ -4210,22 +4211,25 @@ TEST(DBTest, CompactionFilter) { // TODO: figure out sequence number squashtoo int count = 0; int total = 0; - Iterator* iter = dbfull()->TEST_NewInternalIterator(handles_[1]); - iter->SeekToFirst(); - ASSERT_OK(iter->status()); - while (iter->Valid()) { - ParsedInternalKey ikey(Slice(), 0, kTypeValue); - ikey.sequence = -1; - ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); - total++; - if (ikey.sequence != 0) { - count++; + Arena arena; + { + ScopedArenaIterator iter( + dbfull()->TEST_NewInternalIterator(&arena, handles_[1])); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + while (iter->Valid()) { + ParsedInternalKey ikey(Slice(), 0, kTypeValue); + ikey.sequence = -1; + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + total++; + if (ikey.sequence != 0) { + count++; + } + iter->Next(); } - iter->Next(); } ASSERT_EQ(total, 100000); ASSERT_EQ(count, 1); - delete iter; // overwrite all the 100K keys once again. for (int i = 0; i < 100000; i++) { @@ -4280,7 +4284,7 @@ TEST(DBTest, CompactionFilter) { ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); // Scan the entire database to ensure that nothing is left - iter = db_->NewIterator(ReadOptions(), handles_[1]); + Iterator* iter = db_->NewIterator(ReadOptions(), handles_[1]); iter->SeekToFirst(); count = 0; while (iter->Valid()) { @@ -4296,18 +4300,20 @@ TEST(DBTest, CompactionFilter) { // TODO: remove the following or design a different // test count = 0; - iter = dbfull()->TEST_NewInternalIterator(handles_[1]); - iter->SeekToFirst(); - ASSERT_OK(iter->status()); - while (iter->Valid()) { - ParsedInternalKey ikey(Slice(), 0, kTypeValue); - ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); - ASSERT_NE(ikey.sequence, (unsigned)0); - count++; - iter->Next(); + { + ScopedArenaIterator iter( + dbfull()->TEST_NewInternalIterator(&arena, handles_[1])); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + while (iter->Valid()) { + ParsedInternalKey ikey(Slice(), 0, kTypeValue); + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + ASSERT_NE(ikey.sequence, (unsigned)0); + count++; + iter->Next(); + } + ASSERT_EQ(count, 0); } - ASSERT_EQ(count, 0); - delete iter; } // Tests the edge case where compaction does not produce any output -- all @@ -4429,22 +4435,24 @@ TEST(DBTest, CompactionFilterContextManual) { // Verify total number of keys is correct after manual compaction. int count = 0; int total = 0; - Iterator* iter = dbfull()->TEST_NewInternalIterator(); - iter->SeekToFirst(); - ASSERT_OK(iter->status()); - while (iter->Valid()) { - ParsedInternalKey ikey(Slice(), 0, kTypeValue); - ikey.sequence = -1; - ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); - total++; - if (ikey.sequence != 0) { - count++; + { + Arena arena; + ScopedArenaIterator iter(dbfull()->TEST_NewInternalIterator(&arena)); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + while (iter->Valid()) { + ParsedInternalKey ikey(Slice(), 0, kTypeValue); + ikey.sequence = -1; + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + total++; + if (ikey.sequence != 0) { + count++; + } + iter->Next(); } - iter->Next(); + ASSERT_EQ(total, 700); + ASSERT_EQ(count, 1); } - ASSERT_EQ(total, 700); - ASSERT_EQ(count, 1); - delete iter; } class KeepFilterV2 : public CompactionFilterV2 { @@ -4601,25 +4609,27 @@ TEST(DBTest, CompactionFilterV2) { // All the files are in the lowest level. int count = 0; int total = 0; - Iterator* iter = dbfull()->TEST_NewInternalIterator(); - iter->SeekToFirst(); - ASSERT_OK(iter->status()); - while (iter->Valid()) { - ParsedInternalKey ikey(Slice(), 0, kTypeValue); - ikey.sequence = -1; - ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); - total++; - if (ikey.sequence != 0) { - count++; + { + Arena arena; + ScopedArenaIterator iter(dbfull()->TEST_NewInternalIterator(&arena)); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + while (iter->Valid()) { + ParsedInternalKey ikey(Slice(), 0, kTypeValue); + ikey.sequence = -1; + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + total++; + if (ikey.sequence != 0) { + count++; + } + iter->Next(); } - iter->Next(); } ASSERT_EQ(total, 100000); // 1 snapshot only. Since we are using universal compacton, // the sequence no is cleared for better compression ASSERT_EQ(count, 1); - delete iter; // create a new database with the compaction // filter in such a way that it deletes all keys @@ -4643,7 +4653,7 @@ TEST(DBTest, CompactionFilterV2) { ASSERT_EQ(NumTableFilesAtLevel(1), 0); // Scan the entire database to ensure that nothing is left - iter = db_->NewIterator(ReadOptions()); + Iterator* iter = db_->NewIterator(ReadOptions()); iter->SeekToFirst(); count = 0; while (iter->Valid()) { diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 74e6dd249..6b78c4037 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -132,9 +132,11 @@ ForwardIterator::~ForwardIterator() { } void ForwardIterator::Cleanup() { - delete mutable_iter_; + if (mutable_iter_ != nullptr) { + mutable_iter_->~Iterator(); + } for (auto* m : imm_iters_) { - delete m; + m->~Iterator(); } imm_iters_.clear(); for (auto* f : l0_iters_) { @@ -401,8 +403,8 @@ void ForwardIterator::RebuildIterators() { Cleanup(); // New sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); - mutable_iter_ = sv_->mem->NewIterator(read_options_); - sv_->imm->AddIterators(read_options_, &imm_iters_); + mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_); + sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_); const auto& l0_files = sv_->current->files_[0]; l0_iters_.reserve(l0_files.size()); for (const auto* l0 : l0_files) { diff --git a/db/forward_iterator.h b/db/forward_iterator.h index bbf423a50..653a0ac0c 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -14,6 +14,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "db/dbformat.h" +#include "util/arena.h" namespace rocksdb { @@ -100,6 +101,7 @@ class ForwardIterator : public Iterator { IterKey prev_key_; bool is_prev_set_; + Arena arena_; }; } // namespace rocksdb diff --git a/db/memtable.cc b/db/memtable.cc index e9e7051c7..e102575a4 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -249,13 +249,9 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator(const ReadOptions& options, Arena* arena) { - if (arena == nullptr) { - return new MemTableIterator(*this, options, nullptr); - } else { - auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); - return new (mem) - MemTableIterator(*this, options, arena); - } + assert(arena != nullptr); + auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); + return new (mem) MemTableIterator(*this, options, arena); } port::RWMutex* MemTable::GetLock(const Slice& key) { diff --git a/db/memtable.h b/db/memtable.h index 8bc281c6c..2723f30d8 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -81,8 +81,7 @@ class MemTable { // arena: If not null, the arena needs to be used to allocate the Iterator. // Calling ~Iterator of the iterator will destroy all the states but // those allocated in arena. - Iterator* NewIterator(const ReadOptions& options, - Arena* arena = nullptr); + Iterator* NewIterator(const ReadOptions& options, Arena* arena); // Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. diff --git a/db/memtable_list.cc b/db/memtable_list.cc index d3fc1356b..418aae230 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -73,9 +73,10 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, } void MemTableListVersion::AddIterators(const ReadOptions& options, - std::vector* iterator_list) { + std::vector* iterator_list, + Arena* arena) { for (auto& m : memlist_) { - iterator_list->push_back(m->NewIterator(options)); + iterator_list->push_back(m->NewIterator(options, arena)); } } diff --git a/db/memtable_list.h b/db/memtable_list.h index f4923e831..997834e78 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -49,7 +49,7 @@ class MemTableListVersion { MergeContext& merge_context, const Options& options); void AddIterators(const ReadOptions& options, - std::vector* iterator_list); + std::vector* iterator_list, Arena* arena); void AddIterators(const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder); diff --git a/db/repair.cc b/db/repair.cc index 3c64449d1..dfe79fb23 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -48,6 +48,7 @@ #include "rocksdb/env.h" #include "rocksdb/options.h" #include "rocksdb/immutable_options.h" +#include "util/scoped_arena_iterator.h" namespace rocksdb { @@ -240,13 +241,15 @@ class Repairer { // since ExtractMetaData() will also generate edits. FileMetaData meta; meta.fd = FileDescriptor(next_file_number_++, 0, 0); - ReadOptions ro; - ro.total_order_seek = true; - Iterator* iter = mem->NewIterator(ro); - status = BuildTable(dbname_, env_, ioptions_, env_options_, table_cache_, - iter, &meta, icmp_, 0, 0, kNoCompression, - CompressionOptions()); - delete iter; + { + ReadOptions ro; + ro.total_order_seek = true; + Arena arena; + ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); + status = BuildTable(dbname_, env_, ioptions_, env_options_, table_cache_, + iter.get(), &meta, icmp_, 0, 0, kNoCompression, + CompressionOptions()); + } delete mem->Unref(); delete cf_mems_default; mem = nullptr; diff --git a/db/version_set.cc b/db/version_set.cc index 3a1545853..eca56ba2a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -596,31 +596,6 @@ uint64_t Version::GetEstimatedActiveKeys() { return num_non_deletions_ - num_deletions_; } -void Version::AddIterators(const ReadOptions& read_options, - const EnvOptions& soptions, - std::vector* iters) { - // Merge all level zero files together since they may overlap - for (size_t i = 0; i < file_levels_[0].num_files; i++) { - const auto& file = file_levels_[0].files[i]; - iters->push_back(cfd_->table_cache()->NewIterator( - read_options, soptions, cfd_->internal_comparator(), file.fd)); - } - - // For levels > 0, we can use a concatenating iterator that sequentially - // walks through the non-overlapping files in the level, opening them - // lazily. - for (int level = 1; level < num_levels_; level++) { - if (file_levels_[level].num_files != 0) { - iters->push_back(NewTwoLevelIterator(new LevelFileIteratorState( - cfd_->table_cache(), read_options, soptions, - cfd_->internal_comparator(), false /* for_compaction */, - cfd_->options()->prefix_extractor != nullptr), - new LevelFileNumIterator(cfd_->internal_comparator(), - &file_levels_[level]))); - } - } -} - void Version::AddIterators(const ReadOptions& read_options, const EnvOptions& soptions, MergeIteratorBuilder* merge_iter_builder) { diff --git a/db/version_set.h b/db/version_set.h index 2f6d477a1..e9747f839 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -86,8 +86,6 @@ class Version { // Append to *iters a sequence of iterators that will // yield the contents of this Version when merged together. // REQUIRES: This version has been saved (see VersionSet::SaveTo) - void AddIterators(const ReadOptions&, const EnvOptions& soptions, - std::vector* iters); void AddIterators(const ReadOptions&, const EnvOptions& soptions, MergeIteratorBuilder* merger_iter_builder); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 1d30552b3..aefb01e79 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -18,6 +18,7 @@ #include "rocksdb/utilities/write_batch_with_index.h" #include "util/logging.h" #include "util/testharness.h" +#include "util/scoped_arena_iterator.h" namespace rocksdb { @@ -32,7 +33,8 @@ static std::string PrintContents(WriteBatch* b) { ColumnFamilyMemTablesDefault cf_mems_default(mem, &options); Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default); int count = 0; - Iterator* iter = mem->NewIterator(ReadOptions()); + Arena arena; + ScopedArenaIterator iter(mem->NewIterator(ReadOptions(), &arena)); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ParsedInternalKey ikey; memset((void *)&ikey, 0, sizeof(ikey)); @@ -67,7 +69,6 @@ static std::string PrintContents(WriteBatch* b) { state.append("@"); state.append(NumberToString(ikey.sequence)); } - delete iter; if (!s.ok()) { state.append(s.ToString()); } else if (count != WriteBatchInternal::Count(b)) { diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc index e8b2456ee..9a4eb70fd 100644 --- a/java/rocksjni/write_batch.cc +++ b/java/rocksjni/write_batch.cc @@ -18,6 +18,7 @@ #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "util/logging.h" +#include "util/scoped_arena_iterator.h" #include "util/testharness.h" /* @@ -209,7 +210,9 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents( rocksdb::Status s = rocksdb::WriteBatchInternal::InsertInto(b, &cf_mems_default); int count = 0; - rocksdb::Iterator* iter = mem->NewIterator(rocksdb::ReadOptions()); + Arena arena; + ScopedArenaIterator iter(mem->NewIterator( + rocksdb::ReadOptions(), false /*don't enforce total order*/, &arena)); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { rocksdb::ParsedInternalKey ikey; memset(reinterpret_cast(&ikey), 0, sizeof(ikey)); @@ -244,7 +247,6 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents( state.append("@"); state.append(rocksdb::NumberToString(ikey.sequence)); } - delete iter; if (!s.ok()) { state.append(s.ToString()); } else if (count != rocksdb::WriteBatchInternal::Count(b)) { diff --git a/table/table_test.cc b/table/table_test.cc index df4997588..a0f844014 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -42,6 +42,7 @@ #include "util/statistics.h" #include "util/testharness.h" #include "util/testutil.h" +#include "util/scoped_arena_iterator.h" namespace rocksdb { @@ -223,8 +224,12 @@ class Constructor { virtual const KVMap& data() { return data_; } + virtual bool IsArenaMode() const { return false; } + virtual DB* db() const { return nullptr; } // Overridden in DBConstructor + virtual bool AnywayDeleteIterator() const { return false; } + protected: const InternalKeyComparator* last_internal_key_; @@ -279,8 +284,15 @@ class BlockConstructor: public Constructor { // A helper class that converts internal format keys into user keys class KeyConvertingIterator: public Iterator { public: - explicit KeyConvertingIterator(Iterator* iter) : iter_(iter) { } - virtual ~KeyConvertingIterator() { delete iter_; } + KeyConvertingIterator(Iterator* iter, bool arena_mode = false) + : iter_(iter), arena_mode_(arena_mode) {} + virtual ~KeyConvertingIterator() { + if (arena_mode_) { + iter_->~Iterator(); + } else { + delete iter_; + } + } virtual bool Valid() const { return iter_->Valid(); } virtual void Seek(const Slice& target) { ParsedInternalKey ikey(target, kMaxSequenceNumber, kTypeValue); @@ -311,6 +323,7 @@ class KeyConvertingIterator: public Iterator { private: mutable Status status_; Iterator* iter_; + bool arena_mode_; // No copying allowed KeyConvertingIterator(const KeyConvertingIterator&); @@ -391,6 +404,10 @@ class TableConstructor: public Constructor { return table_reader_.get(); } + virtual bool AnywayDeleteIterator() const override { + return convert_to_internal_key_; + } + private: void Reset() { uniq_id_ = 0; @@ -398,12 +415,12 @@ class TableConstructor: public Constructor { sink_.reset(); source_.reset(); } - bool convert_to_internal_key_; uint64_t uniq_id_; unique_ptr sink_; unique_ptr source_; unique_ptr table_reader_; + bool convert_to_internal_key_; TableConstructor(); @@ -446,10 +463,16 @@ class MemTableConstructor: public Constructor { return Status::OK(); } virtual Iterator* NewIterator() const { - return new KeyConvertingIterator(memtable_->NewIterator(ReadOptions())); + return new KeyConvertingIterator( + memtable_->NewIterator(ReadOptions(), &arena_), true); } + virtual bool AnywayDeleteIterator() const override { return true; } + + virtual bool IsArenaMode() const override { return true; } + private: + mutable Arena arena_; InternalKeyComparator internal_comparator_; MemTable* memtable_; std::shared_ptr table_factory_; @@ -800,7 +823,11 @@ class Harness { iter->Next(); } ASSERT_TRUE(!iter->Valid()); - delete iter; + if (constructor_->IsArenaMode() && !constructor_->AnywayDeleteIterator()) { + iter->~Iterator(); + } else { + delete iter; + } } void TestBackwardScan(const std::vector& keys, @@ -815,7 +842,11 @@ class Harness { iter->Prev(); } ASSERT_TRUE(!iter->Valid()); - delete iter; + if (constructor_->IsArenaMode() && !constructor_->AnywayDeleteIterator()) { + iter->~Iterator(); + } else { + delete iter; + } } void TestRandomAccess(Random* rnd, @@ -885,7 +916,11 @@ class Harness { } } } - delete iter; + if (constructor_->IsArenaMode() && !constructor_->AnywayDeleteIterator()) { + iter->~Iterator(); + } else { + delete iter; + } } std::string ToString(const KVMap& data, const KVMap::const_iterator& it) { @@ -1835,7 +1870,8 @@ TEST(MemTableTest, Simple) { ColumnFamilyMemTablesDefault cf_mems_default(memtable, &options); ASSERT_TRUE(WriteBatchInternal::InsertInto(&batch, &cf_mems_default).ok()); - Iterator* iter = memtable->NewIterator(ReadOptions()); + Arena arena; + ScopedArenaIterator iter(memtable->NewIterator(ReadOptions(), &arena)); iter->SeekToFirst(); while (iter->Valid()) { fprintf(stderr, "key: '%s' -> '%s'\n", @@ -1844,7 +1880,6 @@ TEST(MemTableTest, Simple) { iter->Next(); } - delete iter; delete memtable->Unref(); } diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 1aa3856a3..aef84fa35 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -14,6 +14,7 @@ #include "rocksdb/write_batch.h" #include "rocksdb/cache.h" #include "util/coding.h" +#include "util/scoped_arena_iterator.h" #include "utilities/ttl/db_ttl_impl.h" #include @@ -739,7 +740,8 @@ void InternalDumpCommand::DoCommand() { uint64_t c=0; uint64_t s1=0,s2=0; // Setup internal key iterator - auto iter = unique_ptr(idb->TEST_NewInternalIterator()); + Arena arena; + ScopedArenaIterator iter(idb->TEST_NewInternalIterator(&arena)); Status st = iter->status(); if (!st.ok()) { exec_state_ = LDBCommandExecuteResult::FAILED("Iterator error:" diff --git a/util/scoped_arena_iterator.h b/util/scoped_arena_iterator.h new file mode 100644 index 000000000..2021d2dc2 --- /dev/null +++ b/util/scoped_arena_iterator.h @@ -0,0 +1,28 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include "rocksdb/iterator.h" + +namespace rocksdb { +class ScopedArenaIterator { + public: + explicit ScopedArenaIterator(Iterator* iter = nullptr) : iter_(iter) {} + + Iterator* operator->() { return iter_; } + + void set(Iterator* iter) { iter_ = iter; } + + Iterator* get() { return iter_; } + + ~ScopedArenaIterator() { iter_->~Iterator(); } + + private: + Iterator* iter_; +}; +} // namespace rocksdb -- GitLab