提交 3b107731 编写于 作者: W wxyu

MS-626 Refactor DataObj to support cache any type data


Former-commit-id: 7795e47deb81657c484d4d90acb49adf9ed4501c
上级 6b025329
...@@ -28,6 +28,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -28,6 +28,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-609 - Update task construct function - MS-609 - Update task construct function
- MS-611 - Add resources validity check in ResourceMgr - MS-611 - Add resources validity check in ResourceMgr
- MS-619 - Add optimizer class in scheduler - MS-619 - Add optimizer class in scheduler
- MS-626 - Refactor DataObj to support cache any type data
## New Feature ## New Feature
......
...@@ -86,11 +86,11 @@ Cache<ItemObj>::insert(const std::string &key, const ItemObj &item) { ...@@ -86,11 +86,11 @@ Cache<ItemObj>::insert(const std::string &key, const ItemObj &item) {
//if key already exist, subtract old item size //if key already exist, subtract old item size
if (lru_.exists(key)) { if (lru_.exists(key)) {
const ItemObj &old_item = lru_.get(key); const ItemObj &old_item = lru_.get(key);
usage_ -= old_item->size(); usage_ -= old_item->Size();
} }
//plus new item size //plus new item size
usage_ += item->size(); usage_ += item->Size();
} }
//if usage exceed capacity, free some items //if usage exceed capacity, free some items
...@@ -106,7 +106,7 @@ Cache<ItemObj>::insert(const std::string &key, const ItemObj &item) { ...@@ -106,7 +106,7 @@ Cache<ItemObj>::insert(const std::string &key, const ItemObj &item) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
lru_.put(key, item); lru_.put(key, item);
SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->size() SERVER_LOG_DEBUG << "Insert " << key << " size:" << item->Size()
<< " bytes into cache, usage: " << usage_ << " bytes"; << " bytes into cache, usage: " << usage_ << " bytes";
} }
} }
...@@ -120,9 +120,9 @@ Cache<ItemObj>::erase(const std::string &key) { ...@@ -120,9 +120,9 @@ Cache<ItemObj>::erase(const std::string &key) {
} }
const ItemObj &old_item = lru_.get(key); const ItemObj &old_item = lru_.get(key);
usage_ -= old_item->size(); usage_ -= old_item->Size();
SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->size(); SERVER_LOG_DEBUG << "Erase " << key << " size: " << old_item->Size();
lru_.erase(key); lru_.erase(key);
} }
...@@ -160,7 +160,7 @@ Cache<ItemObj>::free_memory() { ...@@ -160,7 +160,7 @@ Cache<ItemObj>::free_memory() {
auto &obj_ptr = it->second; auto &obj_ptr = it->second;
key_array.emplace(key); key_array.emplace(key);
released_size += obj_ptr->size(); released_size += obj_ptr->Size();
++it; ++it;
} }
} }
......
...@@ -59,14 +59,10 @@ CpuCacheMgr::GetInstance() { ...@@ -59,14 +59,10 @@ CpuCacheMgr::GetInstance() {
return &s_mgr; return &s_mgr;
} }
engine::VecIndexPtr DataObjPtr
CpuCacheMgr::GetIndex(const std::string& key) { CpuCacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key); DataObjPtr obj = GetItem(key);
if (obj != nullptr) { return obj;
return obj->data();
}
return nullptr;
} }
} // namespace cache } // namespace cache
......
...@@ -35,7 +35,7 @@ class CpuCacheMgr : public CacheMgr<DataObjPtr> { ...@@ -35,7 +35,7 @@ class CpuCacheMgr : public CacheMgr<DataObjPtr> {
static CpuCacheMgr* static CpuCacheMgr*
GetInstance(); GetInstance();
engine::VecIndexPtr DataObjPtr
GetIndex(const std::string& key); GetIndex(const std::string& key);
}; };
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
#pragma once #pragma once
#include "src/wrapper/VecIndex.h"
#include <memory> #include <memory>
...@@ -26,38 +25,9 @@ namespace cache { ...@@ -26,38 +25,9 @@ namespace cache {
class DataObj { class DataObj {
public: public:
explicit DataObj(const engine::VecIndexPtr& index) : index_(index) { virtual int64_t
} Size() = 0;
DataObj(const engine::VecIndexPtr& index, int64_t size) : index_(index), size_(size) {
}
engine::VecIndexPtr
data() {
return index_;
}
const engine::VecIndexPtr&
data() const {
return index_;
}
int64_t
size() const {
if (index_ == nullptr) {
return 0;
}
if (size_ > 0) {
return size_;
}
return index_->Count() * index_->Dimension() * sizeof(float);
}
private:
engine::VecIndexPtr index_ = nullptr;
int64_t size_ = 0;
}; };
using DataObjPtr = std::shared_ptr<DataObj>; using DataObjPtr = std::shared_ptr<DataObj>;
......
...@@ -71,14 +71,10 @@ GpuCacheMgr::GetInstance(uint64_t gpu_id) { ...@@ -71,14 +71,10 @@ GpuCacheMgr::GetInstance(uint64_t gpu_id) {
} }
} }
engine::VecIndexPtr DataObjPtr
GpuCacheMgr::GetIndex(const std::string& key) { GpuCacheMgr::GetIndex(const std::string& key) {
DataObjPtr obj = GetItem(key); DataObjPtr obj = GetItem(key);
if (obj != nullptr) { return obj;
return obj->data();
}
return nullptr;
} }
} // namespace cache } // namespace cache
......
...@@ -35,7 +35,7 @@ class GpuCacheMgr : public CacheMgr<DataObjPtr> { ...@@ -35,7 +35,7 @@ class GpuCacheMgr : public CacheMgr<DataObjPtr> {
static GpuCacheMgr* static GpuCacheMgr*
GetInstance(uint64_t gpu_id); GetInstance(uint64_t gpu_id);
engine::VecIndexPtr DataObjPtr
GetIndex(const std::string& key); GetIndex(const std::string& key);
private: private:
......
...@@ -91,6 +91,60 @@ ExecutionEngineImpl::CreatetVecIndex(EngineType type) { ...@@ -91,6 +91,60 @@ ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
return index; return index;
} }
void
ExecutionEngineImpl::HybridLoad() {
// if (index_type_ != EngineType::FAISS_IVFSQ8Hybrid) {
// return;
// }
//
// const std::string key = location_ + ".quantizer";
// std::vector<uint64_t> gpus;
//
// // cache hit
// {
// int64_t selected = -1;
// void* quantizer = nullptr;
// for (auto& gpu : gpus) {
// auto cache = cache::GpuCacheMgr::GetInstance(gpu);
// if (auto quan = cache->GetIndex(key)) {
// selected = gpu;
// quantizer = quan;
// }
// }
//
// if (selected != -1) {
// // set quantizer into index;
// return;
// }
// }
//
// // cache miss
// {
// std::vector<int64_t> all_free_mem;
// for (auto& gpu : gpus) {
// auto cache = cache::GpuCacheMgr::GetInstance(gpu);
// auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
// all_free_mem.push_back(free_mem);
// }
//
// auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
// auto best = std::distance(all_free_mem.begin(), max_e);
//
// // load to best device;
// // cache quantizer
// }
//
// // if index_type == Hybrid
//
// // 1. quantizer in which gpu
//
// // 2.1 which gpu cache best
//
// // 2.2 load to that gpu cache
//
// // set quantizer into index
}
Status Status
ExecutionEngineImpl::AddWithIds(int64_t n, const float* xdata, const int64_t* xids) { ExecutionEngineImpl::AddWithIds(int64_t n, const float* xdata, const int64_t* xids) {
auto status = index_->Add(n, xdata, xids); auto status = index_->Add(n, xdata, xids);
...@@ -133,7 +187,7 @@ ExecutionEngineImpl::Serialize() { ...@@ -133,7 +187,7 @@ ExecutionEngineImpl::Serialize() {
Status Status
ExecutionEngineImpl::Load(bool to_cache) { ExecutionEngineImpl::Load(bool to_cache) {
index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); index_ = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
bool already_in_cache = (index_ != nullptr); bool already_in_cache = (index_ != nullptr);
if (!already_in_cache) { if (!already_in_cache) {
try { try {
...@@ -161,7 +215,7 @@ ExecutionEngineImpl::Load(bool to_cache) { ...@@ -161,7 +215,7 @@ ExecutionEngineImpl::Load(bool to_cache) {
Status Status
ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_));
bool already_in_cache = (index != nullptr); bool already_in_cache = (index != nullptr);
if (already_in_cache) { if (already_in_cache) {
index_ = index; index_ = index;
...@@ -189,7 +243,7 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { ...@@ -189,7 +243,7 @@ ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
Status Status
ExecutionEngineImpl::CopyToCpu() { ExecutionEngineImpl::CopyToCpu() {
auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_); auto index = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
bool already_in_cache = (index != nullptr); bool already_in_cache = (index != nullptr);
if (already_in_cache) { if (already_in_cache) {
index_ = index; index_ = index;
...@@ -322,7 +376,7 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr ...@@ -322,7 +376,7 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr
Status Status
ExecutionEngineImpl::Cache() { ExecutionEngineImpl::Cache() {
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize()); cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj); milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
return Status::OK(); return Status::OK();
...@@ -330,7 +384,7 @@ ExecutionEngineImpl::Cache() { ...@@ -330,7 +384,7 @@ ExecutionEngineImpl::Cache() {
Status Status
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) { ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize()); cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj); milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
return Status::OK(); return Status::OK();
......
...@@ -104,6 +104,9 @@ class ExecutionEngineImpl : public ExecutionEngine { ...@@ -104,6 +104,9 @@ class ExecutionEngineImpl : public ExecutionEngine {
VecIndexPtr VecIndexPtr
Load(const std::string& location); Load(const std::string& location);
void
HybridLoad();
protected: protected:
VecIndexPtr index_ = nullptr; VecIndexPtr index_ = nullptr;
EngineType index_type_; EngineType index_type_;
......
...@@ -34,6 +34,11 @@ ...@@ -34,6 +34,11 @@
namespace milvus { namespace milvus {
namespace engine { namespace engine {
int64_t
VecIndex::Size() {
return Count() * Dimension() * sizeof(float);
}
struct FileIOReader { struct FileIOReader {
std::fstream fs; std::fstream fs;
std::string name; std::string name;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "cache/DataObj.h"
#include "knowhere/common/BinarySet.h" #include "knowhere/common/BinarySet.h"
#include "knowhere/common/Config.h" #include "knowhere/common/Config.h"
#include "utils/Status.h" #include "utils/Status.h"
...@@ -48,7 +49,7 @@ class VecIndex; ...@@ -48,7 +49,7 @@ class VecIndex;
using VecIndexPtr = std::shared_ptr<VecIndex>; using VecIndexPtr = std::shared_ptr<VecIndex>;
class VecIndex { class VecIndex : public cache::DataObj {
public: public:
virtual Status virtual Status
BuildAll(const int64_t& nb, const float* xb, const int64_t* ids, const Config& cfg, const int64_t& nt = 0, BuildAll(const int64_t& nb, const float* xb, const int64_t* ids, const Config& cfg, const int64_t& nt = 0,
...@@ -81,6 +82,9 @@ class VecIndex { ...@@ -81,6 +82,9 @@ class VecIndex {
virtual int64_t virtual int64_t
Count() = 0; Count() = 0;
int64_t
Size() override;
virtual knowhere::BinarySet virtual knowhere::BinarySet
Serialize() = 0; Serialize() = 0;
......
...@@ -141,7 +141,7 @@ insert_dummy_index_into_gpu_cache(uint64_t device_id) { ...@@ -141,7 +141,7 @@ insert_dummy_index_into_gpu_cache(uint64_t device_id) {
mock_index->ntotal_ = 1000; mock_index->ntotal_ = 1000;
engine::VecIndexPtr index(mock_index); engine::VecIndexPtr index(mock_index);
cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index); cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index);
cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj); cache::GpuCacheMgr::GetInstance(device_id)->InsertItem("location", obj);
} }
......
...@@ -145,7 +145,7 @@ TEST(CacheTest, CPU_CACHE_TEST) { ...@@ -145,7 +145,7 @@ TEST(CacheTest, CPU_CACHE_TEST) {
for (uint64_t i = 0; i < item_count; i++) { for (uint64_t i = 0; i < item_count; i++) {
//each vector is 1k byte, total size less than 1G //each vector is 1k byte, total size less than 1G
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000000); ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000000);
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index); ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
cpu_mgr->InsertItem("index_" + std::to_string(i), data_obj); cpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
} }
ASSERT_LT(cpu_mgr->ItemCount(), g_num); ASSERT_LT(cpu_mgr->ItemCount(), g_num);
...@@ -169,7 +169,7 @@ TEST(CacheTest, CPU_CACHE_TEST) { ...@@ -169,7 +169,7 @@ TEST(CacheTest, CPU_CACHE_TEST) {
//each vector is 1k byte, total size less than 6G //each vector is 1k byte, total size less than 6G
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 6000000); ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 6000000);
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index); ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
cpu_mgr->InsertItem("index_6g", data_obj); cpu_mgr->InsertItem("index_6g", data_obj);
ASSERT_TRUE(cpu_mgr->ItemExists("index_6g")); ASSERT_TRUE(cpu_mgr->ItemExists("index_6g"));
} }
...@@ -183,7 +183,7 @@ TEST(CacheTest, GPU_CACHE_TEST) { ...@@ -183,7 +183,7 @@ TEST(CacheTest, GPU_CACHE_TEST) {
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
//each vector is 1k byte //each vector is 1k byte
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000); ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 1000);
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index); ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj); gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
} }
...@@ -196,8 +196,8 @@ TEST(CacheTest, GPU_CACHE_TEST) { ...@@ -196,8 +196,8 @@ TEST(CacheTest, GPU_CACHE_TEST) {
// TODO(myh): use gpu index to mock // TODO(myh): use gpu index to mock
//each vector is 1k byte, total size less than 2G //each vector is 1k byte, total size less than 2G
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2000000); ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2000000);
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index); ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
std::cout << data_obj->size() << std::endl; std::cout << data_obj->Size() << std::endl;
gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj); gpu_mgr->InsertItem("index_" + std::to_string(i), data_obj);
} }
...@@ -227,7 +227,7 @@ TEST(CacheTest, INVALID_TEST) { ...@@ -227,7 +227,7 @@ TEST(CacheTest, INVALID_TEST) {
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
//each vector is 1k byte //each vector is 1k byte
ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2); ms::engine::VecIndexPtr mock_index = std::make_shared<MockVecIndex>(256, 2);
ms::cache::DataObjPtr data_obj = std::make_shared<ms::cache::DataObj>(mock_index); ms::cache::DataObjPtr data_obj = std::static_pointer_cast<ms::cache::DataObj>(mock_index);
mgr.InsertItem("index_" + std::to_string(i), data_obj); mgr.InsertItem("index_" + std::to_string(i), data_obj);
} }
ASSERT_EQ(mgr.GetItem("index_0"), nullptr); ASSERT_EQ(mgr.GetItem("index_0"), nullptr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册