From 74939a9e1365068012dcf03921952ce45e76e99b Mon Sep 17 00:00:00 2001 From: kailiu Date: Fri, 28 Feb 2014 18:19:07 -0800 Subject: [PATCH] Make the block-based table's index pluggable Summary: This patch introduced a new table options that allows us to make block-based table's index pluggable. To support that new features: * Code has been refacotred to be more flexible and supports this option well. * More documentation is added for the existing obsecure functionalities. * Big surgeon on DataBlockReader(), where the logic was really convoluted. * Other small code cleanups. The pluggablility will mostly affect development of internal modules and won't change frequently, as a result I intentionally avoid heavy-weight patterns (like factory) and try to make it simple. Test Plan: make all check Reviewers: haobo, sdong Reviewed By: sdong CC: leveldb Differential Revision: https://reviews.facebook.net/D16395 --- include/rocksdb/table.h | 15 + table/block_based_table_builder.cc | 336 ++++++++---- table/block_based_table_builder.h | 16 +- table/block_based_table_factory.cc | 14 +- table/block_based_table_reader.cc | 832 +++++++++++++++-------------- table/block_based_table_reader.h | 100 ++-- table/table_test.cc | 14 +- 7 files changed, 778 insertions(+), 549 deletions(-) diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 5c04257ff..e350c7780 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -54,6 +54,21 @@ struct BlockBasedTableOptions { // If not specified, each "table reader" object will pre-load index/filter // block during table initialization. bool cache_index_and_filter_blocks = false; + + // The index type that will be used for this table. + enum IndexType : char { + // A space efficient index block that is optimized for + // binary-search-based index. + kBinarySearch, + }; + + IndexType index_type = kBinarySearch; +}; + +// Table Properties that are specific to block-based table properties. +struct BlockBasedTablePropertyNames { + // value of this propertis is a fixed int32 number. + static const std::string kIndexType; }; // Create default block based table factory. diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 9890ef33b..de2466605 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -11,23 +11,29 @@ #include #include -#include #include -#include "rocksdb/flush_block_policy.h" +#include +#include + +#include "db/dbformat.h" + #include "rocksdb/cache.h" #include "rocksdb/comparator.h" -#include "table/table_builder.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" +#include "rocksdb/flush_block_policy.h" #include "rocksdb/options.h" -#include "db/dbformat.h" -#include "table/block_based_table_reader.h" +#include "rocksdb/table.h" + #include "table/block.h" +#include "table/block_based_table_reader.h" #include "table/block_builder.h" #include "table/filter_block.h" #include "table/format.h" #include "table/meta_blocks.h" +#include "table/table_builder.h" + #include "util/coding.h" #include "util/crc32c.h" #include "util/stop_watch.h" @@ -36,11 +42,167 @@ namespace rocksdb { namespace { -static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { +typedef BlockBasedTableOptions::IndexType IndexType; + +// The interface for building index. +// Instruction for adding a new concrete IndexBuilder: +// 1. Create a subclass instantiated from IndexBuilder. +// 2. Add a new entry associated with that subclass in TableOptions::IndexType. +// 3. Add a create function for the new subclass in CreateIndexBuilder. +// Note: we can devise more advanced design to simplify the process for adding +// new subclass, which will, on the other hand, increase the code complexity and +// catch unwanted attention from readers. Given that we won't add/change +// indexes frequently, it makes sense to just embrace a more straightforward +// design that just works. +class IndexBuilder { + public: + explicit IndexBuilder(const Comparator* comparator) + : comparator_(comparator) {} + + virtual ~IndexBuilder() {} + + // Add a new index entry to index block. + // To allow further optimization, we provide `last_key_in_current_block` and + // `first_key_in_next_block`, based on which the specific implementation can + // determine the best index key to be used for the index block. + // @last_key_in_current_block: this parameter maybe overridden with the value + // "substitute key". + // @first_key_in_next_block: it will be nullptr if the entry being added is + // the last one in the table + // + // REQUIRES: Finish() has not yet been called. + virtual void AddEntry(std::string* last_key_in_current_block, + const Slice* first_key_in_next_block, + const BlockHandle& block_handle) = 0; + + // Inform the index builder that all entries has been written. Block builder + // may therefore perform any operation required for block finalization. + // + // REQUIRES: Finish() has not yet been called. + virtual Slice Finish() = 0; + + // Get the estimated size for index block. + virtual size_t EstimatedSize() const = 0; + + protected: + const Comparator* comparator_; +}; + +// This index builder builds space-efficient index block for binary-search-based +// index. +// +// Optimizations: +// 1. Made block's `block_restart_interval` to be 1, which will avoid linear +// search when doing index lookup. +// 2. Shorten the key length for index block. Other than honestly using the +// last key in the data block as the index key, we instead find a shortest +// substitute key that serves the same function. +class BinarySearchIndexBuilder : public IndexBuilder { + public: + explicit BinarySearchIndexBuilder(const Comparator* comparator) + : IndexBuilder(comparator), + index_block_builder_(1 /* block_restart_interval == 1 */, comparator) {} + + virtual void AddEntry(std::string* last_key_in_current_block, + const Slice* first_key_in_next_block, + const BlockHandle& block_handle) override { + if (first_key_in_next_block != nullptr) { + comparator_->FindShortestSeparator(last_key_in_current_block, + *first_key_in_next_block); + } else { + comparator_->FindShortSuccessor(last_key_in_current_block); + } + + std::string handle_encoding; + block_handle.EncodeTo(&handle_encoding); + index_block_builder_.Add(*last_key_in_current_block, handle_encoding); + } + + virtual Slice Finish() override { return index_block_builder_.Finish(); } + + virtual size_t EstimatedSize() const { + return index_block_builder_.CurrentSizeEstimate(); + } + + private: + BlockBuilder index_block_builder_; +}; + +// Create a index builder based on its type. +IndexBuilder* CreateIndexBuilder(IndexType type, const Comparator* comparator) { + switch (type) { + case BlockBasedTableOptions::kBinarySearch: { + return new BinarySearchIndexBuilder(comparator); + } + default: { + assert(!"Do not recognize the index type "); + return nullptr; + } + } + // impossible. + assert(false); + return nullptr; +} + +bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { // Check to see if compressed less than 12.5% return compressed_size < raw_size - (raw_size / 8u); } +Slice CompressBlock(const Slice& raw, + const CompressionOptions& compression_options, + CompressionType* type, std::string* compressed_output) { + if (*type == kNoCompression) { + return raw; + } + + // Will return compressed block contents if (1) the compression method is + // supported in this platform and (2) the compression rate is "good enough". + switch (*type) { + case kSnappyCompression: + if (port::Snappy_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + case kZlibCompression: + if (port::Zlib_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + case kBZip2Compression: + if (port::BZip2_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + case kLZ4Compression: + if (port::LZ4_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + case kLZ4HCCompression: + if (port::LZ4HC_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; // fall back to no compression. + default: {} // Do not recognize this compression type + } + + // Compression method is not supported, or not good compression ratio, so just + // fall back to uncompressed form. + *type = kNoCompression; + return raw; +} + } // anonymous namespace // kBlockBasedTableMagicNumber was picked by running @@ -51,6 +213,46 @@ static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { extern const uint64_t kBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull; +// A collector that collects properties of interest to block-based table. +// For now this class looks heavy-weight since we only write one additional +// property. +// But in the forseeable future, we will add more and more properties that are +// specific to block-based table. +class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector + : public TablePropertiesCollector { + public: + BlockBasedTablePropertiesCollector( + BlockBasedTableOptions::IndexType index_type) + : index_type_(index_type) {} + + virtual Status Add(const Slice& key, const Slice& value) { + // Intentionally left blank. Have no interest in collecting stats for + // individual key/value pairs. + return Status::OK(); + } + + virtual Status Finish(UserCollectedProperties* properties) { + std::string val; + PutFixed32(&val, static_cast(index_type_)); + properties->insert({BlockBasedTablePropertyNames::kIndexType, val}); + + return Status::OK(); + } + + // The name of the properties collector can be used for debugging purpose. + virtual const char* Name() const { + return "BlockBasedTablePropertiesCollector"; + } + + virtual UserCollectedProperties GetReadableProperties() const { + // Intentionally left blank. + return UserCollectedProperties(); + } + + private: + BlockBasedTableOptions::IndexType index_type_; +}; + struct BlockBasedTableBuilder::Rep { Options options; const InternalKeyComparator& internal_comparator; @@ -58,7 +260,8 @@ struct BlockBasedTableBuilder::Rep { uint64_t offset = 0; Status status; BlockBuilder data_block; - BlockBuilder index_block; + std::unique_ptr index_builder; + std::string last_key; CompressionType compression_type; TableProperties props; @@ -75,28 +278,31 @@ struct BlockBasedTableBuilder::Rep { Rep(const Options& opt, const InternalKeyComparator& icomparator, WritableFile* f, FlushBlockPolicyFactory* flush_block_policy_factory, - CompressionType compression_type) + CompressionType compression_type, IndexType index_block_type) : options(opt), internal_comparator(icomparator), file(f), data_block(options, &internal_comparator), - // To avoid linear scan, we make the block_restart_interval to be `1` - // in index block builder - index_block(1 /* block_restart_interval */, &internal_comparator), + index_builder( + CreateIndexBuilder(index_block_type, &internal_comparator)), compression_type(compression_type), filter_block(opt.filter_policy == nullptr ? nullptr : new FilterBlockBuilder(opt, &internal_comparator)), flush_block_policy(flush_block_policy_factory->NewFlushBlockPolicy( - options, data_block)) {} + options, data_block)) { + options.table_properties_collectors.push_back( + std::make_shared(index_block_type)); + } }; BlockBasedTableBuilder::BlockBasedTableBuilder( - const Options& options, const InternalKeyComparator& internal_comparator, - WritableFile* file, FlushBlockPolicyFactory* flush_block_policy_factory, + const Options& options, const BlockBasedTableOptions& table_options, + const InternalKeyComparator& internal_comparator, WritableFile* file, CompressionType compression_type) : rep_(new Rep(options, internal_comparator, file, - flush_block_policy_factory, compression_type)) { + table_options.flush_block_policy_factory.get(), + compression_type, table_options.index_type)) { if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } @@ -136,10 +342,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { // entries in the first block and < all entries in subsequent // blocks. if (ok()) { - r->internal_comparator.FindShortestSeparator(&r->last_key, key); - std::string handle_encoding; - r->pending_handle.EncodeTo(&handle_encoding); - r->index_block.Add(r->last_key, Slice(handle_encoding)); + r->index_builder->AddEntry(&r->last_key, &key, r->pending_handle); } } @@ -179,88 +382,25 @@ void BlockBasedTableBuilder::Flush() { void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { + WriteBlock(block->Finish(), handle); + block->Reset(); +} + +void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, + BlockHandle* handle) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 // crc: uint32 assert(ok()); Rep* r = rep_; - Slice raw = block->Finish(); - Slice block_contents; - std::string* compressed = &r->compressed_output; - CompressionType type = r->compression_type; - switch (type) { - case kNoCompression: - block_contents = raw; - break; - - case kSnappyCompression: { - std::string* compressed = &r->compressed_output; - if (port::Snappy_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // Snappy not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - } - case kZlibCompression: - if (port::Zlib_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // Zlib not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - case kBZip2Compression: - if (port::BZip2_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // BZip not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - case kLZ4Compression: - if (port::LZ4_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // LZ4 not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - case kLZ4HCCompression: - if (port::LZ4HC_Compress(r->options.compression_opts, raw.data(), - raw.size(), compressed) && - GoodCompressionRatio(compressed->size(), raw.size())) { - block_contents = *compressed; - } else { - // LZ4 not supported, or not good compression ratio, so just - // store uncompressed form - block_contents = raw; - type = kNoCompression; - } - break; - } + auto type = r->compression_type; + auto block_contents = + CompressBlock(raw_block_contents, r->options.compression_opts, &type, + &r->compressed_output); WriteRawBlock(block_contents, type, handle); r->compressed_output.clear(); - block->Reset(); } void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, @@ -364,11 +504,8 @@ Status BlockBasedTableBuilder::Finish() { // block, we will finish writing all index entries here and flush them // to storage after metaindex block is written. if (ok() && !empty_data_block) { - r->internal_comparator.FindShortSuccessor(&r->last_key); - - std::string handle_encoding; - r->pending_handle.EncodeTo(&handle_encoding); - r->index_block.Add(r->last_key, handle_encoding); + r->index_builder->AddEntry(&r->last_key, nullptr /* no next data block */, + r->pending_handle); } // Write meta blocks and metaindex block with the following order. @@ -394,11 +531,12 @@ Status BlockBasedTableBuilder::Finish() { r->props.filter_policy_name = r->options.filter_policy != nullptr ? r->options.filter_policy->Name() : ""; r->props.index_size = - r->index_block.CurrentSizeEstimate() + kBlockTrailerSize; + r->index_builder->EstimatedSize() + kBlockTrailerSize; // Add basic properties property_block_builder.AddTableProperty(r->props); + // Add use collected properties NotifyCollectTableCollectorsOnFinish( r->options.table_properties_collectors, r->options.info_log.get(), @@ -425,7 +563,7 @@ Status BlockBasedTableBuilder::Finish() { // Write index block if (ok()) { - WriteBlock(&r->index_block, &index_block_handle); + WriteBlock(r->index_builder->Finish(), &index_block_handle); } // Write footer diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 1c4be1f83..5871427c6 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -9,6 +9,7 @@ #pragma once #include + #include "rocksdb/flush_block_policy.h" #include "rocksdb/options.h" #include "rocksdb/status.h" @@ -19,6 +20,7 @@ namespace rocksdb { class BlockBuilder; class BlockHandle; class WritableFile; +struct BlockBasedTableOptions; class BlockBasedTableBuilder : public TableBuilder { public: @@ -26,10 +28,9 @@ class BlockBasedTableBuilder : public TableBuilder { // building in *file. Does not close the file. It is up to the // caller to close the file after calling Finish(). BlockBasedTableBuilder(const Options& options, + const BlockBasedTableOptions& table_options, const InternalKeyComparator& internal_comparator, - WritableFile* file, - FlushBlockPolicyFactory* flush_block_policy_factory, - CompressionType compression_type); + WritableFile* file, CompressionType compression_type); // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); @@ -63,11 +64,17 @@ class BlockBasedTableBuilder : public TableBuilder { private: bool ok() const { return status().ok(); } + // Call block's Finish() method and then write the finalize block contents to + // file. void WriteBlock(BlockBuilder* block, BlockHandle* handle); + // Directly write block content to the file. + void WriteBlock(const Slice& block_contents, BlockHandle* handle); void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle); Status InsertBlockInCache(const Slice& block_contents, - const CompressionType type, const BlockHandle* handle); + const CompressionType type, + const BlockHandle* handle); struct Rep; + class BlockBasedTablePropertiesCollector; Rep* rep_; // Advanced operation: flush any buffered key/value pairs to file. @@ -82,4 +89,3 @@ class BlockBasedTableBuilder : public TableBuilder { }; } // namespace rocksdb - diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 6e06c4dbb..822adee22 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -11,7 +11,10 @@ #include "table/block_based_table_factory.h" #include +#include #include + +#include "rocksdb/flush_block_policy.h" #include "table/block_based_table_builder.h" #include "table/block_based_table_reader.h" #include "port/port.h" @@ -40,12 +43,8 @@ Status BlockBasedTableFactory::NewTableReader( TableBuilder* BlockBasedTableFactory::NewTableBuilder( const Options& options, const InternalKeyComparator& internal_comparator, WritableFile* file, CompressionType compression_type) const { - auto flush_block_policy_factory = - table_options_.flush_block_policy_factory.get(); - - auto table_builder = - new BlockBasedTableBuilder(options, internal_comparator, file, - flush_block_policy_factory, compression_type); + auto table_builder = new BlockBasedTableBuilder( + options, table_options_, internal_comparator, file, compression_type); return table_builder; } @@ -55,4 +54,7 @@ TableFactory* NewBlockBasedTableFactory( return new BlockBasedTableFactory(table_options); } +const std::string BlockBasedTablePropertyNames::kIndexType = + "rocksdb.block.based.table.index.type"; + } // namespace rocksdb diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 52254c8c0..dd8d739f7 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -9,11 +9,16 @@ #include "table/block_based_table_reader.h" +#include +#include + #include "db/dbformat.h" +#include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" +#include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "rocksdb/statistics.h" #include "rocksdb/table.h" @@ -31,21 +36,178 @@ namespace rocksdb { extern uint64_t kBlockBasedTableMagicNumber; +using std::unique_ptr; + +typedef BlockBasedTable::IndexReader IndexReader; + +namespace { // The longest the prefix of the cache key used to identify blocks can be. // We are using the fact that we know for Posix files the unique ID is three // varints. const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; -using std::unique_ptr; + +// Read the block identified by "handle" from "file". +// The only relevant option is options.verify_checksums for now. +// Set *didIO to true if didIO is not null. +// On failure return non-OK. +// On success fill *result and return OK - caller owns *result +Status ReadBlockFromFile(RandomAccessFile* file, const ReadOptions& options, + const BlockHandle& handle, Block** result, Env* env, + bool* didIO = nullptr, bool do_uncompress = true) { + BlockContents contents; + Status s = + ReadBlockContents(file, options, handle, &contents, env, do_uncompress); + if (s.ok()) { + *result = new Block(contents); + } + + if (didIO != nullptr) { + *didIO = true; + } + return s; +} + +// Delete the resource that is held by the iterator. +template +void DeleteHeldResource(void* arg, void* ignored) { + delete reinterpret_cast(arg); +} + +// Delete the entry resided in the cache. +template +void DeleteCachedEntry(const Slice& key, void* value) { + auto entry = reinterpret_cast(value); + delete entry; +} + +// Release the cached entry and decrement its ref count. +void ReleaseCachedEntry(void* arg, void* h) { + Cache* cache = reinterpret_cast(arg); + Cache::Handle* handle = reinterpret_cast(h); + cache->Release(handle); +} + +Slice GetCacheKey(const char* cache_key_prefix, size_t cache_key_prefix_size, + const BlockHandle& handle, char* cache_key) { + assert(cache_key != nullptr); + assert(cache_key_prefix_size != 0); + assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); + char* end = + EncodeVarint64(cache_key + cache_key_prefix_size, handle.offset()); + return Slice(cache_key, static_cast(end - cache_key)); +} + +Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, + Tickers block_cache_miss_ticker, + Tickers block_cache_hit_ticker, + Statistics* statistics) { + auto cache_handle = block_cache->Lookup(key); + if (cache_handle != nullptr) { + BumpPerfCount(&perf_context.block_cache_hit_count); + // overall cache hit + RecordTick(statistics, BLOCK_CACHE_HIT); + // block-type specific cache hit + RecordTick(statistics, block_cache_hit_ticker); + } else { + // overall cache miss + RecordTick(statistics, BLOCK_CACHE_MISS); + // block-type specific cache miss + RecordTick(statistics, block_cache_miss_ticker); + } + + return cache_handle; +} + +} // namespace + +// -- IndexReader and its subclasses +// IndexReader is the interface that provide the functionality for index access. +class BlockBasedTable::IndexReader { + public: + explicit IndexReader(const Comparator* comparator) + : comparator_(comparator) {} + + virtual ~IndexReader() {} + + // Create an iterator for index access. + virtual Iterator* NewIterator() = 0; + + // The size of the index. + virtual size_t size() const = 0; + + protected: + const Comparator* comparator_; +}; + +// Index that allows binary search lookup for the first key of each block. +// This class can be viewed as a thin wrapper for `Block` class which already +// supports binary search. +class BinarySearchIndexReader : public IndexReader { + public: + // Read index from the file and create an intance for + // `BinarySearchIndexReader`. + // The return value is a pair, where + // * first element is the status indicating if the operation succeeded. + // * second element is the index reader to be created. On failure, this + // element will be nullptr + static std::pair Create(RandomAccessFile* file, + const BlockHandle& index_handle, + Env* env, + const Comparator* comparator) { + Block* index_block = nullptr; + auto s = + ReadBlockFromFile(file, ReadOptions(), index_handle, &index_block, env); + + if (!s.ok()) { + // Logically, index_block shouldn't have been populated if any error + // occurred. + assert(index_block == nullptr); + return {s, nullptr}; + } + + return {s, new BinarySearchIndexReader(comparator, index_block)}; + } + + virtual Iterator* NewIterator() override { + return index_block_->NewIterator(comparator_); + } + + virtual size_t size() const override { return index_block_->size(); } + + private: + BinarySearchIndexReader(const Comparator* comparator, Block* index_block) + : IndexReader(comparator), index_block_(index_block) { + assert(index_block_ != nullptr); + } + Block* index_block_; +}; + +// TODO(kailiu) This class is only a stub for now. And the comment below is also +// not completed. +// Index that leverages an internal hash table to quicken the lookup for a given +// key. +class HashIndexReader : public IndexReader { + public: + static std::pair Create( + RandomAccessFile* file, const BlockHandle& index_handle, Env* env, + const Comparator* comparator, BlockBasedTable* table, + const SliceTransform* prefix_extractor) { + return {Status::NotSupported("not implemented yet!"), + nullptr}; // not finished + } +}; + struct BlockBasedTable::Rep { Rep(const EnvOptions& storage_options, const InternalKeyComparator& internal_comparator) - : soptions(storage_options), internal_comparator_(internal_comparator) {} + : soptions(storage_options), internal_comparator(internal_comparator) {} Options options; const EnvOptions& soptions; - const InternalKeyComparator& internal_comparator_; + const InternalKeyComparator& internal_comparator; Status status; unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; @@ -57,12 +219,14 @@ struct BlockBasedTable::Rep { BlockHandle metaindex_handle; // Handle to index: saved from footer BlockHandle index_handle; - // index_block will be populated and used only when options.block_cache is - // NULL; otherwise we will get the index block via the block cache. - unique_ptr index_block; + // index_reader and filter will be populated and used only when + // options.block_cache is nullptr; otherwise we will get the index block via + // the block cache. + unique_ptr index_reader; unique_ptr filter; std::shared_ptr table_properties; + BlockBasedTableOptions::IndexType index_type; }; BlockBasedTable::~BlockBasedTable() { @@ -138,92 +302,6 @@ void BlockBasedTable::GenerateCachePrefix(Cache* cc, } } -namespace { // anonymous namespace, not visible externally - -// Read the block identified by "handle" from "file". -// The only relevant option is options.verify_checksums for now. -// Set *didIO to true if didIO is not null. -// On failure return non-OK. -// On success fill *result and return OK - caller owns *result -Status ReadBlockFromFile( - RandomAccessFile* file, - const ReadOptions& options, - const BlockHandle& handle, - Block** result, - Env* env, - bool* didIO = nullptr, - bool do_uncompress = true) { - BlockContents contents; - Status s = ReadBlockContents(file, options, handle, &contents, - env, do_uncompress); - if (s.ok()) { - *result = new Block(contents); - } - - if (didIO) { - *didIO = true; - } - return s; -} - -void DeleteBlock(void* arg, void* ignored) { - delete reinterpret_cast(arg); -} - -void DeleteCachedBlock(const Slice& key, void* value) { - Block* block = reinterpret_cast(value); - delete block; -} - -void DeleteCachedFilter(const Slice& key, void* value) { - auto filter = reinterpret_cast(value); - delete filter; -} - -void ReleaseBlock(void* arg, void* h) { - Cache* cache = reinterpret_cast(arg); - Cache::Handle* handle = reinterpret_cast(h); - cache->Release(handle); -} - -Slice GetCacheKey(const char* cache_key_prefix, - size_t cache_key_prefix_size, - const BlockHandle& handle, - char* cache_key) { - assert(cache_key != nullptr); - assert(cache_key_prefix_size != 0); - assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); - memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + cache_key_prefix_size, - handle.offset()); - return Slice(cache_key, static_cast(end - cache_key)); -} - -Cache::Handle* GetFromBlockCache( - Cache* block_cache, - const Slice& key, - Tickers block_cache_miss_ticker, - Tickers block_cache_hit_ticker, - Statistics* statistics) { - auto cache_handle = block_cache->Lookup(key); - if (cache_handle != nullptr) { - BumpPerfCount(&perf_context.block_cache_hit_count); - // overall cache hit - RecordTick(statistics, BLOCK_CACHE_HIT); - // block-type specific cache hit - RecordTick(statistics, block_cache_hit_ticker); - } else { - // overall cache miss - RecordTick(statistics, BLOCK_CACHE_MISS); - // block-type specific cache miss - RecordTick(statistics, block_cache_miss_ticker); - } - - return cache_handle; -} - -} // end of anonymous namespace - Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, const BlockBasedTableOptions& table_options, const InternalKeyComparator& internal_comparator, @@ -243,6 +321,7 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, rep->file = std::move(file); rep->metaindex_handle = footer.metaindex_handle(); rep->index_handle = footer.index_handle(); + rep->index_type = table_options.index_type; SetupCacheKeyPrefix(rep); unique_ptr new_table(new BlockBasedTable(rep)); @@ -273,12 +352,12 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, // Will use block cache for index/filter blocks access? if (options.block_cache && table_options.cache_index_and_filter_blocks) { - // Call IndexBlockReader() to implicitly add index to the block_cache - unique_ptr iter(new_table->IndexBlockReader(ReadOptions())); + // Hack: Call NewIndexIterator() to implicitly add index to the block_cache + unique_ptr iter(new_table->NewIndexIterator(ReadOptions())); s = iter->status(); if (s.ok()) { - // Call GetFilter() to implicitly add filter to the block_cache + // Hack: Call GetFilter() to implicitly add filter to the block_cache auto filter_entry = new_table->GetFilter(); filter_entry.Release(options.block_cache.get()); } @@ -286,19 +365,12 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, // If we don't use block cache for index/filter blocks access, we'll // pre-load these blocks, which will kept in member variables in Rep // and with a same life-time as this table object. - Block* index_block = nullptr; + IndexReader* index_reader = nullptr; // TODO: we never really verify check sum for index block - s = ReadBlockFromFile( - rep->file.get(), - ReadOptions(), - footer.index_handle(), - &index_block, - options.env - ); + std::tie(s, index_reader) = new_table->CreateIndexReader(); if (s.ok()) { - assert(index_block->compression_type() == kNoCompression); - rep->index_block.reset(index_block); + rep->index_reader.reset(index_reader); // Set filter block if (rep->options.filter_policy) { @@ -311,9 +383,8 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, } } } else { - delete index_block; + delete index_reader; } - } if (s.ok()) { @@ -381,6 +452,129 @@ Status BlockBasedTable::ReadMetaBlock( return Status::OK(); } +Status BlockBasedTable::GetDataBlockFromCache( + const Slice& block_cache_key, const Slice& compressed_block_cache_key, + Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, + const ReadOptions& read_options, + BlockBasedTable::CachableEntry* block) { + Status s; + Block* compressed_block = nullptr; + Cache::Handle* block_cache_compressed_handle = nullptr; + + // Lookup uncompressed cache first + if (block_cache != nullptr) { + block->cache_handle = + GetEntryFromCache(block_cache, block_cache_key, BLOCK_CACHE_DATA_MISS, + BLOCK_CACHE_DATA_HIT, statistics); + if (block->cache_handle != nullptr) { + block->value = + reinterpret_cast(block_cache->Value(block->cache_handle)); + return s; + } + } + + // If not found, search from the compressed block cache. + assert(block->cache_handle == nullptr && block->value == nullptr); + + if (block_cache_compressed == nullptr) { + return s; + } + + assert(!compressed_block_cache_key.empty()); + block_cache_compressed_handle = + block_cache_compressed->Lookup(compressed_block_cache_key); + // if we found in the compressed cache, then uncompress and insert into + // uncompressed cache + if (block_cache_compressed_handle == nullptr) { + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); + return s; + } + + // found compressed block + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT); + compressed_block = reinterpret_cast( + block_cache_compressed->Value(block_cache_compressed_handle)); + assert(compressed_block->compression_type() != kNoCompression); + + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + s = UncompressBlockContents(compressed_block->data(), + compressed_block->size(), &contents); + + // Insert uncompressed block into block cache + if (s.ok()) { + block->value = new Block(contents); // uncompressed block + assert(block->value->compression_type() == kNoCompression); + if (block_cache != nullptr && block->value->cachable() && + read_options.fill_cache) { + block->cache_handle = + block_cache->Insert(block_cache_key, block->value, + block->value->size(), &DeleteCachedEntry); + assert(reinterpret_cast( + block_cache->Value(block->cache_handle)) == block->value); + } + } + + // Release hold on compressed cache entry + block_cache_compressed->Release(block_cache_compressed_handle); + return s; +} + +Status BlockBasedTable::PutDataBlockToCache( + const Slice& block_cache_key, const Slice& compressed_block_cache_key, + Cache* block_cache, Cache* block_cache_compressed, + const ReadOptions& read_options, Statistics* statistics, + CachableEntry* block, Block* raw_block) { + assert(raw_block->compression_type() == kNoCompression || + block_cache_compressed != nullptr); + + Status s; + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + if (raw_block->compression_type() != kNoCompression) { + s = UncompressBlockContents(raw_block->data(), raw_block->size(), + &contents); + } + if (!s.ok()) { + delete raw_block; + return s; + } + + if (raw_block->compression_type() != kNoCompression) { + block->value = new Block(contents); // uncompressed block + } else { + block->value = raw_block; + raw_block = nullptr; + } + + // Insert compressed block into compressed block cache. + // Release the hold on the compressed cache entry immediately. + if (block_cache_compressed != nullptr && raw_block != nullptr && + raw_block->cachable()) { + auto cache_handle = block_cache_compressed->Insert( + compressed_block_cache_key, raw_block, raw_block->size(), + &DeleteCachedEntry); + block_cache_compressed->Release(cache_handle); + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); + // Avoid the following code to delete this cached block. + raw_block = nullptr; + } + delete raw_block; + + // insert into uncompressed block cache + assert((block->value->compression_type() == kNoCompression)); + if (block_cache != nullptr && block->value->cachable()) { + block->cache_handle = + block_cache->Insert(block_cache_key, block->value, block->value->size(), + &DeleteCachedEntry); + RecordTick(statistics, BLOCK_CACHE_ADD); + assert(reinterpret_cast(block_cache->Value(block->cache_handle)) == + block->value); + } + + return s; +} + FilterBlockReader* BlockBasedTable::ReadFilter ( const Slice& filter_handle_value, BlockBasedTable::Rep* rep, @@ -408,270 +602,88 @@ FilterBlockReader* BlockBasedTable::ReadFilter ( rep->options, block.data, block.heap_allocated); } -Status BlockBasedTable::GetBlock( - const BlockBasedTable* table, - const BlockHandle& handle, - const ReadOptions& options, - const bool for_compaction, - const Tickers block_cache_miss_ticker, - const Tickers block_cache_hit_ticker, - bool* didIO, - CachableEntry* entry) { - bool no_io = options.read_tier == kBlockCacheTier; - Cache* block_cache = table->rep_->options.block_cache.get(); - Statistics* statistics = table->rep_->options.statistics.get(); - Status s; - - if (block_cache != nullptr) { - char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - auto key = GetCacheKey( - table->rep_->cache_key_prefix, - table->rep_->cache_key_prefix_size, - handle, - cache_key - ); - - entry->cache_handle = GetFromBlockCache( - block_cache, - key, - block_cache_miss_ticker, - block_cache_hit_ticker, - statistics - ); - - if (entry->cache_handle != nullptr) { - entry->value = - reinterpret_cast(block_cache->Value(entry->cache_handle)); - } else if (no_io) { - // Did not find in block_cache and can't do IO - return Status::Incomplete("no blocking io"); - } else { - Histograms histogram = for_compaction ? - READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; - { - // block for stop watch - StopWatch sw(table->rep_->options.env, statistics, histogram); - s = ReadBlockFromFile( - table->rep_->file.get(), - options, - handle, - &entry->value, - table->rep_->options.env, - didIO - ); - } - if (s.ok()) { - if (options.fill_cache && entry->value->cachable()) { - entry->cache_handle = block_cache->Insert( - key, entry->value, entry->value->size(), &DeleteCachedBlock); - RecordTick(statistics, BLOCK_CACHE_ADD); - } - } - } - } else if (no_io) { - // Could not read from block_cache and can't do IO - return Status::Incomplete("no blocking io"); - } else { - s = ReadBlockFromFile( - table->rep_->file.get(), - options, - handle, - &entry->value, - table->rep_->options.env, - didIO - ); - } - - return s; -} - // Convert an index iterator value (i.e., an encoded BlockHandle) // into an iterator over the contents of the corresponding block. -Iterator* BlockBasedTable::BlockReader(void* arg, - const ReadOptions& options, - const Slice& index_value, - bool* didIO, - bool for_compaction) { +Iterator* BlockBasedTable::DataBlockReader(void* arg, + const ReadOptions& options, + const Slice& index_value, + bool* didIO, bool for_compaction) { const bool no_io = (options.read_tier == kBlockCacheTier); BlockBasedTable* table = reinterpret_cast(arg); Cache* block_cache = table->rep_->options.block_cache.get(); Cache* block_cache_compressed = table->rep_->options. block_cache_compressed.get(); - Statistics* statistics = table->rep_->options.statistics.get(); - Block* block = nullptr; - Block* cblock = nullptr; - Cache::Handle* cache_handle = nullptr; - Cache::Handle* compressed_cache_handle = nullptr; + CachableEntry block; BlockHandle handle; Slice input = index_value; - Status s = handle.DecodeFrom(&input); // We intentionally allow extra stuff in index_value so that we // can add more features in the future. + Status s = handle.DecodeFrom(&input); if (!s.ok()) { return NewErrorIterator(s); } + // If either block cache is enabled, we'll try to read from it. if (block_cache != nullptr || block_cache_compressed != nullptr) { + Statistics* statistics = table->rep_->options.statistics.get(); char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - Slice key, /* key to the block cache */ - ckey /* key to the compressed block cache */ ; + Slice key, /* key to the block cache */ + ckey /* key to the compressed block cache */; // create key for block cache if (block_cache != nullptr) { - key = GetCacheKey( - table->rep_->cache_key_prefix, - table->rep_->cache_key_prefix_size, - handle, - cache_key - ); + key = GetCacheKey(table->rep_->cache_key_prefix, + table->rep_->cache_key_prefix_size, handle, cache_key); } if (block_cache_compressed != nullptr) { - ckey = GetCacheKey( - table->rep_->compressed_cache_key_prefix, - table->rep_->compressed_cache_key_prefix_size, - handle, - compressed_cache_key - ); + ckey = GetCacheKey(table->rep_->compressed_cache_key_prefix, + table->rep_->compressed_cache_key_prefix_size, handle, + compressed_cache_key); } - // Lookup uncompressed cache first - if (block_cache != nullptr) { - assert(!key.empty()); - cache_handle = block_cache->Lookup(key); - if (cache_handle != nullptr) { - block = reinterpret_cast(block_cache->Value(cache_handle)); - RecordTick(statistics, BLOCK_CACHE_HIT); - RecordTick(statistics, BLOCK_CACHE_DATA_HIT); - } else { - RecordTick(statistics, BLOCK_CACHE_MISS); - RecordTick(statistics, BLOCK_CACHE_DATA_MISS); - } - } + s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, + statistics, options, &block); - // If not found in uncompressed cache, lookup compressed cache - if (block == nullptr && block_cache_compressed != nullptr) { - assert(!ckey.empty()); - compressed_cache_handle = block_cache_compressed->Lookup(ckey); - - // if we found in the compressed cache, then uncompress and - // insert into uncompressed cache - if (compressed_cache_handle != nullptr) { - // found compressed block - cblock = reinterpret_cast(block_cache_compressed-> - Value(compressed_cache_handle)); - assert(cblock->compression_type() != kNoCompression); - - // Retrieve the uncompressed contents into a new buffer - BlockContents contents; - s = UncompressBlockContents(cblock->data(), cblock->size(), - &contents); - - // Insert uncompressed block into block cache - if (s.ok()) { - block = new Block(contents); // uncompressed block - assert(block->compression_type() == kNoCompression); - if (block_cache != nullptr && block->cachable() && - options.fill_cache) { - cache_handle = block_cache->Insert(key, block, block->size(), - &DeleteCachedBlock); - assert(reinterpret_cast(block_cache->Value(cache_handle)) - == block); - } - } - // Release hold on compressed cache entry - block_cache_compressed->Release(compressed_cache_handle); - RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT); - } - } - - if (block != nullptr) { - BumpPerfCount(&perf_context.block_cache_hit_count); - } else if (no_io) { - // Did not find in block_cache and can't do IO - return NewErrorIterator(Status::Incomplete("no blocking io")); - } else { + if (block.value == nullptr && !no_io && options.fill_cache) { Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; - { // block for stop watch + Block* raw_block = nullptr; + { StopWatch sw(table->rep_->options.env, statistics, histogram); - s = ReadBlockFromFile( - table->rep_->file.get(), - options, - handle, - &cblock, - table->rep_->options.env, - didIO, - block_cache_compressed == nullptr - ); + s = ReadBlockFromFile(table->rep_->file.get(), options, handle, + &raw_block, table->rep_->options.env, didIO, + block_cache_compressed == nullptr); } + if (s.ok()) { - assert(cblock->compression_type() == kNoCompression || - block_cache_compressed != nullptr); - - // Retrieve the uncompressed contents into a new buffer - BlockContents contents; - if (cblock->compression_type() != kNoCompression) { - s = UncompressBlockContents(cblock->data(), cblock->size(), - &contents); - } - if (s.ok()) { - if (cblock->compression_type() != kNoCompression) { - block = new Block(contents); // uncompressed block - } else { - block = cblock; - cblock = nullptr; - } - if (block->cachable() && options.fill_cache) { - // Insert compressed block into compressed block cache. - // Release the hold on the compressed cache entry immediately. - if (block_cache_compressed != nullptr && cblock != nullptr) { - compressed_cache_handle = block_cache_compressed->Insert( - ckey, cblock, cblock->size(), &DeleteCachedBlock); - block_cache_compressed->Release(compressed_cache_handle); - RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); - cblock = nullptr; - } - // insert into uncompressed block cache - assert((block->compression_type() == kNoCompression)); - if (block_cache != nullptr) { - cache_handle = block_cache->Insert( - key, block, block->size(), &DeleteCachedBlock); - RecordTick(statistics, BLOCK_CACHE_ADD); - assert(reinterpret_cast(block_cache->Value( - cache_handle))== block); - } - } - } - } - if (cblock != nullptr) { - delete cblock; + s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed, + options, statistics, &block, raw_block); } } - } else if (no_io) { - // Could not read from block_cache and can't do IO - return NewErrorIterator(Status::Incomplete("no blocking io")); - } else { - s = ReadBlockFromFile( - table->rep_->file.get(), - options, - handle, - &block, - table->rep_->options.env, - didIO - ); + } + + // Didn't get any data from block caches. + if (block.value == nullptr) { + if (no_io) { + // Could not read from block_cache and can't do IO + return NewErrorIterator(Status::Incomplete("no blocking io")); + } + s = ReadBlockFromFile(table->rep_->file.get(), options, handle, + &block.value, table->rep_->options.env, didIO); } Iterator* iter; - if (block != nullptr) { - iter = block->NewIterator(&(table->rep_->internal_comparator_)); - if (cache_handle != nullptr) { - iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle); + if (block.value != nullptr) { + iter = block.value->NewIterator(&table->rep_->internal_comparator); + if (block.cache_handle != nullptr) { + iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, + block.cache_handle); } else { - iter->RegisterCleanup(&DeleteBlock, block, nullptr); + iter->RegisterCleanup(&DeleteHeldResource, block.value, nullptr); } } else { iter = NewErrorIterator(s); @@ -679,8 +691,8 @@ Iterator* BlockBasedTable::BlockReader(void* arg, return iter; } -BlockBasedTable::CachableEntry -BlockBasedTable::GetFilter(bool no_io) const { +BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( + bool no_io) const { // filter pre-populated if (rep_->filter != nullptr) { return {rep_->filter.get(), nullptr /* cache handle */}; @@ -702,13 +714,9 @@ BlockBasedTable::GetFilter(bool no_io) const { ); Statistics* statistics = rep_->options.statistics.get(); - auto cache_handle = GetFromBlockCache( - block_cache, - key, - BLOCK_CACHE_FILTER_MISS, - BLOCK_CACHE_FILTER_HIT, - statistics - ); + auto cache_handle = + GetEntryFromCache(block_cache, key, BLOCK_CACHE_FILTER_MISS, + BLOCK_CACHE_FILTER_HIT, statistics); FilterBlockReader* filter = nullptr; if (cache_handle != nullptr) { @@ -734,7 +742,7 @@ BlockBasedTable::GetFilter(bool no_io) const { assert(filter_size > 0); cache_handle = block_cache->Insert( - key, filter, filter_size, &DeleteCachedFilter); + key, filter, filter_size, &DeleteCachedEntry); RecordTick(statistics, BLOCK_CACHE_ADD); } } @@ -743,50 +751,59 @@ BlockBasedTable::GetFilter(bool no_io) const { return { filter, cache_handle }; } -// Get the iterator from the index block. -Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const { - if (rep_->index_block) { - return rep_->index_block->NewIterator(&(rep_->internal_comparator_)); +Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) + const { + // index reader has already been pre-populated. + if (rep_->index_reader) { + return rep_->index_reader->NewIterator(); } - // get index block from cache - assert (rep_->options.block_cache); - bool didIO = false; - CachableEntry entry; - - auto s = GetBlock( - this, - rep_->index_handle, - options, - false, /* for compaction */ - BLOCK_CACHE_INDEX_MISS, - BLOCK_CACHE_INDEX_HIT, - &didIO, - &entry - ); + bool no_io = read_options.read_tier == kBlockCacheTier; + Cache* block_cache = rep_->options.block_cache.get(); + char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, + rep_->index_handle, cache_key); + Statistics* statistics = rep_->options.statistics.get(); + auto cache_handle = + GetEntryFromCache(block_cache, key, BLOCK_CACHE_INDEX_MISS, + BLOCK_CACHE_INDEX_HIT, statistics); - Iterator* iter; - if (entry.value != nullptr) { - iter = entry.value->NewIterator(&(rep_->internal_comparator_)); - if (entry.cache_handle) { - iter->RegisterCleanup( - &ReleaseBlock, rep_->options.block_cache.get(), entry.cache_handle - ); - } else { - iter->RegisterCleanup(&DeleteBlock, entry.value, nullptr); - } + if (cache_handle == nullptr && no_io) { + return NewErrorIterator(Status::Incomplete("no blocking io")); + } + + IndexReader* index_reader = nullptr; + if (cache_handle != nullptr) { + index_reader = + reinterpret_cast(block_cache->Value(cache_handle)); } else { - iter = NewErrorIterator(s); + // Create index reader and put it in the cache. + Status s; + std::tie(s, index_reader) = CreateIndexReader(); + + if (!s.ok()) { + // make sure if something goes wrong, index_reader shall remain intact. + assert(index_reader == nullptr); + return NewErrorIterator(s); + } + + cache_handle = block_cache->Insert(key, index_reader, index_reader->size(), + &DeleteCachedEntry); + RecordTick(statistics, BLOCK_CACHE_ADD); } + + assert(cache_handle); + auto iter = index_reader->NewIterator(); + iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); + return iter; } -Iterator* BlockBasedTable::BlockReader(void* arg, const ReadOptions& options, - const EnvOptions& soptions, - const InternalKeyComparator& icomparator, - const Slice& index_value, - bool for_compaction) { - return BlockReader(arg, options, index_value, nullptr, for_compaction); +Iterator* BlockBasedTable::DataBlockReader( + void* arg, const ReadOptions& options, const EnvOptions& soptions, + const InternalKeyComparator& icomparator, const Slice& index_value, + bool for_compaction) { + return DataBlockReader(arg, options, index_value, nullptr, for_compaction); } // This will be broken if the user specifies an unusual implementation @@ -814,9 +831,7 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) { // loaded to memory. ReadOptions no_io_read_options; no_io_read_options.read_tier = kBlockCacheTier; - unique_ptr iiter( - IndexBlockReader(no_io_read_options) - ); + unique_ptr iiter(NewIndexIterator(no_io_read_options)); iiter->Seek(internal_prefix); if (!iiter->Valid()) { @@ -874,20 +889,20 @@ Iterator* BlockBasedTable::NewIterator(const ReadOptions& options) { } } - return NewTwoLevelIterator(IndexBlockReader(options), - &BlockBasedTable::BlockReader, + return NewTwoLevelIterator(NewIndexIterator(options), + &BlockBasedTable::DataBlockReader, const_cast(this), options, - rep_->soptions, rep_->internal_comparator_); + rep_->soptions, rep_->internal_comparator); } Status BlockBasedTable::Get( - const ReadOptions& readOptions, const Slice& key, void* handle_context, + const ReadOptions& read_options, const Slice& key, void* handle_context, bool (*result_handler)(void* handle_context, const ParsedInternalKey& k, const Slice& v, bool didIO), void (*mark_key_may_exist_handler)(void* handle_context)) { Status s; - Iterator* iiter = IndexBlockReader(readOptions); - auto filter_entry = GetFilter(readOptions.read_tier == kBlockCacheTier); + Iterator* iiter = NewIndexIterator(read_options); + auto filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier); FilterBlockReader* filter = filter_entry.value; bool done = false; for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { @@ -908,9 +923,9 @@ Status BlockBasedTable::Get( } else { bool didIO = false; unique_ptr block_iter( - BlockReader(this, readOptions, iiter->value(), &didIO)); + DataBlockReader(this, read_options, iiter->value(), &didIO)); - if (readOptions.read_tier && block_iter->status().IsIncomplete()) { + if (read_options.read_tier && block_iter->status().IsIncomplete()) { // couldn't get block from block_cache // Update Saver.state to Found because we are only looking for whether // we can guarantee the key is not there when "no_io" is set @@ -958,8 +973,42 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, return !didIO; } +// REQUIRES: The following fields of rep_ should have already been populated: +// 1. file +// 2. index_handle, +// 3. options +// 4. internal_comparator +// 5. index_type +std::pair BlockBasedTable::CreateIndexReader() const { + // Some old version of block-based tables don't have index type present in + // table properties. If that's the case we can safely use the kBinarySearch. + auto index_type = BlockBasedTableOptions::kBinarySearch; + auto& props = rep_->table_properties->user_collected_properties; + auto pos = props.find(BlockBasedTablePropertyNames::kIndexType); + if (pos != props.end()) { + index_type = static_cast( + DecodeFixed32(pos->second.c_str())); + } + + switch (index_type) { + case BlockBasedTableOptions::kBinarySearch: { + return BinarySearchIndexReader::Create( + rep_->file.get(), rep_->index_handle, rep_->options.env, + &rep_->internal_comparator); + } + default: { + std::string error_message = + "Unrecognized index type: " + std::to_string(rep_->index_type); + // equivalent to assert(false), but more informative. + assert(!error_message.c_str()); + return {Status::InvalidArgument(error_message.c_str()), + nullptr}; // cannot reach here + } + } +} + uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) { - Iterator* index_iter = IndexBlockReader(ReadOptions()); + unique_ptr index_iter(NewIndexIterator(ReadOptions())); index_iter->Seek(key); uint64_t result; @@ -981,7 +1030,6 @@ uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) { // right near the end of the file). result = rep_->metaindex_handle.offset(); } - delete index_iter; return result; } @@ -989,8 +1037,8 @@ bool BlockBasedTable::TEST_filter_block_preloaded() const { return rep_->filter != nullptr; } -bool BlockBasedTable::TEST_index_block_preloaded() const { - return rep_->index_block != nullptr; +bool BlockBasedTable::TEST_index_reader_preloaded() const { + return rep_->index_reader != nullptr; } } // namespace rocksdb diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index fc584d9ec..932963335 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -8,12 +8,14 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once -#include + #include -#include "rocksdb/cache.h" -#include "rocksdb/env.h" -#include "rocksdb/iterator.h" +#include +#include + #include "rocksdb/statistics.h" +#include "rocksdb/status.h" +#include "rocksdb/table.h" #include "table/table_reader.h" #include "util/coding.h" @@ -21,14 +23,19 @@ namespace rocksdb { class Block; class BlockHandle; +class Cache; +class FilterBlockReader; class Footer; -struct Options; +class InternalKeyComparator; +class Iterator; class RandomAccessFile; -struct ReadOptions; class TableCache; class TableReader; -class FilterBlockReader; +class WritableFile; struct BlockBasedTableOptions; +struct EnvOptions; +struct Options; +struct ReadOptions; using std::unique_ptr; @@ -91,7 +98,9 @@ class BlockBasedTable : public TableReader { ~BlockBasedTable(); bool TEST_filter_block_preloaded() const; - bool TEST_index_block_preloaded() const; + bool TEST_index_reader_preloaded() const; + // Implementation of IndexReader will be exposed to internal cc file only. + class IndexReader; private: template @@ -101,40 +110,51 @@ class BlockBasedTable : public TableReader { Rep* rep_; bool compaction_optimized_; - static Iterator* BlockReader(void*, const ReadOptions&, - const EnvOptions& soptions, - const InternalKeyComparator& icomparator, - const Slice&, bool for_compaction); + static Iterator* DataBlockReader(void*, const ReadOptions&, + const EnvOptions& soptions, + const InternalKeyComparator& icomparator, + const Slice&, bool for_compaction); - static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, - bool* didIO, bool for_compaction = false); + static Iterator* DataBlockReader(void*, const ReadOptions&, const Slice&, + bool* didIO, bool for_compaction = false); - // if `no_io == true`, we will not try to read filter from sst file - // if it is not cached yet. + // For the following two functions: + // if `no_io == true`, we will not try to read filter/index from sst file + // were they not present in cache yet. CachableEntry GetFilter(bool no_io = false) const; - Iterator* IndexBlockReader(const ReadOptions& options) const; - - // Read the block, either from sst file or from cache. This method will try - // to read from cache only when block_cache is set or ReadOption doesn't - // explicitly prohibit storage IO. + // Get the iterator from the index reader. // - // If the block is read from cache, the statistics for cache miss/hit of the - // the given type of block will be updated. User can specify - // `block_cache_miss_ticker` and `block_cache_hit_ticker` for the statistics - // update. + // Note: ErrorIterator with Status::Incomplete shall be returned if all the + // following conditions are met: + // 1. We enabled table_options.cache_index_and_filter_blocks. + // 2. index is not present in block cache. + // 3. We disallowed any io to be performed, that is, read_options == + // kBlockCacheTier + Iterator* NewIndexIterator(const ReadOptions& read_options) const; + + // Read block cache from block caches (if set): block_cache and + // block_cache_compressed. + // On success, Status::OK with be returned and @block will be populated with + // pointer to the block as well as its block handle. + static Status GetDataBlockFromCache( + const Slice& block_cache_key, const Slice& compressed_block_cache_key, + Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, + const ReadOptions& read_options, + BlockBasedTable::CachableEntry* block); + // Put a raw block (maybe compressed) to the corresponding block caches. + // This method will perform decompression against raw_block if needed and then + // populate the block caches. + // On success, Status::OK will be returned; also @block will be populated with + // uncompressed block and its cache handle. // - // On success, the `result` parameter will be populated, which contains a - // pointer to the block and its cache handle, which will be nullptr if it's - // not read from the cache. - static Status GetBlock(const BlockBasedTable* table, - const BlockHandle& handle, - const ReadOptions& options, - bool for_compaction, - Tickers block_cache_miss_ticker, - Tickers block_cache_hit_ticker, - bool* didIO, - CachableEntry* result); + // REQUIRES: raw_block is heap-allocated. PutDataBlockToCache() will be + // responsible for releasing its memory if error occurs. + static Status PutDataBlockToCache( + const Slice& block_cache_key, const Slice& compressed_block_cache_key, + Cache* block_cache, Cache* block_cache_compressed, + const ReadOptions& read_options, Statistics* statistics, + CachableEntry* block, Block* raw_block); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. @@ -144,6 +164,7 @@ class BlockBasedTable : public TableReader { void ReadMeta(const Footer& footer); void ReadFilter(const Slice& filter_handle_value); + std::pair CreateIndexReader() const; // Read the meta block from sst. static Status ReadMetaBlock( @@ -159,10 +180,9 @@ class BlockBasedTable : public TableReader { static void SetupCacheKeyPrefix(Rep* rep); - explicit BlockBasedTable(Rep* rep) : - compaction_optimized_(false) { - rep_ = rep; - } + explicit BlockBasedTable(Rep* rep) + : rep_(rep), compaction_optimized_(false) {} + // Generate a cache key prefix from the file static void GenerateCachePrefix(Cache* cc, RandomAccessFile* file, char* buffer, size_t* size); diff --git a/table/table_test.cc b/table/table_test.cc index b9b87e525..9c78ffd86 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -9,6 +9,7 @@ #include #include + #include #include #include @@ -16,8 +17,6 @@ #include #include "db/dbformat.h" -#include "rocksdb/statistics.h" -#include "util/statistics.h" #include "db/memtable.h" #include "db/write_batch_internal.h" @@ -25,11 +24,11 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" -#include "rocksdb/slice_transform.h" #include "rocksdb/memtablerep.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/statistics.h" + #include "table/block.h" -#include "table/meta_blocks.h" -#include "table/block_based_table_reader.h" #include "table/block_based_table_builder.h" #include "table/block_based_table_factory.h" #include "table/block_based_table_reader.h" @@ -39,6 +38,7 @@ #include "table/plain_table_factory.h" #include "util/random.h" +#include "util/statistics.h" #include "util/testharness.h" #include "util/testutil.h" @@ -1201,7 +1201,7 @@ TEST(BlockBasedTableTest, BlockCacheDisabledTest) { // preloading filter/index blocks is enabled. auto reader = dynamic_cast(c.table_reader()); ASSERT_TRUE(reader->TEST_filter_block_preloaded()); - ASSERT_TRUE(reader->TEST_index_block_preloaded()); + ASSERT_TRUE(reader->TEST_index_reader_preloaded()); { // nothing happens in the beginning @@ -1242,7 +1242,7 @@ TEST(BlockBasedTableTest, FilterBlockInBlockCache) { // preloading filter/index blocks is prohibited. auto reader = dynamic_cast(c.table_reader()); ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); - ASSERT_TRUE(!reader->TEST_index_block_preloaded()); + ASSERT_TRUE(!reader->TEST_index_reader_preloaded()); // -- PART 1: Open with regular block cache. // Since block_cache is disabled, no cache activities will be involved. -- GitLab