提交 2b1fde60 编写于 作者: L Layamon 提交者: ZhaoMing

[diagnosable lrucache] add diagnosable lru cache

上级 7e76aaba
...@@ -1170,7 +1170,7 @@ if(WITH_TOOLS) ...@@ -1170,7 +1170,7 @@ if(WITH_TOOLS)
endforeach(sourcefile ${BENCHMARKS}) endforeach(sourcefile ${BENCHMARKS})
add_subdirectory(tools) add_subdirectory(tools)
#add_subdirectory(terark-tools/terark-test) add_subdirectory(terark-tools/terark-test)
add_subdirectory(terark-tools/trx-test) add_subdirectory(terark-tools/trx-test)
add_subdirectory(terark-tools/batch-write-bench) add_subdirectory(terark-tools/batch-write-bench)
endif() endif()
...@@ -103,13 +103,13 @@ void LRUHandleTable::Resize() { ...@@ -103,13 +103,13 @@ void LRUHandleTable::Resize() {
LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit,
double high_pri_pool_ratio) double high_pri_pool_ratio)
: capacity_(0), : high_pri_pool_usage_(0),
high_pri_pool_usage_(0), usage_(0),
lru_usage_(0),
capacity_(0),
strict_capacity_limit_(strict_capacity_limit), strict_capacity_limit_(strict_capacity_limit),
high_pri_pool_ratio_(high_pri_pool_ratio), high_pri_pool_ratio_(high_pri_pool_ratio),
high_pri_pool_capacity_(0), high_pri_pool_capacity_(0) {
usage_(0),
lru_usage_(0) {
// Make empty circular linked list // Make empty circular linked list
lru_.next = &lru_; lru_.next = &lru_;
lru_.prev = &lru_; lru_.prev = &lru_;
...@@ -140,7 +140,7 @@ void LRUCacheShard::EraseUnRefEntries() { ...@@ -140,7 +140,7 @@ void LRUCacheShard::EraseUnRefEntries() {
table_.Remove(old->key(), old->hash); table_.Remove(old->key(), old->hash);
old->SetInCache(false); old->SetInCache(false);
Unref(old); Unref(old);
usage_ -= old->charge; UsageSub(old);
last_reference_list.push_back(old); last_reference_list.push_back(old);
} }
} }
...@@ -191,10 +191,10 @@ void LRUCacheShard::LRU_Remove(LRUHandle* e) { ...@@ -191,10 +191,10 @@ void LRUCacheShard::LRU_Remove(LRUHandle* e) {
e->next->prev = e->prev; e->next->prev = e->prev;
e->prev->next = e->next; e->prev->next = e->next;
e->prev = e->next = nullptr; e->prev = e->next = nullptr;
lru_usage_ -= e->charge; LRUUsageSub(e);
if (e->InHighPriPool()) { if (e->InHighPriPool()) {
assert(high_pri_pool_usage_ >= e->charge); assert(high_pri_pool_usage_ >= e->charge);
high_pri_pool_usage_ -= e->charge; HighPriPoolUsageSub(e);
} }
} }
...@@ -208,7 +208,7 @@ void LRUCacheShard::LRU_Insert(LRUHandle* e) { ...@@ -208,7 +208,7 @@ void LRUCacheShard::LRU_Insert(LRUHandle* e) {
e->prev->next = e; e->prev->next = e;
e->next->prev = e; e->next->prev = e;
e->SetInHighPriPool(true); e->SetInHighPriPool(true);
high_pri_pool_usage_ += e->charge; HighPriPoolUsageAdd(e);
MaintainPoolSize(); MaintainPoolSize();
} else { } else {
// Insert "e" to the head of low-pri pool. Note that when // Insert "e" to the head of low-pri pool. Note that when
...@@ -220,7 +220,7 @@ void LRUCacheShard::LRU_Insert(LRUHandle* e) { ...@@ -220,7 +220,7 @@ void LRUCacheShard::LRU_Insert(LRUHandle* e) {
e->SetInHighPriPool(false); e->SetInHighPriPool(false);
lru_low_pri_ = e; lru_low_pri_ = e;
} }
lru_usage_ += e->charge; LRUUsageAdd(e);
} }
void LRUCacheShard::MaintainPoolSize() { void LRUCacheShard::MaintainPoolSize() {
...@@ -229,7 +229,7 @@ void LRUCacheShard::MaintainPoolSize() { ...@@ -229,7 +229,7 @@ void LRUCacheShard::MaintainPoolSize() {
lru_low_pri_ = lru_low_pri_->next; lru_low_pri_ = lru_low_pri_->next;
assert(lru_low_pri_ != &lru_); assert(lru_low_pri_ != &lru_);
lru_low_pri_->SetInHighPriPool(false); lru_low_pri_->SetInHighPriPool(false);
high_pri_pool_usage_ -= lru_low_pri_->charge; HighPriPoolUsageSub(lru_low_pri_);
} }
} }
...@@ -243,7 +243,7 @@ void LRUCacheShard::EvictFromLRU(size_t charge, ...@@ -243,7 +243,7 @@ void LRUCacheShard::EvictFromLRU(size_t charge,
table_.Remove(old->key(), old->hash); table_.Remove(old->key(), old->hash);
old->SetInCache(false); old->SetInCache(false);
Unref(old); Unref(old);
usage_ -= old->charge; UsageSub(old);
deleted->push_back(old); deleted->push_back(old);
} }
} }
...@@ -309,7 +309,7 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) { ...@@ -309,7 +309,7 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
last_reference = Unref(e); last_reference = Unref(e);
if (last_reference) { if (last_reference) {
usage_ -= e->charge; UsageSub(e);
} }
if (e->refs == 1 && e->InCache()) { if (e->refs == 1 && e->InCache()) {
// The item is still in cache, and nobody else holds a reference to it // The item is still in cache, and nobody else holds a reference to it
...@@ -321,7 +321,7 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) { ...@@ -321,7 +321,7 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) {
table_.Remove(e->key(), e->hash); table_.Remove(e->key(), e->hash);
e->SetInCache(false); e->SetInCache(false);
Unref(e); Unref(e);
usage_ -= e->charge; UsageSub(e);
last_reference = true; last_reference = true;
} else { } else {
// put the item on the list to be potentially freed // put the item on the list to be potentially freed
...@@ -386,11 +386,11 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value, ...@@ -386,11 +386,11 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
// note that the cache might get larger than its capacity if not enough // note that the cache might get larger than its capacity if not enough
// space was freed // space was freed
LRUHandle* old = table_.Insert(e); LRUHandle* old = table_.Insert(e);
usage_ += e->charge; UsageAdd(e);
if (old != nullptr) { if (old != nullptr) {
old->SetInCache(false); old->SetInCache(false);
if (Unref(old)) { if (Unref(old)) {
usage_ -= old->charge; UsageSub(old);
// old is on LRU because it's in cache and its reference count // old is on LRU because it's in cache and its reference count
// was just 1 (Unref returned 0) // was just 1 (Unref returned 0)
LRU_Remove(old); LRU_Remove(old);
...@@ -424,7 +424,7 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { ...@@ -424,7 +424,7 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
if (e != nullptr) { if (e != nullptr) {
last_reference = Unref(e); last_reference = Unref(e);
if (last_reference) { if (last_reference) {
usage_ -= e->charge; UsageSub(e);
} }
if (last_reference && e->InCache()) { if (last_reference && e->InCache()) {
LRU_Remove(e); LRU_Remove(e);
...@@ -462,22 +462,71 @@ std::string LRUCacheShard::GetPrintableOptions() const { ...@@ -462,22 +462,71 @@ std::string LRUCacheShard::GetPrintableOptions() const {
return std::string(buffer); return std::string(buffer);
} }
LRUCache::LRUCache(size_t capacity, int num_shard_bits, template <>
bool strict_capacity_limit, double high_pri_pool_ratio, LRUCacheBase<DiagnosableLRUCacheShard>::LRUCacheBase(
std::shared_ptr<MemoryAllocator> allocator) size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio, std::shared_ptr<MemoryAllocator> allocator,
size_t topk)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit, : ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
std::move(allocator)) { std::move(allocator)) {
num_shards_ = 1 << num_shard_bits; num_shards_ = 1 << num_shard_bits;
shards_ = reinterpret_cast<LRUCacheShard*>( shards_ =
port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_)); reinterpret_cast<DiagnosableLRUCacheShard*>(port::cacheline_aligned_alloc(
sizeof(DiagnosableLRUCacheShard) * num_shards_));
size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_; size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_;
for (int i = 0; i < num_shards_; i++) { for (int i = 0; i < num_shards_; i++) {
new (&shards_[i]) new (&shards_[i]) DiagnosableLRUCacheShard(per_shard, strict_capacity_limit,
LRUCacheShard(per_shard, strict_capacity_limit, high_pri_pool_ratio); high_pri_pool_ratio, topk);
}
}
template <class LRUCacheShardType>
LRUCacheBase<LRUCacheShardType>::LRUCacheBase(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio, std::shared_ptr<MemoryAllocator> allocator,
size_t /*topk*/)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
std::move(allocator)) {
num_shards_ = 1 << num_shard_bits;
shards_ = reinterpret_cast<LRUCacheShardType*>(
port::cacheline_aligned_alloc(sizeof(LRUCacheShardType) * num_shards_));
size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_;
for (int i = 0; i < num_shards_; i++) {
new (&shards_[i]) LRUCacheShardType(per_shard, strict_capacity_limit,
high_pri_pool_ratio);
}
}
template <>
std::string LRUCacheBase<DiagnosableLRUCacheShard>::DumpLRUCacheStatistics() {
std::string res;
res.append("Cache Summary: \n");
res.append("usage: " + std::to_string(GetUsage()) +
", pinned_usage: " + std::to_string(GetPinnedUsage()) + "\n");
for (int i = 0; i < num_shards_; i++) {
res.append("shard_" + std::to_string(i) + " : \n");
res.append(shards_[i].DumpDiagnoseInfo());
} }
return res;
}
template <class LRUCacheShardType>
std::string LRUCacheBase<LRUCacheShardType>::DumpLRUCacheStatistics() {
return "";
}
template <>
const char* LRUCacheBase<DiagnosableLRUCacheShard>::Name() const {
return "DiagnosableLRUCache";
} }
LRUCache::~LRUCache() { template <class LRUCacheShardType>
const char* LRUCacheBase<LRUCacheShardType>::Name() const {
return "LRUCache";
}
template <class LRUCacheShardType>
LRUCacheBase<LRUCacheShardType>::~LRUCacheBase() {
if (shards_ != nullptr) { if (shards_ != nullptr) {
assert(num_shards_ > 0); assert(num_shards_ > 0);
for (int i = 0; i < num_shards_; i++) { for (int i = 0; i < num_shards_; i++) {
...@@ -487,27 +536,33 @@ LRUCache::~LRUCache() { ...@@ -487,27 +536,33 @@ LRUCache::~LRUCache() {
} }
} }
CacheShard* LRUCache::GetShard(int shard) { template <class LRUCacheShardType>
CacheShard* LRUCacheBase<LRUCacheShardType>::GetShard(int shard) {
return reinterpret_cast<CacheShard*>(&shards_[shard]); return reinterpret_cast<CacheShard*>(&shards_[shard]);
} }
const CacheShard* LRUCache::GetShard(int shard) const { template <class LRUCacheShardType>
const CacheShard* LRUCacheBase<LRUCacheShardType>::GetShard(int shard) const {
return reinterpret_cast<CacheShard*>(&shards_[shard]); return reinterpret_cast<CacheShard*>(&shards_[shard]);
} }
void* LRUCache::Value(Handle* handle) { template <class LRUCacheShardType>
void* LRUCacheBase<LRUCacheShardType>::Value(Handle* handle) {
return reinterpret_cast<const LRUHandle*>(handle)->value; return reinterpret_cast<const LRUHandle*>(handle)->value;
} }
size_t LRUCache::GetCharge(Handle* handle) const { template <class LRUCacheShardType>
size_t LRUCacheBase<LRUCacheShardType>::GetCharge(Handle* handle) const {
return reinterpret_cast<const LRUHandle*>(handle)->charge; return reinterpret_cast<const LRUHandle*>(handle)->charge;
} }
uint32_t LRUCache::GetHash(Handle* handle) const { template <class LRUCacheShardType>
uint32_t LRUCacheBase<LRUCacheShardType>::GetHash(Handle* handle) const {
return reinterpret_cast<const LRUHandle*>(handle)->hash; return reinterpret_cast<const LRUHandle*>(handle)->hash;
} }
void LRUCache::DisownData() { template <class LRUCacheShardType>
void LRUCacheBase<LRUCacheShardType>::DisownData() {
// Do not drop data if compile with ASAN to suppress leak warning. // Do not drop data if compile with ASAN to suppress leak warning.
#if defined(__clang__) #if defined(__clang__)
#if !defined(__has_feature) || !__has_feature(address_sanitizer) #if !defined(__has_feature) || !__has_feature(address_sanitizer)
...@@ -522,7 +577,8 @@ void LRUCache::DisownData() { ...@@ -522,7 +577,8 @@ void LRUCache::DisownData() {
#endif // __clang__ #endif // __clang__
} }
size_t LRUCache::TEST_GetLRUSize() { template <class LRUCacheShardType>
size_t LRUCacheBase<LRUCacheShardType>::TEST_GetLRUSize() {
size_t lru_size_of_all_shards = 0; size_t lru_size_of_all_shards = 0;
for (int i = 0; i < num_shards_; i++) { for (int i = 0; i < num_shards_; i++) {
lru_size_of_all_shards += shards_[i].TEST_GetLRUSize(); lru_size_of_all_shards += shards_[i].TEST_GetLRUSize();
...@@ -530,25 +586,21 @@ size_t LRUCache::TEST_GetLRUSize() { ...@@ -530,25 +586,21 @@ size_t LRUCache::TEST_GetLRUSize() {
return lru_size_of_all_shards; return lru_size_of_all_shards;
} }
double LRUCache::GetHighPriPoolRatio() { //template <class LRUCacheShardType>
double result = 0.0; //double LRUCacheBase<LRUCacheShardType>::GetHighPriPoolRatio()
if (num_shards_ > 0) {
result = shards_[0].GetHighPriPoolRatio();
}
return result;
}
std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) { std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, return NewLRUCache(
cache_opts.strict_capacity_limit, cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.high_pri_pool_ratio, cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.memory_allocator); cache_opts.memory_allocator, cache_opts.is_diagnose, cache_opts.topk);
} }
std::shared_ptr<Cache> NewLRUCache( std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit, size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio, double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator) { std::shared_ptr<MemoryAllocator> memory_allocator, bool is_diagnose,
size_t topk) {
if (num_shard_bits >= 20) { if (num_shard_bits >= 20) {
return nullptr; // the cache cannot be sharded into too many fine pieces return nullptr; // the cache cannot be sharded into too many fine pieces
} }
...@@ -559,6 +611,11 @@ std::shared_ptr<Cache> NewLRUCache( ...@@ -559,6 +611,11 @@ std::shared_ptr<Cache> NewLRUCache(
if (num_shard_bits < 0) { if (num_shard_bits < 0) {
num_shard_bits = GetDefaultCacheShardBits(capacity); num_shard_bits = GetDefaultCacheShardBits(capacity);
} }
if (is_diagnose) {
return std::make_shared<DiagnosableLRUCache>(
capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio,
std::move(memory_allocator), topk);
}
return std::make_shared<LRUCache>(capacity, num_shard_bits, return std::make_shared<LRUCache>(capacity, num_shard_bits,
strict_capacity_limit, high_pri_pool_ratio, strict_capacity_limit, high_pri_pool_ratio,
std::move(memory_allocator)); std::move(memory_allocator));
......
...@@ -8,6 +8,9 @@ ...@@ -8,6 +8,9 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once #pragma once
#include <map>
#include <mutex>
#include <sstream>
#include <string> #include <string>
#include "cache/sharded_cache.h" #include "cache/sharded_cache.h"
...@@ -114,6 +117,137 @@ struct LRUHandle { ...@@ -114,6 +117,137 @@ struct LRUHandle {
} }
}; };
// One DiagnoseLRUCacheShard has a DiagnoseContext
struct DiagnoseContext {
DiagnoseContext(size_t topk_count) : k_(topk_count) {}
inline void UsageAdd(const LRUHandle* h) {
assert(h->refs > 0);
mu_.Lock();
TopKAdd(topk_in_all_, h);
mu_.Unlock();
element_new_count.fetch_add(1);
}
inline void UsageSub(const LRUHandle* h) {
assert(h->refs == 0);
mu_.Lock();
TopKSub(topk_in_hpp_, h);
mu_.Unlock();
element_del_count.fetch_add(1);
}
inline void LRUUsageAdd(const LRUHandle* h) {
mu_.Lock();
TopKAdd(topk_in_lru_, h);
mu_.Unlock();
insert_lru_count.fetch_add(1);
}
inline void LRUUsageSub(const LRUHandle* h) {
mu_.Lock();
TopKSub(topk_in_lru_, h);
mu_.Unlock();
remove_lru_count.fetch_add(1);
}
inline void HighPriUsageAdd(const LRUHandle* h) {
mu_.Lock();
TopKAdd(topk_in_hpp_, h);
mu_.Unlock();
high_pri_add_count.fetch_add(1);
}
inline void HighPriUsageSub(const LRUHandle* h) {
mu_.Lock();
TopKSub(topk_in_hpp_, h);
mu_.Unlock();
high_pri_del_count.fetch_add(1);
}
std::string Dump() {
std::stringstream stat;
stat << "total insert delta: "
<< GetDelta(element_new_count, last_element_new_count) << std::endl;
stat << "total delete delta: "
<< GetDelta(element_del_count, last_element_del_count) << std::endl;
stat << "lru insert delta: "
<< GetDelta(insert_lru_count, last_insert_lru_count) << std::endl;
stat << "lr delete delta: "
<< GetDelta(remove_lru_count, last_remove_lru_count) << std::endl;
stat << "highp insert delta: "
<< GetDelta(high_pri_add_count, last_high_pri_add_count) << std::endl;
stat << "highp delete delta: "
<< GetDelta(high_pri_del_count, last_high_pri_del_count) << std::endl;
stat << "topk in total: " << TopKInfo(topk_in_all_) << std::endl;
stat << "topk in lru : " << TopKInfo(topk_in_lru_) << std::endl;
stat << "topk in highp: " << TopKInfo(topk_in_hpp_) << std::endl;
return stat.str();
}
private:
inline void TopKAdd(std::map<size_t, size_t>& s, const LRUHandle* h) {
auto findit = s.find(h->charge);
if (findit != s.end()) {
findit->second++;
} else {
s[h->charge] = 1;
}
if (s.size() > k_) {
s.erase(s.begin());
}
}
// since there is no unique-id for every charge_size in topkset, here just
// delete one same charge_size if there are more than one
// TODO maybe use LRUHandle* as it unique_id, but LRUHandle* is a Pointer
// that may be free before it remove from set, which may cause
// head_use_after_free bug
inline void TopKSub(std::map<size_t, size_t>& s, const LRUHandle* h) {
auto findit = s.find(h->charge);
if (findit == s.end()) return;
findit->second--;
if (findit->second == 0) {
s.erase(findit);
}
}
uint64_t GetDelta(std::atomic<uint64_t>& cur, std::atomic<uint64_t>& last) {
auto tmp = cur.load();
auto delta = tmp - last.load();
last.store(tmp);
return tmp;
}
std::string TopKInfo(std::map<size_t, size_t>& topk) {
std::string res{"["};
for (auto e : topk) {
res.append("(" + std::to_string(e.first) + "," +
std::to_string(e.second) + ")" + ",");
}
if (res.back() == ',') res.pop_back();
res.append("]");
return res;
}
std::atomic<uint64_t> insert_lru_count{0};
std::atomic<uint64_t> remove_lru_count{0};
std::atomic<uint64_t> element_new_count{0};
std::atomic<uint64_t> element_del_count{0};
std::atomic<uint64_t> high_pri_add_count{0};
std::atomic<uint64_t> high_pri_del_count{0};
std::atomic<uint64_t> last_insert_lru_count{0};
std::atomic<uint64_t> last_remove_lru_count{0};
std::atomic<uint64_t> last_element_new_count{0};
std::atomic<uint64_t> last_element_del_count{0};
std::atomic<uint64_t> last_high_pri_add_count{0};
std::atomic<uint64_t> last_high_pri_del_count{0};
std::map<size_t, size_t> topk_in_hpp_;
std::map<size_t, size_t> topk_in_all_;
std::map<size_t, size_t> topk_in_lru_;
// std::mutex mu_; // for topK update
mutable port::Mutex mu_;
const size_t k_;
};
// We provide our own simple hash table since it removes a whole bunch // We provide our own simple hash table since it removes a whole bunch
// of porting hacks and is also faster than some of the built-in hash // of porting hacks and is also faster than some of the built-in hash
// table implementations in some of the compiler/runtime combinations // table implementations in some of the compiler/runtime combinations
...@@ -209,6 +343,29 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard { ...@@ -209,6 +343,29 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard {
// Retrives high pri pool ratio // Retrives high pri pool ratio
double GetHighPriPoolRatio(); double GetHighPriPoolRatio();
protected:
virtual inline void HighPriPoolUsageAdd(const LRUHandle* h) {
high_pri_pool_usage_ += h->charge;
}
virtual inline void HighPriPoolUsageSub(const LRUHandle* h) {
high_pri_pool_usage_ -= h->charge;
}
virtual inline void UsageAdd(const LRUHandle* h) { usage_ += h->charge; }
virtual inline void UsageSub(const LRUHandle* h) { usage_ -= h->charge; }
virtual inline void LRUUsageAdd(const LRUHandle* h) {
lru_usage_ += h->charge;
}
virtual inline void LRUUsageSub(const LRUHandle* h) {
lru_usage_ -= h->charge;
}
// Memory size for entries in high-pri pool.
size_t high_pri_pool_usage_;
// Memory size for entries residing in the cache
size_t usage_;
// Memory size for entries residing only in the LRU list
size_t lru_usage_;
private: private:
void LRU_Remove(LRUHandle* e); void LRU_Remove(LRUHandle* e);
void LRU_Insert(LRUHandle* e); void LRU_Insert(LRUHandle* e);
...@@ -230,9 +387,6 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard { ...@@ -230,9 +387,6 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard {
// Initialized before use. // Initialized before use.
size_t capacity_; size_t capacity_;
// Memory size for entries in high-pri pool.
size_t high_pri_pool_usage_;
// Whether to reject insertion if cache reaches its full capacity. // Whether to reject insertion if cache reaches its full capacity.
bool strict_capacity_limit_; bool strict_capacity_limit_;
...@@ -264,40 +418,85 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard { ...@@ -264,40 +418,85 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard {
// ------------vvvvvvvvvvvvv----------- // ------------vvvvvvvvvvvvv-----------
LRUHandleTable table_; LRUHandleTable table_;
// Memory size for entries residing in the cache
size_t usage_;
// Memory size for entries residing only in the LRU list
size_t lru_usage_;
// mutex_ protects the following state. // mutex_ protects the following state.
// We don't count mutex_ as the cache's internal state so semantically we // We don't count mutex_ as the cache's internal state so semantically we
// don't mind mutex_ invoking the non-const actions. // don't mind mutex_ invoking the non-const actions.
mutable port::Mutex mutex_; mutable port::Mutex mutex_;
}; };
class LRUCache : public ShardedCache { class ALIGN_AS(CACHE_LINE_SIZE) DiagnosableLRUCacheShard
: public LRUCacheShard {
public: public:
LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, DiagnosableLRUCacheShard(size_t capacity, bool strict_capacity_limit,
double high_pri_pool_ratio, double high_pri_pool_ratio, size_t topk)
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr); : LRUCacheShard(capacity, strict_capacity_limit, high_pri_pool_ratio),
virtual ~LRUCache(); dc_(topk) {}
virtual const char* Name() const override { return "LRUCache"; }
inline void HighPriPoolUsageAdd(const LRUHandle* h) override {
high_pri_pool_usage_ += h->charge;
dc_.HighPriUsageAdd(h);
}
inline void HighPriPoolUsageSub(const LRUHandle* h) override {
high_pri_pool_usage_ -= h->charge;
dc_.HighPriUsageSub(h);
}
inline void UsageAdd(const LRUHandle* h) override {
usage_ += h->charge;
dc_.UsageAdd(h);
}
inline void UsageSub(const LRUHandle* h) override {
usage_ -= h->charge;
dc_.UsageSub(h);
}
inline void LRUUsageAdd(const LRUHandle* h) override {
lru_usage_ += h->charge;
dc_.LRUUsageAdd(h);
}
inline void LRUUsageSub(const LRUHandle* h) override {
lru_usage_ -= h->charge;
dc_.LRUUsageSub(h);
}
std::string DumpDiagnoseInfo() { return dc_.Dump(); }
private:
DiagnoseContext dc_;
};
template <class LRUCacheShardType>
class LRUCacheBase : public ShardedCache {
public:
LRUCacheBase(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr,
size_t topk = 10);
virtual ~LRUCacheBase();
virtual const char* Name() const override;
virtual CacheShard* GetShard(int shard) override; virtual CacheShard* GetShard(int shard) override;
virtual const CacheShard* GetShard(int shard) const override; virtual const CacheShard* GetShard(int shard) const override;
virtual void* Value(Handle* handle) override; virtual void* Value(Handle* handle) override;
virtual size_t GetCharge(Handle* handle) const override; virtual size_t GetCharge(Handle* handle) const override;
virtual uint32_t GetHash(Handle* handle) const override; virtual uint32_t GetHash(Handle* handle) const override;
virtual void DisownData() override; virtual void DisownData() override;
virtual std::string DumpLRUCacheStatistics();
// Retrieves number of elements in LRU, for unit test purpose only // Retrieves number of elements in LRU, for unit test purpose only
size_t TEST_GetLRUSize(); size_t TEST_GetLRUSize();
// Retrives high pri pool ratio // Retrives high pri pool ratio
double GetHighPriPoolRatio(); double GetHighPriPoolRatio() {
double result = 0.0;
if (num_shards_ > 0) {
result = shards_[0].GetHighPriPoolRatio();
}
return result;
}
private: private:
LRUCacheShard* shards_ = nullptr; LRUCacheShardType* shards_ = nullptr;
int num_shards_ = 0; int num_shards_ = 0;
}; };
using LRUCache = LRUCacheBase<LRUCacheShard>;
using DiagnosableLRUCache = LRUCacheBase<DiagnosableLRUCacheShard>;
} // namespace TERARKDB_NAMESPACE } // namespace TERARKDB_NAMESPACE
...@@ -62,6 +62,10 @@ struct LRUCacheOptions { ...@@ -62,6 +62,10 @@ struct LRUCacheOptions {
// BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority. // BlockBasedTableOptions::cache_index_and_filter_blocks_with_high_priority.
double high_pri_pool_ratio = 0.0; double high_pri_pool_ratio = 0.0;
bool is_diagnose = false;
size_t topk = 10;
// If non-nullptr will use this allocator instead of system allocator when // If non-nullptr will use this allocator instead of system allocator when
// allocating memory for cache blocks. Call this method before you start using // allocating memory for cache blocks. Call this method before you start using
// the cache! // the cache!
...@@ -110,7 +114,8 @@ struct LIRSCacheOptions { ...@@ -110,7 +114,8 @@ struct LIRSCacheOptions {
extern std::shared_ptr<Cache> NewLRUCache( extern std::shared_ptr<Cache> NewLRUCache(
size_t capacity, int num_shard_bits = -1, size_t capacity, int num_shard_bits = -1,
bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0, bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.0,
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr); std::shared_ptr<MemoryAllocator> memory_allocator = nullptr,
bool is_diagnose = false, size_t topk = 10);
extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts); extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
......
...@@ -4,9 +4,10 @@ project(rocksdb_test) ...@@ -4,9 +4,10 @@ project(rocksdb_test)
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -DROCKSDB_PLATFORM_POSIX -DOS_MACOSX -DDebugBreak=__builtin_debugtrap -O3 -g3") #set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -DROCKSDB_PLATFORM_POSIX -DOS_MACOSX -DDebugBreak=__builtin_debugtrap -O3 -g3")
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1y -DROCKSDB_PLATFORM_POSIX -D_DEBUG=1 -DDebugBreak=__builtin_trap -O2 -g3 -march=haswell -fsanitize=address") #set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1y -DROCKSDB_PLATFORM_POSIX -D_DEBUG=1 -DDebugBreak=__builtin_trap -O2 -g3 -march=haswell -fsanitize=address")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -DROCKSDB_PLATFORM_POSIX -D_DEBUG=1 -DDebugBreak=__builtin_trap -O0 -g3 -march=haswell -fsanitize=address") #set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -DROCKSDB_PLATFORM_POSIX -D_DEBUG=1 -DDebugBreak=__builtin_trap -O0 -g3 -march=haswell")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DROCKSDB_PLATFORM_POSIX -DDebugBreak=__builtin_trap")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address") #set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address")
add_executable(rocksdb_test chaos_test.cc) add_executable(rocksdb_test chaos_test.cc)
......
...@@ -43,6 +43,19 @@ class ChaosTest { ...@@ -43,6 +43,19 @@ class ChaosTest {
~ChaosTest() {} ~ChaosTest() {}
void DumpCacheStatistics() {
auto dcache = dynamic_cast<DiagnosableLRUCache *>(bbto.block_cache.get());
if (dcache == nullptr) {
return;
}
while (true) {
auto info = dcache->DumpLRUCacheStatistics();
std::cout << info << std::endl;
using namespace std::chrono;
std::this_thread::sleep_for(60s);
}
}
void set_options() { void set_options() {
options.atomic_flush = false; options.atomic_flush = false;
options.allow_mmap_reads = false; options.allow_mmap_reads = false;
...@@ -117,7 +130,8 @@ class ChaosTest { ...@@ -117,7 +130,8 @@ class ChaosTest {
bbto.pin_top_level_index_and_filter = true; bbto.pin_top_level_index_and_filter = true;
bbto.pin_l0_filter_and_index_blocks_in_cache = true; bbto.pin_l0_filter_and_index_blocks_in_cache = true;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, true)); bbto.filter_policy.reset(NewBloomFilterPolicy(10, true));
bbto.block_cache = NewLRUCache(4ULL << 30, 6, false); //bbto.block_cache = NewLRUCache(4ULL << 30, 6, false);
bbto.block_cache = NewLRUCache(4ULL << 30, 6, false, 0.0, nullptr, true);
options.compaction_pri = kMinOverlappingRatio; options.compaction_pri = kMinOverlappingRatio;
options.compression = kZSTD; options.compression = kZSTD;
...@@ -798,6 +812,8 @@ class ChaosTest { ...@@ -798,6 +812,8 @@ class ChaosTest {
options.compaction_style = TERARKDB_NAMESPACE::kCompactionStyleLevel; options.compaction_style = TERARKDB_NAMESPACE::kCompactionStyleLevel;
options.write_buffer_size = size_t(file_size_base / 1.1); options.write_buffer_size = size_t(file_size_base / 1.1);
options.enable_lazy_compaction = true; options.enable_lazy_compaction = true;
//bbto.block_cache = NewLRUCache(4ULL << 30, 6, false);
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbto));
cfDescriptors.emplace_back("level" + std::to_string(i), options); cfDescriptors.emplace_back("level" + std::to_string(i), options);
} }
if (flags_ & TestWorker) { if (flags_ & TestWorker) {
...@@ -973,6 +989,7 @@ int main(int argc, char **argv) { ...@@ -973,6 +989,7 @@ int main(int argc, char **argv) {
for (int j = 0; j < write_thread; ++j) { for (int j = 0; j < write_thread; ++j) {
thread_vec.emplace_back([&test, j] { test.WriteFunc(j); }); thread_vec.emplace_back([&test, j] { test.WriteFunc(j); });
} }
thread_vec.emplace_back([&test]() { test.DumpCacheStatistics(); });
for (auto &t : thread_vec) { for (auto &t : thread_vec) {
t.join(); t.join();
} }
......
...@@ -18,6 +18,7 @@ namespace gujia { ...@@ -18,6 +18,7 @@ namespace gujia {
typedef struct epoll_event Event; typedef struct epoll_event Event;
} }
#include <cache/lru_cache.h>
#include <rocksdb/compaction_filter.h> #include <rocksdb/compaction_filter.h>
#include <rocksdb/convenience.h> #include <rocksdb/convenience.h>
#include <rocksdb/db.h> #include <rocksdb/db.h>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册