diff --git a/cpp/src/cache/Cache.cpp b/cpp/src/cache/Cache.cpp deleted file mode 100644 index 336bf4e82ce7c67b1ad80bee5de03860f9c102d2..0000000000000000000000000000000000000000 --- a/cpp/src/cache/Cache.cpp +++ /dev/null @@ -1,212 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved -// Unauthorized copying of this file, via any medium is strictly prohibited. -// Proprietary and confidential. -//////////////////////////////////////////////////////////////////////////////// - -#include "Cache.h" -#include "utils/Log.h" - -#include - -namespace zilliz { -namespace milvus { -namespace cache { - -constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.85; - -Cache::Cache(int64_t capacity, uint64_t cache_max_count) - : usage_(0), - capacity_(capacity), - freemem_percent_(DEFAULT_THRESHHOLD_PERCENT), - lru_(cache_max_count) { -// AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity) -} - -void Cache::set_capacity(int64_t capacity) { - if(capacity > 0) { - capacity_ = capacity; - free_memory(); - } -} - -size_t Cache::size() const { - std::lock_guard lock(mutex_); - return lru_.size(); -} - -bool Cache::exists(const std::string& key) { - std::lock_guard lock(mutex_); - return lru_.exists(key); -} - -DataObjPtr Cache::get(const std::string& key) { - std::lock_guard lock(mutex_); - if(!lru_.exists(key)){ - return nullptr; - } - - const CacheObjPtr& cache_obj = lru_.get(key); - return cache_obj->data_; -} - -void Cache::insert(const std::string& key, const DataObjPtr& data_ptr) { - { - std::lock_guard lock(mutex_); - - /* if key already exist, over-write old data */ - if (lru_.exists(key)) { - CacheObjPtr obj_ptr = lru_.get(key); - - usage_ -= obj_ptr->data_->size(); - obj_ptr->data_ = data_ptr; - usage_ += data_ptr->size(); - } else { - CacheObjPtr obj_ptr(new CacheObj(data_ptr)); - lru_.put(key, obj_ptr); - usage_ += data_ptr->size(); - } - - SERVER_LOG_DEBUG << "Insert " << key << " size:" << data_ptr->size() - << " bytes into cache, usage: " << usage_ << " bytes"; - } - - if (usage_ > capacity_) { - SERVER_LOG_DEBUG << "Current usage " << usage_ - << " exceeds cache capacity " << capacity_ - << ", start free memory"; - free_memory(); - } -} - -void Cache::erase(const std::string& key) { - std::lock_guard lock(mutex_); - if(!lru_.exists(key)){ - return; - } - - const CacheObjPtr& obj_ptr = lru_.get(key); - const DataObjPtr& data_ptr = obj_ptr->data_; - usage_ -= data_ptr->size(); - - SERVER_LOG_DEBUG << "Erase " << key << " size: " << data_ptr->size(); - - lru_.erase(key); -} - -void Cache::clear() { - std::lock_guard lock(mutex_); - lru_.clear(); - usage_ = 0; - SERVER_LOG_DEBUG << "Clear cache !"; -} - -#if 0 /* caiyd 20190221, need more testing before enable */ -void Cache::flush_to_file(const std::string& key, const CacheObjPtr& obj_ptr) { - if (!this->swap_enabled_) return; - - const DataObjPtr data_ptr = obj_ptr->data(); - - if (data_ptr == nullptr || data_ptr->size() == 0) return; - if (data_ptr->ptr() == nullptr) return; - - std::string name = std::to_string(reinterpret_cast(data_ptr.get())); - filesys::CreateDirectory(this->swap_path_); - - /* write cache data to file */ - obj_ptr->set_file_path(this->swap_path_ + "/" + name); - std::shared_ptr outfile = nullptr; - filesys::OpenWritableFile(obj_ptr->file_path(), false, &outfile); - filesys::WriteFile(outfile, data_ptr->ptr().get(), data_ptr->size()); - (void)outfile->Close(); - - AGENT_LOG_DEBUG << "Flush cache data: " << key << ", to file: " << obj_ptr->file_path(); - - /* free cache memory */ - data_ptr->ptr().reset(); - usage_ -= data_ptr->size(); -} - -void Cache::restore_from_file(const std::string& key, const CacheObjPtr& obj_ptr) { - if (!this->swap_enabled_) return; - - const DataObjPtr data_ptr = obj_ptr->data(); - if (data_ptr == nullptr || data_ptr->size() == 0) return; - - std::shared_ptr infile = nullptr; - int64_t file_size, bytes_read; - - /* load cache data from file */ - if (!filesys::FileExist(obj_ptr->file_path())) { - THROW_AGENT_UNEXPECTED_ERROR("File not exist: " + obj_ptr->file_path()); - } - filesys::OpenReadableFile(obj_ptr->file_path(), &infile); - infile->GetSize(&file_size); - if (data_ptr->size() != file_size) { - THROW_AGENT_UNEXPECTED_ERROR("File size not match: " + obj_ptr->file_path()); - } - data_ptr->set_ptr(lib::gpu::MakeShared(data_ptr->size(), lib::gpu::MallocHint::kUnifiedGlobal)); - infile->Read(file_size, &bytes_read, data_ptr->ptr().get()); - infile->Close(); - - AGENT_LOG_DEBUG << "Restore cache data: " << key << ", from file: " << obj_ptr->file_path(); - - /* clear file path */ - obj_ptr->set_file_path(""); - usage_ += data_ptr->size(); -} -#endif - -/* free memory space when CACHE occupation exceed its capacity */ -void Cache::free_memory() { - if (usage_ <= capacity_) return; - - int64_t threshhold = capacity_ * freemem_percent_; - int64_t delta_size = usage_ - threshhold; - if(delta_size <= 0) { - delta_size = 1;//ensure at least one item erased - } - - std::set key_array; - int64_t released_size = 0; - - { - std::lock_guard lock(mutex_); - - auto it = lru_.rbegin(); - while (it != lru_.rend() && released_size < delta_size) { - auto& key = it->first; - auto& obj_ptr = it->second; - const auto& data_ptr = obj_ptr->data_; - - key_array.emplace(key); - released_size += data_ptr->size(); - ++it; - } - } - - SERVER_LOG_DEBUG << "to be released memory size: " << released_size; - - for (auto& key : key_array) { - erase(key); - } - - print(); -} - -void Cache::print() { - size_t cache_count = 0; - { - std::lock_guard lock(mutex_); - cache_count = lru_.size(); - } - - SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count; - SERVER_LOG_DEBUG << "[Cache usage]: " << usage_ << " bytes"; - SERVER_LOG_DEBUG << "[Cache capacity]: " << capacity_ << " bytes"; -} - -} // cache -} // milvus -} // zilliz - diff --git a/cpp/src/cache/Cache.h b/cpp/src/cache/Cache.h index 4d6f32b9eb0af65ae744cbebab01d08dcf6f3905..d0df4bdcabba253292ecbf205dc54f0f0682cb49 100644 --- a/cpp/src/cache/Cache.h +++ b/cpp/src/cache/Cache.h @@ -6,35 +6,20 @@ #pragma once +#include "LRU.h" +#include "utils/Log.h" + #include #include #include - -#include "LRU.h" -#include "DataObj.h" +#include namespace zilliz { namespace milvus { namespace cache { -const std::string SWAP_DIR = ".CACHE"; - +template class Cache { -private: - class CacheObj { - public: - CacheObj() = delete; - - CacheObj(const DataObjPtr& data) - : data_(data) { - } - - public: - DataObjPtr data_ = nullptr; - }; - - using CacheObjPtr = std::shared_ptr; - public: //mem_capacity, units:GB Cache(int64_t capacity_gb, uint64_t cache_max_count); @@ -49,11 +34,13 @@ public: size_t size() const; bool exists(const std::string& key); - DataObjPtr get(const std::string& key); - void insert(const std::string& key, const DataObjPtr& data); + ItemObj get(const std::string& key); + void insert(const std::string& key, const ItemObj& item); void erase(const std::string& key); void print(); void clear(); + +private: void free_memory(); private: @@ -61,13 +48,12 @@ private: int64_t capacity_; double freemem_percent_; - LRU lru_; + LRU lru_; mutable std::mutex mutex_; }; -using CachePtr = std::shared_ptr; - } // cache } // milvus } // zilliz +#include "cache/Cache.inl" \ No newline at end of file diff --git a/cpp/src/cache/Cache.inl b/cpp/src/cache/Cache.inl new file mode 100644 index 0000000000000000000000000000000000000000..5ca1028318cfbdb16b73bae88f82f878f140d695 --- /dev/null +++ b/cpp/src/cache/Cache.inl @@ -0,0 +1,173 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved +// Unauthorized copying of this file, via any medium is strictly prohibited. +// Proprietary and confidential. +//////////////////////////////////////////////////////////////////////////////// + + +namespace zilliz { +namespace milvus { +namespace cache { + +constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.85; + +template +Cache::Cache(int64_t capacity, uint64_t cache_max_count) + : usage_(0), + capacity_(capacity), + freemem_percent_(DEFAULT_THRESHHOLD_PERCENT), + lru_(cache_max_count) { +// AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity) +} + +template +void Cache::set_capacity(int64_t capacity) { + if(capacity > 0) { + capacity_ = capacity; + free_memory(); + } +} + +template +size_t Cache::size() const { + std::lock_guard lock(mutex_); + return lru_.size(); +} + +template +bool Cache::exists(const std::string& key) { + std::lock_guard lock(mutex_); + return lru_.exists(key); +} + +template +ItemObj Cache::get(const std::string& key) { + std::lock_guard lock(mutex_); + if(!lru_.exists(key)){ + return nullptr; + } + + return lru_.get(key); +} + +template +void Cache::insert(const std::string& key, const ItemObj& item) { + if(item == nullptr) { + return; + } + +// if(item->size() > capacity_) { +// SERVER_LOG_ERROR << "Item size " << item->size() +// << " is too large to insert into cache, capacity " << capacity_; +// return; +// } + + //calculate usage + { + std::lock_guard lock(mutex_); + + //if key already exist, subtract old item size + if (lru_.exists(key)) { + const ItemObj& old_item = lru_.get(key); + usage_ -= old_item->size(); + } + + //plus new item size + usage_ += item->size(); + } + + //if usage exceed capacity, free some items + if (usage_ > capacity_) { + SERVER_LOG_DEBUG << "Current usage " << usage_ + << " exceeds cache capacity " << capacity_ + << ", start free memory"; + free_memory(); + } + + //insert new item + { + std::lock_guard lock(mutex_); + + lru_.put(key, item); + SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->size() + << " bytes into cache, usage: " << usage_ << " bytes"; + } +} + +template +void Cache::erase(const std::string& key) { + std::lock_guard lock(mutex_); + if(!lru_.exists(key)){ + return; + } + + const ItemObj& old_item = lru_.get(key); + usage_ -= old_item->size(); + + SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->size(); + + lru_.erase(key); +} + +template +void Cache::clear() { + std::lock_guard lock(mutex_); + lru_.clear(); + usage_ = 0; + SERVER_LOG_DEBUG << "Clear cache !"; +} + +/* free memory space when CACHE occupation exceed its capacity */ +template +void Cache::free_memory() { + if (usage_ <= capacity_) return; + + int64_t threshhold = capacity_ * freemem_percent_; + int64_t delta_size = usage_ - threshhold; + if(delta_size <= 0) { + delta_size = 1;//ensure at least one item erased + } + + std::set key_array; + int64_t released_size = 0; + + { + std::lock_guard lock(mutex_); + + auto it = lru_.rbegin(); + while (it != lru_.rend() && released_size < delta_size) { + auto& key = it->first; + auto& obj_ptr = it->second; + + key_array.emplace(key); + released_size += obj_ptr->size(); + ++it; + } + } + + SERVER_LOG_DEBUG << "to be released memory size: " << released_size; + + for (auto& key : key_array) { + erase(key); + } + + print(); +} + +template +void Cache::print() { + size_t cache_count = 0; + { + std::lock_guard lock(mutex_); + cache_count = lru_.size(); + } + + SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count; + SERVER_LOG_DEBUG << "[Cache usage]: " << usage_ << " bytes"; + SERVER_LOG_DEBUG << "[Cache capacity]: " << capacity_ << " bytes"; +} + +} // cache +} // milvus +} // zilliz + diff --git a/cpp/src/cache/CacheMgr.h b/cpp/src/cache/CacheMgr.h index b6f1ec8ef1b9e8f4787ff18a61b21475784112f3..8c5647a83a81655506621c808dee0c35896ce471 100644 --- a/cpp/src/cache/CacheMgr.h +++ b/cpp/src/cache/CacheMgr.h @@ -7,22 +7,23 @@ #pragma once #include "Cache.h" +#include "utils/Log.h" +#include "metrics/Metrics.h" namespace zilliz { namespace milvus { namespace cache { +template class CacheMgr { public: virtual uint64_t ItemCount() const; virtual bool ItemExists(const std::string& key); - virtual DataObjPtr GetItem(const std::string& key); - virtual engine::VecIndexPtr GetIndex(const std::string& key); + virtual ItemObj GetItem(const std::string& key); - virtual void InsertItem(const std::string& key, const DataObjPtr& data); - virtual void InsertItem(const std::string& key, const engine::VecIndexPtr& index); + virtual void InsertItem(const std::string& key, const ItemObj& data); virtual void EraseItem(const std::string& key); @@ -39,6 +40,7 @@ protected: virtual ~CacheMgr(); protected: + using CachePtr = std::shared_ptr>; CachePtr cache_; }; @@ -46,3 +48,5 @@ protected: } } } + +#include "cache/CacheMgr.inl" \ No newline at end of file diff --git a/cpp/src/cache/CacheMgr.cpp b/cpp/src/cache/CacheMgr.inl similarity index 64% rename from cpp/src/cache/CacheMgr.cpp rename to cpp/src/cache/CacheMgr.inl index 977c7e1c426e2b36daf0b75e84427637da766e02..05bb1eb08acdf07dac86351ca53e8ba5036ec32f 100644 --- a/cpp/src/cache/CacheMgr.cpp +++ b/cpp/src/cache/CacheMgr.inl @@ -4,22 +4,21 @@ // Proprietary and confidential. //////////////////////////////////////////////////////////////////////////////// -#include "utils/Log.h" -#include "CacheMgr.h" -#include "metrics/Metrics.h" - namespace zilliz { namespace milvus { namespace cache { -CacheMgr::CacheMgr() { +template +CacheMgr::CacheMgr() { } -CacheMgr::~CacheMgr() { +template +CacheMgr::~CacheMgr() { } -uint64_t CacheMgr::ItemCount() const { +template +uint64_t CacheMgr::ItemCount() const { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return 0; @@ -28,7 +27,8 @@ uint64_t CacheMgr::ItemCount() const { return (uint64_t)(cache_->size()); } -bool CacheMgr::ItemExists(const std::string& key) { +template +bool CacheMgr::ItemExists(const std::string& key) { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return false; @@ -37,7 +37,8 @@ bool CacheMgr::ItemExists(const std::string& key) { return cache_->exists(key); } -DataObjPtr CacheMgr::GetItem(const std::string& key) { +template +ItemObj CacheMgr::GetItem(const std::string& key) { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return nullptr; @@ -46,16 +47,8 @@ DataObjPtr CacheMgr::GetItem(const std::string& key) { return cache_->get(key); } -engine::VecIndexPtr CacheMgr::GetIndex(const std::string& key) { - DataObjPtr obj = GetItem(key); - if(obj != nullptr) { - return obj->data(); - } - - return nullptr; -} - -void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) { +template +void CacheMgr::InsertItem(const std::string& key, const ItemObj& data) { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return; @@ -65,18 +58,8 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) { server::Metrics::GetInstance().CacheAccessTotalIncrement(); } -void CacheMgr::InsertItem(const std::string& key, const engine::VecIndexPtr& index) { - if(cache_ == nullptr) { - SERVER_LOG_ERROR << "Cache doesn't exist"; - return; - } - - DataObjPtr obj = std::make_shared(index); - cache_->insert(key, obj); - server::Metrics::GetInstance().CacheAccessTotalIncrement(); -} - -void CacheMgr::EraseItem(const std::string& key) { +template +void CacheMgr::EraseItem(const std::string& key) { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return; @@ -86,7 +69,8 @@ void CacheMgr::EraseItem(const std::string& key) { server::Metrics::GetInstance().CacheAccessTotalIncrement(); } -void CacheMgr::PrintInfo() { +template +void CacheMgr::PrintInfo() { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return; @@ -95,7 +79,8 @@ void CacheMgr::PrintInfo() { cache_->print(); } -void CacheMgr::ClearCache() { +template +void CacheMgr::ClearCache() { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return; @@ -104,7 +89,8 @@ void CacheMgr::ClearCache() { cache_->clear(); } -int64_t CacheMgr::CacheUsage() const { +template +int64_t CacheMgr::CacheUsage() const { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return 0; @@ -113,7 +99,8 @@ int64_t CacheMgr::CacheUsage() const { return cache_->usage(); } -int64_t CacheMgr::CacheCapacity() const { +template +int64_t CacheMgr::CacheCapacity() const { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return 0; @@ -122,7 +109,8 @@ int64_t CacheMgr::CacheCapacity() const { return cache_->capacity(); } -void CacheMgr::SetCapacity(int64_t capacity) { +template +void CacheMgr::SetCapacity(int64_t capacity) { if(cache_ == nullptr) { SERVER_LOG_ERROR << "Cache doesn't exist"; return; diff --git a/cpp/src/cache/CpuCacheMgr.cpp b/cpp/src/cache/CpuCacheMgr.cpp index 167f91f5e596220c47457925b8cb6ed7763bbfab..69962919fa254242b19e64e1da15002e9c646201 100644 --- a/cpp/src/cache/CpuCacheMgr.cpp +++ b/cpp/src/cache/CpuCacheMgr.cpp @@ -20,7 +20,7 @@ CpuCacheMgr::CpuCacheMgr() { server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE); int64_t cap = config.GetInt64Value(server::CONFIG_CPU_CACHE_CAPACITY, 16); cap *= unit; - cache_ = std::make_shared(cap, 1UL<<32); + cache_ = std::make_shared>(cap, 1UL<<32); double free_percent = config.GetDoubleValue(server::CACHE_FREE_PERCENT, 0.85); if(free_percent > 0.0 && free_percent <= 1.0) { @@ -31,6 +31,20 @@ CpuCacheMgr::CpuCacheMgr() { } } +CpuCacheMgr* CpuCacheMgr::GetInstance() { + static CpuCacheMgr s_mgr; + return &s_mgr; +} + +engine::VecIndexPtr CpuCacheMgr::GetIndex(const std::string& key) { + DataObjPtr obj = GetItem(key); + if(obj != nullptr) { + return obj->data(); + } + + return nullptr; +} + } } } \ No newline at end of file diff --git a/cpp/src/cache/CpuCacheMgr.h b/cpp/src/cache/CpuCacheMgr.h index 39e33aef8918be49269b08995b554efcaa7780ec..2aee7501d09116e957e2d5f8eaec7e14b5fb7232 100644 --- a/cpp/src/cache/CpuCacheMgr.h +++ b/cpp/src/cache/CpuCacheMgr.h @@ -6,22 +6,21 @@ #pragma once #include "CacheMgr.h" +#include "DataObj.h" namespace zilliz { namespace milvus { namespace cache { -class CpuCacheMgr : public CacheMgr { +class CpuCacheMgr : public CacheMgr { private: CpuCacheMgr(); public: //TODO: use smart pointer instead - static CacheMgr* GetInstance() { - static CpuCacheMgr s_mgr; - return &s_mgr; - } + static CpuCacheMgr* GetInstance(); + engine::VecIndexPtr GetIndex(const std::string& key); }; } diff --git a/cpp/src/cache/GpuCacheMgr.cpp b/cpp/src/cache/GpuCacheMgr.cpp index 5c1afa3f5c962694f90de8e715b080ea71a004ee..6567f997adbbe1b07d4341a6fda1ed8ad2302910 100644 --- a/cpp/src/cache/GpuCacheMgr.cpp +++ b/cpp/src/cache/GpuCacheMgr.cpp @@ -25,7 +25,7 @@ GpuCacheMgr::GpuCacheMgr() { int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 0); cap *= G_BYTE; - cache_ = std::make_shared(cap, 1UL<<32); + cache_ = std::make_shared>(cap, 1UL<<32); double free_percent = config.GetDoubleValue(server::GPU_CACHE_FREE_PERCENT, 0.85); if (free_percent > 0.0 && free_percent <= 1.0) { @@ -36,7 +36,7 @@ GpuCacheMgr::GpuCacheMgr() { } } -CacheMgr* GpuCacheMgr::GetInstance(uint64_t gpu_id) { +GpuCacheMgr* GpuCacheMgr::GetInstance(uint64_t gpu_id) { if (instance_.find(gpu_id) == instance_.end()) { std::lock_guard lock(mutex_); if (instance_.find(gpu_id) == instance_.end()) { @@ -49,14 +49,13 @@ CacheMgr* GpuCacheMgr::GetInstance(uint64_t gpu_id) { } } -void GpuCacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) { - //TODO: copy data to gpu - if (cache_ == nullptr) { - SERVER_LOG_ERROR << "Cache doesn't exist"; - return; +engine::VecIndexPtr GpuCacheMgr::GetIndex(const std::string& key) { + DataObjPtr obj = GetItem(key); + if(obj != nullptr) { + return obj->data(); } - cache_->insert(key, data); + return nullptr; } } diff --git a/cpp/src/cache/GpuCacheMgr.h b/cpp/src/cache/GpuCacheMgr.h index 35756733f0a93e3abee6d57bbc258647228d3fbb..ee0ac68660dedec63ea253339ea5c4c4dd6bfc1e 100644 --- a/cpp/src/cache/GpuCacheMgr.h +++ b/cpp/src/cache/GpuCacheMgr.h @@ -5,6 +5,8 @@ //////////////////////////////////////////////////////////////////////////////// #include "CacheMgr.h" +#include "DataObj.h" + #include #include @@ -15,13 +17,13 @@ namespace cache { class GpuCacheMgr; using GpuCacheMgrPtr = std::shared_ptr; -class GpuCacheMgr : public CacheMgr { +class GpuCacheMgr : public CacheMgr { public: GpuCacheMgr(); - static CacheMgr* GetInstance(uint64_t gpu_id); + static GpuCacheMgr* GetInstance(uint64_t gpu_id); - void InsertItem(const std::string& key, const DataObjPtr& data) override; + engine::VecIndexPtr GetIndex(const std::string& key); private: static std::mutex mutex_; diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index 9ddb864e0d0d76a97ea354ef5b5ec8048d648c08..8a738347d20559e95c0c09a7b3da53c855abda57 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -124,7 +124,7 @@ Status ExecutionEngineImpl::Serialize() { } Status ExecutionEngineImpl::Load(bool to_cache) { - index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); + index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index_ != nullptr); if (!already_in_cache) { try { @@ -151,7 +151,7 @@ Status ExecutionEngineImpl::Load(bool to_cache) { } Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { - auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); + auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); bool already_in_cache = (index != nullptr); if (already_in_cache) { index_ = index; @@ -178,7 +178,7 @@ Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { } Status ExecutionEngineImpl::CopyToCpu() { - auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); + auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index != nullptr); if (already_in_cache) { index_ = index; @@ -221,7 +221,7 @@ Status ExecutionEngineImpl::Merge(const std::string &location) { } ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_; - auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location); + auto to_merge = cache::CpuCacheMgr::GetInstance()->GetIndex(location); if (!to_merge) { try { double physical_size = server::CommonUtil::GetFileSize(location); diff --git a/cpp/unittest/server/appendix/server_config.yaml b/cpp/unittest/server/appendix/server_config.yaml index 1a3dca9d97776567c8289ae675336a6ac77e8613..e4b3de31b016189746146d69be9d343fc14bad51 100644 --- a/cpp/unittest/server/appendix/server_config.yaml +++ b/cpp/unittest/server/appendix/server_config.yaml @@ -38,44 +38,8 @@ engine_config: use_blas_threshold: 20 resource_config: - # resource list, length: 0~N - # please set a DISK resource and a CPU resource least, or system will not return query result. - # - # example: - # resource_name: # resource name, just using in connections below - # type: DISK # resource type, optional: DISK/CPU/GPU - # device_id: 0 - # enable_executor: false # if is enable executor, optional: true, false - + mode: simple resources: - ssda: - type: DISK - device_id: 0 - enable_executor: false - - cpu: - type: CPU - device_id: 0 - enable_executor: false - - gpu0: - type: GPU - device_id: 0 - enable_executor: true - gpu_resource_num: 2 - pinned_memory: 300 - temp_memory: 300 - - # connection list, length: 0~N - # example: - # connection_name: - # speed: 100 # unit: MS/s - # endpoint: === - connections: - io: - speed: 500 - endpoint: ssda===cpu - pcie0: - speed: 11000 - endpoint: cpu===gpu0 + # - cpu + - gpu0 diff --git a/cpp/unittest/server/cache_test.cpp b/cpp/unittest/server/cache_test.cpp index 501d3f486f3f7c556cdc9e37ea3dc3c1511bcdaa..8293805209199c783f65c65f9c525b253fd1edca 100644 --- a/cpp/unittest/server/cache_test.cpp +++ b/cpp/unittest/server/cache_test.cpp @@ -15,21 +15,27 @@ using namespace zilliz::milvus; namespace { -class InvalidCacheMgr : public cache::CacheMgr { +class InvalidCacheMgr : public cache::CacheMgr { public: InvalidCacheMgr() { } }; -class LessItemCacheMgr : public cache::CacheMgr { +class LessItemCacheMgr : public cache::CacheMgr { public: LessItemCacheMgr() { - cache_ = std::make_shared(1UL << 12, 10); + cache_ = std::make_shared>(1UL << 12, 10); } }; class MockVecIndex : public engine::VecIndex { public: + MockVecIndex(int64_t dim, int64_t total) + : dimension_(dim), + ntotal_(total){ + + } + virtual ErrorCode BuildAll(const long &nb, const float *xb, const long *ids, @@ -93,7 +99,7 @@ public: } public: - int64_t dimension_ = 512; + int64_t dimension_ = 256; int64_t ntotal_ = 0; }; @@ -101,7 +107,7 @@ public: TEST(CacheTest, DUMMY_TEST) { engine::Config cfg; - MockVecIndex mock_index; + MockVecIndex mock_index(256, 1000); mock_index.Dimension(); mock_index.Count(); mock_index.Add(1, nullptr, nullptr); @@ -118,29 +124,27 @@ TEST(CacheTest, DUMMY_TEST) { } TEST(CacheTest, CPU_CACHE_TEST) { - cache::CacheMgr *cpu_mgr = cache::CpuCacheMgr::GetInstance(); + auto cpu_mgr = cache::CpuCacheMgr::GetInstance(); - const int64_t gbyte = 1 << 30; + const int64_t gbyte = 1024*1024*1024; int64_t g_num = 16; int64_t cap = g_num * gbyte; cpu_mgr->SetCapacity(cap); ASSERT_EQ(cpu_mgr->CacheCapacity(), cap); - const int dim = 256; - - for (int i = 0; i < 20; i++) { - MockVecIndex* mock_index = new MockVecIndex(); - mock_index->ntotal_ = 1000000;//less 1G per index - engine::VecIndexPtr index(mock_index); - - cpu_mgr->InsertItem("index_" + std::to_string(i), index); + uint64_t item_count = 20; + for (uint64_t i = 0; i < item_count; i++) { + //each vector is 1k byte, total size less than 1G + engine::VecIndexPtr mock_index = std::make_shared(256, 1000000); + cache::DataObjPtr data_obj = std::make_shared(mock_index); + cpu_mgr->InsertItem("index_" + std::to_string(i), data_obj); } ASSERT_LT(cpu_mgr->ItemCount(), g_num); auto obj = cpu_mgr->GetIndex("index_0"); ASSERT_TRUE(obj == nullptr); - obj = cpu_mgr->GetIndex("index_19"); + obj = cpu_mgr->GetIndex("index_" + std::to_string(item_count - 1)); ASSERT_TRUE(obj != nullptr); { @@ -154,30 +158,24 @@ TEST(CacheTest, CPU_CACHE_TEST) { g_num = 5; cpu_mgr->SetCapacity(g_num * gbyte); - MockVecIndex* mock_index = new MockVecIndex(); - mock_index->ntotal_ = 6000000;//6G less - engine::VecIndexPtr index(mock_index); - - cpu_mgr->InsertItem("index_6g", index); - ASSERT_EQ(cpu_mgr->ItemCount(), 0);//data greater than capacity can not be inserted sucessfully + //each vector is 1k byte, total size less than 6G + engine::VecIndexPtr mock_index = std::make_shared(256, 6000000); + cache::DataObjPtr data_obj = std::make_shared(mock_index); + cpu_mgr->InsertItem("index_6g", data_obj); + ASSERT_TRUE(cpu_mgr->ItemExists("index_6g")); } cpu_mgr->PrintInfo(); } TEST(CacheTest, GPU_CACHE_TEST) { - cache::CacheMgr* gpu_mgr = cache::GpuCacheMgr::GetInstance(0); - - const int dim = 256; + auto gpu_mgr = cache::GpuCacheMgr::GetInstance(0); for(int i = 0; i < 20; i++) { - MockVecIndex* mock_index = new MockVecIndex(); - mock_index->ntotal_ = 1000; - engine::VecIndexPtr index(mock_index); - - cache::DataObjPtr obj = std::make_shared(index); - - gpu_mgr->InsertItem("index_" + std::to_string(i), obj); + //each vector is 1k byte + engine::VecIndexPtr mock_index = std::make_shared(256, 1000); + cache::DataObjPtr data_obj = std::make_shared(mock_index); + gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj); } auto obj = gpu_mgr->GetItem("index_0"); @@ -187,19 +185,13 @@ TEST(CacheTest, GPU_CACHE_TEST) { for (auto i = 0; i < 3; i++) { // TODO: use gpu index to mock - MockVecIndex *mock_index = new MockVecIndex(); - mock_index->ntotal_ = 1000000; //2G - engine::VecIndexPtr index(mock_index); - cache::DataObjPtr data_obj = std::make_shared(index); + //each vector is 1k byte, total size less than 2G + engine::VecIndexPtr mock_index = std::make_shared(256, 2000000); + cache::DataObjPtr data_obj = std::make_shared(mock_index); std::cout << data_obj->size() <InsertItem("index_" + std::to_string(i), data_obj); } -// ASSERT_EQ(gpu_mgr->ItemCount(), 2); -// auto obj0 = gpu_mgr->GetItem("index_0"); -// ASSERT_EQ(obj0, nullptr); -// auto obj1 = gpu_mgr->GetItem("index_1"); -// auto obj2 = gpu_mgr->GetItem("index_2"); gpu_mgr->ClearCache(); ASSERT_EQ(gpu_mgr->ItemCount(), 0); @@ -213,7 +205,7 @@ TEST(CacheTest, INVALID_TEST) { ASSERT_EQ(mgr.GetItem("test"), nullptr); mgr.InsertItem("test", cache::DataObjPtr()); - mgr.InsertItem("test", engine::VecIndexPtr(nullptr)); + mgr.InsertItem("test", nullptr); mgr.EraseItem("test"); mgr.PrintInfo(); mgr.ClearCache(); @@ -225,12 +217,10 @@ TEST(CacheTest, INVALID_TEST) { { LessItemCacheMgr mgr; for(int i = 0; i < 20; i++) { - MockVecIndex* mock_index = new MockVecIndex(); - mock_index->ntotal_ = 2; - engine::VecIndexPtr index(mock_index); - - cache::DataObjPtr obj = std::make_shared(index); - mgr.InsertItem("index_" + std::to_string(i), obj); + //each vector is 1k byte + engine::VecIndexPtr mock_index = std::make_shared(256, 2); + cache::DataObjPtr data_obj = std::make_shared(mock_index); + mgr.InsertItem("index_" + std::to_string(i), data_obj); } ASSERT_EQ(mgr.GetItem("index_0"), nullptr); }