提交 a1e65208 编写于 作者: G groot

refacture db code


Former-commit-id: 4aa8a2bb10b44d9315431a79ec6c140e4735aff5
上级 e95f5110
......@@ -16,15 +16,7 @@ namespace engine {
DB::~DB() {}
void DB::Open(const Options& options, DB** dbptr) {
*dbptr = nullptr;
#ifdef GPU_VERSION
std::string default_index_type{"Faiss,IVF"};
#else
std::string default_index_type{"Faiss,IDMap"};
#endif
*dbptr = DBFactory::Build(options, default_index_type);
*dbptr = DBFactory::Build(options);
return;
}
......
......@@ -3,11 +3,10 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "DBImpl.h"
#include "DBMetaImpl.h"
#include "Env.h"
#include "EngineFactory.h"
#include "metrics/Metrics.h"
#include "scheduler/SearchScheduler.h"
......@@ -23,80 +22,107 @@ namespace zilliz {
namespace vecwise {
namespace engine {
namespace {
void CollectInsertMetrics(double total_time, size_t n, bool succeed) {
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (succeed) {
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
}
else {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
}
}
void CollectQueryMetrics(double total_time, size_t nq) {
for (int i = 0; i < nq; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
auto average_time = total_time / nq;
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
}
void CollectFileMetrics(meta::TableFileSchema::FILE_TYPE file_type, size_t file_size, double total_time) {
switch(file_type) {
case meta::TableFileSchema::RAW:
case meta::TableFileSchema::TO_INDEX: {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size);
break;
}
default: {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size);
break;
}
}
}
}
template<typename EngineT>
DBImpl<EngineT>::DBImpl(const Options& options)
DBImpl::DBImpl(const Options& options)
: env_(options.env),
options_(options),
bg_compaction_scheduled_(false),
shutting_down_(false),
bg_build_index_started_(false),
pMeta_(new meta::DBMetaImpl(options_.meta)),
pMemMgr_(new MemManager<EngineT>(pMeta_, options_)) {
pMemMgr_(new MemManager(pMeta_, options_)) {
StartTimerTasks(options_.memory_sync_interval);
}
template<typename EngineT>
Status DBImpl<EngineT>::CreateTable(meta::TableSchema& table_schema) {
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
return pMeta_->CreateTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::DescribeTable(meta::TableSchema& table_schema) {
Status DBImpl::DescribeTable(meta::TableSchema& table_schema) {
return pMeta_->DescribeTable(table_schema);
}
template<typename EngineT>
Status DBImpl<EngineT>::HasTable(const std::string& table_id, bool& has_or_not) {
Status DBImpl::HasTable(const std::string& table_id, bool& has_or_not) {
return pMeta_->HasTable(table_id, has_or_not);
}
template<typename EngineT>
Status DBImpl<EngineT>::InsertVectors(const std::string& table_id_,
Status DBImpl::InsertVectors(const std::string& table_id_,
size_t n, const float* vectors, IDNumbers& vector_ids_) {
auto start_time = METRICS_NOW_TIME;
Status status = pMemMgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
auto end_time = METRICS_NOW_TIME;
double total_time = METRICS_MICROSECONDS(start_time,end_time);
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
double total_time = METRICS_MICROSECONDS(start_time,end_time);
double avg_time = total_time / n;
for (int i = 0; i < n; ++i) {
server::Metrics::GetInstance().AddVectorsDurationHistogramOberve(avg_time);
}
// server::Metrics::GetInstance().add_vector_duration_seconds_quantiles().Observe((average_time));
if (!status.ok()) {
server::Metrics::GetInstance().AddVectorsFailTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsFailGaugeSet(n);
return status;
}
server::Metrics::GetInstance().AddVectorsSuccessTotalIncrement(n);
server::Metrics::GetInstance().AddVectorsSuccessGaugeSet(n);
CollectInsertMetrics(total_time, n, status.ok());
return status;
}
template<typename EngineT>
Status DBImpl<EngineT>::Query(const std::string &table_id, size_t k, size_t nq,
Status DBImpl::Query(const std::string &table_id, size_t k, size_t nq,
const float *vectors, QueryResults &results) {
auto start_time = METRICS_NOW_TIME;
meta::DatesT dates = {meta::Meta::GetDate()};
Status result = Query(table_id, k, nq, vectors, dates, results);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
auto average_time = total_time / nq;
for (int i = 0; i < nq; ++i) {
server::Metrics::GetInstance().QueryResponseSummaryObserve(total_time);
}
server::Metrics::GetInstance().QueryVectorResponseSummaryObserve(average_time, nq);
server::Metrics::GetInstance().QueryVectorResponsePerSecondGaugeSet(double (nq) / total_time);
CollectQueryMetrics(total_time, nq);
return result;
}
template<typename EngineT>
Status DBImpl<EngineT>::Query(const std::string& table_id, size_t k, size_t nq,
Status DBImpl::Query(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
#if 0
return QuerySync(table_id, k, nq, vectors, dates, results);
......@@ -105,8 +131,7 @@ Status DBImpl<EngineT>::Query(const std::string& table_id, size_t k, size_t nq,
#endif
}
template<typename EngineT>
Status DBImpl<EngineT>::QuerySync(const std::string& table_id, size_t k, size_t nq,
Status DBImpl::QuerySync(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToSearch(table_id, dates, files);
......@@ -160,38 +185,20 @@ Status DBImpl<EngineT>::QuerySync(const std::string& table_id, size_t k, size_t
auto search_in_index = [&](meta::TableFilesSchema& file_vec) -> void {
for (auto &file : file_vec) {
EngineT index(file.dimension, file.location);
index.Load();
auto file_size = index.PhysicalSize()/(1024*1024);
ExecutionEnginePtr index = EngineFactory::Build(file.dimension, file.location, (EngineType)file.engine_type_);
index->Load();
auto file_size = index->PhysicalSize();
search_set_size += file_size;
LOG(DEBUG) << "Search file_type " << file.file_type << " Of Size: "
<< file_size << " M";
<< file_size/(1024*1024) << " M";
int inner_k = index.Count() < k ? index.Count() : k;
int inner_k = index->Count() < k ? index->Count() : k;
auto start_time = METRICS_NOW_TIME;
index.Search(nq, vectors, inner_k, output_distence, output_ids);
index->Search(nq, vectors, inner_k, output_distence, output_ids);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
if(file.file_type == meta::TableFileSchema::RAW) {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size*1024*1024);
} else if(file.file_type == meta::TableFileSchema::TO_INDEX) {
server::Metrics::GetInstance().SearchRawDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().RawFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().RawFileSizeGaugeSet(file_size*1024*1024);
} else {
server::Metrics::GetInstance().SearchIndexDataDurationSecondsHistogramObserve(total_time);
server::Metrics::GetInstance().IndexFileSizeHistogramObserve(file_size*1024*1024);
server::Metrics::GetInstance().IndexFileSizeTotalIncrement(file_size*1024*1024);
server::Metrics::GetInstance().IndexFileSizeGaugeSet(file_size*1024*1024);
}
CollectFileMetrics((meta::TableFileSchema::FILE_TYPE)file.file_type, file_size, total_time);
cluster(output_ids, output_distence, inner_k); // cluster to each query
memset(output_distence, 0, k * nq * sizeof(float));
memset(output_ids, 0, k * nq * sizeof(long));
......@@ -258,8 +265,7 @@ Status DBImpl<EngineT>::QuerySync(const std::string& table_id, size_t k, size_t
return Status::OK();
}
template<typename EngineT>
Status DBImpl<EngineT>::QueryAsync(const std::string& table_id, size_t k, size_t nq,
Status DBImpl::QueryAsync(const std::string& table_id, size_t k, size_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
meta::DatePartionedTableFilesSchema files;
auto status = pMeta_->FilesToSearch(table_id, dates, files);
......@@ -292,13 +298,12 @@ Status DBImpl<EngineT>::QueryAsync(const std::string& table_id, size_t k, size_t
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::StartTimerTasks(int interval) {
bg_timer_thread_ = std::thread(&DBImpl<EngineT>::BackgroundTimerTask, this, interval);
void DBImpl::StartTimerTasks(int interval) {
bg_timer_thread_ = std::thread(&DBImpl::BackgroundTimerTask, this, interval);
}
template<typename EngineT>
void DBImpl<EngineT>::BackgroundTimerTask(int interval) {
void DBImpl::BackgroundTimerTask(int interval) {
Status status;
while (true) {
if (!bg_error_.ok()) break;
......@@ -315,22 +320,19 @@ void DBImpl<EngineT>::BackgroundTimerTask(int interval) {
}
}
template<typename EngineT>
void DBImpl<EngineT>::TrySchedule() {
void DBImpl::TrySchedule() {
if (bg_compaction_scheduled_) return;
if (!bg_error_.ok()) return;
bg_compaction_scheduled_ = true;
env_->Schedule(&DBImpl<EngineT>::BGWork, this);
env_->Schedule(&DBImpl::BGWork, this);
}
template<typename EngineT>
void DBImpl<EngineT>::BGWork(void* db_) {
void DBImpl::BGWork(void* db_) {
reinterpret_cast<DBImpl*>(db_)->BackgroundCall();
}
template<typename EngineT>
void DBImpl<EngineT>::BackgroundCall() {
void DBImpl::BackgroundCall() {
std::lock_guard<std::mutex> lock(mutex_);
assert(bg_compaction_scheduled_);
......@@ -343,9 +345,7 @@ void DBImpl<EngineT>::BackgroundCall() {
bg_work_finish_signal_.notify_all();
}
template<typename EngineT>
Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::DateT& date,
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
const meta::TableFilesSchema& files) {
meta::TableFileSchema table_file;
table_file.table_id = table_id;
......@@ -357,7 +357,7 @@ Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::Date
return status;
}
EngineT index(table_file.dimension, table_file.location);
ExecutionEnginePtr index = EngineFactory::Build(table_file.dimension, table_file.location, (EngineType)table_file.engine_type_);
meta::TableFilesSchema updated;
long index_size = 0;
......@@ -365,7 +365,7 @@ Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::Date
for (auto& file : files) {
auto start_time = METRICS_NOW_TIME;
index.Merge(file.location);
index->Merge(file.location);
auto file_schema = file;
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
......@@ -374,13 +374,13 @@ Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::Date
file_schema.file_type = meta::TableFileSchema::TO_DELETE;
updated.push_back(file_schema);
LOG(DEBUG) << "Merging file " << file_schema.file_id;
index_size = index.Size();
index_size = index->Size();
if (index_size >= options_.index_trigger_size) break;
}
index.Serialize();
index->Serialize();
if (index_size >= options_.index_trigger_size) {
table_file.file_type = meta::TableFileSchema::TO_INDEX;
......@@ -391,15 +391,14 @@ Status DBImpl<EngineT>::MergeFiles(const std::string& table_id, const meta::Date
updated.push_back(table_file);
status = pMeta_->UpdateTableFiles(updated);
LOG(DEBUG) << "New merged file " << table_file.file_id <<
" of size=" << index.PhysicalSize()/(1024*1024) << " M";
" of size=" << index->PhysicalSize()/(1024*1024) << " M";
index.Cache();
index->Cache();
return status;
}
template<typename EngineT>
Status DBImpl<EngineT>::BackgroundMergeFiles(const std::string& table_id) {
Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
meta::DatePartionedTableFilesSchema raw_files;
auto status = pMeta_->FilesToMerge(table_id, raw_files);
if (!status.ok()) {
......@@ -426,8 +425,7 @@ Status DBImpl<EngineT>::BackgroundMergeFiles(const std::string& table_id) {
return Status::OK();
}
template<typename EngineT>
Status DBImpl<EngineT>::BuildIndex(const meta::TableFileSchema& file) {
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
meta::TableFileSchema table_file;
table_file.table_id = file.table_id;
table_file.date = file.date;
......@@ -436,11 +434,11 @@ Status DBImpl<EngineT>::BuildIndex(const meta::TableFileSchema& file) {
return status;
}
EngineT to_index(file.dimension, file.location);
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension, file.location, (EngineType)file.engine_type_);
to_index.Load();
to_index->Load();
auto start_time = METRICS_NOW_TIME;
auto index = to_index.BuildIndex(table_file.location);
auto index = to_index->BuildIndex(table_file.location);
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().BuildIndexDurationSecondsHistogramObserve(total_time);
......@@ -464,8 +462,7 @@ Status DBImpl<EngineT>::BuildIndex(const meta::TableFileSchema& file) {
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::BackgroundBuildIndex() {
void DBImpl::BackgroundBuildIndex() {
std::lock_guard<std::mutex> lock(build_index_mutex_);
assert(bg_build_index_started_);
meta::TableFilesSchema to_index_files;
......@@ -485,18 +482,16 @@ void DBImpl<EngineT>::BackgroundBuildIndex() {
bg_build_index_finish_signal_.notify_all();
}
template<typename EngineT>
Status DBImpl<EngineT>::TryBuildIndex() {
Status DBImpl::TryBuildIndex() {
if (bg_build_index_started_) return Status::OK();
if (shutting_down_.load(std::memory_order_acquire)) return Status::OK();
bg_build_index_started_ = true;
std::thread build_index_task(&DBImpl<EngineT>::BackgroundBuildIndex, this);
std::thread build_index_task(&DBImpl::BackgroundBuildIndex, this);
build_index_task.detach();
return Status::OK();
}
template<typename EngineT>
void DBImpl<EngineT>::BackgroundCompaction() {
void DBImpl::BackgroundCompaction() {
std::vector<std::string> table_ids;
pMemMgr_->Serialize(table_ids);
......@@ -510,18 +505,15 @@ void DBImpl<EngineT>::BackgroundCompaction() {
}
}
template<typename EngineT>
Status DBImpl<EngineT>::DropAll() {
Status DBImpl::DropAll() {
return pMeta_->DropAll();
}
template<typename EngineT>
Status DBImpl<EngineT>::Size(long& result) {
Status DBImpl::Size(long& result) {
return pMeta_->Size(result);
}
template<typename EngineT>
DBImpl<EngineT>::~DBImpl() {
DBImpl::~DBImpl() {
{
std::unique_lock<std::mutex> lock(mutex_);
shutting_down_.store(true, std::memory_order_release);
......
......@@ -8,12 +8,12 @@
#include "DB.h"
#include "MemManager.h"
#include "Types.h"
#include "Traits.h"
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
#include <thread>
namespace zilliz {
namespace vecwise {
......@@ -25,11 +25,10 @@ namespace meta {
class Meta;
}
template <typename EngineT>
class DBImpl : public DB {
public:
using MetaPtr = meta::Meta::Ptr;
using MemManagerPtr = typename MemManager<EngineT>::Ptr;
using MemManagerPtr = typename MemManager::Ptr;
DBImpl(const Options& options);
......@@ -100,5 +99,3 @@ private:
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#include "DBImpl.inl"
......@@ -7,6 +7,7 @@
#include "IDGenerator.h"
#include "Utils.h"
#include "MetaConsts.h"
#include "Factories.h"
#include "metrics/Metrics.h"
#include <unistd.h>
......@@ -33,10 +34,12 @@ inline auto StoragePrototype(const std::string &path) {
make_column("table_id", &TableSchema::table_id, unique()),
make_column("dimension", &TableSchema::dimension),
make_column("created_on", &TableSchema::created_on),
make_column("files_cnt", &TableSchema::files_cnt, default_value(0))),
make_column("files_cnt", &TableSchema::files_cnt, default_value(0)),
make_column("engine_type", &TableSchema::engine_type_)),
make_table("TableFile",
make_column("id", &TableFileSchema::id, primary_key()),
make_column("table_id", &TableFileSchema::table_id),
make_column("engine_type", &TableFileSchema::engine_type_),
make_column("file_id", &TableFileSchema::file_id),
make_column("file_type", &TableFileSchema::file_type),
make_column("size", &TableFileSchema::size, default_value(0)),
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "EngineFactory.h"
#include "FaissExecutionEngine.h"
#include "Log.h"
namespace zilliz {
namespace vecwise {
namespace engine {
ExecutionEnginePtr
EngineFactory::Build(uint16_t dimension,
const std::string& location,
EngineType type) {
switch(type) {
case EngineType::FAISS_IDMAP:
return ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, "IDMap", "IDMap,Flat"));
case EngineType::FAISS_IVFFLAT:
return ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, "IVF", "IDMap,Flat"));
default:
ENGINE_LOG_ERROR << "Unsupportted engine type";
return nullptr;
}
}
}
}
}
\ No newline at end of file
......@@ -5,22 +5,20 @@
******************************************************************************/
#pragma once
#include "Status.h"
#include "ExecutionEngine.h"
namespace zilliz {
namespace vecwise {
namespace engine {
struct IVFIndexTrait {
static const char* BuildIndexType;
static const char* RawIndexType;
class EngineFactory {
public:
static ExecutionEnginePtr Build(uint16_t dimension,
const std::string& location,
EngineType type);
};
struct IDMapIndexTrait {
static const char* BuildIndexType;
static const char* RawIndexType;
};
} // namespace engine
} // namespace vecwise
} // namespace zilliz
}
}
}
......@@ -11,8 +11,7 @@ namespace zilliz {
namespace vecwise {
namespace engine {
template<typename Derived>
Status ExecutionEngine<Derived>::AddWithIds(const std::vector<float>& vectors, const std::vector<long>& vector_ids) {
Status ExecutionEngine::AddWithIds(const std::vector<float>& vectors, const std::vector<long>& vector_ids) {
long n1 = (long)vectors.size();
long n2 = (long)vector_ids.size();
if (n1 != n2) {
......@@ -22,60 +21,6 @@ Status ExecutionEngine<Derived>::AddWithIds(const std::vector<float>& vectors, c
return AddWithIds(n1, vectors.data(), vector_ids.data());
}
template<typename Derived>
Status ExecutionEngine<Derived>::AddWithIds(long n, const float *xdata, const long *xids) {
return static_cast<Derived*>(this)->AddWithIds(n, xdata, xids);
}
template<typename Derived>
size_t ExecutionEngine<Derived>::Count() const {
return static_cast<Derived*>(this)->Count();
}
template<typename Derived>
size_t ExecutionEngine<Derived>::Size() const {
return static_cast<Derived*>(this)->Size();
}
template<typename Derived>
size_t ExecutionEngine<Derived>::PhysicalSize() const {
return static_cast<Derived*>(this)->PhysicalSize();
}
template<typename Derived>
Status ExecutionEngine<Derived>::Serialize() {
return static_cast<Derived*>(this)->Serialize();
}
template<typename Derived>
Status ExecutionEngine<Derived>::Load() {
return static_cast<Derived*>(this)->Load();
}
template<typename Derived>
Status ExecutionEngine<Derived>::Merge(const std::string& location) {
return static_cast<Derived*>(this)->Merge(location);
}
template<typename Derived>
Status ExecutionEngine<Derived>::Search(long n,
const float *data,
long k,
float *distances,
long *labels) const {
return static_cast<Derived*>(this)->Search(n, data, k, distances, labels);
}
template<typename Derived>
Status ExecutionEngine<Derived>::Cache() {
return static_cast<Derived*>(this)->Cache();
}
template<typename Derived>
std::shared_ptr<Derived> ExecutionEngine<Derived>::BuildIndex(const std::string& location) {
return static_cast<Derived*>(this)->BuildIndex(location);
}
} // namespace engine
} // namespace vecwise
......
......@@ -14,38 +14,44 @@ namespace zilliz {
namespace vecwise {
namespace engine {
template <typename Derived>
enum class EngineType {
FAISS_IDMAP = 1,
FAISS_IVFFLAT,
};
class ExecutionEngine {
public:
Status AddWithIds(const std::vector<float>& vectors,
const std::vector<long>& vector_ids);
virtual Status AddWithIds(const std::vector<float>& vectors,
const std::vector<long>& vector_ids);
Status AddWithIds(long n, const float *xdata, const long *xids);
virtual Status AddWithIds(long n, const float *xdata, const long *xids) = 0;
size_t Count() const;
virtual size_t Count() const = 0;
size_t Size() const;
virtual size_t Size() const = 0;
size_t PhysicalSize() const;
virtual size_t PhysicalSize() const = 0;
Status Serialize();
virtual Status Serialize() = 0;
Status Load();
virtual Status Load() = 0;
Status Merge(const std::string& location);
virtual Status Merge(const std::string& location) = 0;
Status Search(long n,
virtual Status Search(long n,
const float *data,
long k,
float *distances,
long *labels) const;
long *labels) const = 0;
std::shared_ptr<Derived> BuildIndex(const std::string&);
virtual std::shared_ptr<ExecutionEngine> BuildIndex(const std::string&) = 0;
Status Cache();
virtual Status Cache() = 0;
};
using ExecutionEnginePtr = std::shared_ptr<ExecutionEngine>;
} // namespace engine
} // namespace vecwise
......
......@@ -5,8 +5,6 @@
////////////////////////////////////////////////////////////////////////////////
#include "Factories.h"
#include "DBImpl.h"
#include "FaissExecutionEngine.h"
#include "Traits.h"
#include <stdlib.h>
#include <time.h>
......@@ -45,28 +43,14 @@ std::shared_ptr<meta::DBMetaImpl> DBMetaImplFactory::Build() {
return std::shared_ptr<meta::DBMetaImpl>(new meta::DBMetaImpl(options));
}
std::shared_ptr<DB> DBFactory::Build(const std::string& db_type) {
std::shared_ptr<DB> DBFactory::Build() {
auto options = OptionsFactory::Build();
auto db = DBFactory::Build(options, db_type);
auto db = DBFactory::Build(options);
return std::shared_ptr<DB>(db);
}
DB* DBFactory::Build(const Options& options, const std::string& db_type) {
std::stringstream ss(db_type);
std::string token;
std::vector<std::string> tokens;
while (std::getline(ss, token, ',')) {
tokens.push_back(token);
}
assert(tokens.size()==2);
assert(tokens[0]=="Faiss");
if (tokens[1] == "IVF") {
return new DBImpl<FaissExecutionEngine<IVFIndexTrait>>(options);
} else if (tokens[1] == "IDMap") {
return new DBImpl<FaissExecutionEngine<IDMapIndexTrait>>(options);
}
return nullptr;
DB* DBFactory::Build(const Options& options) {
return new DBImpl(options);
}
} // namespace engine
......
......@@ -8,6 +8,7 @@
#include "DB.h"
#include "DBMetaImpl.h"
#include "Options.h"
#include "ExecutionEngine.h"
#include <string>
#include <memory>
......@@ -29,8 +30,8 @@ struct DBMetaImplFactory {
};
struct DBFactory {
static std::shared_ptr<DB> Build(const std::string& db_type = "Faiss,IVF");
static DB* Build(const Options&, const std::string& db_type = "Faiss,IVF");
static std::shared_ptr<DB> Build();
static DB* Build(const Options&);
};
} // namespace engine
......
......@@ -3,8 +3,6 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "FaissExecutionEngine.h"
#include <easylogging++.h>
......@@ -23,47 +21,49 @@ namespace vecwise {
namespace engine {
template<class IndexTrait>
FaissExecutionEngine<IndexTrait>::FaissExecutionEngine(uint16_t dimension, const std::string& location)
: pIndex_(faiss::index_factory(dimension, IndexTrait::RawIndexType)),
location_(location) {
FaissExecutionEngine::FaissExecutionEngine(uint16_t dimension,
const std::string& location,
const std::string& build_index_type,
const std::string& raw_index_type)
: pIndex_(faiss::index_factory(dimension, raw_index_type.c_str())),
location_(location),
build_index_type_(build_index_type),
raw_index_type_(raw_index_type) {
}
template<class IndexTrait>
FaissExecutionEngine<IndexTrait>::FaissExecutionEngine(std::shared_ptr<faiss::Index> index, const std::string& location)
FaissExecutionEngine::FaissExecutionEngine(std::shared_ptr<faiss::Index> index,
const std::string& location,
const std::string& build_index_type,
const std::string& raw_index_type)
: pIndex_(index),
location_(location) {
location_(location),
build_index_type_(build_index_type),
raw_index_type_(raw_index_type) {
}
template<class IndexTrait>
Status FaissExecutionEngine<IndexTrait>::AddWithIds(long n, const float *xdata, const long *xids) {
Status FaissExecutionEngine::AddWithIds(long n, const float *xdata, const long *xids) {
pIndex_->add_with_ids(n, xdata, xids);
return Status::OK();
}
template<class IndexTrait>
size_t FaissExecutionEngine<IndexTrait>::Count() const {
size_t FaissExecutionEngine::Count() const {
return (size_t)(pIndex_->ntotal);
}
template<class IndexTrait>
size_t FaissExecutionEngine<IndexTrait>::Size() const {
size_t FaissExecutionEngine::Size() const {
return (size_t)(Count() * pIndex_->d)*sizeof(float);
}
template<class IndexTrait>
size_t FaissExecutionEngine<IndexTrait>::PhysicalSize() const {
size_t FaissExecutionEngine::PhysicalSize() const {
return (size_t)(Count() * pIndex_->d)*sizeof(float);
}
template<class IndexTrait>
Status FaissExecutionEngine<IndexTrait>::Serialize() {
Status FaissExecutionEngine::Serialize() {
write_index(pIndex_.get(), location_.c_str());
return Status::OK();
}
template<class IndexTrait>
Status FaissExecutionEngine<IndexTrait>::Load() {
Status FaissExecutionEngine::Load() {
auto index = zilliz::vecwise::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool to_cache = false;
auto start_time = METRICS_NOW_TIME;
......@@ -90,8 +90,7 @@ Status FaissExecutionEngine<IndexTrait>::Load() {
return Status::OK();
}
template<class IndexTrait>
Status FaissExecutionEngine<IndexTrait>::Merge(const std::string& location) {
Status FaissExecutionEngine::Merge(const std::string& location) {
if (location == location_) {
return Status::Error("Cannot Merge Self");
}
......@@ -105,12 +104,11 @@ Status FaissExecutionEngine<IndexTrait>::Merge(const std::string& location) {
return Status::OK();
}
template<class IndexTrait>
typename FaissExecutionEngine<IndexTrait>::Ptr
FaissExecutionEngine<IndexTrait>::BuildIndex(const std::string& location) {
ExecutionEnginePtr
FaissExecutionEngine::BuildIndex(const std::string& location) {
auto opd = std::make_shared<Operand>();
opd->d = pIndex_->d;
opd->index_type = IndexTrait::BuildIndexType;
opd->index_type = build_index_type_;
IndexBuilderPtr pBuilder = GetIndexBuilder(opd);
auto from_index = dynamic_cast<faiss::IndexIDMap*>(pIndex_.get());
......@@ -119,13 +117,12 @@ FaissExecutionEngine<IndexTrait>::BuildIndex(const std::string& location) {
dynamic_cast<faiss::IndexFlat*>(from_index->index)->xb.data(),
from_index->id_map.data());
Ptr new_ee(new FaissExecutionEngine<IndexTrait>(index->data(), location));
ExecutionEnginePtr new_ee(new FaissExecutionEngine(index->data(), location, build_index_type_, raw_index_type_));
new_ee->Serialize();
return new_ee;
}
template<class IndexTrait>
Status FaissExecutionEngine<IndexTrait>::Search(long n,
Status FaissExecutionEngine::Search(long n,
const float *data,
long k,
float *distances,
......@@ -135,8 +132,7 @@ Status FaissExecutionEngine<IndexTrait>::Search(long n,
return Status::OK();
}
template<class IndexTrait>
Status FaissExecutionEngine<IndexTrait>::Cache() {
Status FaissExecutionEngine::Cache() {
zilliz::vecwise::cache::CpuCacheMgr::GetInstance(
)->InsertItem(location_, std::make_shared<Index>(pIndex_));
......
......@@ -19,50 +19,52 @@ namespace vecwise {
namespace engine {
template<class IndexTrait>
class FaissExecutionEngine : public ExecutionEngine<FaissExecutionEngine<IndexTrait>> {
class FaissExecutionEngine : public ExecutionEngine {
public:
using Ptr = std::shared_ptr<FaissExecutionEngine<IndexTrait>>;
FaissExecutionEngine(uint16_t dimension, const std::string& location);
FaissExecutionEngine(std::shared_ptr<faiss::Index> index, const std::string& location);
FaissExecutionEngine(uint16_t dimension,
const std::string& location,
const std::string& build_index_type,
const std::string& raw_index_type);
Status AddWithIds(const std::vector<float>& vectors,
const std::vector<long>& vector_ids);
FaissExecutionEngine(std::shared_ptr<faiss::Index> index,
const std::string& location,
const std::string& build_index_type,
const std::string& raw_index_type);
Status AddWithIds(long n, const float *xdata, const long *xids);
Status AddWithIds(long n, const float *xdata, const long *xids) override;
size_t Count() const;
size_t Count() const override;
size_t Size() const;
size_t Size() const override;
size_t PhysicalSize() const;
size_t PhysicalSize() const override;
Status Serialize();
Status Serialize() override;
Status Load();
Status Load() override;
Status Merge(const std::string& location);
Status Merge(const std::string& location) override;
Status Search(long n,
const float *data,
long k,
float *distances,
long *labels) const;
long *labels) const override;
Ptr BuildIndex(const std::string&);
ExecutionEnginePtr BuildIndex(const std::string&) override;
Status Cache();
Status Cache() override;
protected:
std::shared_ptr<faiss::Index> pIndex_;
std::string location_;
std::string build_index_type_;
std::string raw_index_type_;
};
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#include "FaissExecutionEngine.inl"
......@@ -3,18 +3,24 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Traits.h"
#pragma once
#include <easylogging++.h>
namespace zilliz {
namespace vecwise {
namespace engine {
const char* IVFIndexTrait::BuildIndexType = "IVF";
const char* IVFIndexTrait::RawIndexType = "IDMap,Flat";
#define ENGINE_DOMAIN_NAME "[ENGINE] "
#define ENGINE_ERROR_TEXT "ENGINE Error:"
const char* IDMapIndexTrait::BuildIndexType = "IDMap";
const char* IDMapIndexTrait::RawIndexType = "IDMap,Flat";
#define ENGINE_LOG_TRACE LOG(TRACE) << ENGINE_DOMAIN_NAME
#define ENGINE_LOG_DEBUG LOG(DEBUG) << ENGINE_DOMAIN_NAME
#define ENGINE_LOG_INFO LOG(INFO) << ENGINE_DOMAIN_NAME
#define ENGINE_LOG_WARNING LOG(WARNING) << ENGINE_DOMAIN_NAME
#define ENGINE_LOG_ERROR LOG(ERROR) << ENGINE_DOMAIN_NAME
#define ENGINE_LOG_FATAL LOG(FATAL) << ENGINE_DOMAIN_NAME
} // namespace engine
} // namespace vecwise
} // namespace sql
} // namespace zilliz
} // namespace server
......@@ -3,11 +3,10 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "MemManager.h"
#include "Meta.h"
#include "MetaConsts.h"
#include "EngineFactory.h"
#include "metrics/Metrics.h"
#include <iostream>
......@@ -19,34 +18,29 @@ namespace zilliz {
namespace vecwise {
namespace engine {
template<typename EngineT>
MemVectors<EngineT>::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
MemVectors::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
const meta::TableFileSchema& schema, const Options& options)
: pMeta_(meta_ptr),
options_(options),
schema_(schema),
pIdGenerator_(new SimpleIDGenerator()),
pEE_(new EngineT(schema_.dimension, schema_.location)) {
pEE_(EngineFactory::Build(schema_.dimension, schema_.location, (EngineType)schema_.engine_type_)) {
}
template<typename EngineT>
void MemVectors<EngineT>::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
void MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
pIdGenerator_->GetNextIDNumbers(n_, vector_ids_);
pEE_->AddWithIds(n_, vectors_, vector_ids_.data());
}
template<typename EngineT>
size_t MemVectors<EngineT>::Total() const {
size_t MemVectors::Total() const {
return pEE_->Count();
}
template<typename EngineT>
size_t MemVectors<EngineT>::ApproximateSize() const {
size_t MemVectors::ApproximateSize() const {
return pEE_->Size();
}
template<typename EngineT>
Status MemVectors<EngineT>::Serialize(std::string& table_id) {
Status MemVectors::Serialize(std::string& table_id) {
table_id = schema_.table_id;
auto size = ApproximateSize();
auto start_time = METRICS_NOW_TIME;
......@@ -70,8 +64,7 @@ Status MemVectors<EngineT>::Serialize(std::string& table_id) {
return status;
}
template<typename EngineT>
MemVectors<EngineT>::~MemVectors() {
MemVectors::~MemVectors() {
if (pIdGenerator_ != nullptr) {
delete pIdGenerator_;
pIdGenerator_ = nullptr;
......@@ -81,9 +74,7 @@ MemVectors<EngineT>::~MemVectors() {
/*
* MemManager
*/
template<typename EngineT>
typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::GetMemByTable(
MemManager::MemVectorsPtr MemManager::GetMemByTable(
const std::string& table_id) {
auto memIt = memMap_.find(table_id);
if (memIt != memMap_.end()) {
......@@ -97,12 +88,11 @@ typename MemManager<EngineT>::MemVectorsPtr MemManager<EngineT>::GetMemByTable(
return nullptr;
}
memMap_[table_id] = MemVectorsPtr(new MemVectors<EngineT>(pMeta_, table_file, options_));
memMap_[table_id] = MemVectorsPtr(new MemVectors(pMeta_, table_file, options_));
return memMap_[table_id];
}
template<typename EngineT>
Status MemManager<EngineT>::InsertVectors(const std::string& table_id_,
Status MemManager::InsertVectors(const std::string& table_id_,
size_t n_,
const float* vectors_,
IDNumbers& vector_ids_) {
......@@ -110,8 +100,7 @@ Status MemManager<EngineT>::InsertVectors(const std::string& table_id_,
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
}
template<typename EngineT>
Status MemManager<EngineT>::InsertVectorsNoLock(const std::string& table_id,
Status MemManager::InsertVectorsNoLock(const std::string& table_id,
size_t n,
const float* vectors,
IDNumbers& vector_ids) {
......@@ -124,8 +113,7 @@ Status MemManager<EngineT>::InsertVectorsNoLock(const std::string& table_id,
return Status::OK();
}
template<typename EngineT>
Status MemManager<EngineT>::ToImmutable() {
Status MemManager::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
for (auto& kv: memMap_) {
immMems_.push_back(kv.second);
......@@ -135,8 +123,7 @@ Status MemManager<EngineT>::ToImmutable() {
return Status::OK();
}
template<typename EngineT>
Status MemManager<EngineT>::Serialize(std::vector<std::string>& table_ids) {
Status MemManager::Serialize(std::vector<std::string>& table_ids) {
ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_);
std::string table_id;
......
......@@ -5,6 +5,7 @@
******************************************************************************/
#pragma once
#include "ExecutionEngine.h"
#include "IDGenerator.h"
#include "Status.h"
#include "Meta.h"
......@@ -23,12 +24,10 @@ namespace meta {
class Meta;
}
template <typename EngineT>
class MemVectors {
public:
using EnginePtr = typename EngineT::Ptr;
using MetaPtr = meta::Meta::Ptr;
using Ptr = std::shared_ptr<MemVectors<EngineT>>;
using Ptr = std::shared_ptr<MemVectors>;
explicit MemVectors(const std::shared_ptr<meta::Meta>&,
const meta::TableFileSchema&, const Options&);
......@@ -54,18 +53,17 @@ private:
Options options_;
meta::TableFileSchema schema_;
IDGenerator* pIdGenerator_;
EnginePtr pEE_;
ExecutionEnginePtr pEE_;
}; // MemVectors
template<typename EngineT>
class MemManager {
public:
using MetaPtr = meta::Meta::Ptr;
using MemVectorsPtr = typename MemVectors<EngineT>::Ptr;
using Ptr = std::shared_ptr<MemManager<EngineT>>;
using MemVectorsPtr = typename MemVectors::Ptr;
using Ptr = std::shared_ptr<MemManager>;
MemManager(const std::shared_ptr<meta::Meta>& meta, const Options& options)
: pMeta_(meta), options_(options) {}
......@@ -96,4 +94,3 @@ private:
} // namespace engine
} // namespace vecwise
} // namespace zilliz
#include "MemManager.inl"
......@@ -5,6 +5,8 @@
******************************************************************************/
#pragma once
#include "ExecutionEngine.h"
#include <vector>
#include <map>
#include <string>
......@@ -25,6 +27,7 @@ struct TableSchema {
uint16_t dimension;
std::string location;
long created_on;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
}; // TableSchema
struct TableFileSchema {
......@@ -38,6 +41,7 @@ struct TableFileSchema {
size_t id;
std::string table_id;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
std::string file_id;
int file_type = NEW;
size_t size;
......
......@@ -10,6 +10,7 @@
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "metrics/Metrics.h"
#include "db/EngineFactory.h"
namespace zilliz {
namespace vecwise {
......@@ -79,7 +80,9 @@ SearchScheduler::IndexLoadWorker() {
server::TimeRecorder rc("Load index");
//load index
IndexEnginePtr index_ptr = std::make_shared<IndexClass>(context->file_->dimension, context->file_->location);
ExecutionEnginePtr index_ptr = EngineFactory::Build(context->file_->dimension,
context->file_->location,
(EngineType)context->file_->engine_type_);
index_ptr->Load();
rc.Record("load index file to memory");
......@@ -111,7 +114,7 @@ SearchScheduler::IndexLoadWorker() {
}
//put search task to another queue
SearchTaskPtr task_ptr = std::make_shared<SearchTaskClass>();
SearchTaskPtr task_ptr = std::make_shared<SearchTask>();
task_ptr->index_id_ = context->file_->id;
task_ptr->index_type_ = context->file_->file_type;
task_ptr->index_engine_ = index_ptr;
......
......@@ -3,8 +3,6 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "SearchTaskQueue.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
......@@ -70,8 +68,7 @@ SearchTaskQueue::GetInstance() {
return instance;
}
template<typename trait>
bool SearchTask<trait>::DoSearch() {
bool SearchTask::DoSearch() {
if(index_engine_ == nullptr) {
return false;
}
......
......@@ -7,8 +7,7 @@
#include "SearchContext.h"
#include "utils/BlockingQueue.h"
#include "../FaissExecutionEngine.h"
#include "../Traits.h"
#include "db/ExecutionEngine.h"
#include <memory>
......@@ -16,16 +15,6 @@ namespace zilliz {
namespace vecwise {
namespace engine {
#ifdef GPU_VERSION
using IndexTraitClass = IVFIndexTrait;
#else
using IndexTraitClass = IDMapIndexTrait;
#endif
using IndexClass = FaissExecutionEngine<IndexTraitClass>;
using IndexEnginePtr = std::shared_ptr<IndexClass>;
template <typename trait>
class SearchTask {
public:
bool DoSearch();
......@@ -33,12 +22,11 @@ public:
public:
size_t index_id_ = 0;
int index_type_ = 0; //for metrics
IndexEnginePtr index_engine_;
ExecutionEnginePtr index_engine_;
std::vector<SearchContextPtr> search_contexts_;
};
using SearchTaskClass = SearchTask<IndexTraitClass>;
using SearchTaskPtr = std::shared_ptr<SearchTaskClass>;
using SearchTaskPtr = std::shared_ptr<SearchTask>;
class SearchTaskQueue : public server::BlockingQueue<SearchTaskPtr> {
private:
......@@ -58,6 +46,4 @@ private:
}
}
}
#include "SearchTaskQueue.inl"
\ No newline at end of file
}
\ No newline at end of file
......@@ -38,7 +38,7 @@ engine::Options DBTest::GetOptions() {
void DBTest::SetUp() {
InitLog();
auto options = GetOptions();
db_ = engine::DBFactory::Build(options, "Faiss,IDMap");
db_ = engine::DBFactory::Build(options);
}
void DBTest::TearDown() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册