提交 4cc5b8d5 编写于 作者: X Xu Peng

refactor(db): replace faiss with execution engine


Former-commit-id: 7e7949541e39bc18f30c5aafa7968fdacf709719
上级 a724141e
...@@ -11,9 +11,11 @@ ...@@ -11,9 +11,11 @@
#include <easylogging++.h> #include <easylogging++.h>
#include <wrapper/IndexBuilder.h> #include <wrapper/IndexBuilder.h>
#include <cache/CpuCacheMgr.h> #include <cache/CpuCacheMgr.h>
#include "DBImpl.h" #include "DBImpl.h"
#include "DBMetaImpl.h" #include "DBMetaImpl.h"
#include "Env.h" #include "Env.h"
#include "FaissExecutionEngine.h"
namespace zilliz { namespace zilliz {
namespace vecwise { namespace vecwise {
...@@ -220,30 +222,25 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, ...@@ -220,30 +222,25 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
return status; return status;
} }
std::shared_ptr<faiss::Index> index(faiss::index_factory(group_file.dimension, "IDMap,Flat")); std::shared_ptr<ExecutionEngine> execution_engine(
new FaissExecutionEngine(group_file.dimension, group_file.location));
meta::GroupFilesSchema updated; meta::GroupFilesSchema updated;
long index_size = 0; long index_size = 0;
for (auto& file : files) { for (auto& file : files) {
auto to_merge = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(file.location); execution_engine->Merge(file.location);
if (!to_merge) {
to_merge = read_index(file.location);
}
auto file_index = dynamic_cast<faiss::IndexIDMap*>(to_merge->data().get());
index->add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
file_index->id_map.data());
auto file_schema = file; auto file_schema = file;
file_schema.file_type = meta::GroupFileSchema::TO_DELETE; file_schema.file_type = meta::GroupFileSchema::TO_DELETE;
updated.push_back(file_schema); updated.push_back(file_schema);
/* LOG(DEBUG) << "About to merge file " << file_schema.file_id << */ /* LOG(DEBUG) << "About to merge file " << file_schema.file_id << */
/* " of size=" << file_schema.rows; */ /* " of size=" << file_schema.rows; */
index_size = group_file.dimension * index->ntotal; index_size = execution_engine->Size();
if (index_size >= _options.index_trigger_size) break; if (index_size >= _options.index_trigger_size) break;
} }
faiss::write_index(index.get(), group_file.location.c_str()); execution_engine->Serialize();
if (index_size >= _options.index_trigger_size) { if (index_size >= _options.index_trigger_size) {
group_file.file_type = meta::GroupFileSchema::TO_INDEX; group_file.file_type = meta::GroupFileSchema::TO_INDEX;
...@@ -256,8 +253,7 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date, ...@@ -256,8 +253,7 @@ Status DBImpl::merge_files(const std::string& group_id, const meta::DateT& date,
/* LOG(DEBUG) << "New merged file " << group_file.file_id << */ /* LOG(DEBUG) << "New merged file " << group_file.file_id << */
/* " of size=" << group_file.rows; */ /* " of size=" << group_file.rows; */
zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->InsertItem( execution_engine->Cache();
group_file.location, std::make_shared<Index>(index));
return status; return status;
} }
......
...@@ -22,6 +22,8 @@ public: ...@@ -22,6 +22,8 @@ public:
virtual Status Serialize() = 0; virtual Status Serialize() = 0;
virtual Status Merge(const std::string& location) = 0;
virtual Status Cache() = 0; virtual Status Cache() = 0;
virtual ~ExecutionEngine() {} virtual ~ExecutionEngine() {}
......
#include <easylogging++.h> #include <easylogging++.h>
#include <faiss/AutoTune.h> #include <faiss/AutoTune.h>
#include <faiss/MetaIndexes.h>
#include <faiss/IndexFlat.h>
#include <faiss/index_io.h>
#include <wrapper/Index.h> #include <wrapper/Index.h>
#include <cache/CpuCacheMgr.h> #include <cache/CpuCacheMgr.h>
...@@ -34,6 +37,20 @@ Status FaissExecutionEngine::Serialize() { ...@@ -34,6 +37,20 @@ Status FaissExecutionEngine::Serialize() {
return Status::OK(); return Status::OK();
} }
Status FaissExecutionEngine::Merge(const std::string& location) {
if (location == location_) {
return Status::Error("Cannot Merge Self");
}
auto to_merge = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
if (!to_merge) {
to_merge = read_index(location);
}
auto file_index = dynamic_cast<faiss::IndexIDMap*>(to_merge->data().get());
pIndex_->add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
file_index->id_map.data());
return Status::OK();
}
Status FaissExecutionEngine::Cache() { Status FaissExecutionEngine::Cache() {
zilliz::vecwise::cache::CpuCacheMgr::GetInstance( zilliz::vecwise::cache::CpuCacheMgr::GetInstance(
)->InsertItem(location_, std::make_shared<Index>(pIndex_)); )->InsertItem(location_, std::make_shared<Index>(pIndex_));
......
...@@ -22,6 +22,8 @@ public: ...@@ -22,6 +22,8 @@ public:
virtual size_t Size() const override; virtual size_t Size() const override;
virtual Status Merge(const std::string& location) override;
virtual Status Serialize() override; virtual Status Serialize() override;
virtual Status Cache() override; virtual Status Cache() override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册