diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index aed69e04790e4a1c2249583d2f2faea3176339c0..c1bf84016073b2e24d8ef95ae448889d4de2aaf4 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -32,6 +32,7 @@ Please mark all change in change log and use the ticket from JIRA. - MS-611 - Add resources validity check in ResourceMgr - MS-619 - Add optimizer class in scheduler - MS-614 - Preload table at startup +- MS-626 - Refactor DataObj to support cache any type data ## New Feature diff --git a/cpp/src/cache/Cache.inl b/cpp/src/cache/Cache.inl index 2ab6b3ffa66480612e92056d62eefbe03eb809c7..3a60dd288f4a83dfaababdc49448ec5d90817a0a 100644 --- a/cpp/src/cache/Cache.inl +++ b/cpp/src/cache/Cache.inl @@ -86,11 +86,11 @@ Cache::insert(const std::string &key, const ItemObj &item) { //if key already exist, subtract old item size if (lru_.exists(key)) { const ItemObj &old_item = lru_.get(key); - usage_ -= old_item->size(); + usage_ -= old_item->Size(); } //plus new item size - usage_ += item->size(); + usage_ += item->Size(); } //if usage exceed capacity, free some items @@ -106,7 +106,7 @@ Cache::insert(const std::string &key, const ItemObj &item) { std::lock_guard lock(mutex_); lru_.put(key, item); - SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->size() + SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->Size() << " bytes into cache, usage: " << usage_ << " bytes"; } } @@ -120,9 +120,9 @@ Cache::erase(const std::string &key) { } const ItemObj &old_item = lru_.get(key); - usage_ -= old_item->size(); + usage_ -= old_item->Size(); - SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->size(); + SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size(); lru_.erase(key); } @@ -160,7 +160,7 @@ Cache::free_memory() { auto &obj_ptr = it->second; key_array.emplace(key); - released_size += obj_ptr->size(); + released_size += obj_ptr->Size(); ++it; } } diff --git a/cpp/src/cache/CpuCacheMgr.cpp b/cpp/src/cache/CpuCacheMgr.cpp index 7cfe59e72e58b3bcd91b75a00df87e8f4e030878..7dcecba39bd31a817b504a9be3b54f7476d7046c 100644 --- a/cpp/src/cache/CpuCacheMgr.cpp +++ b/cpp/src/cache/CpuCacheMgr.cpp @@ -59,14 +59,10 @@ CpuCacheMgr::GetInstance() { return &s_mgr; } -engine::VecIndexPtr +DataObjPtr CpuCacheMgr::GetIndex(const std::string& key) { DataObjPtr obj = GetItem(key); - if (obj != nullptr) { - return obj->data(); - } - - return nullptr; + return obj; } } // namespace cache diff --git a/cpp/src/cache/CpuCacheMgr.h b/cpp/src/cache/CpuCacheMgr.h index 2f8c50991a9e50eadf60935bbcc40d3d78de3d4d..81d529f5565a98a98d75fa064450daa8a0084581 100644 --- a/cpp/src/cache/CpuCacheMgr.h +++ b/cpp/src/cache/CpuCacheMgr.h @@ -35,7 +35,7 @@ class CpuCacheMgr : public CacheMgr { static CpuCacheMgr* GetInstance(); - engine::VecIndexPtr + DataObjPtr GetIndex(const std::string& key); }; diff --git a/cpp/src/cache/DataObj.h b/cpp/src/cache/DataObj.h index eb58b708250cbc221361f2b782ac8042a9a8d8a4..abd504f59017a789fb53bd9b8c90053eeae75a5e 100644 --- a/cpp/src/cache/DataObj.h +++ b/cpp/src/cache/DataObj.h @@ -17,7 +17,6 @@ #pragma once -#include "src/wrapper/VecIndex.h" #include @@ -26,38 +25,9 @@ namespace cache { class DataObj { public: - explicit DataObj(const engine::VecIndexPtr& index) : index_(index) { - } + virtual int64_t + Size() = 0; - DataObj(const engine::VecIndexPtr& index, int64_t size) : index_(index), size_(size) { - } - - engine::VecIndexPtr - data() { - return index_; - } - - const engine::VecIndexPtr& - data() const { - return index_; - } - - int64_t - size() const { - if (index_ == nullptr) { - return 0; - } - - if (size_ > 0) { - return size_; - } - - return index_->Count() * index_->Dimension() * sizeof(float); - } - - private: - engine::VecIndexPtr index_ = nullptr; - int64_t size_ = 0; }; using DataObjPtr = std::shared_ptr; diff --git a/cpp/src/cache/GpuCacheMgr.cpp b/cpp/src/cache/GpuCacheMgr.cpp index a9eff6d3c3ceb295675ff3ce35d6be000dceb308..aa19cd7fae58eb7b83ad1be5e26f80bb1b1812e5 100644 --- a/cpp/src/cache/GpuCacheMgr.cpp +++ b/cpp/src/cache/GpuCacheMgr.cpp @@ -71,14 +71,10 @@ GpuCacheMgr::GetInstance(uint64_t gpu_id) { } } -engine::VecIndexPtr +DataObjPtr GpuCacheMgr::GetIndex(const std::string& key) { DataObjPtr obj = GetItem(key); - if (obj != nullptr) { - return obj->data(); - } - - return nullptr; + return obj; } } // namespace cache diff --git a/cpp/src/cache/GpuCacheMgr.h b/cpp/src/cache/GpuCacheMgr.h index 8c0d4b95026ab4aea4684b55d593b2d8b3c20b4c..4d434b2cfbef4a46060691b45a387634d0ae0e57 100644 --- a/cpp/src/cache/GpuCacheMgr.h +++ b/cpp/src/cache/GpuCacheMgr.h @@ -35,7 +35,7 @@ class GpuCacheMgr : public CacheMgr { static GpuCacheMgr* GetInstance(uint64_t gpu_id); - engine::VecIndexPtr + DataObjPtr GetIndex(const std::string& key); private: diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index 51a31d1143de91f5e17816d9c3622e924f12018b..5f43b41dd2fa90afd5cefc0f0cc7b011f84b95de 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -91,6 +91,60 @@ ExecutionEngineImpl::CreatetVecIndex(EngineType type) { return index; } +void +ExecutionEngineImpl::HybridLoad() { +// if (index_type_ != EngineType::FAISS_IVFSQ8Hybrid) { +// return; +// } +// +// const std::string key = location_ + ".quantizer"; +// std::vector gpus; +// +// // cache hit +// { +// int64_t selected = -1; +// void* quantizer = nullptr; +// for (auto& gpu : gpus) { +// auto cache = cache::GpuCacheMgr::GetInstance(gpu); +// if (auto quan = cache->GetIndex(key)) { +// selected = gpu; +// quantizer = quan; +// } +// } +// +// if (selected != -1) { +// // set quantizer into index; +// return; +// } +// } +// +// // cache miss +// { +// std::vector all_free_mem; +// for (auto& gpu : gpus) { +// auto cache = cache::GpuCacheMgr::GetInstance(gpu); +// auto free_mem = cache->CacheCapacity() - cache->CacheUsage(); +// all_free_mem.push_back(free_mem); +// } +// +// auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end()); +// auto best = std::distance(all_free_mem.begin(), max_e); +// +// // load to best device; +// // cache quantizer +// } +// +// // if index_type == Hybrid +// +// // 1. quantizer in which gpu +// +// // 2.1 which gpu cache best +// +// // 2.2 load to that gpu cache +// +// // set quantizer into index +} + Status ExecutionEngineImpl::AddWithIds(int64_t n, const float* xdata, const int64_t* xids) { auto status = index_->Add(n, xdata, xids); @@ -133,7 +187,7 @@ ExecutionEngineImpl::Serialize() { Status ExecutionEngineImpl::Load(bool to_cache) { - index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); + index_ = std::static_pointer_cast(cache::CpuCacheMgr::GetInstance()->GetIndex(location_)); bool already_in_cache = (index_ != nullptr); if (!already_in_cache) { try { @@ -161,7 +215,7 @@ ExecutionEngineImpl::Load(bool to_cache) { Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { - auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); + auto index = std::static_pointer_cast(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_)); bool already_in_cache = (index != nullptr); if (already_in_cache) { index_ = index; @@ -200,7 +254,7 @@ ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) { Status ExecutionEngineImpl::CopyToCpu() { - auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); + auto index = std::static_pointer_cast(cache::CpuCacheMgr::GetInstance()->GetIndex(location_)); bool already_in_cache = (index != nullptr); if (already_in_cache) { index_ = index; @@ -333,7 +387,7 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr Status ExecutionEngineImpl::Cache() { - cache::DataObjPtr obj = std::make_shared(index_, PhysicalSize()); + cache::DataObjPtr obj = std::static_pointer_cast(index_); milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj); return Status::OK(); @@ -341,7 +395,7 @@ ExecutionEngineImpl::Cache() { Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) { - cache::DataObjPtr obj = std::make_shared(index_, PhysicalSize()); + cache::DataObjPtr obj = std::static_pointer_cast(index_); milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj); return Status::OK(); diff --git a/cpp/src/db/engine/ExecutionEngineImpl.h b/cpp/src/db/engine/ExecutionEngineImpl.h index 56a584999439b87a962a1f762ac497f62621454b..f41072e89d7d91a578a8df9213e95f0ccae48777 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.h +++ b/cpp/src/db/engine/ExecutionEngineImpl.h @@ -107,6 +107,9 @@ class ExecutionEngineImpl : public ExecutionEngine { VecIndexPtr Load(const std::string& location); + void + HybridLoad(); + protected: VecIndexPtr index_ = nullptr; EngineType index_type_; diff --git a/cpp/src/wrapper/VecIndex.cpp b/cpp/src/wrapper/VecIndex.cpp index 627f3515e4c8ed456cf33e3d1616849b6a76e68d..90a58f588c6041149a5dbc062990005011f34eb4 100644 --- a/cpp/src/wrapper/VecIndex.cpp +++ b/cpp/src/wrapper/VecIndex.cpp @@ -34,6 +34,11 @@ namespace milvus { namespace engine { +int64_t +VecIndex::Size() { + return Count() * Dimension() * sizeof(float); +} + struct FileIOReader { std::fstream fs; std::string name; diff --git a/cpp/src/wrapper/VecIndex.h b/cpp/src/wrapper/VecIndex.h index d2088b64ef574dbb690145baf8a79d466dfc1e7e..63d4ef690322e2790e1334a4ef9670fbf380476f 100644 --- a/cpp/src/wrapper/VecIndex.h +++ b/cpp/src/wrapper/VecIndex.h @@ -20,6 +20,7 @@ #include #include +#include "cache/DataObj.h" #include "knowhere/common/BinarySet.h" #include "knowhere/common/Config.h" #include "utils/Status.h" @@ -48,7 +49,7 @@ class VecIndex; using VecIndexPtr = std::shared_ptr; -class VecIndex { +class VecIndex : public cache::DataObj { public: virtual Status BuildAll(const int64_t& nb, const float* xb, const int64_t* ids, const Config& cfg, const int64_t& nt = 0, @@ -81,6 +82,9 @@ class VecIndex { virtual int64_t Count() = 0; + int64_t + Size() override; + virtual knowhere::BinarySet Serialize() = 0; diff --git a/cpp/unittest/scheduler/test_scheduler.cpp b/cpp/unittest/scheduler/test_scheduler.cpp index 1238f906d1e2e3246d47ec7eed750032bef494bb..a107040a0b9b72be863766e4e2e47bdbc762d97b 100644 --- a/cpp/unittest/scheduler/test_scheduler.cpp +++ b/cpp/unittest/scheduler/test_scheduler.cpp @@ -141,7 +141,7 @@ insert_dummy_index_into_gpu_cache(uint64_t device_id) { mock_index->ntotal_ = 1000; engine::VecIndexPtr index(mock_index); - cache::DataObjPtr obj = std::make_shared(index); + cache::DataObjPtr obj = std::static_pointer_cast(index); cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj); } diff --git a/cpp/unittest/server/test_cache.cpp b/cpp/unittest/server/test_cache.cpp index 754f23f51be8548e178322df3e2ce06f0587e292..8774851c4362932f3fa7dc0ebf3e49ef253ece2c 100644 --- a/cpp/unittest/server/test_cache.cpp +++ b/cpp/unittest/server/test_cache.cpp @@ -145,7 +145,7 @@ TEST(CacheTest, CPU_CACHE_TEST) { for (uint64_t i = 0; i < item_count; i++) { //each vector is 1k byte, total size less than 1G ms::engine::VecIndexPtr mock_index = std::make_shared(256, 1000000); - ms::cache::DataObjPtr data_obj = std::make_shared(mock_index); + ms::cache::DataObjPtr data_obj = std::static_pointer_cast(mock_index); cpu_mgr->InsertItem("index_" + std::to_string(i), data_obj); } ASSERT_LT(cpu_mgr->ItemCount(), g_num); @@ -169,7 +169,7 @@ TEST(CacheTest, CPU_CACHE_TEST) { //each vector is 1k byte, total size less than 6G ms::engine::VecIndexPtr mock_index = std::make_shared(256, 6000000); - ms::cache::DataObjPtr data_obj = std::make_shared(mock_index); + ms::cache::DataObjPtr data_obj = std::static_pointer_cast(mock_index); cpu_mgr->InsertItem("index_6g", data_obj); ASSERT_TRUE(cpu_mgr->ItemExists("index_6g")); } @@ -183,7 +183,7 @@ TEST(CacheTest, GPU_CACHE_TEST) { for (int i = 0; i < 20; i++) { //each vector is 1k byte ms::engine::VecIndexPtr mock_index = std::make_shared(256, 1000); - ms::cache::DataObjPtr data_obj = std::make_shared(mock_index); + ms::cache::DataObjPtr data_obj = std::static_pointer_cast(mock_index); gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj); } @@ -196,8 +196,8 @@ TEST(CacheTest, GPU_CACHE_TEST) { // TODO(myh): use gpu index to mock //each vector is 1k byte, total size less than 2G ms::engine::VecIndexPtr mock_index = std::make_shared(256, 2000000); - ms::cache::DataObjPtr data_obj = std::make_shared(mock_index); - std::cout << data_obj->size() << std::endl; + ms::cache::DataObjPtr data_obj = std::static_pointer_cast(mock_index); + std::cout << data_obj->Size() << std::endl; gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj); } @@ -227,7 +227,7 @@ TEST(CacheTest, INVALID_TEST) { for (int i = 0; i < 20; i++) { //each vector is 1k byte ms::engine::VecIndexPtr mock_index = std::make_shared(256, 2); - ms::cache::DataObjPtr data_obj = std::make_shared(mock_index); + ms::cache::DataObjPtr data_obj = std::static_pointer_cast(mock_index); mgr.InsertItem("index_" + std::to_string(i), data_obj); } ASSERT_EQ(mgr.GetItem("index_0"), nullptr);