提交 522de4f5 编写于 作者: M Marton Trencseni

Adding pin_l0_filter_and_index_blocks_in_cache feature.

Summary:
When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache.
What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention.
When the table reader is destroyed, it releases the pinned blocks (if there were any). This has to happen before the cache is destroyed, so I had to introduce a TableReader::Close(), to guarantee the order of destruction.

Test Plan:
Added two unit tests for this. Existing unit tests run fine (default is pin_l0_filter_and_index_blocks_in_cache=false).

DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32
  Mac: OK.
  Linux: with D55287 patched in it's OK.

Reviewers: sdong

Reviewed By: sdong

Subscribers: andrewkr, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D54801
上级 be222712
......@@ -63,7 +63,7 @@ Status BuildTable(
const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, const Env::IOPriority io_priority,
TableProperties* table_properties) {
TableProperties* table_properties, int level) {
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
Status s;
......@@ -149,7 +149,8 @@ Status BuildTable(
ReadOptions(), env_options, internal_comparator, meta->fd, nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
false));
false /* for_compaction */, nullptr /* arena */,
false /* skip_filter */, level));
s = it->status();
if (s.ok() && paranoid_file_checks) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {
......
......@@ -61,6 +61,6 @@ extern Status BuildTable(
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr);
TableProperties* table_properties = nullptr, int level = -1);
} // namespace rocksdb
......@@ -1288,6 +1288,11 @@ void rocksdb_block_based_options_set_cache_index_and_filter_blocks(
options->rep.cache_index_and_filter_blocks = v;
}
void rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(
rocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.pin_l0_filter_and_index_blocks_in_cache = v;
}
void rocksdb_block_based_options_set_skip_table_builder_flush(
rocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.skip_table_builder_flush = v;
......
......@@ -465,6 +465,8 @@ class ColumnFamilySet {
// Don't call while iterating over ColumnFamilySet
void FreeDeadColumnFamilies();
Cache* get_table_cache() { return table_cache_; }
private:
friend class ColumnFamilyData;
// helper function that gets called from cfd destructor
......
......@@ -424,6 +424,92 @@ TEST_F(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {
TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
}
TEST_F(DBTest, IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "key", "val"));
// Create a new table.
ASSERT_OK(Flush(1));
// index/filter blocks added to block cache right after table creation.
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
// only index/filter were added
ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
std::string value;
// Miss and hit count should remain the same, they're all pinned.
db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
// Miss and hit count should remain the same, they're all pinned.
value = Get(1, "key");
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
}
TEST_F(DBTest, MultiLevelIndexAndFilterBlocksCachedWithPinning) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "a", "begin");
Put(1, "z", "end");
ASSERT_OK(Flush(1));
// move this table to L1
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
TryReopenWithColumnFamilies({"default", "pikachu"}, options);
// create new table at L0
Put(1, "a2", "begin2");
Put(1, "z2", "end2");
ASSERT_OK(Flush(1));
// get base cache values
uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
std::string value;
// this should be read from L0
// so cache values don't change
value = Get(1, "a2");
ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
// should be read from L1; the block cache survives the reopen, and during
// the BlockBasedTableReader::Open() of the table we try to fetch it, we
// will see one hit from there, and then the Get() results in another hit
value = Get(1, "a");
ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
}
TEST_F(DBTest, ParanoidFileChecks) {
Options options = CurrentOptions();
options.create_if_missing = true;
......
......@@ -234,14 +234,14 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), meta,
cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
existing_snapshots_, earliest_write_conflict_snapshot_,
output_compression_, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), Env::IO_HIGH, &table_properties_);
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
existing_snapshots_, earliest_write_conflict_snapshot_,
output_compression_, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
Env::IO_HIGH, &table_properties_, 0 /* level */);
info.table_properties = table_properties_;
LogFlush(db_options_.info_log);
}
......
......@@ -88,7 +88,7 @@ Status TableCache::GetTableReader(
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader, bool skip_filters) {
unique_ptr<TableReader>* table_reader, bool skip_filters, int level) {
std::string fname =
TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
unique_ptr<RandomAccessFile> file;
......@@ -109,7 +109,7 @@ Status TableCache::GetTableReader(
file_read_hist));
s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, env_options, internal_comparator,
skip_filters),
skip_filters, level),
std::move(file_reader), fd.GetFileSize(), table_reader);
TEST_SYNC_POINT("TableCache::GetTableReader:0");
}
......@@ -120,7 +120,8 @@ Status TableCache::FindTable(const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, Cache::Handle** handle,
const bool no_io, bool record_read_stats,
HistogramImpl* file_read_hist, bool skip_filters) {
HistogramImpl* file_read_hist, bool skip_filters,
int level) {
PERF_TIMER_GUARD(find_table_nanos);
Status s;
uint64_t number = fd.GetNumber();
......@@ -136,7 +137,7 @@ Status TableCache::FindTable(const EnvOptions& env_options,
unique_ptr<TableReader> table_reader;
s = GetTableReader(env_options, internal_comparator, fd,
false /* sequential mode */, record_read_stats,
file_read_hist, &table_reader, skip_filters);
file_read_hist, &table_reader, skip_filters, level);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
......@@ -158,7 +159,7 @@ InternalIterator* TableCache::NewIterator(
const ReadOptions& options, const EnvOptions& env_options,
const InternalKeyComparator& icomparator, const FileDescriptor& fd,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
bool for_compaction, Arena* arena, bool skip_filters) {
bool for_compaction, Arena* arena, bool skip_filters, int level) {
PERF_TIMER_GUARD(new_table_iterator_nanos);
if (table_reader_ptr != nullptr) {
......@@ -173,7 +174,8 @@ InternalIterator* TableCache::NewIterator(
unique_ptr<TableReader> table_reader_unique_ptr;
Status s = GetTableReader(
env_options, icomparator, fd, /* sequential mode */ true,
/* record stats */ false, nullptr, &table_reader_unique_ptr);
/* record stats */ false, nullptr, &table_reader_unique_ptr,
false /* skip_filters */, level);
if (!s.ok()) {
return NewErrorInternalIterator(s, arena);
}
......@@ -184,7 +186,7 @@ InternalIterator* TableCache::NewIterator(
Status s = FindTable(env_options, icomparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
!for_compaction /* record read_stats */,
file_read_hist, skip_filters);
file_read_hist, skip_filters, level);
if (!s.ok()) {
return NewErrorInternalIterator(s, arena);
}
......@@ -216,7 +218,7 @@ Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, const Slice& k,
GetContext* get_context, HistogramImpl* file_read_hist,
bool skip_filters) {
bool skip_filters, int level) {
TableReader* t = fd.table_reader;
Status s;
Cache::Handle* handle = nullptr;
......@@ -265,7 +267,8 @@ Status TableCache::Get(const ReadOptions& options,
if (!t) {
s = FindTable(env_options_, internal_comparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters);
true /* record_read_stats */, file_read_hist, skip_filters,
level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
......
......@@ -45,34 +45,37 @@ class TableCache {
// the cache and should not be deleted, and is valid for as long as the
// returned iterator is live.
// @param skip_filters Disables loading/accessing the filter block
// @param level The level this table is at, -1 for "not set / don't know"
InternalIterator* NewIterator(
const ReadOptions& options, const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr,
HistogramImpl* file_read_hist = nullptr, bool for_compaction = false,
Arena* arena = nullptr, bool skip_filters = false);
Arena* arena = nullptr, bool skip_filters = false, int level = -1);
// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value) repeatedly until
// it returns false.
// @param skip_filters Disables loading/accessing the filter block
// @param level The level this table is at, -1 for "not set / don't know"
Status Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, const Slice& k,
GetContext* get_context, HistogramImpl* file_read_hist = nullptr,
bool skip_filters = false);
bool skip_filters = false, int level = -1);
// Evict any entry for the specified file number
static void Evict(Cache* cache, uint64_t file_number);
// Find table reader
// @param skip_filters Disables loading/accessing the filter block
// @param level == -1 means not specified
Status FindTable(const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, Cache::Handle**,
const bool no_io = false, bool record_read_stats = true,
HistogramImpl* file_read_hist = nullptr,
bool skip_filters = false);
bool skip_filters = false, int level = -1);
// Get TableReader from a cache handle.
TableReader* GetTableReaderFromHandle(Cache::Handle* handle);
......@@ -106,7 +109,7 @@ class TableCache {
const FileDescriptor& fd, bool sequential_mode,
bool record_read_stats, HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader,
bool skip_filters = false);
bool skip_filters = false, int level = -1);
const ImmutableCFOptions& ioptions_;
const EnvOptions& env_options_;
......
......@@ -91,6 +91,7 @@ class FilePicker {
const InternalKeyComparator* internal_comparator)
: num_levels_(num_levels),
curr_level_(static_cast<unsigned int>(-1)),
returned_file_level_(static_cast<unsigned int>(-1)),
hit_file_level_(static_cast<unsigned int>(-1)),
search_left_bound_(0),
search_right_bound_(FileIndexer::kLevelMaxIndex),
......@@ -117,6 +118,8 @@ class FilePicker {
}
}
int GetCurrentLevel() { return returned_file_level_; }
FdWithKeyRange* GetNextFile() {
while (!search_ended_) { // Loops over different levels.
while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
......@@ -189,6 +192,7 @@ class FilePicker {
}
prev_file_ = f;
#endif
returned_file_level_ = curr_level_;
if (curr_level_ > 0 && cmp_largest < 0) {
// No more files to search in this level.
search_ended_ = !PrepareNextLevel();
......@@ -215,6 +219,7 @@ class FilePicker {
private:
unsigned int num_levels_;
unsigned int curr_level_;
unsigned int returned_file_level_;
unsigned int hit_file_level_;
int32_t search_left_bound_;
int32_t search_right_bound_;
......@@ -485,7 +490,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
const EnvOptions& env_options,
const InternalKeyComparator& icomparator,
HistogramImpl* file_read_hist, bool for_compaction,
bool prefix_enabled, bool skip_filters)
bool prefix_enabled, bool skip_filters, int level)
: TwoLevelIteratorState(prefix_enabled),
table_cache_(table_cache),
read_options_(read_options),
......@@ -493,7 +498,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
icomparator_(icomparator),
file_read_hist_(file_read_hist),
for_compaction_(for_compaction),
skip_filters_(skip_filters) {}
skip_filters_(skip_filters),
level_(level) {}
InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override {
if (meta_handle.size() != sizeof(FileDescriptor)) {
......@@ -505,7 +511,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
return table_cache_->NewIterator(
read_options_, env_options_, icomparator_, *fd,
nullptr /* don't need reference to table*/, file_read_hist_,
for_compaction_, nullptr /* arena */, skip_filters_);
for_compaction_, nullptr /* arena */, skip_filters_, level_);
}
}
......@@ -521,6 +527,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
HistogramImpl* file_read_hist_;
bool for_compaction_;
bool skip_filters_;
int level_;
};
// A wrapper of version builder which references the current version in
......@@ -788,7 +795,8 @@ void Version::AddIterators(const ReadOptions& read_options,
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr,
cfd_->internal_stats()->GetFileReadHist(0), false, arena));
cfd_->internal_stats()->GetFileReadHist(0), false, arena,
false /* skip_filters */, 0 /* level */));
}
// For levels > 0, we can use a concatenating iterator that sequentially
......@@ -803,7 +811,7 @@ void Version::AddIterators(const ReadOptions& read_options,
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr,
IsFilterSkipped(level));
IsFilterSkipped(level), level);
mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
auto* first_level_iter = new (mem) LevelFileNumIterator(
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level));
......@@ -908,7 +916,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
read_options, *internal_comparator(), f->fd, ikey, &get_context,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()));
fp.IsHitFileLastInLevel()),
fp.GetCurrentLevel());
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
......@@ -2054,9 +2063,16 @@ VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options,
env_options_(storage_options),
env_options_compactions_(env_options_) {}
void CloseTables(void* ptr, size_t) {
TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
table_reader->Close();
}
VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
// VersionSet
column_family_set_->get_table_cache()->ApplyToAllCacheEntries(&CloseTables,
false);
column_family_set_.reset();
for (auto file : obsolete_files_) {
delete file;
......@@ -3267,7 +3283,8 @@ InternalIterator* VersionSet::MakeInputIterator(Compaction* c) {
read_options, env_options_compactions_,
cfd->internal_comparator(), flevel->files[i].fd, nullptr,
nullptr, /* no per level latency histogram*/
true /* for compaction */);
true /* for_compaction */, nullptr /* arena */,
false /* skip_filters */, (int)which /* level */);
}
} else {
// Create concatenating iterator for the files from this level
......@@ -3277,7 +3294,7 @@ InternalIterator* VersionSet::MakeInputIterator(Compaction* c) {
cfd->internal_comparator(),
nullptr /* no per level latency histogram */,
true /* for_compaction */, false /* prefix enabled */,
false /* skip_filters */),
false /* skip_filters */, (int)which /* level */),
new LevelFileNumIterator(cfd->internal_comparator(),
c->input_levels(which)));
}
......
......@@ -138,6 +138,7 @@
block_size=8192
block_restart_interval=16
cache_index_and_filter_blocks=false
pin_l0_filter_and_index_blocks_in_cache=false
index_type=kBinarySearch
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
......@@ -451,6 +451,9 @@ extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_cache_index_and_filter_blocks(
rocksdb_block_based_table_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(
rocksdb_block_based_table_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_skip_table_builder_flush(
rocksdb_block_based_table_options_t* options, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory(
......
......@@ -64,6 +64,12 @@ struct BlockBasedTableOptions {
// block during table initialization.
bool cache_index_and_filter_blocks = false;
// if cache_index_and_filter_blocks is true and the below is true, then
// filter and index blocks are stored in the cache, but a reference is
// held in the "table reader" object so the blocks are pinned and only
// evicted from cache when the table reader is freed.
bool pin_l0_filter_and_index_blocks_in_cache = false;
// The index type that will be used for this table.
enum IndexType : char {
// A space efficient index block that is optimized for
......
......@@ -38,13 +38,14 @@ jlong Java_org_rocksdb_PlainTableConfig_newTableFactoryHandle(
/*
* Class: org_rocksdb_BlockBasedTableConfig
* Method: newTableFactoryHandle
* Signature: (ZJIJIIZIZZJIBBI)J
* Signature: (ZJIJIIZIZZZJIBBI)J
*/
jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
JNIEnv* env, jobject jobj, jboolean no_block_cache, jlong block_cache_size,
jint block_cache_num_shardbits, jlong block_size, jint block_size_deviation,
jint block_restart_interval, jboolean whole_key_filtering,
jlong jfilterPolicy, jboolean cache_index_and_filter_blocks,
jboolean pin_l0_filter_and_index_blocks_in_cache,
jboolean hash_index_allow_collision, jlong block_cache_compressed_size,
jint block_cache_compressd_num_shard_bits, jbyte jchecksum_type,
jbyte jindex_type, jint jformat_version) {
......@@ -70,6 +71,8 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
options.filter_policy = *pFilterPolicy;
}
options.cache_index_and_filter_blocks = cache_index_and_filter_blocks;
options.pin_l0_filter_and_index_blocks_in_cache =
pin_l0_filter_and_index_blocks_in_cache;
options.hash_index_allow_collision = hash_index_allow_collision;
if (block_cache_compressed_size > 0) {
if (block_cache_compressd_num_shard_bits > 0) {
......
......@@ -64,7 +64,7 @@ Status BlockBasedTableFactory::NewTableReader(
table_reader_options.ioptions, table_reader_options.env_options,
table_options_, table_reader_options.internal_comparator, std::move(file),
file_size, table_reader, prefetch_enabled,
table_reader_options.skip_filters);
table_reader_options.skip_filters, table_reader_options.level);
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(
......@@ -94,6 +94,12 @@ Status BlockBasedTableFactory::SanitizeOptions(
return Status::InvalidArgument("Enable cache_index_and_filter_blocks, "
", but block cache is disabled");
}
if (table_options_.pin_l0_filter_and_index_blocks_in_cache &&
table_options_.no_block_cache) {
return Status::InvalidArgument(
"Enable pin_l0_filter_and_index_blocks_in_cache, "
", but block cache is disabled");
}
if (!BlockBasedTableSupportedVersion(table_options_.format_version)) {
return Status::InvalidArgument(
"Unsupported BlockBasedTable format_version. Please check "
......@@ -115,6 +121,10 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const {
snprintf(buffer, kBufferSize, " cache_index_and_filter_blocks: %d\n",
table_options_.cache_index_and_filter_blocks);
ret.append(buffer);
snprintf(buffer, kBufferSize,
" pin_l0_filter_and_index_blocks_in_cache: %d\n",
table_options_.pin_l0_filter_and_index_blocks_in_cache);
ret.append(buffer);
snprintf(buffer, kBufferSize, " index_type: %d\n",
table_options_.index_type);
ret.append(buffer);
......
......@@ -340,6 +340,28 @@ class HashIndexReader : public IndexReader {
BlockContents prefixes_contents_;
};
// CachableEntry represents the entries that *may* be fetched from block cache.
// field `value` is the item we want to get.
// field `cache_handle` is the cache handle to the block cache. If the value
// was not read from cache, `cache_handle` will be nullptr.
template <class TValue>
struct BlockBasedTable::CachableEntry {
CachableEntry(TValue* _value, Cache::Handle* _cache_handle)
: value(_value), cache_handle(_cache_handle) {}
CachableEntry() : CachableEntry(nullptr, nullptr) {}
void Release(Cache* cache) {
if (cache_handle) {
cache->Release(cache_handle);
value = nullptr;
cache_handle = nullptr;
}
}
bool IsSet() const { return cache_handle != nullptr; }
TValue* value = nullptr;
// if the entry is from the cache, cache_handle will be populated.
Cache::Handle* cache_handle = nullptr;
};
struct BlockBasedTable::Rep {
Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options,
......@@ -394,34 +416,21 @@ struct BlockBasedTable::Rep {
// and compatible with existing code, we introduce a wrapper that allows
// block to extract prefix without knowing if a key is internal or not.
unique_ptr<SliceTransform> internal_prefix_transform;
// only used in level 0 files:
// when pin_l0_filter_and_index_blocks_in_cache is true, we do use the
// LRU cache, but we always keep the filter & idndex block's handle checked
// out here (=we don't call Release()), plus the parsed out objects
// the LRU cache will never push flush them out, hence they're pinned
CachableEntry<FilterBlockReader> filter_entry;
CachableEntry<IndexReader> index_entry;
};
BlockBasedTable::~BlockBasedTable() {
Close();
delete rep_;
}
// CachableEntry represents the entries that *may* be fetched from block cache.
// field `value` is the item we want to get.
// field `cache_handle` is the cache handle to the block cache. If the value
// was not read from cache, `cache_handle` will be nullptr.
template <class TValue>
struct BlockBasedTable::CachableEntry {
CachableEntry(TValue* _value, Cache::Handle* _cache_handle)
: value(_value), cache_handle(_cache_handle) {}
CachableEntry() : CachableEntry(nullptr, nullptr) {}
void Release(Cache* cache) {
if (cache_handle) {
cache->Release(cache_handle);
value = nullptr;
cache_handle = nullptr;
}
}
TValue* value = nullptr;
// if the entry is from the cache, cache_handle will be populated.
Cache::Handle* cache_handle = nullptr;
};
// Helper function to setup the cache key's prefix for the Table.
void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) {
assert(kMaxCacheKeyPrefixSize >= 10);
......@@ -498,7 +507,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
uint64_t file_size,
unique_ptr<TableReader>* table_reader,
const bool prefetch_index_and_filter,
const bool skip_filters) {
const bool skip_filters, const int level) {
table_reader->reset();
Footer footer;
......@@ -594,14 +603,33 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
assert(table_options.block_cache != nullptr);
// Hack: Call NewIndexIterator() to implicitly add index to the
// block_cache
// if pin_l0_filter_and_index_blocks_in_cache is true and this is
// a level0 file, then we will pass in this pointer to rep->index
// to NewIndexIterator(), which will save the index block in there
// else it's a nullptr and nothing special happens
CachableEntry<IndexReader>* index_entry = nullptr;
if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
level == 0) {
index_entry = &rep->index_entry;
}
unique_ptr<InternalIterator> iter(
new_table->NewIndexIterator(ReadOptions()));
new_table->NewIndexIterator(ReadOptions(), nullptr, index_entry));
s = iter->status();
if (s.ok()) {
// Hack: Call GetFilter() to implicitly add filter to the block_cache
auto filter_entry = new_table->GetFilter();
filter_entry.Release(table_options.block_cache.get());
// if pin_l0_filter_and_index_blocks_in_cache is true, and this is
// a level0 file, then save it in rep_->filter_entry; it will be
// released in the destructor only, hence it will be pinned in the
// cache until this reader is alive
if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
level == 0) {
rep->filter_entry = filter_entry;
} else {
filter_entry.Release(table_options.block_cache.get());
}
}
} else {
// If we don't use block cache for index/filter blocks access, we'll
......@@ -886,6 +914,11 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
return {rep_->filter.get(), nullptr /* cache handle */};
}
// we have a pinned filter block
if (rep_->filter_entry.IsSet()) {
return rep_->filter_entry;
}
PERF_TIMER_GUARD(read_filter_block_nanos);
Cache* block_cache = rep_->table_options.block_cache.get();
......@@ -935,12 +968,19 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
}
InternalIterator* BlockBasedTable::NewIndexIterator(
const ReadOptions& read_options, BlockIter* input_iter) {
const ReadOptions& read_options, BlockIter* input_iter,
CachableEntry<IndexReader>* index_entry) {
// index reader has already been pre-populated.
if (rep_->index_reader) {
return rep_->index_reader->NewIterator(
input_iter, read_options.total_order_seek);
}
// we have a pinned index block
if (rep_->index_entry.IsSet()) {
return rep_->index_entry.value->NewIterator(input_iter,
read_options.total_order_seek);
}
PERF_TIMER_GUARD(read_index_block_nanos);
bool no_io = read_options.read_tier == kBlockCacheTier;
......@@ -996,7 +1036,15 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
assert(cache_handle);
auto* iter = index_reader->NewIterator(
input_iter, read_options.total_order_seek);
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle);
// the caller would like to take ownership of the index block
// don't call RegisterCleanup() in this case, the caller will take care of it
if (index_entry != nullptr) {
*index_entry = {index_reader, cache_handle};
} else {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle);
}
return iter;
}
......@@ -1224,7 +1272,13 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL);
}
filter_entry.Release(rep_->table_options.block_cache.get());
// if rep_->filter_entry is not set, we should call Release(); otherwise
// don't call, in this case we have a local copy in rep_->filter_entry,
// it's pinned to the cache and will be released in the destructor
if (!rep_->filter_entry.IsSet()) {
filter_entry.Release(rep_->table_options.block_cache.get());
}
return may_match;
}
......@@ -1324,7 +1378,12 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
}
}
filter_entry.Release(rep_->table_options.block_cache.get());
// if rep_->filter_entry is not set, we should call Release(); otherwise
// don't call, in this case we have a local copy in rep_->filter_entry,
// it's pinned to the cache and will be released in the destructor
if (!rep_->filter_entry.IsSet()) {
filter_entry.Release(rep_->table_options.block_cache.get());
}
return s;
}
......@@ -1612,6 +1671,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
return s;
}
void BlockBasedTable::Close() {
rep_->filter_entry.Release(rep_->table_options.block_cache.get());
rep_->index_entry.Release(rep_->table_options.block_cache.get());
}
Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) {
out_file->Append(
"Index Details:\n"
......
......@@ -76,7 +76,7 @@ class BlockBasedTable : public TableReader {
unique_ptr<RandomAccessFileReader>&& file,
uint64_t file_size, unique_ptr<TableReader>* table_reader,
bool prefetch_index_and_filter = true,
bool skip_filters = false);
bool skip_filters = false, int level = -1);
bool PrefixMayMatch(const Slice& internal_key);
......@@ -119,6 +119,8 @@ class BlockBasedTable : public TableReader {
// convert SST file to a human readable form
Status DumpTable(WritableFile* out_file) override;
void Close() override;
~BlockBasedTable();
bool TEST_filter_block_preloaded() const;
......@@ -155,8 +157,9 @@ class BlockBasedTable : public TableReader {
// 2. index is not present in block cache.
// 3. We disallowed any io to be performed, that is, read_options ==
// kBlockCacheTier
InternalIterator* NewIndexIterator(const ReadOptions& read_options,
BlockIter* input_iter = nullptr);
InternalIterator* NewIndexIterator(
const ReadOptions& read_options, BlockIter* input_iter = nullptr,
CachableEntry<IndexReader>* index_entry = nullptr);
// Read block cache from block caches (if set): block_cache and
// block_cache_compressed.
......
......@@ -29,17 +29,20 @@ struct TableReaderOptions {
TableReaderOptions(const ImmutableCFOptions& _ioptions,
const EnvOptions& _env_options,
const InternalKeyComparator& _internal_comparator,
bool _skip_filters = false)
bool _skip_filters = false, int _level = -1)
: ioptions(_ioptions),
env_options(_env_options),
internal_comparator(_internal_comparator),
skip_filters(_skip_filters) {}
skip_filters(_skip_filters),
level(_level) {}
const ImmutableCFOptions& ioptions;
const EnvOptions& env_options;
const InternalKeyComparator& internal_comparator;
// This is only used for BlockBasedTable (reader)
bool skip_filters;
// what level this table/file is on, -1 for "not set, don't know"
int level;
};
struct TableBuilderOptions {
......
......@@ -91,6 +91,8 @@ class TableReader {
virtual Status DumpTable(WritableFile* out_file) {
return Status::NotSupported("DumpTable() not supported");
}
virtual void Close() {}
};
} // namespace rocksdb
......@@ -1715,7 +1715,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) {
ImmutableCFOptions ioptions3(options);
// Generate table without filter policy
c3.Finish(options, ioptions3, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
// Open table with filter policy
table_options.filter_policy.reset(NewBloomFilterPolicy(1));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
......
......@@ -74,6 +74,7 @@ const_params="
--level_compaction_dynamic_level_bytes=true \
--bytes_per_sync=$((8 * M)) \
--cache_index_and_filter_blocks=0 \
--pin_l0_filter_and_index_blocks_in_cache=1 \
--benchmark_write_rate_limit=$(( 1024 * 1024 * $mb_written_per_sec )) \
\
--hard_rate_limit=3 \
......
......@@ -340,6 +340,9 @@ DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed"
DEFINE_bool(cache_index_and_filter_blocks, false,
"Cache index/filter blocks in block cache.");
DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false,
"Pin index/filter blocks of L0 files in block cache.");
DEFINE_int32(block_size,
static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
"Number of bytes in a block.");
......@@ -2511,6 +2514,8 @@ class Benchmark {
}
block_based_options.cache_index_and_filter_blocks =
FLAGS_cache_index_and_filter_blocks;
block_based_options.pin_l0_filter_and_index_blocks_in_cache =
FLAGS_pin_l0_filter_and_index_blocks_in_cache;
block_based_options.block_cache = cache_;
block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.block_size = FLAGS_block_size;
......
......@@ -491,6 +491,10 @@ static std::unordered_map<std::string,
{"cache_index_and_filter_blocks",
{offsetof(struct BlockBasedTableOptions, cache_index_and_filter_blocks),
OptionType::kBoolean, OptionVerificationType::kNormal}},
{"pin_l0_filter_and_index_blocks_in_cache",
{offsetof(struct BlockBasedTableOptions,
pin_l0_filter_and_index_blocks_in_cache),
OptionType::kBoolean, OptionVerificationType::kNormal}},
{"index_type",
{offsetof(struct BlockBasedTableOptions, index_type),
OptionType::kBlockBasedTableIndexType, OptionVerificationType::kNormal}},
......
......@@ -1582,7 +1582,9 @@ TEST_F(OptionsParserTest, BlockBasedTableOptionsAllFieldsSettable) {
// Need to update the option string if a new option is added.
ASSERT_OK(GetBlockBasedTableOptionsFromString(
*bbto,
"cache_index_and_filter_blocks=1;index_type=kHashSearch;"
"cache_index_and_filter_blocks=1;"
"pin_l0_filter_and_index_blocks_in_cache=1;"
"index_type=kHashSearch;"
"checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;"
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
"block_size_deviation=8;block_restart_interval=4; "
......
......@@ -193,6 +193,7 @@ const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined) {
BlockBasedTableOptions RandomBlockBasedTableOptions(Random* rnd) {
BlockBasedTableOptions opt;
opt.cache_index_and_filter_blocks = rnd->Uniform(2);
opt.pin_l0_filter_and_index_blocks_in_cache = rnd->Uniform(2);
opt.index_type = rnd->Uniform(2) ? BlockBasedTableOptions::kBinarySearch
: BlockBasedTableOptions::kHashSearch;
opt.hash_index_allow_collision = rnd->Uniform(2);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册