提交 ef37fc97 编写于 作者: S starlord

Merge branch 'branch-0.4.0' into 'branch-0.5.0'

refine cache code

See merge request groot/vecwise_engine!4

Former-commit-id: 1baa0f3ce6bb90667f090e2ee76f5c168a4ca76d
......@@ -6,35 +6,20 @@
#pragma once
#include "LRU.h"
#include "utils/Log.h"
#include <string>
#include <mutex>
#include <atomic>
#include "LRU.h"
#include "DataObj.h"
#include <set>
namespace zilliz {
namespace milvus {
namespace cache {
const std::string SWAP_DIR = ".CACHE";
template<typename ItemObj>
class Cache {
private:
class CacheObj {
public:
CacheObj() = delete;
CacheObj(const DataObjPtr& data)
: data_(data) {
}
public:
DataObjPtr data_ = nullptr;
};
using CacheObjPtr = std::shared_ptr<CacheObj>;
public:
//mem_capacity, units:GB
Cache(int64_t capacity_gb, uint64_t cache_max_count);
......@@ -49,11 +34,13 @@ public:
size_t size() const;
bool exists(const std::string& key);
DataObjPtr get(const std::string& key);
void insert(const std::string& key, const DataObjPtr& data);
ItemObj get(const std::string& key);
void insert(const std::string& key, const ItemObj& item);
void erase(const std::string& key);
void print();
void clear();
private:
void free_memory();
private:
......@@ -61,13 +48,12 @@ private:
int64_t capacity_;
double freemem_percent_;
LRU<std::string, CacheObjPtr> lru_;
LRU<std::string, ItemObj> lru_;
mutable std::mutex mutex_;
};
using CachePtr = std::shared_ptr<Cache>;
} // cache
} // milvus
} // zilliz
#include "cache/Cache.inl"
\ No newline at end of file
......@@ -4,10 +4,6 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "Cache.h"
#include "utils/Log.h"
#include <set>
namespace zilliz {
namespace milvus {
......@@ -15,7 +11,8 @@ namespace cache {
constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.85;
Cache::Cache(int64_t capacity, uint64_t cache_max_count)
template<typename ItemObj>
Cache<ItemObj>::Cache(int64_t capacity, uint64_t cache_max_count)
: usage_(0),
capacity_(capacity),
freemem_percent_(DEFAULT_THRESHHOLD_PERCENT),
......@@ -23,142 +20,106 @@ Cache::Cache(int64_t capacity, uint64_t cache_max_count)
// AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity)
}
void Cache::set_capacity(int64_t capacity) {
template<typename ItemObj>
void Cache<ItemObj>::set_capacity(int64_t capacity) {
if(capacity > 0) {
capacity_ = capacity;
free_memory();
}
}
size_t Cache::size() const {
template<typename ItemObj>
size_t Cache<ItemObj>::size() const {
std::lock_guard<std::mutex> lock(mutex_);
return lru_.size();
}
bool Cache::exists(const std::string& key) {
template<typename ItemObj>
bool Cache<ItemObj>::exists(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
return lru_.exists(key);
}
DataObjPtr Cache::get(const std::string& key) {
template<typename ItemObj>
ItemObj Cache<ItemObj>::get(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
if(!lru_.exists(key)){
return nullptr;
}
const CacheObjPtr& cache_obj = lru_.get(key);
return cache_obj->data_;
return lru_.get(key);
}
void Cache::insert(const std::string& key, const DataObjPtr& data_ptr) {
template<typename ItemObj>
void Cache<ItemObj>::insert(const std::string& key, const ItemObj& item) {
if(item == nullptr) {
return;
}
// if(item->size() > capacity_) {
// SERVER_LOG_ERROR << "Item size " << item->size()
// << " is too large to insert into cache, capacity " << capacity_;
// return;
// }
//calculate usage
{
std::lock_guard<std::mutex> lock(mutex_);
/* if key already exist, over-write old data */
//if key already exist, subtract old item size
if (lru_.exists(key)) {
CacheObjPtr obj_ptr = lru_.get(key);
usage_ -= obj_ptr->data_->size();
obj_ptr->data_ = data_ptr;
usage_ += data_ptr->size();
} else {
CacheObjPtr obj_ptr(new CacheObj(data_ptr));
lru_.put(key, obj_ptr);
usage_ += data_ptr->size();
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->size();
}
SERVER_LOG_DEBUG << "Insert " << key << " size:" << data_ptr->size()
<< " bytes into cache, usage: " << usage_ << " bytes";
//plus new item size
usage_ += item->size();
}
//if usage exceed capacity, free some items
if (usage_ > capacity_) {
SERVER_LOG_DEBUG << "Current usage " << usage_
<< " exceeds cache capacity " << capacity_
<< ", start free memory";
<< " exceeds cache capacity " << capacity_
<< ", start free memory";
free_memory();
}
//insert new item
{
std::lock_guard<std::mutex> lock(mutex_);
lru_.put(key, item);
SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->size()
<< " bytes into cache, usage: " << usage_ << " bytes";
}
}
void Cache::erase(const std::string& key) {
template<typename ItemObj>
void Cache<ItemObj>::erase(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
if(!lru_.exists(key)){
return;
}
const CacheObjPtr& obj_ptr = lru_.get(key);
const DataObjPtr& data_ptr = obj_ptr->data_;
usage_ -= data_ptr->size();
const ItemObj& old_item = lru_.get(key);
usage_ -= old_item->size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << data_ptr->size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->size();
lru_.erase(key);
}
void Cache::clear() {
template<typename ItemObj>
void Cache<ItemObj>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
SERVER_LOG_DEBUG << "Clear cache !";
}
#if 0 /* caiyd 20190221, need more testing before enable */
void Cache::flush_to_file(const std::string& key, const CacheObjPtr& obj_ptr) {
if (!this->swap_enabled_) return;
const DataObjPtr data_ptr = obj_ptr->data();
if (data_ptr == nullptr || data_ptr->size() == 0) return;
if (data_ptr->ptr() == nullptr) return;
std::string name = std::to_string(reinterpret_cast<int64_t>(data_ptr.get()));
filesys::CreateDirectory(this->swap_path_);
/* write cache data to file */
obj_ptr->set_file_path(this->swap_path_ + "/" + name);
std::shared_ptr<arrow::io::OutputStream> outfile = nullptr;
filesys::OpenWritableFile(obj_ptr->file_path(), false, &outfile);
filesys::WriteFile(outfile, data_ptr->ptr().get(), data_ptr->size());
(void)outfile->Close();
AGENT_LOG_DEBUG << "Flush cache data: " << key << ", to file: " << obj_ptr->file_path();
/* free cache memory */
data_ptr->ptr().reset();
usage_ -= data_ptr->size();
}
void Cache::restore_from_file(const std::string& key, const CacheObjPtr& obj_ptr) {
if (!this->swap_enabled_) return;
const DataObjPtr data_ptr = obj_ptr->data();
if (data_ptr == nullptr || data_ptr->size() == 0) return;
std::shared_ptr<arrow::io::RandomAccessFile> infile = nullptr;
int64_t file_size, bytes_read;
/* load cache data from file */
if (!filesys::FileExist(obj_ptr->file_path())) {
THROW_AGENT_UNEXPECTED_ERROR("File not exist: " + obj_ptr->file_path());
}
filesys::OpenReadableFile(obj_ptr->file_path(), &infile);
infile->GetSize(&file_size);
if (data_ptr->size() != file_size) {
THROW_AGENT_UNEXPECTED_ERROR("File size not match: " + obj_ptr->file_path());
}
data_ptr->set_ptr(lib::gpu::MakeShared<char>(data_ptr->size(), lib::gpu::MallocHint::kUnifiedGlobal));
infile->Read(file_size, &bytes_read, data_ptr->ptr().get());
infile->Close();
AGENT_LOG_DEBUG << "Restore cache data: " << key << ", from file: " << obj_ptr->file_path();
/* clear file path */
obj_ptr->set_file_path("");
usage_ += data_ptr->size();
}
#endif
/* free memory space when CACHE occupation exceed its capacity */
void Cache::free_memory() {
template<typename ItemObj>
void Cache<ItemObj>::free_memory() {
if (usage_ <= capacity_) return;
int64_t threshhold = capacity_ * freemem_percent_;
......@@ -177,10 +138,9 @@ void Cache::free_memory() {
while (it != lru_.rend() && released_size < delta_size) {
auto& key = it->first;
auto& obj_ptr = it->second;
const auto& data_ptr = obj_ptr->data_;
key_array.emplace(key);
released_size += data_ptr->size();
released_size += obj_ptr->size();
++it;
}
}
......@@ -194,7 +154,8 @@ void Cache::free_memory() {
print();
}
void Cache::print() {
template<typename ItemObj>
void Cache<ItemObj>::print() {
size_t cache_count = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
......
......@@ -7,22 +7,23 @@
#pragma once
#include "Cache.h"
#include "utils/Log.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace milvus {
namespace cache {
template<typename ItemObj>
class CacheMgr {
public:
virtual uint64_t ItemCount() const;
virtual bool ItemExists(const std::string& key);
virtual DataObjPtr GetItem(const std::string& key);
virtual engine::VecIndexPtr GetIndex(const std::string& key);
virtual ItemObj GetItem(const std::string& key);
virtual void InsertItem(const std::string& key, const DataObjPtr& data);
virtual void InsertItem(const std::string& key, const engine::VecIndexPtr& index);
virtual void InsertItem(const std::string& key, const ItemObj& data);
virtual void EraseItem(const std::string& key);
......@@ -39,6 +40,7 @@ protected:
virtual ~CacheMgr();
protected:
using CachePtr = std::shared_ptr<Cache<ItemObj>>;
CachePtr cache_;
};
......@@ -46,3 +48,5 @@ protected:
}
}
}
#include "cache/CacheMgr.inl"
\ No newline at end of file
......@@ -4,22 +4,21 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "utils/Log.h"
#include "CacheMgr.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace milvus {
namespace cache {
CacheMgr::CacheMgr() {
template<typename ItemObj>
CacheMgr<ItemObj>::CacheMgr() {
}
CacheMgr::~CacheMgr() {
template<typename ItemObj>
CacheMgr<ItemObj>::~CacheMgr() {
}
uint64_t CacheMgr::ItemCount() const {
template<typename ItemObj>
uint64_t CacheMgr<ItemObj>::ItemCount() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
......@@ -28,7 +27,8 @@ uint64_t CacheMgr::ItemCount() const {
return (uint64_t)(cache_->size());
}
bool CacheMgr::ItemExists(const std::string& key) {
template<typename ItemObj>
bool CacheMgr<ItemObj>::ItemExists(const std::string& key) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return false;
......@@ -37,7 +37,8 @@ bool CacheMgr::ItemExists(const std::string& key) {
return cache_->exists(key);
}
DataObjPtr CacheMgr::GetItem(const std::string& key) {
template<typename ItemObj>
ItemObj CacheMgr<ItemObj>::GetItem(const std::string& key) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return nullptr;
......@@ -46,16 +47,8 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) {
return cache_->get(key);
}
engine::VecIndexPtr CacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
if(obj != nullptr) {
return obj->data();
}
return nullptr;
}
void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
template<typename ItemObj>
void CacheMgr<ItemObj>::InsertItem(const std::string& key, const ItemObj& data) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......@@ -65,18 +58,8 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::InsertItem(const std::string& key, const engine::VecIndexPtr& index) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
DataObjPtr obj = std::make_shared<DataObj>(index);
cache_->insert(key, obj);
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::EraseItem(const std::string& key) {
template<typename ItemObj>
void CacheMgr<ItemObj>::EraseItem(const std::string& key) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......@@ -86,7 +69,8 @@ void CacheMgr::EraseItem(const std::string& key) {
server::Metrics::GetInstance().CacheAccessTotalIncrement();
}
void CacheMgr::PrintInfo() {
template<typename ItemObj>
void CacheMgr<ItemObj>::PrintInfo() {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......@@ -95,7 +79,8 @@ void CacheMgr::PrintInfo() {
cache_->print();
}
void CacheMgr::ClearCache() {
template<typename ItemObj>
void CacheMgr<ItemObj>::ClearCache() {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......@@ -104,7 +89,8 @@ void CacheMgr::ClearCache() {
cache_->clear();
}
int64_t CacheMgr::CacheUsage() const {
template<typename ItemObj>
int64_t CacheMgr<ItemObj>::CacheUsage() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
......@@ -113,7 +99,8 @@ int64_t CacheMgr::CacheUsage() const {
return cache_->usage();
}
int64_t CacheMgr::CacheCapacity() const {
template<typename ItemObj>
int64_t CacheMgr<ItemObj>::CacheCapacity() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
......@@ -122,7 +109,8 @@ int64_t CacheMgr::CacheCapacity() const {
return cache_->capacity();
}
void CacheMgr::SetCapacity(int64_t capacity) {
template<typename ItemObj>
void CacheMgr<ItemObj>::SetCapacity(int64_t capacity) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
......
......@@ -20,7 +20,7 @@ CpuCacheMgr::CpuCacheMgr() {
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
int64_t cap = config.GetInt64Value(server::CONFIG_CPU_CACHE_CAPACITY, 16);
cap *= unit;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL<<32);
double free_percent = config.GetDoubleValue(server::CACHE_FREE_PERCENT, 0.85);
if(free_percent > 0.0 && free_percent <= 1.0) {
......@@ -31,6 +31,20 @@ CpuCacheMgr::CpuCacheMgr() {
}
}
CpuCacheMgr* CpuCacheMgr::GetInstance() {
static CpuCacheMgr s_mgr;
return &s_mgr;
}
engine::VecIndexPtr CpuCacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
if(obj != nullptr) {
return obj->data();
}
return nullptr;
}
}
}
}
\ No newline at end of file
......@@ -6,22 +6,21 @@
#pragma once
#include "CacheMgr.h"
#include "DataObj.h"
namespace zilliz {
namespace milvus {
namespace cache {
class CpuCacheMgr : public CacheMgr {
class CpuCacheMgr : public CacheMgr<DataObjPtr> {
private:
CpuCacheMgr();
public:
//TODO: use smart pointer instead
static CacheMgr* GetInstance() {
static CpuCacheMgr s_mgr;
return &s_mgr;
}
static CpuCacheMgr* GetInstance();
engine::VecIndexPtr GetIndex(const std::string& key);
};
}
......
......@@ -25,7 +25,7 @@ GpuCacheMgr::GpuCacheMgr() {
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 0);
cap *= G_BYTE;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
cache_ = std::make_shared<Cache<DataObjPtr>>(cap, 1UL<<32);
double free_percent = config.GetDoubleValue(server::GPU_CACHE_FREE_PERCENT, 0.85);
if (free_percent > 0.0 && free_percent <= 1.0) {
......@@ -36,7 +36,7 @@ GpuCacheMgr::GpuCacheMgr() {
}
}
CacheMgr* GpuCacheMgr::GetInstance(uint64_t gpu_id) {
GpuCacheMgr* GpuCacheMgr::GetInstance(uint64_t gpu_id) {
if (instance_.find(gpu_id) == instance_.end()) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance_.find(gpu_id) == instance_.end()) {
......@@ -49,14 +49,13 @@ CacheMgr* GpuCacheMgr::GetInstance(uint64_t gpu_id) {
}
}
void GpuCacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
//TODO: copy data to gpu
if (cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
engine::VecIndexPtr GpuCacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key);
if(obj != nullptr) {
return obj->data();
}
cache_->insert(key, data);
return nullptr;
}
}
......
......@@ -5,6 +5,8 @@
////////////////////////////////////////////////////////////////////////////////
#include "CacheMgr.h"
#include "DataObj.h"
#include <unordered_map>
#include <memory>
......@@ -15,13 +17,13 @@ namespace cache {
class GpuCacheMgr;
using GpuCacheMgrPtr = std::shared_ptr<GpuCacheMgr>;
class GpuCacheMgr : public CacheMgr {
class GpuCacheMgr : public CacheMgr<DataObjPtr> {
public:
GpuCacheMgr();
static CacheMgr* GetInstance(uint64_t gpu_id);
static GpuCacheMgr* GetInstance(uint64_t gpu_id);
void InsertItem(const std::string& key, const DataObjPtr& data) override;
engine::VecIndexPtr GetIndex(const std::string& key);
private:
static std::mutex mutex_;
......
......@@ -124,7 +124,7 @@ Status ExecutionEngineImpl::Serialize() {
}
Status ExecutionEngineImpl::Load(bool to_cache) {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
if (!already_in_cache) {
try {
......@@ -151,7 +151,7 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
}
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
index_ = index;
......@@ -178,7 +178,7 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
}
Status ExecutionEngineImpl::CopyToCpu() {
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
index_ = index;
......@@ -221,7 +221,7 @@ Status ExecutionEngineImpl::Merge(const std::string &location) {
}
ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
auto to_merge = cache::CpuCacheMgr::GetInstance()->GetIndex(location);
if (!to_merge) {
try {
double physical_size = server::CommonUtil::GetFileSize(location);
......
......@@ -38,44 +38,8 @@ engine_config:
use_blas_threshold: 20
resource_config:
# resource list, length: 0~N
# please set a DISK resource and a CPU resource least, or system will not return query result.
#
# example:
# resource_name: # resource name, just using in connections below
# type: DISK # resource type, optional: DISK/CPU/GPU
# device_id: 0
# enable_executor: false # if is enable executor, optional: true, false
mode: simple
resources:
ssda:
type: DISK
device_id: 0
enable_executor: false
cpu:
type: CPU
device_id: 0
enable_executor: false
gpu0:
type: GPU
device_id: 0
enable_executor: true
gpu_resource_num: 2
pinned_memory: 300
temp_memory: 300
# connection list, length: 0~N
# example:
# connection_name:
# speed: 100 # unit: MS/s
# endpoint: ===
connections:
io:
speed: 500
endpoint: ssda===cpu
pcie0:
speed: 11000
endpoint: cpu===gpu0
# - cpu
- gpu0
......@@ -15,21 +15,27 @@ using namespace zilliz::milvus;
namespace {
class InvalidCacheMgr : public cache::CacheMgr {
class InvalidCacheMgr : public cache::CacheMgr<cache::DataObjPtr> {
public:
InvalidCacheMgr() {
}
};
class LessItemCacheMgr : public cache::CacheMgr {
class LessItemCacheMgr : public cache::CacheMgr<cache::DataObjPtr> {
public:
LessItemCacheMgr() {
cache_ = std::make_shared<cache::Cache>(1UL << 12, 10);
cache_ = std::make_shared<cache::Cache<cache::DataObjPtr>>(1UL << 12, 10);
}
};
class MockVecIndex : public engine::VecIndex {
public:
MockVecIndex(int64_t dim, int64_t total)
: dimension_(dim),
ntotal_(total){
}
virtual ErrorCode BuildAll(const long &nb,
const float *xb,
const long *ids,
......@@ -93,7 +99,7 @@ public:
}
public:
int64_t dimension_ = 512;
int64_t dimension_ = 256;
int64_t ntotal_ = 0;
};
......@@ -101,7 +107,7 @@ public:
TEST(CacheTest, DUMMY_TEST) {
engine::Config cfg;
MockVecIndex mock_index;
MockVecIndex mock_index(256, 1000);
mock_index.Dimension();
mock_index.Count();
mock_index.Add(1, nullptr, nullptr);
......@@ -118,29 +124,27 @@ TEST(CacheTest, DUMMY_TEST) {
}
TEST(CacheTest, CPU_CACHE_TEST) {
cache::CacheMgr *cpu_mgr = cache::CpuCacheMgr::GetInstance();
auto cpu_mgr = cache::CpuCacheMgr::GetInstance();
const int64_t gbyte = 1 << 30;
const int64_t gbyte = 1024*1024*1024;
int64_t g_num = 16;
int64_t cap = g_num * gbyte;
cpu_mgr->SetCapacity(cap);
ASSERT_EQ(cpu_mgr->CacheCapacity(), cap);
const int dim = 256;
for (int i = 0; i < 20; i++) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000000;//less 1G per index
engine::VecIndexPtr index(mock_index);
cpu_mgr->InsertItem("index_" + std::to_string(i), index);
uint64_t item_count = 20;
for (uint64_t i = 0; i < item_count; i++) {
//each vector is 1k byte, total size less than 1G
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000000);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
cpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
}
ASSERT_LT(cpu_mgr->ItemCount(), g_num);
auto obj = cpu_mgr->GetIndex("index_0");
ASSERT_TRUE(obj == nullptr);
obj = cpu_mgr->GetIndex("index_19");
obj = cpu_mgr->GetIndex("index_" + std::to_string(item_count - 1));
ASSERT_TRUE(obj != nullptr);
{
......@@ -154,30 +158,24 @@ TEST(CacheTest, CPU_CACHE_TEST) {
g_num = 5;
cpu_mgr->SetCapacity(g_num * gbyte);
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 6000000;//6G less
engine::VecIndexPtr index(mock_index);
cpu_mgr->InsertItem("index_6g", index);
ASSERT_EQ(cpu_mgr->ItemCount(), 0);//data greater than capacity can not be inserted sucessfully
//each vector is 1k byte, total size less than 6G
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 6000000);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
cpu_mgr->InsertItem("index_6g", data_obj);
ASSERT_TRUE(cpu_mgr->ItemExists("index_6g"));
}
cpu_mgr->PrintInfo();
}
TEST(CacheTest, GPU_CACHE_TEST) {
cache::CacheMgr* gpu_mgr = cache::GpuCacheMgr::GetInstance(0);
const int dim = 256;
auto gpu_mgr = cache::GpuCacheMgr::GetInstance(0);
for(int i = 0; i < 20; i++) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000;
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
gpu_mgr->InsertItem("index_" + std::to_string(i), obj);
//each vector is 1k byte
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
}
auto obj = gpu_mgr->GetItem("index_0");
......@@ -187,19 +185,13 @@ TEST(CacheTest, GPU_CACHE_TEST) {
for (auto i = 0; i < 3; i++) {
// TODO: use gpu index to mock
MockVecIndex *mock_index = new MockVecIndex();
mock_index->ntotal_ = 1000000; //2G
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(index);
//each vector is 1k byte, total size less than 2G
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2000000);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
std::cout << data_obj->size() <<std::endl;
gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
}
// ASSERT_EQ(gpu_mgr->ItemCount(), 2);
// auto obj0 = gpu_mgr->GetItem("index_0");
// ASSERT_EQ(obj0, nullptr);
// auto obj1 = gpu_mgr->GetItem("index_1");
// auto obj2 = gpu_mgr->GetItem("index_2");
gpu_mgr->ClearCache();
ASSERT_EQ(gpu_mgr->ItemCount(), 0);
......@@ -213,7 +205,7 @@ TEST(CacheTest, INVALID_TEST) {
ASSERT_EQ(mgr.GetItem("test"), nullptr);
mgr.InsertItem("test", cache::DataObjPtr());
mgr.InsertItem("test", engine::VecIndexPtr(nullptr));
mgr.InsertItem("test", nullptr);
mgr.EraseItem("test");
mgr.PrintInfo();
mgr.ClearCache();
......@@ -225,12 +217,10 @@ TEST(CacheTest, INVALID_TEST) {
{
LessItemCacheMgr mgr;
for(int i = 0; i < 20; i++) {
MockVecIndex* mock_index = new MockVecIndex();
mock_index->ntotal_ = 2;
engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index);
mgr.InsertItem("index_" + std::to_string(i), obj);
//each vector is 1k byte
engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(mock_index);
mgr.InsertItem("index_" + std::to_string(i), data_obj);
}
ASSERT_EQ(mgr.GetItem("index_0"), nullptr);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册