From 0712798938cc15610984ebd992e0541487bed04e Mon Sep 17 00:00:00 2001 From: wxyu Date: Thu, 10 Oct 2019 20:57:14 +0800 Subject: [PATCH] MS-631 IVFSQ8H Index support Former-commit-id: 21e17a20794e4fde31e79c4bbd4e26d46c79d886 --- cpp/CHANGELOG.md | 1 + cpp/src/cache/CpuCacheMgr.cpp | 2 +- cpp/src/cache/GpuCacheMgr.cpp | 4 +- .../index/vector_index/IndexIVFSQHybrid.cpp | 26 +++- .../index/vector_index/IndexIVFSQHybrid.h | 10 +- .../knowhere/index/vector_index/Quantizer.h | 4 +- cpp/src/db/engine/ExecutionEngine.h | 3 +- cpp/src/db/engine/ExecutionEngineImpl.cpp | 138 +++++++++++------- cpp/src/db/engine/ExecutionEngineImpl.h | 5 +- cpp/src/scheduler/JobMgr.cpp | 4 + cpp/src/scheduler/Utils.cpp | 39 +++++ cpp/src/scheduler/Utils.h | 4 + cpp/src/scheduler/optimizer/HybridPass.cpp | 7 +- cpp/src/sdk/include/MilvusApi.h | 1 + cpp/src/server/Config.cpp | 8 +- cpp/src/server/Config.h | 4 +- cpp/src/wrapper/ConfAdapterMgr.cpp | 1 + cpp/src/wrapper/VecImpl.h | 4 + cpp/src/wrapper/VecIndex.cpp | 2 +- 19 files changed, 192 insertions(+), 75 deletions(-) diff --git a/cpp/CHANGELOG.md b/cpp/CHANGELOG.md index 0a16fed4..1a62a791 100644 --- a/cpp/CHANGELOG.md +++ b/cpp/CHANGELOG.md @@ -36,6 +36,7 @@ Please mark all change in change log and use the ticket from JIRA. ## New Feature - MS-627 - Integrate new index: IVFSQHybrid +- MS-631 - IVFSQ8H Index support ## Task - MS-554 - Change license to Apache 2.0 diff --git a/cpp/src/cache/CpuCacheMgr.cpp b/cpp/src/cache/CpuCacheMgr.cpp index 7dcecba3..1f560cbf 100644 --- a/cpp/src/cache/CpuCacheMgr.cpp +++ b/cpp/src/cache/CpuCacheMgr.cpp @@ -32,7 +32,7 @@ CpuCacheMgr::CpuCacheMgr() { server::Config& config = server::Config::GetInstance(); Status s; - int32_t cpu_cache_cap; + int64_t cpu_cache_cap; s = config.GetCacheConfigCpuCacheCapacity(cpu_cache_cap); if (!s.ok()) { SERVER_LOG_ERROR << s.message(); diff --git a/cpp/src/cache/GpuCacheMgr.cpp b/cpp/src/cache/GpuCacheMgr.cpp index aa19cd7f..d862bc03 100644 --- a/cpp/src/cache/GpuCacheMgr.cpp +++ b/cpp/src/cache/GpuCacheMgr.cpp @@ -36,12 +36,12 @@ GpuCacheMgr::GpuCacheMgr() { server::Config& config = server::Config::GetInstance(); Status s; - int32_t gpu_cache_cap; + int64_t gpu_cache_cap; s = config.GetCacheConfigGpuCacheCapacity(gpu_cache_cap); if (!s.ok()) { SERVER_LOG_ERROR << s.message(); } - int32_t cap = gpu_cache_cap * G_BYTE; + int64_t cap = gpu_cache_cap * G_BYTE; cache_ = std::make_shared>(cap, 1UL << 32); float gpu_mem_threshold; diff --git a/cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp b/cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp index 60b1770f..78c48abe 100644 --- a/cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp +++ b/cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.cpp @@ -100,16 +100,20 @@ IVFSQHybrid::CopyCpuToGpu(const int64_t& device_id, const Config& config) { void IVFSQHybrid::LoadImpl(const BinarySet& index_binary) { FaissBaseIndex::LoadImpl(index_binary); // load on cpu + auto* ivf_index = dynamic_cast(index_.get()); + ivf_index->backup_quantizer(); } void IVFSQHybrid::search_impl(int64_t n, const float* data, int64_t k, float* distances, int64_t* labels, const Config& cfg) { - if (gpu_mode) { + if (gpu_mode == 2) { GPUIVF::search_impl(n, data, k, distances, labels, cfg); - } else { + } else if (gpu_mode == 1) { ResScope rs(res_, gpu_id_); IVF::search_impl(n, data, k, distances, labels, cfg); + } else if (gpu_mode == 0) { + IVF::search_impl(n, data, k, distances, labels, cfg); } } @@ -137,8 +141,12 @@ IVFSQHybrid::LoadQuantizer(const Config& conf) { delete gpu_index; auto q = std::make_shared(); - q->quantizer = index_composition->quantizer; + + auto& q_ptr = index_composition->quantizer; + q->size = q_ptr->d * q_ptr->getNumVecs() * sizeof(float); + q->quantizer = q_ptr; res_ = res; + gpu_mode = 1; return q; } else { KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource"); @@ -156,7 +164,7 @@ IVFSQHybrid::SetQuantizer(const QuantizerPtr& q) { faiss::gpu::GpuIndexFlat* is_gpu_flat_index = dynamic_cast(ivf_index->quantizer); if (is_gpu_flat_index == nullptr) { - delete ivf_index->quantizer; +// delete ivf_index->quantizer; ivf_index->quantizer = ivf_quantizer->quantizer; } } @@ -199,10 +207,18 @@ IVFSQHybrid::LoadData(const knowhere::QuantizerPtr& q, const Config& conf) { auto gpu_index = faiss::gpu::index_cpu_to_gpu(res->faiss_res.get(), gpu_id_, index_composition, &option); index_.reset(gpu_index); - gpu_mode = true; // all in gpu + gpu_mode = 2; // all in gpu } else { KNOWHERE_THROW_MSG("CopyCpuToGpu Error, can't get gpu_resource"); } } +FaissIVFQuantizer::~FaissIVFQuantizer() { + if (quantizer != nullptr) { + delete quantizer; + quantizer = nullptr; + } + // else do nothing +} + } // namespace knowhere diff --git a/cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h b/cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h index 1ec67760..d34b28b1 100644 --- a/cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h +++ b/cpp/src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h @@ -27,23 +27,25 @@ namespace knowhere { struct FaissIVFQuantizer : public Quantizer { faiss::gpu::GpuIndexFlat* quantizer = nullptr; + + ~FaissIVFQuantizer() override; }; using FaissIVFQuantizerPtr = std::shared_ptr; class IVFSQHybrid : public GPUIVFSQ { public: explicit IVFSQHybrid(const int& device_id) : GPUIVFSQ(device_id) { - gpu_mode = false; + gpu_mode = 0; } explicit IVFSQHybrid(std::shared_ptr index) : GPUIVFSQ(-1) { index_ = index; - gpu_mode = false; + gpu_mode = 0; } explicit IVFSQHybrid(std::shared_ptr index, const int64_t& device_id, ResPtr& resource) : GPUIVFSQ(index, device_id, resource) { - gpu_mode = true; + gpu_mode = 2; } public: @@ -76,7 +78,7 @@ class IVFSQHybrid : public GPUIVFSQ { LoadImpl(const BinarySet& index_binary) override; protected: - bool gpu_mode = false; + int64_t gpu_mode = 0; // 0,1,2 }; } // namespace knowhere diff --git a/cpp/src/core/knowhere/knowhere/index/vector_index/Quantizer.h b/cpp/src/core/knowhere/knowhere/index/vector_index/Quantizer.h index ea74e97c..f7f9fda8 100644 --- a/cpp/src/core/knowhere/knowhere/index/vector_index/Quantizer.h +++ b/cpp/src/core/knowhere/knowhere/index/vector_index/Quantizer.h @@ -24,11 +24,13 @@ namespace knowhere { struct Quantizer { virtual ~Quantizer() = default; + + int64_t size = -1; }; using QuantizerPtr = std::shared_ptr; struct QuantizerCfg : Cfg { - uint64_t mode = -1; // 0: all data, 1: copy quantizer, 2: copy data + int64_t mode = -1; // 0: all data, 1: copy quantizer, 2: copy data }; using QuantizerConfig = std::shared_ptr; diff --git a/cpp/src/db/engine/ExecutionEngine.h b/cpp/src/db/engine/ExecutionEngine.h index 848704bd..d7de6f37 100644 --- a/cpp/src/db/engine/ExecutionEngine.h +++ b/cpp/src/db/engine/ExecutionEngine.h @@ -32,7 +32,8 @@ enum class EngineType { FAISS_IVFFLAT, FAISS_IVFSQ8, NSG_MIX, - MAX_VALUE = NSG_MIX, + FAISS_IVFSQ8H, + MAX_VALUE = FAISS_IVFSQ8H, }; enum class MetricType { diff --git a/cpp/src/db/engine/ExecutionEngineImpl.cpp b/cpp/src/db/engine/ExecutionEngineImpl.cpp index 77c6942d..d2169b85 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.cpp +++ b/cpp/src/db/engine/ExecutionEngineImpl.cpp @@ -33,10 +33,31 @@ #include #include +#include namespace milvus { namespace engine { +class CachedQuantizer : public cache::DataObj { + public: + explicit + CachedQuantizer(knowhere::QuantizerPtr data) + : data_(std::move(data)) {} + + knowhere::QuantizerPtr + Data() { + return data_; + } + + int64_t + Size() override { + return data_->size; + } + + private: + knowhere::QuantizerPtr data_; +}; + ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension, const std::string& location, EngineType index_type, MetricType metric_type, int32_t nlist) : location_(location), dim_(dimension), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) { @@ -83,6 +104,10 @@ ExecutionEngineImpl::CreatetVecIndex(EngineType type) { index = GetVecIndexFactory(IndexType::NSG_MIX); break; } + case EngineType::FAISS_IVFSQ8H: { + index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_HYBRID); + break; + } default: { ENGINE_LOG_ERROR << "Invalid engine type"; return nullptr; @@ -92,57 +117,63 @@ ExecutionEngineImpl::CreatetVecIndex(EngineType type) { } void -ExecutionEngineImpl::HybridLoad() { - // if (index_type_ != EngineType::FAISS_IVFSQ8Hybrid) { - // return; - // } - // - // const std::string key = location_ + ".quantizer"; - // std::vector gpus; - // - // // cache hit - // { - // int64_t selected = -1; - // void* quantizer = nullptr; - // for (auto& gpu : gpus) { - // auto cache = cache::GpuCacheMgr::GetInstance(gpu); - // if (auto quan = cache->GetIndex(key)) { - // selected = gpu; - // quantizer = quan; - // } - // } - // - // if (selected != -1) { - // // set quantizer into index; - // return; - // } - // } - // - // // cache miss - // { - // std::vector all_free_mem; - // for (auto& gpu : gpus) { - // auto cache = cache::GpuCacheMgr::GetInstance(gpu); - // auto free_mem = cache->CacheCapacity() - cache->CacheUsage(); - // all_free_mem.push_back(free_mem); - // } - // - // auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end()); - // auto best = std::distance(all_free_mem.begin(), max_e); - // - // // load to best device; - // // cache quantizer - // } - // - // // if index_type == Hybrid - // - // // 1. quantizer in which gpu - // - // // 2.1 which gpu cache best - // - // // 2.2 load to that gpu cache - // - // // set quantizer into index +ExecutionEngineImpl::HybridLoad() const { + if (index_type_ != EngineType::FAISS_IVFSQ8H) { + return; + } + + const std::string key = location_ + ".quantizer"; + std::vector gpus = scheduler::get_gpu_pool(); + + // cache hit + { + const int64_t NOT_FOUND = -1; + int64_t device_id = NOT_FOUND; + knowhere::QuantizerPtr quantizer = nullptr; + + for (auto& gpu : gpus) { + auto cache = cache::GpuCacheMgr::GetInstance(gpu); + if (auto cached_quantizer = cache->GetIndex(key)) { + device_id = gpu; + quantizer = std::static_pointer_cast(cached_quantizer)->Data(); + } + } + + if (device_id != NOT_FOUND) { + index_->SetQuantizer(quantizer); + return; + } + } + + // cache miss + { + std::vector all_free_mem; + for (auto& gpu : gpus) { + auto cache = cache::GpuCacheMgr::GetInstance(gpu); + auto free_mem = cache->CacheCapacity() - cache->CacheUsage(); + all_free_mem.push_back(free_mem); + } + + auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end()); + auto best_index = std::distance(all_free_mem.begin(), max_e); + auto best_device_id = gpus[best_index]; + + auto quantizer_conf = std::make_shared(); + quantizer_conf->mode = 1; + quantizer_conf->gpu_id = best_device_id; + auto quantizer = index_->LoadQuantizer(quantizer_conf); + index_->SetQuantizer(quantizer); + auto cache_quantizer = std::make_shared(quantizer); + cache::GpuCacheMgr::GetInstance(best_device_id)->InsertItem(key, cache_quantizer); + } +} + +void +ExecutionEngineImpl::HybridUnset() const { + if (index_type_ != EngineType::FAISS_IVFSQ8H) { + return; + } + index_->UnsetQuantizer(); } Status @@ -375,7 +406,12 @@ ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t npr auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType()); auto conf = adapter->MatchSearch(temp_conf, index_->GetType()); + HybridLoad(); + auto status = index_->Search(n, data, distances, labels, conf); + + HybridUnset(); + if (!status.ok()) { ENGINE_LOG_ERROR << "Search error"; } diff --git a/cpp/src/db/engine/ExecutionEngineImpl.h b/cpp/src/db/engine/ExecutionEngineImpl.h index f41072e8..7d2f6e91 100644 --- a/cpp/src/db/engine/ExecutionEngineImpl.h +++ b/cpp/src/db/engine/ExecutionEngineImpl.h @@ -108,7 +108,10 @@ class ExecutionEngineImpl : public ExecutionEngine { Load(const std::string& location); void - HybridLoad(); + HybridLoad() const; + + void + HybridUnset() const; protected: VecIndexPtr index_ = nullptr; diff --git a/cpp/src/scheduler/JobMgr.cpp b/cpp/src/scheduler/JobMgr.cpp index bf22f7be..9b6d7dfe 100644 --- a/cpp/src/scheduler/JobMgr.cpp +++ b/cpp/src/scheduler/JobMgr.cpp @@ -20,6 +20,7 @@ #include "task/Task.h" #include +#include namespace milvus { namespace scheduler { @@ -66,6 +67,9 @@ JobMgr::worker_function() { } auto tasks = build_task(job); + + // TODO: optimizer all task + // disk resources NEVER be empty. if (auto disk = res_mgr_->GetDiskResources()[0].lock()) { for (auto& task : tasks) { diff --git a/cpp/src/scheduler/Utils.cpp b/cpp/src/scheduler/Utils.cpp index 071f1522..61494b9f 100644 --- a/cpp/src/scheduler/Utils.cpp +++ b/cpp/src/scheduler/Utils.cpp @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +#include "server/Config.h" #include "scheduler/Utils.h" +#include "utils/Log.h" #include #include @@ -38,5 +40,42 @@ get_num_gpu() { return n_devices; } +std::vector +get_gpu_pool() { + std::vector gpu_pool; + + server::Config& config = server::Config::GetInstance(); + std::vector pool; + Status s = config.GetResourceConfigPool(pool); + if (!s.ok()) { + SERVER_LOG_ERROR << s.message(); + } + + std::set gpu_ids; + + for (auto& resource : pool) { + if (resource == "cpu") { + continue; + } else { + if (resource.length() < 4 || resource.substr(0, 3) != "gpu") { + // error + exit(-1); + } + auto gpu_id = std::stoi(resource.substr(3)); + if (gpu_id >= scheduler::get_num_gpu()) { + // error + exit(-1); + } + gpu_ids.insert(gpu_id); + } + } + + for (auto& gpu_id : gpu_ids) { + gpu_pool.push_back(gpu_id); + } + + return gpu_pool; +}; + } // namespace scheduler } // namespace milvus diff --git a/cpp/src/scheduler/Utils.h b/cpp/src/scheduler/Utils.h index e999e0fd..24876eeb 100644 --- a/cpp/src/scheduler/Utils.h +++ b/cpp/src/scheduler/Utils.h @@ -16,6 +16,7 @@ // under the License. #include +#include namespace milvus { namespace scheduler { @@ -26,5 +27,8 @@ get_current_timestamp(); uint64_t get_num_gpu(); +std::vector +get_gpu_pool(); + } // namespace scheduler } // namespace milvus diff --git a/cpp/src/scheduler/optimizer/HybridPass.cpp b/cpp/src/scheduler/optimizer/HybridPass.cpp index f172a7be..343d6ec8 100644 --- a/cpp/src/scheduler/optimizer/HybridPass.cpp +++ b/cpp/src/scheduler/optimizer/HybridPass.cpp @@ -23,11 +23,14 @@ namespace scheduler { bool HybridPass::Run(const TaskPtr& task) { - // TODO: Index::IVFSQ8Hybrid, if nq < threshold set cpu, else set gpu + // TODO: future, Index::IVFSQ8H, if nq < threshold set cpu, else set gpu if (task->Type() != TaskType::SearchTask) return false; auto search_task = std::static_pointer_cast(task); - // if (search_task->file_->engine_type_ == engine::EngineType::FAISS_IVFSQ8Hybrid) + if (search_task->file_->engine_type_ == (int)engine::EngineType::FAISS_IVFSQ8H) { + // TODO: make specified label + return true; + } return false; } diff --git a/cpp/src/sdk/include/MilvusApi.h b/cpp/src/sdk/include/MilvusApi.h index 9425ef3b..68fe0e9d 100644 --- a/cpp/src/sdk/include/MilvusApi.h +++ b/cpp/src/sdk/include/MilvusApi.h @@ -36,6 +36,7 @@ enum class IndexType { gpu_ivfflat, gpu_ivfsq8, mix_nsg, + ivfsq8h, }; enum class MetricType { diff --git a/cpp/src/server/Config.cpp b/cpp/src/server/Config.cpp index d383eb5e..78a9aaad 100644 --- a/cpp/src/server/Config.cpp +++ b/cpp/src/server/Config.cpp @@ -161,7 +161,7 @@ Config::ValidateConfig() { } /* cache config */ - int32_t cache_cpu_cache_capacity; + int64_t cache_cpu_cache_capacity; s = GetCacheConfigCpuCacheCapacity(cache_cpu_cache_capacity); if (!s.ok()) { return s; @@ -173,7 +173,7 @@ Config::ValidateConfig() { return s; } - int32_t cache_gpu_cache_capacity; + int64_t cache_gpu_cache_capacity; s = GetCacheConfigGpuCacheCapacity(cache_gpu_cache_capacity); if (!s.ok()) { return s; @@ -789,7 +789,7 @@ Config::GetMetricConfigPrometheusPort(std::string& value) { } Status -Config::GetCacheConfigCpuCacheCapacity(int32_t& value) { +Config::GetCacheConfigCpuCacheCapacity(int64_t& value) { std::string str = GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_CPU_CACHE_CAPACITY, CONFIG_CACHE_CPU_CACHE_CAPACITY_DEFAULT); Status s = CheckCacheConfigCpuCacheCapacity(str); @@ -815,7 +815,7 @@ Config::GetCacheConfigCpuCacheThreshold(float& value) { } Status -Config::GetCacheConfigGpuCacheCapacity(int32_t& value) { +Config::GetCacheConfigGpuCacheCapacity(int64_t& value) { std::string str = GetConfigStr(CONFIG_CACHE, CONFIG_CACHE_GPU_CACHE_CAPACITY, CONFIG_CACHE_GPU_CACHE_CAPACITY_DEFAULT); Status s = CheckCacheConfigGpuCacheCapacity(str); diff --git a/cpp/src/server/Config.h b/cpp/src/server/Config.h index fb00498b..127bd9c0 100644 --- a/cpp/src/server/Config.h +++ b/cpp/src/server/Config.h @@ -221,11 +221,11 @@ class Config { /* cache config */ Status - GetCacheConfigCpuCacheCapacity(int32_t& value); + GetCacheConfigCpuCacheCapacity(int64_t& value); Status GetCacheConfigCpuCacheThreshold(float& value); Status - GetCacheConfigGpuCacheCapacity(int32_t& value); + GetCacheConfigGpuCacheCapacity(int64_t& value); Status GetCacheConfigGpuCacheThreshold(float& value); Status diff --git a/cpp/src/wrapper/ConfAdapterMgr.cpp b/cpp/src/wrapper/ConfAdapterMgr.cpp index 05c23c42..b329588c 100644 --- a/cpp/src/wrapper/ConfAdapterMgr.cpp +++ b/cpp/src/wrapper/ConfAdapterMgr.cpp @@ -49,6 +49,7 @@ AdapterMgr::RegisterAdapter() { REGISTER_CONF_ADAPTER(IVFSQConfAdapter, IndexType::FAISS_IVFSQ8_CPU, ivfsq8_cpu); REGISTER_CONF_ADAPTER(IVFSQConfAdapter, IndexType::FAISS_IVFSQ8_GPU, ivfsq8_gpu); REGISTER_CONF_ADAPTER(IVFSQConfAdapter, IndexType::FAISS_IVFSQ8_MIX, ivfsq8_mix); + REGISTER_CONF_ADAPTER(IVFSQConfAdapter, IndexType::FAISS_IVFSQ8_HYBRID, ivfsq8_h); REGISTER_CONF_ADAPTER(IVFPQConfAdapter, IndexType::FAISS_IVFPQ_CPU, ivfpq_cpu); REGISTER_CONF_ADAPTER(IVFPQConfAdapter, IndexType::FAISS_IVFPQ_GPU, ivfpq_gpu); diff --git a/cpp/src/wrapper/VecImpl.h b/cpp/src/wrapper/VecImpl.h index 501c35a8..44c6ffe2 100644 --- a/cpp/src/wrapper/VecImpl.h +++ b/cpp/src/wrapper/VecImpl.h @@ -93,6 +93,10 @@ class IVFMixIndex : public VecIndexImpl { class IVFHybridIndex : public IVFMixIndex { public: + explicit IVFHybridIndex(std::shared_ptr index, const IndexType& type) + : IVFMixIndex(std::move(index), type) { + } + knowhere::QuantizerPtr LoadQuantizer(const Config& conf) override; diff --git a/cpp/src/wrapper/VecIndex.cpp b/cpp/src/wrapper/VecIndex.cpp index 041b3a8d..fe75cbb8 100644 --- a/cpp/src/wrapper/VecIndex.cpp +++ b/cpp/src/wrapper/VecIndex.cpp @@ -145,7 +145,7 @@ GetVecIndexFactory(const IndexType& type, const Config& cfg) { } case IndexType::FAISS_IVFSQ8_HYBRID: { index = std::make_shared(gpu_device); - break; + return std::make_shared(index, IndexType::FAISS_IVFSQ8_HYBRID); } case IndexType::NSG_MIX: { // TODO(linxj): bug. index = std::make_shared(gpu_device); -- GitLab