提交 249eff0f 编写于 作者: P Peter Dillinger 提交者: Facebook GitHub Bot

Stats for redundant insertions into block cache (#6681)

Summary:
Since read threads do not coordinate on loading data into block
cache, two threads between Lookup and Insert can end up loading and
inserting the same data. This is particularly concerning with
cache_index_and_filter_blocks since those are hot and more likely to
be race targets if ejected from (or not pre-populated in) the cache.

Particularly with moves toward disaggregated / network storage, the cost
of redundant retrieval might be high, and we should at least have some
hard statistics from which we can estimate impact.

Example with full filter thrashing "cliff":

    $ ./db_bench --benchmarks=fillrandom --num=15000000 --cache_index_and_filter_blocks -bloom_bits=10
    ...
    $ ./db_bench --db=/tmp/rocksdbtest-172704/dbbench --use_existing_db --benchmarks=readrandom,stats --num=200000 --cache_index_and_filter_blocks --cache_size=$((130 * 1024 * 1024)) --bloom_bits=10 --threads=16 -statistics 2>&1 | egrep '^rocksdb.block.cache.(.*add|.*redundant)' | grep -v compress | sort
    rocksdb.block.cache.add COUNT : 14181
    rocksdb.block.cache.add.failures COUNT : 0
    rocksdb.block.cache.add.redundant COUNT : 476
    rocksdb.block.cache.data.add COUNT : 12749
    rocksdb.block.cache.data.add.redundant COUNT : 18
    rocksdb.block.cache.filter.add COUNT : 1003
    rocksdb.block.cache.filter.add.redundant COUNT : 217
    rocksdb.block.cache.index.add COUNT : 429
    rocksdb.block.cache.index.add.redundant COUNT : 241
    $ ./db_bench --db=/tmp/rocksdbtest-172704/dbbench --use_existing_db --benchmarks=readrandom,stats --num=200000 --cache_index_and_filter_blocks --cache_size=$((120 * 1024 * 1024)) --bloom_bits=10 --threads=16 -statistics 2>&1 | egrep '^rocksdb.block.cache.(.*add|.*redundant)' | grep -v compress | sort
    rocksdb.block.cache.add COUNT : 1182223
    rocksdb.block.cache.add.failures COUNT : 0
    rocksdb.block.cache.add.redundant COUNT : 302728
    rocksdb.block.cache.data.add COUNT : 31425
    rocksdb.block.cache.data.add.redundant COUNT : 12
    rocksdb.block.cache.filter.add COUNT : 795455
    rocksdb.block.cache.filter.add.redundant COUNT : 130238
    rocksdb.block.cache.index.add COUNT : 355343
    rocksdb.block.cache.index.add.redundant COUNT : 172478
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6681

Test Plan: Some manual testing (above) and unit test covering key metrics is included

Reviewed By: ltamasi

Differential Revision: D21134113

Pulled By: pdillinger

fbshipit-source-id: c11497b5f00f4ffdfe919823904e52d0a1a91d87
上级 75b13ea9
......@@ -341,7 +341,8 @@ class ClockCacheShard final : public CacheShard {
CacheHandle* Insert(const Slice& key, uint32_t hash, void* value,
size_t change,
void (*deleter)(const Slice& key, void* value),
bool hold_reference, CleanupContext* context);
bool hold_reference, CleanupContext* context,
bool* overwritten);
// Guards list_, head_, and recycle_. In addition, updating table_ also has
// to hold the mutex, to avoid the cache being in inconsistent state.
......@@ -564,7 +565,8 @@ void ClockCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) {
CacheHandle* ClockCacheShard::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), bool hold_reference,
CleanupContext* context) {
CleanupContext* context, bool* overwritten) {
assert(overwritten != nullptr && *overwritten == false);
size_t total_charge =
CacheHandle::CalcTotalCharge(key, charge, metadata_charge_policy_);
MutexLock l(&mutex_);
......@@ -597,6 +599,7 @@ CacheHandle* ClockCacheShard::Insert(
handle->flags.store(flags, std::memory_order_relaxed);
HashTable::accessor accessor;
if (table_.find(accessor, CacheKey(key, hash))) {
*overwritten = true;
CacheHandle* existing_handle = accessor->second;
table_.erase(accessor);
UnsetInCache(existing_handle, context);
......@@ -619,8 +622,9 @@ Status ClockCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
char* key_data = new char[key.size()];
memcpy(key_data, key.data(), key.size());
Slice key_copy(key_data, key.size());
bool overwritten = false;
CacheHandle* handle = Insert(key_copy, hash, value, charge, deleter,
out_handle != nullptr, &context);
out_handle != nullptr, &context, &overwritten);
Status s;
if (out_handle != nullptr) {
if (handle == nullptr) {
......@@ -629,6 +633,10 @@ Status ClockCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
*out_handle = reinterpret_cast<Cache::Handle*>(handle);
}
}
if (overwritten) {
assert(s.ok());
s = Status::OkOverwritten();
}
Cleanup(context);
return s;
}
......
......@@ -386,6 +386,7 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
LRUHandle* old = table_.Insert(e);
usage_ += total_charge;
if (old != nullptr) {
s = Status::OkOverwritten();
assert(old->InCache());
old->SetInCache(false);
if (!old->HasRefs()) {
......
......@@ -2092,23 +2092,25 @@ class DBBasicTestMultiGet : public DBTestBase {
const BlockBuilder& data_block_builder_;
};
class MyBlockCache : public Cache {
class MyBlockCache : public CacheWrapper {
public:
explicit MyBlockCache(std::shared_ptr<Cache>& target)
: target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
virtual const char* Name() const override { return "MyBlockCache"; }
virtual Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
explicit MyBlockCache(std::shared_ptr<Cache> target)
: CacheWrapper(target),
num_lookups_(0),
num_found_(0),
num_inserts_(0) {}
const char* Name() const override { return "MyBlockCache"; }
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
num_inserts_++;
return target_->Insert(key, value, charge, deleter, handle, priority);
}
virtual Handle* Lookup(const Slice& key,
Statistics* stats = nullptr) override {
Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override {
num_lookups_++;
Handle* handle = target_->Lookup(key, stats);
if (handle != nullptr) {
......@@ -2116,57 +2118,6 @@ class DBBasicTestMultiGet : public DBTestBase {
}
return handle;
}
virtual bool Ref(Handle* handle) override { return target_->Ref(handle); }
virtual bool Release(Handle* handle, bool force_erase = false) override {
return target_->Release(handle, force_erase);
}
virtual void* Value(Handle* handle) override {
return target_->Value(handle);
}
virtual void Erase(const Slice& key) override { target_->Erase(key); }
virtual uint64_t NewId() override { return target_->NewId(); }
virtual void SetCapacity(size_t capacity) override {
target_->SetCapacity(capacity);
}
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
target_->SetStrictCapacityLimit(strict_capacity_limit);
}
virtual bool HasStrictCapacityLimit() const override {
return target_->HasStrictCapacityLimit();
}
virtual size_t GetCapacity() const override {
return target_->GetCapacity();
}
virtual size_t GetUsage() const override { return target_->GetUsage(); }
virtual size_t GetUsage(Handle* handle) const override {
return target_->GetUsage(handle);
}
virtual size_t GetPinnedUsage() const override {
return target_->GetPinnedUsage();
}
virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) override {
return target_->ApplyToAllCacheEntries(callback, thread_safe);
}
virtual void EraseUnRefEntries() override {
return target_->EraseUnRefEntries();
}
int num_lookups() { return num_lookups_; }
int num_found() { return num_found_; }
......@@ -2174,7 +2125,6 @@ class DBBasicTestMultiGet : public DBTestBase {
int num_inserts() { return num_inserts_; }
private:
std::shared_ptr<Cache> target_;
int num_lookups_;
int num_found_;
int num_inserts_;
......
......@@ -517,6 +517,139 @@ TEST_F(DBBlockCacheTest, IndexAndFilterBlocksCachePriority) {
}
}
namespace {
// An LRUCache wrapper that can falsely report "not found" on Lookup.
// This allows us to manipulate BlockBasedTableReader into thinking
// another thread inserted the data in between Lookup and Insert,
// while mostly preserving the LRUCache interface/behavior.
class LookupLiarCache : public CacheWrapper {
int nth_lookup_not_found_ = 0;
public:
explicit LookupLiarCache(std::shared_ptr<Cache> target)
: CacheWrapper(std::move(target)) {}
Handle* Lookup(const Slice& key, Statistics* stats) override {
if (nth_lookup_not_found_ == 1) {
nth_lookup_not_found_ = 0;
return nullptr;
}
if (nth_lookup_not_found_ > 1) {
--nth_lookup_not_found_;
}
return CacheWrapper::Lookup(key, stats);
}
// 1 == next lookup, 2 == after next, etc.
void SetNthLookupNotFound(int n) { nth_lookup_not_found_ = n; }
};
} // anonymous namespace
TEST_F(DBBlockCacheTest, AddRedundantStats) {
const size_t capacity = size_t{1} << 25;
const int num_shard_bits = 0; // 1 shard
int iterations_tested = 0;
for (std::shared_ptr<Cache> base_cache :
{NewLRUCache(capacity, num_shard_bits),
NewClockCache(capacity, num_shard_bits)}) {
if (!base_cache) {
// Skip clock cache when not supported
continue;
}
++iterations_tested;
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
std::shared_ptr<LookupLiarCache> cache =
std::make_shared<LookupLiarCache>(base_cache);
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.block_cache = cache;
table_options.filter_policy.reset(NewBloomFilterPolicy(50));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
DestroyAndReopen(options);
// Create a new table.
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Put("bar", "value"));
ASSERT_OK(Flush());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// Normal access filter+index+data.
ASSERT_EQ("value", Get("foo"));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD));
// --------
ASSERT_EQ(3, TestGetTickerCount(options, BLOCK_CACHE_ADD));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD_REDUNDANT));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD_REDUNDANT));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD_REDUNDANT));
// --------
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_ADD_REDUNDANT));
// Againt access filter+index+data, but force redundant load+insert on index
cache->SetNthLookupNotFound(2);
ASSERT_EQ("value", Get("bar"));
ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD));
// --------
ASSERT_EQ(4, TestGetTickerCount(options, BLOCK_CACHE_ADD));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD_REDUNDANT));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD_REDUNDANT));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD_REDUNDANT));
// --------
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_ADD_REDUNDANT));
// Access just filter (with high probability), and force redundant
// load+insert
cache->SetNthLookupNotFound(1);
ASSERT_EQ("NOT_FOUND", Get("this key was not added"));
EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD));
EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD));
EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD));
// --------
EXPECT_EQ(5, TestGetTickerCount(options, BLOCK_CACHE_ADD));
EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD_REDUNDANT));
EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD_REDUNDANT));
EXPECT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD_REDUNDANT));
// --------
EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD_REDUNDANT));
// Access just data, forcing redundant load+insert
ReadOptions read_options;
std::unique_ptr<Iterator> iter{db_->NewIterator(read_options)};
cache->SetNthLookupNotFound(1);
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), "bar");
EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD));
EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD));
EXPECT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD));
// --------
EXPECT_EQ(6, TestGetTickerCount(options, BLOCK_CACHE_ADD));
EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_ADD_REDUNDANT));
EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_ADD_REDUNDANT));
EXPECT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_DATA_ADD_REDUNDANT));
// --------
EXPECT_EQ(3, TestGetTickerCount(options, BLOCK_CACHE_ADD_REDUNDANT));
}
EXPECT_GE(iterations_tested, 1);
}
TEST_F(DBBlockCacheTest, ParanoidFileChecks) {
Options options = CurrentOptions();
options.create_if_missing = true;
......
......@@ -647,6 +647,72 @@ class TestPutOperator : public MergeOperator {
virtual const char* Name() const override { return "TestPutOperator"; }
};
// A wrapper around Cache that can easily be extended with instrumentation,
// etc.
class CacheWrapper : public Cache {
public:
explicit CacheWrapper(std::shared_ptr<Cache> target)
: target_(std::move(target)) {}
const char* Name() const override { return target_->Name(); }
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
return target_->Insert(key, value, charge, deleter, handle, priority);
}
Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override {
return target_->Lookup(key, stats);
}
bool Ref(Handle* handle) override { return target_->Ref(handle); }
bool Release(Handle* handle, bool force_erase = false) override {
return target_->Release(handle, force_erase);
}
void* Value(Handle* handle) override { return target_->Value(handle); }
void Erase(const Slice& key) override { target_->Erase(key); }
uint64_t NewId() override { return target_->NewId(); }
void SetCapacity(size_t capacity) override { target_->SetCapacity(capacity); }
void SetStrictCapacityLimit(bool strict_capacity_limit) override {
target_->SetStrictCapacityLimit(strict_capacity_limit);
}
bool HasStrictCapacityLimit() const override {
return target_->HasStrictCapacityLimit();
}
size_t GetCapacity() const override { return target_->GetCapacity(); }
size_t GetUsage() const override { return target_->GetUsage(); }
size_t GetUsage(Handle* handle) const override {
return target_->GetUsage(handle);
}
size_t GetPinnedUsage() const override { return target_->GetPinnedUsage(); }
size_t GetCharge(Handle* handle) const override {
return target_->GetCharge(handle);
}
void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) override {
target_->ApplyToAllCacheEntries(callback, thread_safe);
}
void EraseUnRefEntries() override { target_->EraseUnRefEntries(); }
protected:
std::shared_ptr<Cache> target_;
};
class DBTestBase : public testing::Test {
public:
// Sequence of option configurations to try
......
......@@ -342,6 +342,24 @@ enum Tickers : uint32_t {
BLOCK_CACHE_COMPRESSION_DICT_ADD,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT,
// # of blocks redundantly inserted into block cache.
// REQUIRES: BLOCK_CACHE_ADD_REDUNDANT <= BLOCK_CACHE_ADD
BLOCK_CACHE_ADD_REDUNDANT,
// # of index blocks redundantly inserted into block cache.
// REQUIRES: BLOCK_CACHE_INDEX_ADD_REDUNDANT <= BLOCK_CACHE_INDEX_ADD
BLOCK_CACHE_INDEX_ADD_REDUNDANT,
// # of filter blocks redundantly inserted into block cache.
// REQUIRES: BLOCK_CACHE_FILTER_ADD_REDUNDANT <= BLOCK_CACHE_FILTER_ADD
BLOCK_CACHE_FILTER_ADD_REDUNDANT,
// # of data blocks redundantly inserted into block cache.
// REQUIRES: BLOCK_CACHE_DATA_ADD_REDUNDANT <= BLOCK_CACHE_DATA_ADD
BLOCK_CACHE_DATA_ADD_REDUNDANT,
// # of dict blocks redundantly inserted into block cache.
// REQUIRES: BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT
// <= BLOCK_CACHE_COMPRESSION_DICT_ADD
BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
TICKER_ENUM_MAX
};
......
......@@ -78,6 +78,7 @@ class Status {
kPathNotFound = 9,
KMergeOperandsInsufficientCapacity = 10,
kManualCompactionPaused = 11,
kOverwritten = 12,
kMaxSubCode
};
......@@ -101,6 +102,12 @@ class Status {
// Return a success status.
static Status OK() { return Status(); }
// Successful, though an existing something was overwritten
// Note: using variants of OK status for program logic is discouraged,
// but it can be useful for communicating statistical information without
// changing public APIs.
static Status OkOverwritten() { return Status(kOk, kOverwritten); }
// Return error status of an appropriate type.
static Status NotFound(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kNotFound, msg, msg2);
......@@ -220,6 +227,12 @@ class Status {
// Returns true iff the status indicates success.
bool ok() const { return code() == kOk; }
// Returns true iff the status indicates success *with* something
// overwritten
bool IsOkOverwritten() const {
return code() == kOk && subcode() == kOverwritten;
}
// Returns true iff the status indicates a NotFound error.
bool IsNotFound() const { return code() == kNotFound; }
......
......@@ -176,6 +176,14 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
"rocksdb.block.cache.compression.dict.bytes.insert"},
{BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT,
"rocksdb.block.cache.compression.dict.bytes.evict"},
{BLOCK_CACHE_ADD_REDUNDANT, "rocksdb.block.cache.add.redundant"},
{BLOCK_CACHE_INDEX_ADD_REDUNDANT,
"rocksdb.block.cache.index.add.redundant"},
{BLOCK_CACHE_FILTER_ADD_REDUNDANT,
"rocksdb.block.cache.filter.add.redundant"},
{BLOCK_CACHE_DATA_ADD_REDUNDANT, "rocksdb.block.cache.data.add.redundant"},
{BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
"rocksdb.block.cache.compression.dict.add.redundant"},
};
const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
......
......@@ -15,6 +15,8 @@
#include <utility>
#include <vector>
#include "cache/sharded_cache.h"
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
#include "file/file_prefetch_buffer.h"
......@@ -338,15 +340,22 @@ void BlockBasedTable::UpdateCacheMissMetrics(BlockType block_type,
void BlockBasedTable::UpdateCacheInsertionMetrics(BlockType block_type,
GetContext* get_context,
size_t usage) const {
size_t usage,
bool redundant) const {
Statistics* const statistics = rep_->ioptions.statistics;
// TODO: introduce perf counters for block cache insertions
if (get_context) {
++get_context->get_context_stats_.num_cache_add;
if (redundant) {
++get_context->get_context_stats_.num_cache_add_redundant;
}
get_context->get_context_stats_.num_cache_bytes_write += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_ADD);
if (redundant) {
RecordTick(statistics, BLOCK_CACHE_ADD_REDUNDANT);
}
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usage);
}
......@@ -354,9 +363,15 @@ void BlockBasedTable::UpdateCacheInsertionMetrics(BlockType block_type,
case BlockType::kFilter:
if (get_context) {
++get_context->get_context_stats_.num_cache_filter_add;
if (redundant) {
++get_context->get_context_stats_.num_cache_filter_add_redundant;
}
get_context->get_context_stats_.num_cache_filter_bytes_insert += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
if (redundant) {
RecordTick(statistics, BLOCK_CACHE_FILTER_ADD_REDUNDANT);
}
RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, usage);
}
break;
......@@ -364,10 +379,17 @@ void BlockBasedTable::UpdateCacheInsertionMetrics(BlockType block_type,
case BlockType::kCompressionDictionary:
if (get_context) {
++get_context->get_context_stats_.num_cache_compression_dict_add;
if (redundant) {
++get_context->get_context_stats_
.num_cache_compression_dict_add_redundant;
}
get_context->get_context_stats_
.num_cache_compression_dict_bytes_insert += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD);
if (redundant) {
RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT);
}
RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
usage);
}
......@@ -376,9 +398,15 @@ void BlockBasedTable::UpdateCacheInsertionMetrics(BlockType block_type,
case BlockType::kIndex:
if (get_context) {
++get_context->get_context_stats_.num_cache_index_add;
if (redundant) {
++get_context->get_context_stats_.num_cache_index_add_redundant;
}
get_context->get_context_stats_.num_cache_index_bytes_insert += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
if (redundant) {
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD_REDUNDANT);
}
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, usage);
}
break;
......@@ -388,9 +416,15 @@ void BlockBasedTable::UpdateCacheInsertionMetrics(BlockType block_type,
// for range tombstones
if (get_context) {
++get_context->get_context_stats_.num_cache_data_add;
if (redundant) {
++get_context->get_context_stats_.num_cache_data_add_redundant;
}
get_context->get_context_stats_.num_cache_data_bytes_insert += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
if (redundant) {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD_REDUNDANT);
}
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, usage);
}
break;
......@@ -1182,7 +1216,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
block->SetCachedValue(block_holder.release(), block_cache,
cache_handle);
UpdateCacheInsertionMetrics(block_type, get_context, charge);
UpdateCacheInsertionMetrics(block_type, get_context, charge,
s.IsOkOverwritten());
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
}
......@@ -1287,7 +1322,8 @@ Status BlockBasedTable::PutDataBlockToCache(
cached_block->SetCachedValue(block_holder.release(), block_cache,
cache_handle);
UpdateCacheInsertionMetrics(block_type, get_context, charge);
UpdateCacheInsertionMetrics(block_type, get_context, charge,
s.IsOkOverwritten());
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
}
......
......@@ -260,7 +260,8 @@ class BlockBasedTable : public TableReader {
void UpdateCacheMissMetrics(BlockType block_type,
GetContext* get_context) const;
void UpdateCacheInsertionMetrics(BlockType block_type,
GetContext* get_context, size_t usage) const;
GetContext* get_context, size_t usage,
bool redundant) const;
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
BlockType block_type,
GetContext* get_context) const;
......
......@@ -152,6 +152,10 @@ void GetContext::ReportCounters() {
if (get_context_stats_.num_cache_add > 0) {
RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
}
if (get_context_stats_.num_cache_add_redundant > 0) {
RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT,
get_context_stats_.num_cache_add_redundant);
}
if (get_context_stats_.num_cache_bytes_write > 0) {
RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
get_context_stats_.num_cache_bytes_write);
......@@ -160,6 +164,10 @@ void GetContext::ReportCounters() {
RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
get_context_stats_.num_cache_index_add);
}
if (get_context_stats_.num_cache_index_add_redundant > 0) {
RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT,
get_context_stats_.num_cache_index_add_redundant);
}
if (get_context_stats_.num_cache_index_bytes_insert > 0) {
RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
get_context_stats_.num_cache_index_bytes_insert);
......@@ -168,6 +176,10 @@ void GetContext::ReportCounters() {
RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
get_context_stats_.num_cache_data_add);
}
if (get_context_stats_.num_cache_data_add_redundant > 0) {
RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT,
get_context_stats_.num_cache_data_add_redundant);
}
if (get_context_stats_.num_cache_data_bytes_insert > 0) {
RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
get_context_stats_.num_cache_data_bytes_insert);
......@@ -176,6 +188,10 @@ void GetContext::ReportCounters() {
RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
get_context_stats_.num_cache_filter_add);
}
if (get_context_stats_.num_cache_filter_add_redundant > 0) {
RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT,
get_context_stats_.num_cache_filter_add_redundant);
}
if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
get_context_stats_.num_cache_filter_bytes_insert);
......@@ -184,6 +200,10 @@ void GetContext::ReportCounters() {
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
get_context_stats_.num_cache_compression_dict_add);
}
if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) {
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
get_context_stats_.num_cache_compression_dict_add_redundant);
}
if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
get_context_stats_.num_cache_compression_dict_bytes_insert);
......
......@@ -33,14 +33,19 @@ struct GetContextStats {
uint64_t num_cache_bytes_read = 0;
uint64_t num_cache_miss = 0;
uint64_t num_cache_add = 0;
uint64_t num_cache_add_redundant = 0;
uint64_t num_cache_bytes_write = 0;
uint64_t num_cache_index_add = 0;
uint64_t num_cache_index_add_redundant = 0;
uint64_t num_cache_index_bytes_insert = 0;
uint64_t num_cache_data_add = 0;
uint64_t num_cache_data_add_redundant = 0;
uint64_t num_cache_data_bytes_insert = 0;
uint64_t num_cache_filter_add = 0;
uint64_t num_cache_filter_add_redundant = 0;
uint64_t num_cache_filter_bytes_insert = 0;
uint64_t num_cache_compression_dict_add = 0;
uint64_t num_cache_compression_dict_add_redundant = 0;
uint64_t num_cache_compression_dict_bytes_insert = 0;
};
......
......@@ -52,6 +52,7 @@ static const char* msgs[static_cast<int>(Status::kMaxSubCode)] = {
"Insufficient capacity for merge operands",
// kManualCompactionPaused
"Manual compaction paused",
" (overwritten)", // kOverwritten, subcode of OK
};
Status::Status(Code _code, SubCode _subcode, const Slice& msg,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册