提交 1dfcdb15 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

Extend pin_l0 to filter partitions

Summary:
This is the continuation of https://github.com/facebook/rocksdb/pull/2661 for filter partitions. When pin_l0 is set (along with cache_xxx), then open table open the filter partitions are loaded into the cache and pinned there.
Closes https://github.com/facebook/rocksdb/pull/2766

Differential Revision: D5671098

Pulled By: maysamyabandeh

fbshipit-source-id: 174f24018f1d7f1129621e7380287b65b67d2115
上级 39ef9005
......@@ -291,7 +291,8 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
}
BlockBasedTable* table_;
std::unique_ptr<Block> index_block_;
std::map<uint64_t, BlockBasedTable::CachableEntry<Block>> partition_map_;
std::unordered_map<uint64_t, BlockBasedTable::CachableEntry<Block>>
partition_map_;
};
// Index that allows binary search lookup for the first key of each block.
......@@ -797,14 +798,13 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
rep->ioptions.info_log);
}
// pre-fetching of blocks is turned on
const bool pin =
rep->table_options.pin_l0_filter_and_index_blocks_in_cache && level == 0;
// pre-fetching of blocks is turned on
// Will use block cache for index/filter blocks access
// Always prefetch index and filter for level 0
if (table_options.cache_index_and_filter_blocks) {
if (prefetch_index_and_filter_in_cache || level == 0) {
const bool pin =
rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
level == 0;
assert(table_options.block_cache != nullptr);
// Hack: Call NewIndexIterator() to implicitly add index to the
// block_cache
......@@ -823,15 +823,15 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
if (s.ok()) {
// Hack: Call GetFilter() to implicitly add filter to the block_cache
auto filter_entry = new_table->GetFilter();
if (filter_entry.value != nullptr) {
filter_entry.value->CacheDependencies(pin);
}
// if pin_l0_filter_and_index_blocks_in_cache is true, and this is
// a level0 file, then save it in rep_->filter_entry; it will be
// released in the destructor only, hence it will be pinned in the
// cache while this reader is alive
if (pin) {
rep->filter_entry = filter_entry;
if (rep->filter_entry.value != nullptr) {
rep->filter_entry.value->SetLevel(level);
}
} else {
filter_entry.Release(table_options.block_cache.get());
}
......@@ -844,17 +844,25 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
IndexReader* index_reader = nullptr;
s = new_table->CreateIndexReader(prefetch_buffer.get(), &index_reader,
meta_iter.get(), level);
if (s.ok()) {
rep->index_reader.reset(index_reader);
// The partitions of partitioned index are always stored in cache. They
// are hence follow the configuration for pin and prefetch regardless of
// the value of cache_index_and_filter_blocks
if (prefetch_index_and_filter_in_cache || level == 0) {
rep->index_reader->CacheDependencies(pin);
}
// Set filter block
if (rep->filter_policy) {
const bool is_a_filter_partition = true;
rep->filter.reset(new_table->ReadFilter(
prefetch_buffer.get(), rep->filter_handle, !is_a_filter_partition));
if (rep->filter.get()) {
rep->filter->SetLevel(level);
auto filter = new_table->ReadFilter(
prefetch_buffer.get(), rep->filter_handle, !is_a_filter_partition);
rep->filter.reset(filter);
// Refer to the comment above about paritioned indexes always being
// cached
if (filter && (prefetch_index_and_filter_in_cache || level == 0)) {
filter->CacheDependencies(pin);
}
}
} else {
......@@ -1171,15 +1179,16 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
}
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
bool no_io) const {
FilePrefetchBuffer* prefetch_buffer, bool no_io) const {
const BlockHandle& filter_blk_handle = rep_->filter_handle;
const bool is_a_filter_partition = true;
return GetFilter(filter_blk_handle, !is_a_filter_partition, no_io);
return GetFilter(prefetch_buffer, filter_blk_handle, !is_a_filter_partition,
no_io);
}
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
const BlockHandle& filter_blk_handle, const bool is_a_filter_partition,
bool no_io) const {
FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle,
const bool is_a_filter_partition, bool no_io) const {
// If cache_index_and_filter_blocks is false, filter should be pre-populated.
// We will return rep_->filter anyway. rep_->filter can be nullptr if filter
// read fails at Open() time. We don't want to reload again since it will
......@@ -1219,8 +1228,8 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
// Do not invoke any io.
return CachableEntry<FilterBlockReader>();
} else {
filter = ReadFilter(nullptr /* prefetch_buffer */, filter_blk_handle,
is_a_filter_partition);
filter =
ReadFilter(prefetch_buffer, filter_blk_handle, is_a_filter_partition);
if (filter != nullptr) {
assert(filter->size() > 0);
Status s = block_cache->Insert(
......@@ -1482,7 +1491,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
BlockBasedTable::BlockEntryIteratorState::BlockEntryIteratorState(
BlockBasedTable* table, const ReadOptions& read_options,
const InternalKeyComparator* icomparator, bool skip_filters, bool is_index,
std::map<uint64_t, CachableEntry<Block>>* block_map)
std::unordered_map<uint64_t, CachableEntry<Block>>* block_map)
: TwoLevelIteratorState(table->rep_->ioptions.prefix_extractor != nullptr),
table_(table),
read_options_(read_options),
......@@ -1501,9 +1510,19 @@ BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator(
auto rep = table_->rep_;
if (block_map_) {
auto block = block_map_->find(handle.offset());
assert(block != block_map_->end());
return block->second.value->NewIterator(&rep->internal_comparator, nullptr,
true, rep->ioptions.statistics);
// This is a possible scenario since block cache might not have had space
// for the partition
if (block != block_map_->end()) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_INDEX_HIT);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_HIT);
Cache* block_cache = rep->table_options.block_cache.get();
assert(block_cache);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(block->second.cache_handle));
return block->second.value->NewIterator(
&rep->internal_comparator, nullptr, true, rep->ioptions.statistics);
}
}
return NewDataBlockIterator(rep, read_options_, handle, nullptr, is_index_,
s);
......@@ -1700,7 +1719,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
const bool no_io = read_options.read_tier == kBlockCacheTier;
CachableEntry<FilterBlockReader> filter_entry;
if (!skip_filters) {
filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier);
filter_entry = GetFilter(/*prefetch_buffer*/ nullptr,
read_options.read_tier == kBlockCacheTier);
}
FilterBlockReader* filter = filter_entry.value;
......
......@@ -183,6 +183,9 @@ class BlockBasedTable : public TableReader {
virtual void CacheDependencies(bool /* unused */) {}
// Prefetch all the blocks referenced by this index to the buffer
void PrefetchBlocks(FilePrefetchBuffer* buf);
protected:
const InternalKeyComparator* icomparator_;
......@@ -210,6 +213,7 @@ class BlockBasedTable : public TableReader {
explicit BlockBasedTable(Rep* rep) : rep_(rep) {}
private:
friend class MockedBlockBasedTable;
// input_iter: if it is not null, update this one and return it as Iterator
static InternalIterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro,
const Slice& index_value,
......@@ -239,10 +243,11 @@ class BlockBasedTable : public TableReader {
// For the following two functions:
// if `no_io == true`, we will not try to read filter/index from sst file
// were they not present in cache yet.
CachableEntry<FilterBlockReader> GetFilter(bool no_io = false) const;
CachableEntry<FilterBlockReader> GetFilter(
FilePrefetchBuffer* prefetch_buffer = nullptr, bool no_io = false) const;
virtual CachableEntry<FilterBlockReader> GetFilter(
const BlockHandle& filter_blk_handle, const bool is_a_filter_partition,
bool no_io) const;
FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle,
const bool is_a_filter_partition, bool no_io) const;
// Get the iterator from the index reader.
// If input_iter is not set, return new Iterator
......@@ -352,7 +357,7 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
BlockBasedTable* table, const ReadOptions& read_options,
const InternalKeyComparator* icomparator, bool skip_filters,
bool is_index = false,
std::map<uint64_t, CachableEntry<Block>>* block_map = nullptr);
std::unordered_map<uint64_t, CachableEntry<Block>>* block_map = nullptr);
InternalIterator* NewSecondaryIterator(const Slice& index_value) override;
bool PrefixMayMatch(const Slice& internal_key) override;
bool KeyReachedUpperBound(const Slice& internal_key) override;
......@@ -365,7 +370,7 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
bool skip_filters_;
// true if the 2nd level iterator is on indexes instead of on user data.
bool is_index_;
std::map<uint64_t, CachableEntry<Block>>* block_map_;
std::unordered_map<uint64_t, CachableEntry<Block>>* block_map_;
port::RWMutex cleaner_mu;
};
......
......@@ -108,15 +108,14 @@ class FilterBlockReader {
bool whole_key_filtering() const { return whole_key_filtering_; }
int GetLevel() const { return level_; }
void SetLevel(int level) { level_ = level; }
// convert this object to a human readable form
virtual std::string ToString() const {
std::string error_msg("Unsupported filter \n");
return error_msg;
}
virtual void CacheDependencies(bool pin) {}
protected:
bool whole_key_filtering_;
......
......@@ -7,6 +7,7 @@
#include <utility>
#include "monitoring/perf_context_imp.h"
#include "port/port.h"
#include "rocksdb/filter_policy.h"
#include "table/block.h"
......@@ -100,19 +101,29 @@ PartitionedFilterBlockReader::PartitionedFilterBlockReader(
}
PartitionedFilterBlockReader::~PartitionedFilterBlockReader() {
{
ReadLock rl(&mu_);
for (auto it = handle_list_.begin(); it != handle_list_.end(); ++it) {
table_->rep_->table_options.block_cache.get()->Release(*it);
}
// TODO(myabandeh): if instead of filter object we store only the blocks in
// block cache, then we don't have to manually earse them from block cache
// here.
auto block_cache = table_->rep_->table_options.block_cache.get();
if (UNLIKELY(block_cache == nullptr)) {
return;
}
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
for (auto it = filter_block_set_.begin(); it != filter_block_set_.end();
++it) {
BlockIter biter;
BlockHandle handle;
idx_on_fltr_blk_->NewIterator(&comparator_, &biter, true);
biter.SeekToFirst();
for (; biter.Valid(); biter.Next()) {
auto input = biter.value();
auto s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
continue;
}
auto key = BlockBasedTable::GetCacheKey(table_->rep_->cache_key_prefix,
table_->rep_->cache_key_prefix_size,
*it, cache_key);
table_->rep_->table_options.block_cache.get()->Erase(key);
handle, cache_key);
block_cache->Erase(key);
}
}
......@@ -205,34 +216,22 @@ PartitionedFilterBlockReader::GetFilterPartition(
const bool is_a_filter_partition = true;
auto block_cache = table_->rep_->table_options.block_cache.get();
if (LIKELY(block_cache != nullptr)) {
bool pin_cached_filters =
GetLevel() == 0 &&
table_->rep_->table_options.pin_l0_filter_and_index_blocks_in_cache;
if (pin_cached_filters) {
ReadLock rl(&mu_);
auto iter = filter_cache_.find(fltr_blk_handle.offset());
if (iter != filter_cache_.end()) {
if (filter_map_.size() != 0) {
auto iter = filter_map_.find(fltr_blk_handle.offset());
// This is a possible scenario since block cache might not have had space
// for the partition
if (iter != filter_map_.end()) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
RecordTick(statistics(), BLOCK_CACHE_FILTER_HIT);
RecordTick(statistics(), BLOCK_CACHE_HIT);
RecordTick(statistics(), BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(iter->second.cache_handle));
*cached = true;
return {iter->second, nullptr};
}
}
auto filter =
table_->GetFilter(fltr_blk_handle, is_a_filter_partition, no_io);
if (filter.IsSet()) {
WriteLock wl(&mu_);
filter_block_set_.insert(fltr_blk_handle);
if (pin_cached_filters) {
std::pair<uint64_t, FilterBlockReader*> pair(fltr_blk_handle.offset(),
filter.value);
auto succ = filter_cache_.insert(pair).second;
if (succ) {
handle_list_.push_back(filter.cache_handle);
} // Otherwise it is already inserted by a concurrent thread
*cached = true;
return iter->second;
}
}
return filter;
return table_->GetFilter(/*prefetch_buffer*/ nullptr, fltr_blk_handle,
is_a_filter_partition, no_io);
} else {
auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle,
is_a_filter_partition);
......@@ -244,4 +243,69 @@ size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
return idx_on_fltr_blk_->size();
}
// TODO(myabandeh): merge this with the same function in IndexReader
void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
// Before read partitions, prefetch them to avoid lots of IOs
auto rep = table_->rep_;
BlockIter biter;
BlockHandle handle;
idx_on_fltr_blk_->NewIterator(&comparator_, &biter, true);
// Index partitions are assumed to be consecuitive. Prefetch them all.
// Read the first block offset
biter.SeekToFirst();
Slice input = biter.value();
Status s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
"Could not read first index partition");
return;
}
uint64_t prefetch_off = handle.offset();
// Read the last block's offset
biter.SeekToLast();
input = biter.value();
s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
"Could not read last index partition");
return;
}
uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
auto& file = table_->rep_->file;
prefetch_buffer.reset(new FilePrefetchBuffer());
s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len);
// After prefetch, read the partitions one by one
biter.SeekToFirst();
Cache* block_cache = rep->table_options.block_cache.get();
for (; biter.Valid(); biter.Next()) {
input = biter.value();
s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log, "Could not read index partition");
continue;
}
const bool no_io = true;
const bool is_a_filter_partition = true;
auto filter = table_->GetFilter(prefetch_buffer.get(), handle,
is_a_filter_partition, !no_io);
if (LIKELY(filter.IsSet())) {
if (pin) {
filter_map_[handle.offset()] = std::move(filter);
} else {
block_cache->Release(filter.cache_handle);
}
} else {
delete filter.value;
}
}
}
} // namespace rocksdb
......@@ -88,20 +88,15 @@ class PartitionedFilterBlockReader : public FilterBlockReader {
BlockBasedTable::CachableEntry<FilterBlockReader> GetFilterPartition(
FilePrefetchBuffer* prefetch_buffer, Slice* handle, const bool no_io,
bool* cached);
virtual void CacheDependencies(bool pin) override;
const SliceTransform* prefix_extractor_;
std::unique_ptr<Block> idx_on_fltr_blk_;
const Comparator& comparator_;
const BlockBasedTable* table_;
std::unordered_map<uint64_t, FilterBlockReader*> filter_cache_;
autovector<Cache::Handle*> handle_list_;
struct BlockHandleCmp {
bool operator()(const BlockHandle& lhs, const BlockHandle& rhs) const {
return lhs.offset() < rhs.offset();
}
};
std::set<BlockHandle, BlockHandleCmp> filter_block_set_;
port::RWMutex mu_;
std::unordered_map<uint64_t,
BlockBasedTable::CachableEntry<FilterBlockReader>>
filter_map_;
};
} // namespace rocksdb
......@@ -22,11 +22,14 @@ std::map<uint64_t, Slice> slices;
class MockedBlockBasedTable : public BlockBasedTable {
public:
explicit MockedBlockBasedTable(Rep* rep) : BlockBasedTable(rep) {}
explicit MockedBlockBasedTable(Rep* rep) : BlockBasedTable(rep) {
// Initialize what Open normally does as much as necessary for the test
rep->cache_key_prefix_size = 10;
}
virtual CachableEntry<FilterBlockReader> GetFilter(
const BlockHandle& filter_blk_handle, const bool is_a_filter_partition,
bool no_io) const override {
FilePrefetchBuffer*, const BlockHandle& filter_blk_handle,
const bool /* unused */, bool /* unused */) const override {
Slice slice = slices[filter_blk_handle.offset()];
auto obj = new FullFilterBlockReader(
nullptr, true, BlockContents(slice, false, kNoCompression),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册