提交 77bc22ff 编写于 作者: G groot

add cache classes


Former-commit-id: 7a268e064088b6f4a759a714d6e7dd70d5c6dcfb
上级 c250d003
......@@ -4,8 +4,11 @@
# Proprietary and confidential.
#-------------------------------------------------------------------------------
AUX_SOURCE_DIRECTORY(./cache cache_files)
set(vecwise_engine_src
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
)
${cache_files}
cache/DataObj.h)
add_library(vecwise_engine SHARED ${vecwise_engine_src})
\ No newline at end of file
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "Cache.h"
#include <set>
namespace zilliz {
namespace vecwise {
namespace cache {
Cache::Cache(int64_t mem_capacity, uint64_t cache_max_count)
: usage_(0),
capacity_(mem_capacity*1024*1024*1024),
lru_(cache_max_count) {
// AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity)
}
size_t Cache::size() const {
std::lock_guard<std::mutex> lock(mutex_);
return lru_.size();
}
bool Cache::exists(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
return lru_.exists(key);
}
DataObjPtr Cache::get(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
if(!lru_.exists(key)){
return nullptr;
}
const CacheObjPtr& cache_obj = lru_.get(key);
return cache_obj->data_;
}
void Cache::insert(const std::string& key, const DataObjPtr& data_ptr) {
{
std::lock_guard<std::mutex> lock(mutex_);
/* if key already exist, over-write old data */
if (lru_.exists(key)) {
CacheObjPtr obj_ptr = lru_.get(key);
usage_ -= obj_ptr->data_->size();
obj_ptr->data_ = data_ptr;
usage_ += data_ptr->size();
} else {
CacheObjPtr obj_ptr(new CacheObj(data_ptr));
lru_.put(key, obj_ptr);
usage_ += data_ptr->size();
}
// AGENT_LOG_DEBUG << "Insert into LRU(" << (capacity_ > 0 ? std::to_string(usage_ * 100 / capacity_) : "Nan")
// << "%, +" << data_ptr->size() << ", " << usage_ << ", " << lru_.size() << "):"
// << " " << key;
}
if (usage_ > capacity_) {
// AGENT_LOG_TRACE << "Current usage " << usage_
// << " exceeds cache capacity " << capacity_
// << ", start free memory";
free_memory();
}
}
void Cache::insert(const std::string& key, const std::shared_ptr<char>& data, int64_t size) {
DataObjPtr ptr = std::make_shared<DataObj>(data, size);
insert(key, ptr);
}
void Cache::erase(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
if(!lru_.exists(key)){
return;
}
const CacheObjPtr& obj_ptr = lru_.get(key);
const DataObjPtr& data_ptr = obj_ptr->data_;
usage_ -= data_ptr->size();
// AGENT_LOG_DEBUG << "Erase from LRU(" << (capacity_ > 0 ? std::to_string(usage_*100/capacity_) : "Nan")
// << "%, -" << data_ptr->size() << ", " << usage_ << ", " << lru_.size() << "): "
// << (data_ptr->flags().get_flag(DataObjAttr::kPinned) ? "Pinned " : "")
// << (data_ptr->flags().get_flag(DataObjAttr::kValid) ? "Valid " : "")
// << "(ref:" << obj_ptr->ref_ << ") "
// << key;
lru_.erase(key);
}
void Cache::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
// AGENT_LOG_DEBUG << "Clear LRU !";
}
#if 0 /* caiyd 20190221, need more testing before enable */
void Cache::flush_to_file(const std::string& key, const CacheObjPtr& obj_ptr) {
if (!this->swap_enabled_) return;
const DataObjPtr data_ptr = obj_ptr->data();
if (data_ptr == nullptr || data_ptr->size() == 0) return;
if (data_ptr->ptr() == nullptr) return;
std::string name = std::to_string(reinterpret_cast<int64_t>(data_ptr.get()));
filesys::CreateDirectory(this->swap_path_);
/* write cache data to file */
obj_ptr->set_file_path(this->swap_path_ + "/" + name);
std::shared_ptr<arrow::io::OutputStream> outfile = nullptr;
filesys::OpenWritableFile(obj_ptr->file_path(), false, &outfile);
filesys::WriteFile(outfile, data_ptr->ptr().get(), data_ptr->size());
(void)outfile->Close();
AGENT_LOG_DEBUG << "Flush cache data: " << key << ", to file: " << obj_ptr->file_path();
/* free cache memory */
data_ptr->ptr().reset();
usage_ -= data_ptr->size();
}
void Cache::restore_from_file(const std::string& key, const CacheObjPtr& obj_ptr) {
if (!this->swap_enabled_) return;
const DataObjPtr data_ptr = obj_ptr->data();
if (data_ptr == nullptr || data_ptr->size() == 0) return;
std::shared_ptr<arrow::io::RandomAccessFile> infile = nullptr;
int64_t file_size, bytes_read;
/* load cache data from file */
if (!filesys::FileExist(obj_ptr->file_path())) {
THROW_AGENT_UNEXPECTED_ERROR("File not exist: " + obj_ptr->file_path());
}
filesys::OpenReadableFile(obj_ptr->file_path(), &infile);
infile->GetSize(&file_size);
if (data_ptr->size() != file_size) {
THROW_AGENT_UNEXPECTED_ERROR("File size not match: " + obj_ptr->file_path());
}
data_ptr->set_ptr(lib::gpu::MakeShared<char>(data_ptr->size(), lib::gpu::MallocHint::kUnifiedGlobal));
infile->Read(file_size, &bytes_read, data_ptr->ptr().get());
infile->Close();
AGENT_LOG_DEBUG << "Restore cache data: " << key << ", from file: " << obj_ptr->file_path();
/* clear file path */
obj_ptr->set_file_path("");
usage_ += data_ptr->size();
}
#endif
/* free memory space when CACHE occupation exceed its capacity */
void Cache::free_memory() {
if (usage_ <= capacity_) return;
int64_t threshhold = capacity_ * THRESHHOLD_PERCENT;
int64_t delta_size = usage_ - threshhold;
std::set<std::string> key_array;
int64_t released_size = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = lru_.rbegin();
while (it != lru_.rend() && released_size < delta_size) {
auto& key = it->first;
auto& obj_ptr = it->second;
const auto& data_ptr = obj_ptr->data_;
key_array.emplace(key);
released_size += data_ptr->size();
++it;
}
}
// AGENT_LOG_DEBUG << "to be released memory size: " << released_size;
for (auto& key : key_array) {
erase(key);
}
print();
}
void Cache::print() {
int64_t still_pinned_count = 0;
int64_t total_pinned_size = 0;
int64_t total_valid_empty_size = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto it = lru_.begin(); it != lru_.end(); ++it) {
auto& obj_ptr = it->second;
const auto& data_ptr = obj_ptr->data_;
if (data_ptr != nullptr) {
total_pinned_size += data_ptr->size();
++still_pinned_count;
} else {
total_valid_empty_size += data_ptr->size();
}
}
}
// AGENT_LOG_DEBUG << "[Still Pinned count]: " << still_pinned_count;
// AGENT_LOG_DEBUG << "[Pinned Memory total size(byte)]: " << total_pinned_size;
// AGENT_LOG_DEBUG << "[valid_empty total size(byte)]: " << total_valid_empty_size;
// AGENT_LOG_DEBUG << "[free memory size(byte)]: " << capacity_ - total_pinned_size - total_valid_empty_size;
}
} // cache
} // vecwise
} // zilliz
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <string>
#include <mutex>
#include <atomic>
#include "LRU.h"
#include "DataObj.h"
namespace zilliz {
namespace vecwise {
namespace cache {
const std::string SWAP_DIR = ".CACHE";
const float THRESHHOLD_PERCENT = 0.75;
class Cache {
private:
class CacheObj {
public:
CacheObj() = delete;
CacheObj(const DataObjPtr& data)
: data_(data) {
}
CacheObj(const std::shared_ptr<char>& data, int64_t size) {
data_ = std::make_shared<DataObj>(data, size);
}
public:
DataObjPtr data_ = nullptr;
};
using CacheObjPtr = std::shared_ptr<CacheObj>;
public:
//mem_capacity, units:GB
Cache(int64_t mem_capacity, uint64_t cache_max_count);
~Cache() = default;
int64_t usage() const { return usage_; }
int64_t capacity() const { return capacity_; }
size_t size() const;
bool exists(const std::string& key);
DataObjPtr get(const std::string& key);
void insert(const std::string& key, const DataObjPtr& data);
void insert(const std::string& key, const std::shared_ptr<char>& data, int64_t size);
void erase(const std::string& key);
void print();
void clear();
void free_memory();
private:
int64_t usage_;
int64_t capacity_;
LRU<std::string, CacheObjPtr> lru_;
mutable std::mutex mutex_;
};
using CachePtr = std::shared_ptr<Cache>;
} // cache
} // vecwise
} // zilliz
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "CacheMgr.h"
namespace zilliz {
namespace vecwise {
namespace cache {
CacheMgr::CacheMgr() {
//TODO: loada config to initialize cache
cache_ = std::make_shared<Cache>(16, 1UL<<32);
}
size_t CacheMgr::ItemCount() const {
if(cache_ == nullptr) {
return 0;
}
return cache_->size();
}
bool CacheMgr::IsExists(const std::string& key) {
if(cache_ == nullptr) {
return false;
}
return cache_->exists(key);
}
DataObjPtr CacheMgr::GetItem(const std::string& key) {
if(cache_ == nullptr) {
return nullptr;
}
return cache_->get(key);
}
void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
if(cache_ == nullptr) {
return;
}
cache_->insert(key, data);
}
void CacheMgr::InsertItem(const std::string& key, const std::shared_ptr<char>& data, int64_t size) {
if(cache_ == nullptr) {
return;
}
cache_->insert(key, data, size);
}
void CacheMgr::EraseItem(const std::string& key) {
if(cache_ == nullptr) {
return;
}
cache_->erase(key);
}
void CacheMgr::PrintInfo() {
if(cache_ == nullptr) {
return;
}
cache_->print();
}
void CacheMgr::ClearCache() {
if(cache_ == nullptr) {
return;
}
cache_->clear();
}
int64_t CacheMgr::CacheUsage() const {
if(cache_ == nullptr) {
return 0;
}
return cache_->usage();
}
int64_t CacheMgr::CacheCapacity() const {
if(cache_ == nullptr) {
return 0;
}
return cache_->capacity();
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include "Cache.h"
namespace zilliz {
namespace vecwise {
namespace cache {
class CacheMgr {
public:
static CacheMgr& GetInstance() {
static CacheMgr mgr;
return mgr;
}
size_t ItemCount() const;
bool IsExists(const std::string& key);
DataObjPtr GetItem(const std::string& key);
void InsertItem(const std::string& key, const DataObjPtr& data);
void InsertItem(const std::string& key, const std::shared_ptr<char>& data, int64_t size);
void EraseItem(const std::string& key);
void PrintInfo();
void ClearCache();
int64_t CacheUsage() const;
int64_t CacheCapacity() const;
private:
CacheMgr();
private:
CachePtr cache_;
};
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <memory>
namespace zilliz {
namespace vecwise {
namespace cache {
class DataObj {
public:
DataObj(const std::shared_ptr<char>& data, int64_t size)
: data_(data), size_(size)
{}
std::shared_ptr<char> data() { return data_; }
const std::shared_ptr<char>& data() const { return data_; }
int64_t size() const { return size_; }
private:
std::shared_ptr<char> data_ = nullptr;
int64_t size_ = 0;
};
using DataObjPtr = std::shared_ptr<DataObj>;
}
}
}
\ No newline at end of file
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <unordered_map>
#include <list>
#include <cstddef>
#include <stdexcept>
namespace zilliz {
namespace vecwise {
namespace cache {
template<typename key_t, typename value_t>
class LRU {
public:
typedef typename std::pair<key_t, value_t> key_value_pair_t;
typedef typename std::list<key_value_pair_t>::iterator list_iterator_t;
typedef typename std::list<key_value_pair_t>::reverse_iterator reverse_list_iterator_t;
LRU(size_t max_size) : _max_size(max_size) {}
void put(const key_t& key, const value_t& value) {
auto it = _cache_items_map.find(key);
_cache_items_list.push_front(key_value_pair_t(key, value));
if (it != _cache_items_map.end()) {
_cache_items_list.erase(it->second);
_cache_items_map.erase(it);
}
_cache_items_map[key] = _cache_items_list.begin();
if (_cache_items_map.size() > _max_size) {
auto last = _cache_items_list.end();
last--;
_cache_items_map.erase(last->first);
_cache_items_list.pop_back();
}
}
const value_t& get(const key_t& key) {
auto it = _cache_items_map.find(key);
if (it == _cache_items_map.end()) {
throw std::range_error("There is no such key in cache");
} else {
_cache_items_list.splice(_cache_items_list.begin(), _cache_items_list, it->second);
return it->second->second;
}
}
void erase(const key_t& key) {
auto it = _cache_items_map.find(key);
if (it != _cache_items_map.end()) {
_cache_items_list.erase(it->second);
_cache_items_map.erase(it);
}
}
bool exists(const key_t& key) const {
return _cache_items_map.find(key) != _cache_items_map.end();
}
size_t size() const {
return _cache_items_map.size();
}
list_iterator_t begin() {
_iter = _cache_items_list.begin();
return _iter;
}
list_iterator_t end() {
return _cache_items_list.end();
}
reverse_list_iterator_t rbegin() {
return _cache_items_list.rbegin();
}
reverse_list_iterator_t rend() {
return _cache_items_list.rend();
}
void clear() {
_cache_items_list.clear();
_cache_items_map.clear();
}
private:
std::list<key_value_pair_t> _cache_items_list;
std::unordered_map<key_t, list_iterator_t> _cache_items_map;
size_t _max_size;
list_iterator_t _iter;
};
} // cache
} // vecwise
} // zilliz
......@@ -16,4 +16,5 @@ set(unittest_libs
gmock_main
pthread)
add_subdirectory(cache)
add_subdirectory(log)
\ No newline at end of file
#-------------------------------------------------------------------------------
# Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
# Unauthorized copying of this file, via any medium is strictly prohibited.
# Proprietary and confidential.
#-------------------------------------------------------------------------------
include_directories(../../src)
set(cache_srcs
../../src/cache/Cache.cpp
../../src/cache/CacheMgr.cpp)
set(cache_test_src
${unittest_srcs}
${cache_srcs}
cache_tests.cpp)
add_executable(cache_tests ${cache_test_src})
target_link_libraries(cache_tests ${unittest_libs})
////////////////////////////////////////////////////////////////////////////////
// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include "cache/CacheMgr.h"
using namespace zilliz::vecwise;
namespace {
cache::DataObjPtr MakeData(int64_t size) {
auto data_ptr = std::shared_ptr<char>(new char[size], std::default_delete<char[]>());
return std::make_shared<cache::DataObj>(data_ptr, size);
}
#define MAKE_1GB_DATA MakeData(1*1024*1024*1024)
#define MAKE_100MB_DATA MakeData(100*1024*1024)
#define MAKE_1MB_DATA MakeData(1*1024*1024)
}
TEST(CacheTest, CACHE_TEST) {
cache::CacheMgr cache_mgr = cache::CacheMgr::GetInstance();
for(int i = 0; i < 10; i++) {
cache_mgr.InsertItem(std::to_string(i), MAKE_100MB_DATA);
}
ASSERT_EQ(cache_mgr.ItemCount(), 10);
std::string key = "test_data";
cache_mgr.InsertItem(key, MAKE_100MB_DATA);
cache::DataObjPtr data = cache_mgr.GetItem(key);
ASSERT_TRUE(data != nullptr);
ASSERT_TRUE(cache_mgr.IsExists(key));
ASSERT_EQ(data->size(), 100*1024*1024);
cache_mgr.EraseItem(key);
data = cache_mgr.GetItem(key);
ASSERT_TRUE(data == nullptr);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册