diff --git a/db/db_impl.cc b/db/db_impl.cc index cc2a9f2ee208209eab7f3408e23fe6a942151921..cd972bf6ff5690b1ef6ecd52b6e8e9810c59e968 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1406,7 +1406,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, FileMetaData meta; meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. - Iterator* iter = mem->NewIterator(ReadOptions(), true); + ReadOptions ro; + ro.total_order_seek = true; + Iterator* iter = mem->NewIterator(ro); const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = mem->GetFirstSequenceNumber(); @@ -1473,11 +1475,13 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, mutex_.Unlock(); log_buffer->FlushBufferToLog(); std::vector memtables; + ReadOptions ro; + ro.total_order_seek = true; for (MemTable* m : mems) { Log(options_.info_log, "[%s] Flushing memtable with next log file: %" PRIu64 "\n", cfd->GetName().c_str(), m->GetNextLogNumber()); - memtables.push_back(m->NewIterator(ReadOptions(), true)); + memtables.push_back(m->NewIterator(ro)); } Iterator* iter = NewMergingIterator(&cfd->internal_comparator(), &memtables[0], memtables.size()); @@ -3300,7 +3304,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena); // Collect iterator for mutable mem merge_iter_builder.AddIterator( - super_version->mem->NewIterator(options, false, arena)); + super_version->mem->NewIterator(options, arena)); // Collect all needed child iterators for immutable memtables super_version->imm->AddIterators(options, &merge_iter_builder); // Collect iterators for files in L0 - Ln diff --git a/db/memtable.cc b/db/memtable.cc index 4ddcb37471d8caca38d6732942efc686eb2b3168..f9a17e19e8e30540a9984f6da9120b735844fa64 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -174,13 +174,13 @@ const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: - MemTableIterator(const MemTable& mem, const ReadOptions& options, - bool enforce_total_order, Arena* arena) + MemTableIterator( + const MemTable& mem, const ReadOptions& options, Arena* arena) : bloom_(nullptr), prefix_extractor_(mem.prefix_extractor_), valid_(false), arena_mode_(arena != nullptr) { - if (prefix_extractor_ != nullptr && !enforce_total_order) { + if (prefix_extractor_ != nullptr && !options.total_order_seek) { bloom_ = mem.prefix_bloom_.get(); iter_ = mem.table_->GetDynamicPrefixIterator(arena); } else { @@ -248,14 +248,13 @@ class MemTableIterator: public Iterator { void operator=(const MemTableIterator&); }; -Iterator* MemTable::NewIterator(const ReadOptions& options, - bool enforce_total_order, Arena* arena) { +Iterator* MemTable::NewIterator(const ReadOptions& options, Arena* arena) { if (arena == nullptr) { - return new MemTableIterator(*this, options, enforce_total_order, nullptr); + return new MemTableIterator(*this, options, nullptr); } else { auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); return new (mem) - MemTableIterator(*this, options, enforce_total_order, arena); + MemTableIterator(*this, options, arena); } } diff --git a/db/memtable.h b/db/memtable.h index 8bad2773a333a3e7ee3bee3f42f781573fae7f9b..8bc281c6cf92f9bf82653a3e47ce52e5abec16b8 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -82,7 +82,6 @@ class MemTable { // Calling ~Iterator of the iterator will destroy all the states but // those allocated in arena. Iterator* NewIterator(const ReadOptions& options, - bool enforce_total_order = false, Arena* arena = nullptr); // Add an entry into memtable that maps key to value at the diff --git a/db/repair.cc b/db/repair.cc index b0859d7305df77ff71f04c5810d9c8d935e005e7..820cc19243f4d88f5bab24134fc392172a7c0614 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -237,7 +237,8 @@ class Repairer { FileMetaData meta; meta.fd = FileDescriptor(next_file_number_++, 0, 0); ReadOptions ro; - Iterator* iter = mem->NewIterator(ro, true /* enforce_total_order */); + ro.total_order_seek = true; + Iterator* iter = mem->NewIterator(ro); status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_, iter, &meta, icmp_, 0, 0, kNoCompression); delete iter; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 6146e0081a4e3e4ea7c3144802aa1a1833f1c732..13a0a08a1223cf2199c5e1dcd2830cb9f925c7f1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -902,18 +902,25 @@ struct ReadOptions { // Not supported in ROCKSDB_LITE mode! bool tailing; + // Enable a total order seek regardless of index format (e.g. hash index) + // used in the table. Some table format (e.g. plain table) may not support + // this option. + bool total_order_seek; + ReadOptions() : verify_checksums(true), fill_cache(true), snapshot(nullptr), read_tier(kReadAllTier), - tailing(false) {} + tailing(false), + total_order_seek(false) {} ReadOptions(bool cksum, bool cache) : verify_checksums(cksum), fill_cache(cache), snapshot(nullptr), read_tier(kReadAllTier), - tailing(false) {} + tailing(false), + total_order_seek(false) {} }; // Options that control write operations diff --git a/table/block.cc b/table/block.cc index 24e7b72fa110b5f317c4024f7998814d09ea5748..0db23a1bd8022a6a74936944dfdab5c33ec11991 100644 --- a/table/block.cc +++ b/table/block.cc @@ -321,7 +321,8 @@ Block::~Block() { } } -Iterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter) { +Iterator* Block::NewIterator( + const Comparator* cmp, BlockIter* iter, bool total_order_seek) { if (size_ < 2*sizeof(uint32_t)) { if (iter != nullptr) { iter->SetStatus(Status::Corruption("bad block contents")); @@ -339,12 +340,17 @@ Iterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter) { return NewEmptyIterator(); } } else { + BlockHashIndex* hash_index_ptr = + total_order_seek ? nullptr : hash_index_.get(); + BlockPrefixIndex* prefix_index_ptr = + total_order_seek ? nullptr : prefix_index_.get(); + if (iter != nullptr) { iter->Initialize(cmp, data_, restart_offset_, num_restarts, - hash_index_.get(), prefix_index_.get()); + hash_index_ptr, prefix_index_ptr); } else { iter = new BlockIter(cmp, data_, restart_offset_, num_restarts, - hash_index_.get(), prefix_index_.get()); + hash_index_ptr, prefix_index_ptr); } } diff --git a/table/block.h b/table/block.h index 494ed24bd73aaf0786f8f951a7d0e2012df4208e..49bcf12cf3010a74d47e9dfc243ef412fe4bb6ff 100644 --- a/table/block.h +++ b/table/block.h @@ -45,8 +45,12 @@ class Block { // // If iter is null, return new Iterator // If iter is not null, update this one and return it as Iterator* + // + // If total_order_seek is true, hash_index_ and prefix_index_ are ignored. + // This option only applies for index block. For data block, hash_index_ + // and prefix_index_ are null, so this option does not matter. Iterator* NewIterator(const Comparator* comparator, - BlockIter* iter = nullptr); + BlockIter* iter = nullptr, bool total_order_seek = true); void SetBlockHashIndex(BlockHashIndex* hash_index); void SetBlockPrefixIndex(BlockPrefixIndex* prefix_index); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 2e5c21018edc63ed0ed1c81ee82646b287c7614c..3c0ef527e2859cc6ab61d2ceb06c7676702acc4c 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -137,7 +137,8 @@ class BlockBasedTable::IndexReader { // Create an iterator for index access. // An iter is passed in, if it is not null, update this one and return it // If it is null, create a new Iterator - virtual Iterator* NewIterator(BlockIter* iter = nullptr) = 0; + virtual Iterator* NewIterator( + BlockIter* iter = nullptr, bool total_order_seek = true) = 0; // The size of the index. virtual size_t size() const = 0; @@ -174,8 +175,9 @@ class BinarySearchIndexReader : public IndexReader { return s; } - virtual Iterator* NewIterator(BlockIter* iter = nullptr) override { - return index_block_->NewIterator(comparator_, iter); + virtual Iterator* NewIterator( + BlockIter* iter = nullptr, bool dont_care = true) override { + return index_block_->NewIterator(comparator_, iter, true); } virtual size_t size() const override { return index_block_->size(); } @@ -295,8 +297,9 @@ class HashIndexReader : public IndexReader { return Status::OK(); } - virtual Iterator* NewIterator(BlockIter* iter = nullptr) override { - return index_block_->NewIterator(comparator_, iter); + virtual Iterator* NewIterator( + BlockIter* iter = nullptr, bool total_order_seek = true) override { + return index_block_->NewIterator(comparator_, iter, total_order_seek); } virtual size_t size() const override { return index_block_->size(); } @@ -818,7 +821,8 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options, BlockIter* input_iter) { // index reader has already been pre-populated. if (rep_->index_reader) { - return rep_->index_reader->NewIterator(input_iter); + return rep_->index_reader->NewIterator( + input_iter, read_options.total_order_seek); } bool no_io = read_options.read_tier == kBlockCacheTier; @@ -866,10 +870,9 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options, } assert(cache_handle); - Iterator* iter; - iter = index_reader->NewIterator(input_iter); + auto* iter = index_reader->NewIterator( + input_iter, read_options.total_order_seek); iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); - return iter; } @@ -988,6 +991,9 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState { } bool PrefixMayMatch(const Slice& internal_key) override { + if (read_options_.total_order_seek) { + return true; + } return table_->PrefixMayMatch(internal_key); } diff --git a/table/block_test.cc b/table/block_test.cc index 93652c29f1b70e7ed0bfa339cc00530402f24a0d..da01d6def1ae56563e053f99c0d0022e3f28721a 100644 --- a/table/block_test.cc +++ b/table/block_test.cc @@ -172,7 +172,7 @@ void CheckBlockContents(BlockContents contents, const int max_key, } std::unique_ptr hash_iter( - reader1.NewIterator(BytewiseComparator())); + reader1.NewIterator(BytewiseComparator(), nullptr, false)); std::unique_ptr regular_iter( reader2.NewIterator(BytewiseComparator())); diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index b4b86af39cbb8c69a7e2669e8521c809434fefc2..0da3011926dc3ccf69a737947fbd207673696d09 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -271,10 +271,17 @@ Slice CuckooTableIterator::value() const { return curr_value_; } -Iterator* CuckooTableReader::NewIterator(const ReadOptions&, Arena* arena) { +extern Iterator* NewErrorIterator(const Status& status, Arena* arena); + +Iterator* CuckooTableReader::NewIterator( + const ReadOptions& read_options, Arena* arena) { if (!status().ok()) { return NewErrorIterator( - Status::Corruption("CuckooTableReader status is not okay.")); + Status::Corruption("CuckooTableReader status is not okay."), arena); + } + if (read_options.total_order_seek) { + return NewErrorIterator( + Status::InvalidArgument("total_order_seek is not supported."), arena); } CuckooTableIterator* iter; if (arena == nullptr) { diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 8728eb1d39fa2bdb3d388c87b762f587a581c496..b5eccd310c5d36a1bf3830366ecaeb8c98c5241e 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -187,6 +187,10 @@ void PlainTableReader::SetupForCompaction() { Iterator* PlainTableReader::NewIterator(const ReadOptions& options, Arena* arena) { + if (options.total_order_seek && !IsTotalOrderMode()) { + return NewErrorIterator( + Status::InvalidArgument("total_order_seek not supported"), arena); + } if (arena == nullptr) { return new PlainTableIterator(this, prefix_extractor_ != nullptr); } else { diff --git a/table/table_test.cc b/table/table_test.cc index c8b3ffa2a2c3d8f9e7f16b9f0c5fff43645faed6..929cdf832a8ea5cc80b283fa73780109cb8ddbf6 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -382,7 +382,7 @@ class TableConstructor: public Constructor { sink_->contents().size(), &table_reader_); } - virtual TableReader* table_reader() { + virtual TableReader* GetTableReader() { return table_reader_.get(); } @@ -1042,7 +1042,7 @@ TEST(BlockBasedTableTest, BasicBlockBasedTableProperties) { c.Finish(options, table_options, GetPlainInternalComparator(options.comparator), &keys, &kvmap); - auto& props = *c.table_reader()->GetTableProperties(); + auto& props = *c.GetTableReader()->GetTableProperties(); ASSERT_EQ(kvmap.size(), props.num_entries); auto raw_key_size = kvmap.size() * 2ul; @@ -1074,10 +1074,93 @@ TEST(BlockBasedTableTest, FilterPolicyNameProperties) { c.Finish(options, table_options, GetPlainInternalComparator(options.comparator), &keys, &kvmap); - auto& props = *c.table_reader()->GetTableProperties(); + auto& props = *c.GetTableReader()->GetTableProperties(); ASSERT_EQ("rocksdb.BuiltinBloomFilter", props.filter_policy_name); } +TEST(BlockBasedTableTest, TotalOrderSeekOnHashIndex) { + BlockBasedTableOptions table_options; + for (int i = 0; i < 4; ++i) { + Options options; + // Make each key/value an individual block + table_options.block_size = 64; + switch (i) { + case 0: + // Binary search index + table_options.index_type = BlockBasedTableOptions::kBinarySearch; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + break; + case 1: + // Hash search index + table_options.index_type = BlockBasedTableOptions::kHashSearch; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + options.prefix_extractor.reset(NewFixedPrefixTransform(4)); + break; + case 2: + // Hash search index with hash_index_allow_collision + table_options.index_type = BlockBasedTableOptions::kHashSearch; + table_options.hash_index_allow_collision = true; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + options.prefix_extractor.reset(NewFixedPrefixTransform(4)); + break; + case 3: + default: + // Hash search index with filter policy + table_options.index_type = BlockBasedTableOptions::kHashSearch; + table_options.filter_policy.reset(NewBloomFilterPolicy(10)); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + options.prefix_extractor.reset(NewFixedPrefixTransform(4)); + break; + } + + TableConstructor c(BytewiseComparator(), true); + c.Add("aaaa1", std::string('a', 56)); + c.Add("bbaa1", std::string('a', 56)); + c.Add("cccc1", std::string('a', 56)); + c.Add("bbbb1", std::string('a', 56)); + c.Add("baaa1", std::string('a', 56)); + c.Add("abbb1", std::string('a', 56)); + c.Add("cccc2", std::string('a', 56)); + std::vector keys; + KVMap kvmap; + c.Finish(options, table_options, + GetPlainInternalComparator(options.comparator), &keys, &kvmap); + auto props = c.GetTableReader()->GetTableProperties(); + ASSERT_EQ(7u, props->num_data_blocks); + auto* reader = c.GetTableReader(); + ReadOptions ro; + ro.total_order_seek = true; + std::unique_ptr iter(reader->NewIterator(ro)); + + iter->Seek(InternalKey("b", 0, kTypeValue).Encode()); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("baaa1", ExtractUserKey(iter->key()).ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bbaa1", ExtractUserKey(iter->key()).ToString()); + + iter->Seek(InternalKey("bb", 0, kTypeValue).Encode()); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bbaa1", ExtractUserKey(iter->key()).ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bbbb1", ExtractUserKey(iter->key()).ToString()); + + iter->Seek(InternalKey("bbb", 0, kTypeValue).Encode()); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bbbb1", ExtractUserKey(iter->key()).ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("cccc1", ExtractUserKey(iter->key()).ToString()); + } +} + static std::string RandomString(Random* rnd, int len) { std::string r; test::RandomString(rnd, len, &r); @@ -1125,9 +1208,9 @@ TEST(TableTest, HashIndexTest) { std::unique_ptr comparator( new InternalKeyComparator(BytewiseComparator())); c.Finish(options, table_options, *comparator, &keys, &kvmap); - auto reader = c.table_reader(); + auto reader = c.GetTableReader(); - auto props = c.table_reader()->GetTableProperties(); + auto props = reader->GetTableProperties(); ASSERT_EQ(5u, props->num_data_blocks); std::unique_ptr hash_iter(reader->NewIterator(ReadOptions())); @@ -1234,7 +1317,7 @@ TEST(BlockBasedTableTest, IndexSizeStat) { c.Finish(options, table_options, GetPlainInternalComparator(options.comparator), &ks, &kvmap); - auto index_size = c.table_reader()->GetTableProperties()->index_size; + auto index_size = c.GetTableReader()->GetTableProperties()->index_size; ASSERT_GT(index_size, last_index_size); last_index_size = index_size; } @@ -1261,7 +1344,7 @@ TEST(BlockBasedTableTest, NumBlockStat) { c.Finish(options, table_options, GetPlainInternalComparator(options.comparator), &ks, &kvmap); ASSERT_EQ(kvmap.size(), - c.table_reader()->GetTableProperties()->num_data_blocks); + c.GetTableReader()->GetTableProperties()->num_data_blocks); } // A simple tool that takes the snapshot of block cache statistics. @@ -1338,7 +1421,7 @@ TEST(BlockBasedTableTest, BlockCacheDisabledTest) { GetPlainInternalComparator(options.comparator), &keys, &kvmap); // preloading filter/index blocks is enabled. - auto reader = dynamic_cast(c.table_reader()); + auto reader = dynamic_cast(c.GetTableReader()); ASSERT_TRUE(reader->TEST_filter_block_preloaded()); ASSERT_TRUE(reader->TEST_index_reader_preloaded()); @@ -1379,7 +1462,7 @@ TEST(BlockBasedTableTest, FilterBlockInBlockCache) { c.Finish(options, table_options, GetPlainInternalComparator(options.comparator), &keys, &kvmap); // preloading filter/index blocks is prohibited. - auto reader = dynamic_cast(c.table_reader()); + auto reader = dynamic_cast(c.GetTableReader()); ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); ASSERT_TRUE(!reader->TEST_index_reader_preloaded()); @@ -1513,7 +1596,7 @@ TEST(BlockBasedTableTest, BlockCacheLeak) { ASSERT_OK(iter->status()); ASSERT_OK(c.Reopen(opt)); - auto table_reader = dynamic_cast(c.table_reader()); + auto table_reader = dynamic_cast(c.GetTableReader()); for (const std::string& key : keys) { ASSERT_TRUE(table_reader->TEST_KeyInCache(ReadOptions(), key)); } @@ -1522,7 +1605,7 @@ TEST(BlockBasedTableTest, BlockCacheLeak) { table_options.block_cache = NewLRUCache(16 * 1024 * 1024); opt.table_factory.reset(NewBlockBasedTableFactory(table_options)); ASSERT_OK(c.Reopen(opt)); - table_reader = dynamic_cast(c.table_reader()); + table_reader = dynamic_cast(c.GetTableReader()); for (const std::string& key : keys) { ASSERT_TRUE(!table_reader->TEST_KeyInCache(ReadOptions(), key)); }