提交 2a23bee9 编写于 作者: P Peter Dillinger 提交者: Facebook GitHub Bot

Use CacheWrapper in more places (#11295)

Summary:
... to simplify code and make it less prone to needless updates on refactoring.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11295

Test Plan: existing tests (no functional changes intended)

Reviewed By: hx235

Differential Revision: D44040260

Pulled By: pdillinger

fbshipit-source-id: 1b6badb5c8ca673db0903bfaba3cfbc986f386be
上级 49881921
......@@ -11,7 +11,7 @@ namespace ROCKSDB_NAMESPACE {
ChargedCache::ChargedCache(std::shared_ptr<Cache> cache,
std::shared_ptr<Cache> block_cache)
: cache_(cache),
: CacheWrapper(cache),
cache_res_mgr_(std::make_shared<ConcurrentCacheReservationManager>(
std::make_shared<
CacheReservationManagerImpl<CacheEntryRole::kBlobCache>>(
......@@ -20,13 +20,13 @@ ChargedCache::ChargedCache(std::shared_ptr<Cache> cache,
Status ChargedCache::Insert(const Slice& key, ObjectPtr obj,
const CacheItemHelper* helper, size_t charge,
Handle** handle, Priority priority) {
Status s = cache_->Insert(key, obj, helper, charge, handle, priority);
Status s = target_->Insert(key, obj, helper, charge, handle, priority);
if (s.ok()) {
// Insert may cause the cache entry eviction if the cache is full. So we
// directly call the reservation manager to update the total memory used
// in the cache.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
cache_res_mgr_->UpdateCacheReservation(target_->GetUsage())
.PermitUncheckedError();
}
return s;
......@@ -38,13 +38,13 @@ Cache::Handle* ChargedCache::Lookup(const Slice& key,
Priority priority, bool wait,
Statistics* stats) {
auto handle =
cache_->Lookup(key, helper, create_context, priority, wait, stats);
target_->Lookup(key, helper, create_context, priority, wait, stats);
// Lookup may promote the KV pair from the secondary cache to the primary
// cache. So we directly call the reservation manager to update the total
// memory used in the cache.
if (helper && helper->create_cb) {
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
cache_res_mgr_->UpdateCacheReservation(target_->GetUsage())
.PermitUncheckedError();
}
return handle;
......@@ -52,8 +52,8 @@ Cache::Handle* ChargedCache::Lookup(const Slice& key,
bool ChargedCache::Release(Cache::Handle* handle, bool useful,
bool erase_if_last_ref) {
size_t memory_used_delta = cache_->GetUsage(handle);
bool erased = cache_->Release(handle, useful, erase_if_last_ref);
size_t memory_used_delta = target_->GetUsage(handle);
bool erased = target_->Release(handle, useful, erase_if_last_ref);
if (erased) {
assert(cache_res_mgr_);
cache_res_mgr_
......@@ -64,8 +64,8 @@ bool ChargedCache::Release(Cache::Handle* handle, bool useful,
}
bool ChargedCache::Release(Cache::Handle* handle, bool erase_if_last_ref) {
size_t memory_used_delta = cache_->GetUsage(handle);
bool erased = cache_->Release(handle, erase_if_last_ref);
size_t memory_used_delta = target_->GetUsage(handle);
bool erased = target_->Release(handle, erase_if_last_ref);
if (erased) {
assert(cache_res_mgr_);
cache_res_mgr_
......@@ -76,25 +76,25 @@ bool ChargedCache::Release(Cache::Handle* handle, bool erase_if_last_ref) {
}
void ChargedCache::Erase(const Slice& key) {
cache_->Erase(key);
target_->Erase(key);
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
cache_res_mgr_->UpdateCacheReservation(target_->GetUsage())
.PermitUncheckedError();
}
void ChargedCache::EraseUnRefEntries() {
cache_->EraseUnRefEntries();
target_->EraseUnRefEntries();
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
cache_res_mgr_->UpdateCacheReservation(target_->GetUsage())
.PermitUncheckedError();
}
void ChargedCache::SetCapacity(size_t capacity) {
cache_->SetCapacity(capacity);
target_->SetCapacity(capacity);
// SetCapacity can result in evictions when the cache capacity is decreased,
// so we would want to update the cache reservation here as well.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
cache_res_mgr_->UpdateCacheReservation(target_->GetUsage())
.PermitUncheckedError();
}
......
......@@ -17,11 +17,10 @@ class ConcurrentCacheReservationManager;
// A cache interface which wraps around another cache and takes care of
// reserving space in block cache towards a single global memory limit, and
// forwards all the calls to the underlying cache.
class ChargedCache : public Cache {
class ChargedCache : public CacheWrapper {
public:
ChargedCache(std::shared_ptr<Cache> cache,
std::shared_ptr<Cache> block_cache);
~ChargedCache() override = default;
Status Insert(const Slice& key, ObjectPtr obj, const CacheItemHelper* helper,
size_t charge, Handle** handle = nullptr,
......@@ -42,66 +41,9 @@ class ChargedCache : public Cache {
static const char* kClassName() { return "ChargedCache"; }
const char* Name() const override { return kClassName(); }
uint64_t NewId() override { return cache_->NewId(); }
void SetCapacity(size_t capacity) override;
void SetStrictCapacityLimit(bool strict_capacity_limit) override {
cache_->SetStrictCapacityLimit(strict_capacity_limit);
}
bool HasStrictCapacityLimit() const override {
return cache_->HasStrictCapacityLimit();
}
ObjectPtr Value(Cache::Handle* handle) override {
return cache_->Value(handle);
}
bool IsReady(Cache::Handle* handle) override {
return cache_->IsReady(handle);
}
void Wait(Cache::Handle* handle) override { cache_->Wait(handle); }
void WaitAll(std::vector<Handle*>& handles) override {
cache_->WaitAll(handles);
}
bool Ref(Cache::Handle* handle) override { return cache_->Ref(handle); }
size_t GetCapacity() const override { return cache_->GetCapacity(); }
size_t GetUsage() const override { return cache_->GetUsage(); }
size_t GetUsage(Cache::Handle* handle) const override {
return cache_->GetUsage(handle);
}
size_t GetPinnedUsage() const override { return cache_->GetPinnedUsage(); }
size_t GetCharge(Cache::Handle* handle) const override {
return cache_->GetCharge(handle);
}
const CacheItemHelper* GetCacheItemHelper(Handle* handle) const override {
return cache_->GetCacheItemHelper(handle);
}
void ApplyToAllEntries(
const std::function<void(const Slice& key, ObjectPtr value, size_t charge,
const CacheItemHelper* helper)>& callback,
const Cache::ApplyToAllEntriesOptions& opts) override {
cache_->ApplyToAllEntries(callback, opts);
}
std::string GetPrintableOptions() const override {
return cache_->GetPrintableOptions();
}
void DisownData() override { return cache_->DisownData(); }
inline Cache* GetCache() const { return cache_.get(); }
inline Cache* GetCache() const { return target_.get(); }
inline ConcurrentCacheReservationManager* TEST_GetCacheReservationManager()
const {
......@@ -109,7 +51,6 @@ class ChargedCache : public Cache {
}
private:
std::shared_ptr<Cache> cache_;
std::shared_ptr<ConcurrentCacheReservationManager> cache_res_mgr_;
};
......
......@@ -1997,37 +1997,25 @@ TEST_F(DBSecondaryCacheTest, TestSecondaryCacheMultiGet) {
Destroy(options);
}
class LRUCacheWithStat : public LRUCache {
class CacheWithStats : public CacheWrapper {
public:
LRUCacheWithStat(
size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit,
double _high_pri_pool_ratio, double _low_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> _memory_allocator = nullptr,
bool _use_adaptive_mutex = kDefaultToAdaptiveMutex,
CacheMetadataChargePolicy _metadata_charge_policy =
kDontChargeCacheMetadata,
const std::shared_ptr<SecondaryCache>& _secondary_cache = nullptr)
: LRUCache(_capacity, _num_shard_bits, _strict_capacity_limit,
_high_pri_pool_ratio, _low_pri_pool_ratio, _memory_allocator,
_use_adaptive_mutex, _metadata_charge_policy,
_secondary_cache) {
insert_count_ = 0;
lookup_count_ = 0;
}
~LRUCacheWithStat() {}
using CacheWrapper::CacheWrapper;
static const char* kClassName() { return "CacheWithStats"; }
const char* Name() const override { return kClassName(); }
Status Insert(const Slice& key, Cache::ObjectPtr value,
const CacheItemHelper* helper, size_t charge,
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
insert_count_++;
return LRUCache::Insert(key, value, helper, charge, handle, priority);
return target_->Insert(key, value, helper, charge, handle, priority);
}
Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
CreateContext* create_context, Priority priority, bool wait,
Statistics* stats = nullptr) override {
lookup_count_++;
return LRUCache::Lookup(key, helper, create_context, priority, wait, stats);
return target_->Lookup(key, helper, create_context, priority, wait, stats);
}
uint32_t GetInsertCount() { return insert_count_; }
......@@ -2038,24 +2026,18 @@ class LRUCacheWithStat : public LRUCache {
}
private:
uint32_t insert_count_;
uint32_t lookup_count_;
uint32_t insert_count_ = 0;
uint32_t lookup_count_ = 0;
};
TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {
LRUCacheOptions cache_opts(1024 * 1024 /* capacity */, 0 /* num_shard_bits */,
false /* strict_capacity_limit */,
0.5 /* high_pri_pool_ratio */,
nullptr /* memory_allocator */,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
LRUCacheWithStat* tmp_cache = new LRUCacheWithStat(
cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.low_pri_pool_ratio, cache_opts.memory_allocator,
cache_opts.use_adaptive_mutex, cache_opts.metadata_charge_policy,
cache_opts.secondary_cache);
std::shared_ptr<Cache> cache(tmp_cache);
std::shared_ptr<CacheWithStats> cache =
std::make_shared<CacheWithStats>(NewLRUCache(cache_opts));
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
......@@ -2083,15 +2065,15 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {
// do th eread for all the key value pairs, so all the blocks should be in
// cache
uint32_t start_insert = tmp_cache->GetInsertCount();
uint32_t start_lookup = tmp_cache->GetLookupcount();
uint32_t start_insert = cache->GetInsertCount();
uint32_t start_lookup = cache->GetLookupcount();
std::string v;
for (int i = 0; i < N; i++) {
v = Get(Key(i));
ASSERT_EQ(v, value[i]);
}
uint32_t dump_insert = tmp_cache->GetInsertCount() - start_insert;
uint32_t dump_lookup = tmp_cache->GetLookupcount() - start_lookup;
uint32_t dump_insert = cache->GetInsertCount() - start_insert;
uint32_t dump_lookup = cache->GetLookupcount() - start_lookup;
ASSERT_EQ(63,
static_cast<int>(dump_insert)); // the insert in the block cache
ASSERT_EQ(256,
......@@ -2122,14 +2104,8 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048 * 1024);
cache_opts.secondary_cache = secondary_cache;
tmp_cache = new LRUCacheWithStat(
cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.low_pri_pool_ratio, cache_opts.memory_allocator,
cache_opts.use_adaptive_mutex, cache_opts.metadata_charge_policy,
cache_opts.secondary_cache);
std::shared_ptr<Cache> cache_new(tmp_cache);
table_options.block_cache = cache_new;
cache = std::make_shared<CacheWithStats>(NewLRUCache(cache_opts));
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
......@@ -2160,8 +2136,8 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {
// After load, we do the Get again
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
uint32_t cache_insert = tmp_cache->GetInsertCount();
uint32_t cache_lookup = tmp_cache->GetLookupcount();
uint32_t cache_insert = cache->GetInsertCount();
uint32_t cache_lookup = cache->GetLookupcount();
for (int i = 0; i < N; i++) {
v = Get(Key(i));
ASSERT_EQ(v, value[i]);
......@@ -2172,8 +2148,8 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {
ASSERT_EQ(0, static_cast<int>(final_insert));
// lookup the secondary to get all blocks
ASSERT_EQ(64, static_cast<int>(final_lookup));
uint32_t block_insert = tmp_cache->GetInsertCount() - cache_insert;
uint32_t block_lookup = tmp_cache->GetLookupcount() - cache_lookup;
uint32_t block_insert = cache->GetInsertCount() - cache_insert;
uint32_t block_lookup = cache->GetLookupcount() - cache_lookup;
// Check the new block cache insert and lookup, should be no insert since all
// blocks are from the secondary cache.
ASSERT_EQ(0, static_cast<int>(block_insert));
......@@ -2189,13 +2165,8 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
0.5 /* high_pri_pool_ratio */,
nullptr /* memory_allocator */,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
LRUCacheWithStat* tmp_cache = new LRUCacheWithStat(
cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.low_pri_pool_ratio, cache_opts.memory_allocator,
cache_opts.use_adaptive_mutex, cache_opts.metadata_charge_policy,
cache_opts.secondary_cache);
std::shared_ptr<Cache> cache(tmp_cache);
std::shared_ptr<CacheWithStats> cache =
std::make_shared<CacheWithStats>(NewLRUCache(cache_opts));
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
......@@ -2245,8 +2216,8 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
// do th eread for all the key value pairs, so all the blocks should be in
// cache
uint32_t start_insert = tmp_cache->GetInsertCount();
uint32_t start_lookup = tmp_cache->GetLookupcount();
uint32_t start_insert = cache->GetInsertCount();
uint32_t start_lookup = cache->GetLookupcount();
ReadOptions ro;
std::string v;
for (int i = 0; i < N; i++) {
......@@ -2257,8 +2228,8 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
ASSERT_OK(db2->Get(ro, Key(i), &v));
ASSERT_EQ(v, value2[i]);
}
uint32_t dump_insert = tmp_cache->GetInsertCount() - start_insert;
uint32_t dump_lookup = tmp_cache->GetLookupcount() - start_lookup;
uint32_t dump_insert = cache->GetInsertCount() - start_insert;
uint32_t dump_lookup = cache->GetLookupcount() - start_lookup;
ASSERT_EQ(128,
static_cast<int>(dump_insert)); // the insert in the block cache
ASSERT_EQ(512,
......@@ -2289,14 +2260,8 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048 * 1024);
cache_opts.secondary_cache = secondary_cache;
tmp_cache = new LRUCacheWithStat(
cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.low_pri_pool_ratio, cache_opts.memory_allocator,
cache_opts.use_adaptive_mutex, cache_opts.metadata_charge_policy,
cache_opts.secondary_cache);
std::shared_ptr<Cache> cache_new(tmp_cache);
table_options.block_cache = cache_new;
cache = std::make_shared<CacheWithStats>(NewLRUCache(cache_opts));
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
......@@ -2332,8 +2297,8 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
fault_fs_->SetFilesystemActive(false, error_msg);
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
uint32_t cache_insert = tmp_cache->GetInsertCount();
uint32_t cache_lookup = tmp_cache->GetLookupcount();
uint32_t cache_insert = cache->GetInsertCount();
uint32_t cache_lookup = cache->GetLookupcount();
for (int i = 0; i < N; i++) {
ASSERT_OK(db1->Get(ro, Key(i), &v));
ASSERT_EQ(v, value1[i]);
......@@ -2344,8 +2309,8 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
ASSERT_EQ(0, static_cast<int>(final_insert));
// lookup the secondary to get all blocks
ASSERT_EQ(64, static_cast<int>(final_lookup));
uint32_t block_insert = tmp_cache->GetInsertCount() - cache_insert;
uint32_t block_lookup = tmp_cache->GetLookupcount() - cache_lookup;
uint32_t block_insert = cache->GetInsertCount() - cache_insert;
uint32_t block_lookup = cache->GetLookupcount() - cache_lookup;
// Check the new block cache insert and lookup, should be no insert since all
// blocks are from the secondary cache.
ASSERT_EQ(0, static_cast<int>(block_insert));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册