From 4cc5b8d52394a48169f9891d3eda51858568a376 Mon Sep 17 00:00:00 2001 From: Xu Peng Date: Tue, 30 Apr 2019 16:43:42 +0800 Subject: [PATCH] refactor(db): replace faiss with execution engine Former-commit-id: 7e7949541e39bc18f30c5aafa7968fdacf709719 --- cpp/src/db/DBImpl.cpp | 20 ++++++++------------ cpp/src/db/ExecutionEngine.h | 2 ++ cpp/src/db/FaissExecutionEngine.cpp | 17 +++++++++++++++++ cpp/src/db/FaissExecutionEngine.h | 2 ++ 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/cpp/src/db/DBImpl.cpp b/cpp/src/db/DBImpl.cpp index 859a1d75..8e69eb0e 100644 --- a/cpp/src/db/DBImpl.cpp +++ b/cpp/src/db/DBImpl.cpp @@ -11,9 +11,11 @@ #include #include #include + #include "DBImpl.h" #include "DBMetaImpl.h" #include "Env.h" +#include "FaissExecutionEngine.h" namespace zilliz { namespace vecwise { @@ -220,30 +222,25 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, return status; } - std::shared_ptr index(faiss::index_factory(group_file.dimension, "IDMap,Flat")); + std::shared_ptr execution_engine( + new FaissExecutionEngine(group_file.dimension, group_file.location)); meta::GroupFilesSchema updated; long index_size = 0; for (auto& file : files) { - auto to_merge = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(file.location); - if (!to_merge) { - to_merge = read_index(file.location); - } - auto file_index = dynamic_cast(to_merge->data().get()); - index->add_with_ids(file_index->ntotal, dynamic_cast(file_index->index)->xb.data(), - file_index->id_map.data()); + execution_engine->Merge(file.location); auto file_schema = file; file_schema.file_type = meta::GroupFileSchema::TO_DELETE; updated.push_back(file_schema); /* LOG(DEBUG) << "About to merge file " << file_schema.file_id << */ /* " of size=" << file_schema.rows; */ - index_size = group_file.dimension * index->ntotal; + index_size = execution_engine->Size(); if (index_size >= _options.index_trigger_size) break; } - faiss::write_index(index.get(), group_file.location.c_str()); + execution_engine->Serialize(); if (index_size >= _options.index_trigger_size) { group_file.file_type = meta::GroupFileSchema::TO_INDEX; @@ -256,8 +253,7 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, /* LOG(DEBUG) << "New merged file " << group_file.file_id << */ /* " of size=" << group_file.rows; */ - zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->InsertItem( - group_file.location, std::make_shared(index)); + execution_engine->Cache(); return status; } diff --git a/cpp/src/db/ExecutionEngine.h b/cpp/src/db/ExecutionEngine.h index 4b08149f..87502045 100644 --- a/cpp/src/db/ExecutionEngine.h +++ b/cpp/src/db/ExecutionEngine.h @@ -22,6 +22,8 @@ public: virtual Status Serialize() = 0; + virtual Status Merge(const std::string& location) = 0; + virtual Status Cache() = 0; virtual ~ExecutionEngine() {} diff --git a/cpp/src/db/FaissExecutionEngine.cpp b/cpp/src/db/FaissExecutionEngine.cpp index ffa3ef3c..2bb0f99a 100644 --- a/cpp/src/db/FaissExecutionEngine.cpp +++ b/cpp/src/db/FaissExecutionEngine.cpp @@ -1,5 +1,8 @@ #include #include +#include +#include +#include #include #include @@ -34,6 +37,20 @@ Status FaissExecutionEngine::Serialize() { return Status::OK(); } +Status FaissExecutionEngine::Merge(const std::string& location) { + if (location == location_) { + return Status::Error("Cannot Merge Self"); + } + auto to_merge = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(location); + if (!to_merge) { + to_merge = read_index(location); + } + auto file_index = dynamic_cast(to_merge->data().get()); + pIndex_->add_with_ids(file_index->ntotal, dynamic_cast(file_index->index)->xb.data(), + file_index->id_map.data()); + return Status::OK(); +} + Status FaissExecutionEngine::Cache() { zilliz::vecwise::cache::CpuCacheMgr::GetInstance( )->InsertItem(location_, std::make_shared(pIndex_)); diff --git a/cpp/src/db/FaissExecutionEngine.h b/cpp/src/db/FaissExecutionEngine.h index bb1f5a47..a76e100e 100644 --- a/cpp/src/db/FaissExecutionEngine.h +++ b/cpp/src/db/FaissExecutionEngine.h @@ -22,6 +22,8 @@ public: virtual size_t Size() const override; + virtual Status Merge(const std::string& location) override; + virtual Status Serialize() override; virtual Status Cache() override; -- GitLab