/******************************************************************************* * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved * Unauthorized copying of this file, via any medium is strictly prohibited. * Proprietary and confidential. ******************************************************************************/ #include #include "src/cache/GpuCacheMgr.h" #include "src/metrics/Metrics.h" #include "db/Log.h" #include "utils/CommonUtil.h" #include "src/cache/CpuCacheMgr.h" #include "ExecutionEngineImpl.h" #include "wrapper/knowhere/vec_index.h" #include "wrapper/knowhere/vec_impl.h" #include "knowhere/common/exception.h" #include "db/Exception.h" namespace zilliz { namespace milvus { namespace engine { ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension, const std::string &location, EngineType index_type, MetricType metric_type, int32_t nlist) : location_(location), dim_(dimension), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) { index_ = CreatetVecIndex(EngineType::FAISS_IDMAP); if (!index_) throw Exception("Create Empty VecIndex"); Config build_cfg; build_cfg["dim"] = dimension; build_cfg["metric_type"] = (metric_type_ == MetricType::IP) ? "IP" : "L2"; AutoGenParams(index_->GetType(), 0, build_cfg); auto ec = std::static_pointer_cast(index_)->Build(build_cfg); if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); } } ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index, const std::string &location, EngineType index_type, MetricType metric_type, int32_t nlist) : index_(std::move(index)), location_(location), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) { } VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) { std::shared_ptr index; switch (type) { case EngineType::FAISS_IDMAP: { index = GetVecIndexFactory(IndexType::FAISS_IDMAP); break; } case EngineType::FAISS_IVFFLAT: { index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX); break; } case EngineType::FAISS_IVFSQ8: { index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX); break; } case EngineType::NSG_MIX: { index = GetVecIndexFactory(IndexType::NSG_MIX); break; } default: { ENGINE_LOG_ERROR << "Invalid engine type"; return nullptr; } } return index; } Status ExecutionEngineImpl::AddWithIds(long n, const float *xdata, const long *xids) { auto ec = index_->Add(n, xdata, xids); if (ec != server::KNOWHERE_SUCCESS) { return Status::Error("Add error"); } return Status::OK(); } size_t ExecutionEngineImpl::Count() const { if(index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0"; return 0; } return index_->Count(); } size_t ExecutionEngineImpl::Size() const { return (size_t) (Count() * Dimension()) * sizeof(float); } size_t ExecutionEngineImpl::Dimension() const { if(index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_; return dim_; } return index_->Dimension(); } size_t ExecutionEngineImpl::PhysicalSize() const { return server::CommonUtil::GetFileSize(location_); } Status ExecutionEngineImpl::Serialize() { auto ec = write_index(index_, location_); if (ec != server::KNOWHERE_SUCCESS) { return Status::Error("Serialize: write to disk error"); } return Status::OK(); } Status ExecutionEngineImpl::Load(bool to_cache) { index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index_ != nullptr); if (!already_in_cache) { try { double physical_size = PhysicalSize(); server::CollectExecutionEngineMetrics metrics(physical_size); index_ = read_index(location_); if(index_ == nullptr) { ENGINE_LOG_ERROR << "Failed to load index from " << location_; } else { ENGINE_LOG_DEBUG << "Disk io from: " << location_; } } catch (knowhere::KnowhereException &e) { ENGINE_LOG_ERROR << e.what(); return Status::Error(e.what()); } catch (std::exception &e) { return Status::Error(e.what()); } } if (!already_in_cache && to_cache) { Cache(); } return Status::OK(); } Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) { auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_); bool already_in_cache = (index != nullptr); if (already_in_cache) { index_ = index; } else { if(index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu"; return Status::Error("index is null"); } try { index_ = index_->CopyToGpu(device_id); ENGINE_LOG_DEBUG << "CPU to GPU" << device_id; } catch (knowhere::KnowhereException &e) { ENGINE_LOG_ERROR << e.what(); return Status::Error(e.what()); } catch (std::exception &e) { return Status::Error(e.what()); } } if (!already_in_cache) { GpuCache(device_id); } return Status::OK(); } Status ExecutionEngineImpl::CopyToCpu() { auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_); bool already_in_cache = (index != nullptr); if (already_in_cache) { index_ = index; } else { if(index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to cpu"; return Status::Error("index is null"); } try { index_ = index_->CopyToCpu(); ENGINE_LOG_DEBUG << "GPU to CPU"; } catch (knowhere::KnowhereException &e) { ENGINE_LOG_ERROR << e.what(); return Status::Error(e.what()); } catch (std::exception &e) { return Status::Error(e.what()); } } if (!already_in_cache) { Cache(); } return Status::OK(); } ExecutionEnginePtr ExecutionEngineImpl::Clone() { if(index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone"; return nullptr; } auto ret = std::make_shared(dim_, location_, index_type_, metric_type_, nlist_); ret->Init(); ret->index_ = index_->Clone(); return ret; } Status ExecutionEngineImpl::Merge(const std::string &location) { if (location == location_) { return Status::Error("Cannot Merge Self"); } ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_; auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location); if (!to_merge) { try { double physical_size = server::CommonUtil::GetFileSize(location); server::CollectExecutionEngineMetrics metrics(physical_size); to_merge = read_index(location); } catch (knowhere::KnowhereException &e) { ENGINE_LOG_ERROR << e.what(); return Status::Error(e.what()); } catch (std::exception &e) { return Status::Error(e.what()); } } if(index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to merge"; return Status::Error("index is null"); } if (auto file_index = std::dynamic_pointer_cast(to_merge)) { auto ec = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds()); if (ec != server::KNOWHERE_SUCCESS) { ENGINE_LOG_ERROR << "Merge: Add Error"; return Status::Error("Merge: Add Error"); } return Status::OK(); } else { return Status::Error("file index type is not idmap"); } } ExecutionEnginePtr ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_type) { ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_; auto from_index = std::dynamic_pointer_cast(index_); if(from_index == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: from_index is null, failed to build index"; return nullptr; } auto to_index = CreatetVecIndex(engine_type); if (!to_index) { throw Exception("Create Empty VecIndex"); } Config build_cfg; build_cfg["dim"] = Dimension(); build_cfg["metric_type"] = (metric_type_ == MetricType::IP) ? "IP" : "L2"; build_cfg["gpu_id"] = gpu_num_; build_cfg["nlist"] = nlist_; AutoGenParams(to_index->GetType(), Count(), build_cfg); auto ec = to_index->BuildAll(Count(), from_index->GetRawVectors(), from_index->GetRawIds(), build_cfg); if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); } return std::make_shared(to_index, location, engine_type, metric_type_, nlist_); } Status ExecutionEngineImpl::Search(long n, const float *data, long k, long nprobe, float *distances, long *labels) const { if(index_ == nullptr) { ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search"; return Status::Error("index is null"); } ENGINE_LOG_DEBUG << "Search Params: [k] " << k << " [nprobe] " << nprobe; auto ec = index_->Search(n, data, distances, labels, Config::object{{"k", k}, {"nprobe", nprobe}}); if (ec != server::KNOWHERE_SUCCESS) { ENGINE_LOG_ERROR << "Search error"; return Status::Error("Search: Search Error"); } return Status::OK(); } Status ExecutionEngineImpl::Cache() { zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, index_); return Status::OK(); } Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) { zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, index_); return Status::OK(); } // TODO(linxj): remove. Status ExecutionEngineImpl::Init() { using namespace zilliz::milvus::server; ServerConfig &config = ServerConfig::GetInstance(); ConfigNode server_config = config.GetConfig(CONFIG_SERVER); gpu_num_ = server_config.GetInt32Value("gpu_index", 0); return Status::OK(); } } // namespace engine } // namespace milvus } // namespace zilliz