提交 674cf417 编写于 作者: S sdong 提交者: Facebook GitHub Bot

Divide block_based_table_reader.cc (#6527)

Summary:
block_based_table_reader.cc is a giant file, which makes it hard for users to navigate the code. Divide the files to multiple files.
Some class templates cannot be moved to .cc file. They are moved to .h files. It is still better than including them all in block_based_table_reader.cc.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6527

Test Plan: "make all check" and "make release". Also build using cmake.

Differential Revision: D20428455

fbshipit-source-id: ca713c698469f07f35bc0c271358c0874ed4eb28
上级 dd7a4a8f
......@@ -620,6 +620,7 @@ set(SOURCES
options/options_sanity_check.cc
port/stack_trace.cc
table/adaptive/adaptive_table_factory.cc
table/block_based/binary_search_index_reader.cc
table/block_based/block.cc
table/block_based/block_based_filter_block.cc
table/block_based/block_based_table_builder.cc
......@@ -633,9 +634,13 @@ set(SOURCES
table/block_based/filter_policy.cc
table/block_based/flush_block_policy.cc
table/block_based/full_filter_block.cc
table/block_based/hash_index_reader.cc
table/block_based/index_builder.cc
table/block_based/index_reader_common.cc
table/block_based/parsed_full_filter_block.cc
table/block_based/partitioned_filter_block.cc
table/block_based/partitioned_index_reader.cc
table/block_based/reader_common.cc
table/block_based/uncompression_dict_reader.cc
table/block_fetcher.cc
table/cuckoo/cuckoo_table_builder.cc
......
......@@ -231,6 +231,7 @@ cpp_library(
"port/port_posix.cc",
"port/stack_trace.cc",
"table/adaptive/adaptive_table_factory.cc",
"table/block_based/binary_search_index_reader.cc",
"table/block_based/block.cc",
"table/block_based/block_based_filter_block.cc",
"table/block_based/block_based_table_builder.cc",
......@@ -244,9 +245,13 @@ cpp_library(
"table/block_based/filter_policy.cc",
"table/block_based/flush_block_policy.cc",
"table/block_based/full_filter_block.cc",
"table/block_based/hash_index_reader.cc",
"table/block_based/index_builder.cc",
"table/block_based/index_reader_common.cc",
"table/block_based/parsed_full_filter_block.cc",
"table/block_based/partitioned_filter_block.cc",
"table/block_based/partitioned_index_reader.cc",
"table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
"table/cuckoo/cuckoo_table_builder.cc",
......
......@@ -119,6 +119,7 @@ LIB_SOURCES = \
port/port_posix.cc \
port/stack_trace.cc \
table/adaptive/adaptive_table_factory.cc \
table/block_based/binary_search_index_reader.cc \
table/block_based/block.cc \
table/block_based/block_based_filter_block.cc \
table/block_based/block_based_table_builder.cc \
......@@ -132,9 +133,13 @@ LIB_SOURCES = \
table/block_based/filter_policy.cc \
table/block_based/flush_block_policy.cc \
table/block_based/full_filter_block.cc \
table/block_based/hash_index_reader.cc \
table/block_based/index_builder.cc \
table/block_based/index_reader_common.cc \
table/block_based/parsed_full_filter_block.cc \
table/block_based/partitioned_filter_block.cc \
table/block_based/partitioned_index_reader.cc \
table/block_based/reader_common.cc \
table/block_based/uncompression_dict_reader.cc \
table/block_fetcher.cc \
table/cuckoo/cuckoo_table_builder.cc \
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/binary_search_index_reader.h"
namespace ROCKSDB_NAMESPACE {
Status BinarySearchIndexReader::Create(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
std::unique_ptr<IndexReader>* index_reader) {
assert(table != nullptr);
assert(table->get_rep());
assert(!pin || prefetch);
assert(index_reader != nullptr);
CachableEntry<Block> index_block;
if (prefetch || !use_cache) {
const Status s =
ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
/*get_context=*/nullptr, lookup_context, &index_block);
if (!s.ok()) {
return s;
}
if (use_cache && !pin) {
index_block.Reset();
}
}
index_reader->reset(
new BinarySearchIndexReader(table, std::move(index_block)));
return Status::OK();
}
InternalIteratorBase<IndexValue>* BinarySearchIndexReader::NewIterator(
const ReadOptions& read_options, bool /* disable_prefix_seek */,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) {
const BlockBasedTable::Rep* rep = table()->get_rep();
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s =
GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);
return iter;
}
return NewErrorInternalIterator<IndexValue>(s);
}
Statistics* kNullStats = nullptr;
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
auto it = index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, true,
index_has_first_key(), index_key_includes_seq(), index_value_is_full());
assert(it != nullptr);
index_block.TransferTo(it);
return it;
}
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "table/block_based/index_reader_common.h"
namespace ROCKSDB_NAMESPACE {
// Index that allows binary search lookup for the first key of each block.
// This class can be viewed as a thin wrapper for `Block` class which already
// supports binary search.
class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon {
public:
// Read index from the file and create an intance for
// `BinarySearchIndexReader`.
// On success, index_reader will be populated; otherwise it will remain
// unmodified.
static Status Create(const BlockBasedTable* table,
FilePrefetchBuffer* prefetch_buffer, bool use_cache,
bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
std::unique_ptr<IndexReader>* index_reader);
InternalIteratorBase<IndexValue>* NewIterator(
const ReadOptions& read_options, bool /* disable_prefix_seek */,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) override;
size_t ApproximateMemoryUsage() const override {
size_t usage = ApproximateIndexBlockMemoryUsage();
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size(const_cast<BinarySearchIndexReader*>(this));
#else
usage += sizeof(*this);
#endif // ROCKSDB_MALLOC_USABLE_SIZE
return usage;
}
private:
BinarySearchIndexReader(const BlockBasedTable* t,
CachableEntry<Block>&& index_block)
: IndexReaderCommon(t, std::move(index_block)) {}
};
} // namespace ROCKSDB_NAMESPACE
此差异已折叠。
......@@ -9,38 +9,18 @@
#pragma once
#include <stdint.h>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "db/range_tombstone_fragmenter.h"
#include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "options/cf_options.h"
#include "rocksdb/options.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block_based/block.h"
#include "table/block_based/cachable_entry.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_type.h"
#include "table/block_based/cachable_entry.h"
#include "table/block_based/filter_block.h"
#include "table/block_based/uncompression_dict_reader.h"
#include "table/format.h"
#include "table/get_context.h"
#include "table/multiget_context.h"
#include "table/persistent_cache_helper.h"
#include "table/table_properties_internal.h"
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
#include "trace_replay/block_cache_tracer.h"
#include "util/coding.h"
#include "util/user_comparator_wrapper.h"
namespace ROCKSDB_NAMESPACE {
......@@ -617,211 +597,4 @@ struct BlockBasedTable::Rep {
!ioptions.allow_mmap_reads /* enable */));
}
};
// Iterates over the contents of BlockBasedTable.
template <class TBlockIter, typename TValue = Slice>
class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
// compaction_readahead_size: its value will only be used if for_compaction =
// true
public:
BlockBasedTableIterator(const BlockBasedTable* table,
const ReadOptions& read_options,
const InternalKeyComparator& icomp,
InternalIteratorBase<IndexValue>* index_iter,
bool check_filter, bool need_upper_bound_check,
const SliceTransform* prefix_extractor,
BlockType block_type, TableReaderCaller caller,
size_t compaction_readahead_size = 0)
: table_(table),
read_options_(read_options),
icomp_(icomp),
user_comparator_(icomp.user_comparator()),
index_iter_(index_iter),
pinned_iters_mgr_(nullptr),
block_iter_points_to_real_block_(false),
check_filter_(check_filter),
need_upper_bound_check_(need_upper_bound_check),
prefix_extractor_(prefix_extractor),
block_type_(block_type),
lookup_context_(caller),
compaction_readahead_size_(compaction_readahead_size) {}
~BlockBasedTableIterator() { delete index_iter_; }
void Seek(const Slice& target) override;
void SeekForPrev(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
void Next() final override;
bool NextAndGetResult(IterateResult* result) override;
void Prev() override;
bool Valid() const override {
return !is_out_of_bound_ &&
(is_at_first_key_from_index_ ||
(block_iter_points_to_real_block_ && block_iter_.Valid()));
}
Slice key() const override {
assert(Valid());
if (is_at_first_key_from_index_) {
return index_iter_->value().first_internal_key;
} else {
return block_iter_.key();
}
}
Slice user_key() const override {
assert(Valid());
if (is_at_first_key_from_index_) {
return ExtractUserKey(index_iter_->value().first_internal_key);
} else {
return block_iter_.user_key();
}
}
TValue value() const override {
assert(Valid());
// Load current block if not loaded.
if (is_at_first_key_from_index_ &&
!const_cast<BlockBasedTableIterator*>(this)
->MaterializeCurrentBlock()) {
// Oops, index is not consistent with block contents, but we have
// no good way to report error at this point. Let's return empty value.
return TValue();
}
return block_iter_.value();
}
Status status() const override {
// Prefix index set status to NotFound when the prefix does not exist
if (!index_iter_->status().ok() && !index_iter_->status().IsNotFound()) {
return index_iter_->status();
} else if (block_iter_points_to_real_block_) {
return block_iter_.status();
} else {
return Status::OK();
}
}
// Whether iterator invalidated for being out of bound.
bool IsOutOfBound() override { return is_out_of_bound_; }
inline bool MayBeOutOfUpperBound() override {
assert(Valid());
return !data_block_within_upper_bound_;
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
}
bool IsKeyPinned() const override {
// Our key comes either from block_iter_'s current key
// or index_iter_'s current *value*.
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) ||
(block_iter_points_to_real_block_ && block_iter_.IsKeyPinned()));
}
bool IsValuePinned() const override {
// Load current block if not loaded.
if (is_at_first_key_from_index_) {
const_cast<BlockBasedTableIterator*>(this)->MaterializeCurrentBlock();
}
// BlockIter::IsValuePinned() is always true. No need to check
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
block_iter_points_to_real_block_;
}
void ResetDataIter() {
if (block_iter_points_to_real_block_) {
if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) {
block_iter_.DelegateCleanupsTo(pinned_iters_mgr_);
}
block_iter_.Invalidate(Status::OK());
block_iter_points_to_real_block_ = false;
}
}
void SavePrevIndexValue() {
if (block_iter_points_to_real_block_) {
// Reseek. If they end up with the same data block, we shouldn't re-fetch
// the same data block.
prev_block_offset_ = index_iter_->value().handle.offset();
}
}
private:
enum class IterDirection {
kForward,
kBackward,
};
const BlockBasedTable* table_;
const ReadOptions read_options_;
const InternalKeyComparator& icomp_;
UserComparatorWrapper user_comparator_;
InternalIteratorBase<IndexValue>* index_iter_;
PinnedIteratorsManager* pinned_iters_mgr_;
TBlockIter block_iter_;
// True if block_iter_ is initialized and points to the same block
// as index iterator.
bool block_iter_points_to_real_block_;
// See InternalIteratorBase::IsOutOfBound().
bool is_out_of_bound_ = false;
// Whether current data block being fully within iterate upper bound.
bool data_block_within_upper_bound_ = false;
// True if we're standing at the first key of a block, and we haven't loaded
// that block yet. A call to value() will trigger loading the block.
bool is_at_first_key_from_index_ = false;
bool check_filter_;
// TODO(Zhongyi): pick a better name
bool need_upper_bound_check_;
const SliceTransform* prefix_extractor_;
BlockType block_type_;
uint64_t prev_block_offset_ = std::numeric_limits<uint64_t>::max();
BlockCacheLookupContext lookup_context_;
// Readahead size used in compaction, its value is used only if
// lookup_context_.caller = kCompaction.
size_t compaction_readahead_size_;
size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize;
size_t readahead_limit_ = 0;
int64_t num_file_reads_ = 0;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_;
// If `target` is null, seek to first.
void SeekImpl(const Slice* target);
void InitDataBlock();
bool MaterializeCurrentBlock();
void FindKeyForward();
void FindBlockForward();
void FindKeyBackward();
void CheckOutOfBound();
// Check if data block is fully within iterate_upper_bound.
//
// Note MyRocks may update iterate bounds between seek. To workaround it,
// we need to check and update data_block_within_upper_bound_ accordingly.
void CheckDataBlockWithinUpperBound();
bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction) {
if (need_upper_bound_check_ && direction == IterDirection::kBackward) {
// Upper bound check isn't sufficnet for backward direction to
// guarantee the same result as total order, so disable prefix
// check.
return true;
}
if (check_filter_ &&
!table_->PrefixMayMatch(ikey, read_options_, prefix_extractor_,
need_upper_bound_check_, &lookup_context_)) {
// TODO remember the iterator is invalidated because of prefix
// match. This can avoid the upper level file iterator to falsely
// believe the position is the end of the SST file and move to
// the first key of the next file.
ResetDataIter();
return false;
}
return true;
}
};
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/reader_common.h"
// The file contains some member functions of BlockBasedTable that
// cannot be implemented in block_based_table_reader.cc because
// it's called by other files (e.g. block_based_iterator.h) and
// are templates.
namespace ROCKSDB_NAMESPACE {
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
template <typename TBlockIter>
TBlockIter* BlockBasedTable::NewDataBlockIterator(
const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
BlockType block_type, GetContext* get_context,
BlockCacheLookupContext* lookup_context, Status s,
FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
CachableEntry<UncompressionDict> uncompression_dict;
if (rep_->uncompression_dict_reader) {
const bool no_io = (ro.read_tier == kBlockCacheTier);
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
prefetch_buffer, no_io, get_context, lookup_context,
&uncompression_dict);
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
}
const UncompressionDict& dict = uncompression_dict.GetValue()
? *uncompression_dict.GetValue()
: UncompressionDict::GetEmptyDict();
CachableEntry<Block> block;
s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type,
get_context, lookup_context, for_compaction,
/* use_cache */ true);
if (!s.ok()) {
assert(block.IsEmpty());
iter->Invalidate(s);
return iter;
}
assert(block.GetValue() != nullptr);
// Block contents are pinned and it is still pinned after the iterator
// is destroyed as long as cleanup functions are moved to another object,
// when:
// 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortal source. If own_bytes is true then we are
// not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), block_type, iter,
block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
// insert a dummy record to block cache to track the memory usage
Cache* const block_cache = rep_->table_options.block_cache.get();
Cache::Handle* cache_handle = nullptr;
// There are two other types of cache keys: 1) SST cache key added in
// `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
// `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
// from SST cache key(31 bytes), and use non-zero prefix to
// differentiate from `write_buffer_manager`
const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
// Prefix: use rep_->cache_key_prefix padded by 0s
memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
assert(rep_->cache_key_prefix_size != 0);
assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
next_cache_key_id_++);
assert(end - cache_key <=
static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
s = block_cache->Insert(unique_key, nullptr,
block.GetValue()->ApproximateMemoryUsage(),
nullptr, &cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
cache_handle);
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}
block.TransferTo(iter);
return iter;
}
// Convert an uncompressed data block (i.e CachableEntry<Block>)
// into an iterator over the contents of the corresponding block.
// If input_iter is null, new a iterator
// If input_iter is not null, update this iter and return it
template <typename TBlockIter>
TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro,
CachableEntry<Block>& block,
TBlockIter* input_iter,
Status s) const {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
assert(block.GetValue() != nullptr);
// Block contents are pinned and it is still pinned after the iterator
// is destroyed as long as cleanup functions are moved to another object,
// when:
// 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortal source. If own_bytes is true then we are
// not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
const bool block_contents_pinned =
block.IsCached() ||
(!block.GetValue()->own_bytes() && rep_->immortal_table);
iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), BlockType::kData,
iter, block_contents_pinned);
if (!block.IsCached()) {
if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
// insert a dummy record to block cache to track the memory usage
Cache* const block_cache = rep_->table_options.block_cache.get();
Cache::Handle* cache_handle = nullptr;
// There are two other types of cache keys: 1) SST cache key added in
// `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
// `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
// from SST cache key(31 bytes), and use non-zero prefix to
// differentiate from `write_buffer_manager`
const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
// Prefix: use rep_->cache_key_prefix padded by 0s
memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
assert(rep_->cache_key_prefix_size != 0);
assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
next_cache_key_id_++);
assert(end - cache_key <=
static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
s = block_cache->Insert(unique_key, nullptr,
block.GetValue()->ApproximateMemoryUsage(),
nullptr, &cache_handle);
if (s.ok()) {
assert(cache_handle != nullptr);
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
cache_handle);
}
}
} else {
iter->SetCacheHandle(block.GetCacheHandle());
}
block.TransferTo(iter);
return iter;
}
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/hash_index_reader.h"
#include "table/block_fetcher.h"
#include "table/meta_blocks.h"
namespace ROCKSDB_NAMESPACE {
Status HashIndexReader::Create(const BlockBasedTable* table,
FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_index_iter,
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
std::unique_ptr<IndexReader>* index_reader) {
assert(table != nullptr);
assert(index_reader != nullptr);
assert(!pin || prefetch);
const BlockBasedTable::Rep* rep = table->get_rep();
assert(rep != nullptr);
CachableEntry<Block> index_block;
if (prefetch || !use_cache) {
const Status s =
ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
/*get_context=*/nullptr, lookup_context, &index_block);
if (!s.ok()) {
return s;
}
if (use_cache && !pin) {
index_block.Reset();
}
}
// Note, failure to create prefix hash index does not need to be a
// hard error. We can still fall back to the original binary search index.
// So, Create will succeed regardless, from this point on.
index_reader->reset(new HashIndexReader(table, std::move(index_block)));
// Get prefixes block
BlockHandle prefixes_handle;
Status s =
FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, &prefixes_handle);
if (!s.ok()) {
// TODO: log error
return Status::OK();
}
// Get index metadata block
BlockHandle prefixes_meta_handle;
s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesMetadataBlock,
&prefixes_meta_handle);
if (!s.ok()) {
// TODO: log error
return Status::OK();
}
RandomAccessFileReader* const file = rep->file.get();
const Footer& footer = rep->footer;
const ImmutableCFOptions& ioptions = rep->ioptions;
const PersistentCacheOptions& cache_options = rep->persistent_cache_options;
MemoryAllocator* const memory_allocator =
GetMemoryAllocator(rep->table_options);
// Read contents for the blocks
BlockContents prefixes_contents;
BlockFetcher prefixes_block_fetcher(
file, prefetch_buffer, footer, ReadOptions(), prefixes_handle,
&prefixes_contents, ioptions, true /*decompress*/,
true /*maybe_compressed*/, BlockType::kHashIndexPrefixes,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
s = prefixes_block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
}
BlockContents prefixes_meta_contents;
BlockFetcher prefixes_meta_block_fetcher(
file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle,
&prefixes_meta_contents, ioptions, true /*decompress*/,
true /*maybe_compressed*/, BlockType::kHashIndexMetadata,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
s = prefixes_meta_block_fetcher.ReadBlockContents();
if (!s.ok()) {
// TODO: log error
return Status::OK();
}
BlockPrefixIndex* prefix_index = nullptr;
assert(rep->internal_prefix_transform.get() != nullptr);
s = BlockPrefixIndex::Create(rep->internal_prefix_transform.get(),
prefixes_contents.data,
prefixes_meta_contents.data, &prefix_index);
// TODO: log error
if (s.ok()) {
HashIndexReader* const hash_index_reader =
static_cast<HashIndexReader*>(index_reader->get());
hash_index_reader->prefix_index_.reset(prefix_index);
}
return Status::OK();
}
InternalIteratorBase<IndexValue>* HashIndexReader::NewIterator(
const ReadOptions& read_options, bool disable_prefix_seek,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) {
const BlockBasedTable::Rep* rep = table()->get_rep();
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s =
GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);
return iter;
}
return NewErrorInternalIterator<IndexValue>(s);
}
Statistics* kNullStats = nullptr;
const bool total_order_seek =
read_options.total_order_seek || disable_prefix_seek;
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
auto it = index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), iter, kNullStats,
total_order_seek, index_has_first_key(), index_key_includes_seq(),
index_value_is_full(), false /* block_contents_pinned */,
prefix_index_.get());
assert(it != nullptr);
index_block.TransferTo(it);
return it;
}
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "table/block_based/index_reader_common.h"
namespace ROCKSDB_NAMESPACE {
// Index that leverages an internal hash table to quicken the lookup for a given
// key.
class HashIndexReader : public BlockBasedTable::IndexReaderCommon {
public:
static Status Create(const BlockBasedTable* table,
FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_index_iter, bool use_cache,
bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
std::unique_ptr<IndexReader>* index_reader);
InternalIteratorBase<IndexValue>* NewIterator(
const ReadOptions& read_options, bool disable_prefix_seek,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) override;
size_t ApproximateMemoryUsage() const override {
size_t usage = ApproximateIndexBlockMemoryUsage();
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size(const_cast<HashIndexReader*>(this));
#else
if (prefix_index_) {
usage += prefix_index_->ApproximateMemoryUsage();
}
usage += sizeof(*this);
#endif // ROCKSDB_MALLOC_USABLE_SIZE
return usage;
}
private:
HashIndexReader(const BlockBasedTable* t, CachableEntry<Block>&& index_block)
: IndexReaderCommon(t, std::move(index_block)) {}
std::unique_ptr<BlockPrefixIndex> prefix_index_;
};
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/index_reader_common.h"
namespace ROCKSDB_NAMESPACE {
Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
const ReadOptions& read_options, bool use_cache, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) {
PERF_TIMER_GUARD(read_index_block_nanos);
assert(table != nullptr);
assert(index_block != nullptr);
assert(index_block->IsEmpty());
const Rep* const rep = table->get_rep();
assert(rep != nullptr);
const Status s = table->RetrieveBlock(
prefetch_buffer, read_options, rep->footer.index_handle(),
UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex,
get_context, lookup_context, /* for_compaction */ false, use_cache);
return s;
}
Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock(
bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) const {
assert(index_block != nullptr);
if (!index_block_.IsEmpty()) {
index_block->SetUnownedValue(index_block_.GetValue());
return Status::OK();
}
ReadOptions read_options;
if (no_io) {
read_options.read_tier = kBlockCacheTier;
}
return ReadIndexBlock(table_, /*prefetch_buffer=*/nullptr, read_options,
cache_index_blocks(), get_context, lookup_context,
index_block);
}
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/reader_common.h"
namespace ROCKSDB_NAMESPACE {
// Encapsulates common functionality for the various index reader
// implementations. Provides access to the index block regardless of whether
// it is owned by the reader or stored in the cache, or whether it is pinned
// in the cache or not.
class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
public:
IndexReaderCommon(const BlockBasedTable* t,
CachableEntry<Block>&& index_block)
: table_(t), index_block_(std::move(index_block)) {
assert(table_ != nullptr);
}
protected:
static Status ReadIndexBlock(const BlockBasedTable* table,
FilePrefetchBuffer* prefetch_buffer,
const ReadOptions& read_options, bool use_cache,
GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block);
const BlockBasedTable* table() const { return table_; }
const InternalKeyComparator* internal_comparator() const {
assert(table_ != nullptr);
assert(table_->get_rep() != nullptr);
return &table_->get_rep()->internal_comparator;
}
bool index_has_first_key() const {
assert(table_ != nullptr);
assert(table_->get_rep() != nullptr);
return table_->get_rep()->index_has_first_key;
}
bool index_key_includes_seq() const {
assert(table_ != nullptr);
assert(table_->get_rep() != nullptr);
return table_->get_rep()->index_key_includes_seq;
}
bool index_value_is_full() const {
assert(table_ != nullptr);
assert(table_->get_rep() != nullptr);
return table_->get_rep()->index_value_is_full;
}
bool cache_index_blocks() const {
assert(table_ != nullptr);
assert(table_->get_rep() != nullptr);
return table_->get_rep()->table_options.cache_index_and_filter_blocks;
}
Status GetOrReadIndexBlock(bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<Block>* index_block) const;
size_t ApproximateIndexBlockMemoryUsage() const {
assert(!index_block_.GetOwnValue() || index_block_.GetValue() != nullptr);
return index_block_.GetOwnValue()
? index_block_.GetValue()->ApproximateMemoryUsage()
: 0;
}
private:
const BlockBasedTable* table_;
CachableEntry<Block> index_block_;
};
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/partitioned_index_reader.h"
#include "table/block_based/block_based_table_iterator.h"
namespace ROCKSDB_NAMESPACE {
Status PartitionIndexReader::Create(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
std::unique_ptr<IndexReader>* index_reader) {
assert(table != nullptr);
assert(table->get_rep());
assert(!pin || prefetch);
assert(index_reader != nullptr);
CachableEntry<Block> index_block;
if (prefetch || !use_cache) {
const Status s =
ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
/*get_context=*/nullptr, lookup_context, &index_block);
if (!s.ok()) {
return s;
}
if (use_cache && !pin) {
index_block.Reset();
}
}
index_reader->reset(new PartitionIndexReader(table, std::move(index_block)));
return Status::OK();
}
InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
const ReadOptions& read_options, bool /* disable_prefix_seek */,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) {
const bool no_io = (read_options.read_tier == kBlockCacheTier);
CachableEntry<Block> index_block;
const Status s =
GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
if (!s.ok()) {
if (iter != nullptr) {
iter->Invalidate(s);
return iter;
}
return NewErrorInternalIterator<IndexValue>(s);
}
const BlockBasedTable::Rep* rep = table()->rep_;
InternalIteratorBase<IndexValue>* it = nullptr;
Statistics* kNullStats = nullptr;
// Filters are already checked before seeking the index
if (!partition_map_.empty()) {
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
it = NewTwoLevelIterator(
new BlockBasedTable::PartitionedIndexIteratorState(table(),
&partition_map_),
index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
index_has_first_key(), index_key_includes_seq(),
index_value_is_full()));
} else {
ReadOptions ro;
ro.fill_cache = read_options.fill_cache;
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
it = new BlockBasedTableIterator<IndexBlockIter, IndexValue>(
table(), ro, *internal_comparator(),
index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
index_has_first_key(), index_key_includes_seq(),
index_value_is_full()),
false, true, /* prefix_extractor */ nullptr, BlockType::kIndex,
lookup_context ? lookup_context->caller
: TableReaderCaller::kUncategorized);
}
assert(it != nullptr);
index_block.TransferTo(it);
return it;
// TODO(myabandeh): Update TwoLevelIterator to be able to make use of
// on-stack BlockIter while the state is on heap. Currentlly it assumes
// the first level iter is always on heap and will attempt to delete it
// in its destructor.
}
void PartitionIndexReader::CacheDependencies(bool pin) {
// Before read partitions, prefetch them to avoid lots of IOs
BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
const BlockBasedTable::Rep* rep = table()->rep_;
IndexBlockIter biter;
BlockHandle handle;
Statistics* kNullStats = nullptr;
CachableEntry<Block> index_block;
Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */,
&lookup_context, &index_block);
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
"Error retrieving top-level index block while trying to "
"cache index partitions: %s",
s.ToString().c_str());
return;
}
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
index_block.GetValue()->NewIndexIterator(
internal_comparator(), internal_comparator()->user_comparator(),
rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
index_has_first_key(), index_key_includes_seq(), index_value_is_full());
// Index partitions are assumed to be consecuitive. Prefetch them all.
// Read the first block offset
biter.SeekToFirst();
if (!biter.Valid()) {
// Empty index.
return;
}
handle = biter.value().handle;
uint64_t prefetch_off = handle.offset();
// Read the last block's offset
biter.SeekToLast();
if (!biter.Valid()) {
// Empty index.
return;
}
handle = biter.value().handle;
uint64_t last_off = handle.offset() + block_size(handle);
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer);
s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len));
// After prefetch, read the partitions one by one
biter.SeekToFirst();
auto ro = ReadOptions();
for (; biter.Valid(); biter.Next()) {
handle = biter.value().handle;
CachableEntry<Block> block;
// TODO: Support counter batch update for partitioned index and
// filter blocks
s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
&block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context,
/*contents=*/nullptr);
assert(s.ok() || block.GetValue() == nullptr);
if (s.ok() && block.GetValue() != nullptr) {
if (block.IsCached()) {
if (pin) {
partition_map_[handle.offset()] = std::move(block);
}
}
}
}
}
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "table/block_based/index_reader_common.h"
namespace ROCKSDB_NAMESPACE {
// Index that allows binary search lookup in a two-level index structure.
class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
public:
// Read the partition index from the file and create an instance for
// `PartitionIndexReader`.
// On success, index_reader will be populated; otherwise it will remain
// unmodified.
static Status Create(const BlockBasedTable* table,
FilePrefetchBuffer* prefetch_buffer, bool use_cache,
bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
std::unique_ptr<IndexReader>* index_reader);
// return a two-level iterator: first level is on the partition index
InternalIteratorBase<IndexValue>* NewIterator(
const ReadOptions& read_options, bool /* disable_prefix_seek */,
IndexBlockIter* iter, GetContext* get_context,
BlockCacheLookupContext* lookup_context) override;
void CacheDependencies(bool pin) override;
size_t ApproximateMemoryUsage() const override {
size_t usage = ApproximateIndexBlockMemoryUsage();
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size(const_cast<PartitionIndexReader*>(this));
#else
usage += sizeof(*this);
#endif // ROCKSDB_MALLOC_USABLE_SIZE
// TODO(myabandeh): more accurate estimate of partition_map_ mem usage
return usage;
}
private:
PartitionIndexReader(const BlockBasedTable* t,
CachableEntry<Block>&& index_block)
: IndexReaderCommon(t, std::move(index_block)) {}
std::unordered_map<uint64_t, CachableEntry<Block>> partition_map_;
};
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based/reader_common.h"
#include "util/crc32c.h"
#include "util/xxhash.h"
namespace ROCKSDB_NAMESPACE {
void ForceReleaseCachedEntry(void* arg, void* h) {
Cache* cache = reinterpret_cast<Cache*>(arg);
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
cache->Release(handle, true /* force_erase */);
}
Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len,
uint32_t expected) {
Status s;
uint32_t actual = 0;
switch (type) {
case kNoChecksum:
break;
case kCRC32c:
expected = crc32c::Unmask(expected);
actual = crc32c::Value(buf, len);
break;
case kxxHash:
actual = XXH32(buf, static_cast<int>(len), 0);
break;
case kxxHash64:
actual = static_cast<uint32_t>(XXH64(buf, static_cast<int>(len), 0) &
uint64_t{0xffffffff});
break;
default:
s = Status::Corruption("unknown checksum type");
}
if (s.ok() && actual != expected) {
s = Status::Corruption("properties block checksum mismatched");
}
return s;
}
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/table.h"
namespace ROCKSDB_NAMESPACE {
// Release the cached entry and decrement its ref count.
extern void ForceReleaseCachedEntry(void* arg, void* h);
inline MemoryAllocator* GetMemoryAllocator(
const BlockBasedTableOptions& table_options) {
return table_options.block_cache.get()
? table_options.block_cache->memory_allocator()
: nullptr;
}
inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock(
const BlockBasedTableOptions& table_options) {
return table_options.block_cache_compressed.get()
? table_options.block_cache_compressed->memory_allocator()
: nullptr;
}
extern Status VerifyChecksum(const ChecksumType type, const char* buf,
size_t len, uint32_t expected);
} // namespace ROCKSDB_NAMESPACE
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册