diff --git a/cpp/src/db/Constants.h b/cpp/src/db/Constants.h index 1ba02b1d5580ba10448f95eac7b145999b24be74..055b10ca9a9156d1809dca519738c72aadc32b8c 100644 --- a/cpp/src/db/Constants.h +++ b/cpp/src/db/Constants.h @@ -10,9 +10,9 @@ namespace milvus { namespace engine { const size_t K = 1024UL; -const size_t M = K*K; -const size_t G = K*M; -const size_t T = K*G; +const size_t M = K * K; +const size_t G = K * M; +const size_t T = K * G; const size_t MAX_TABLE_FILE_MEM = 128 * M; diff --git a/cpp/src/db/Factories.cpp b/cpp/src/db/Factories.cpp index abcc0821ab74aa227b884970acbdbce37587d1de..65c7484a502ae5e88ed77f9eba1ccc80890858f4 100644 --- a/cpp/src/db/Factories.cpp +++ b/cpp/src/db/Factories.cpp @@ -22,6 +22,8 @@ namespace zilliz { namespace milvus { namespace engine { +#define USE_NEW_MEM_MANAGER 1 + DBMetaOptions DBMetaOptionsFactory::Build(const std::string& path) { auto p = path; if(p == "") { @@ -74,17 +76,14 @@ std::shared_ptr DBMetaImplFactory::Build(const DBMetaOptions& metaOp if (dialect.find("mysql") != std::string::npos) { ENGINE_LOG_INFO << "Using MySQL"; return std::make_shared(meta::MySQLMetaImpl(metaOptions, mode)); - } - else if (dialect.find("sqlite") != std::string::npos) { - ENGINE_LOG_DEBUG << "Using SQLite"; + } else if (dialect.find("sqlite") != std::string::npos) { + ENGINE_LOG_INFO << "Using SQLite"; return std::make_shared(meta::DBMetaImpl(metaOptions)); - } - else { + } else { ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect; throw InvalidArgumentException("URI dialect is not mysql / sqlite"); } - } - else { + } else { ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri; throw InvalidArgumentException("Wrong URI format "); } @@ -102,11 +101,11 @@ DB* DBFactory::Build(const Options& options) { MemManagerAbstractPtr MemManagerFactory::Build(const std::shared_ptr& meta, const Options& options) { - bool useNew = true; - if (useNew) { - return std::make_shared(meta, options); - } +#ifdef USE_NEW_MEM_MANAGER + return std::make_shared(meta, options); +#else return std::make_shared(meta, options); +#endif } } // namespace engine diff --git a/cpp/src/db/Factories.h b/cpp/src/db/Factories.h index 567bc0a8bcd01e394e345feca7e6eeb22e85cb58..8b6e7b100f6fa86863508091c1dc07b3969199c8 100644 --- a/cpp/src/db/Factories.h +++ b/cpp/src/db/Factories.h @@ -15,12 +15,13 @@ #include #include + namespace zilliz { namespace milvus { namespace engine { struct DBMetaOptionsFactory { - static DBMetaOptions Build(const std::string& path = ""); + static DBMetaOptions Build(const std::string &path = ""); }; struct OptionsFactory { @@ -29,16 +30,16 @@ struct OptionsFactory { struct DBMetaImplFactory { static std::shared_ptr Build(); - static std::shared_ptr Build(const DBMetaOptions& metaOptions, const int& mode); + static std::shared_ptr Build(const DBMetaOptions &metaOptions, const int &mode); }; struct DBFactory { static std::shared_ptr Build(); - static DB* Build(const Options&); + static DB *Build(const Options &); }; struct MemManagerFactory { - static MemManagerAbstractPtr Build(const std::shared_ptr& meta, const Options& options); + static MemManagerAbstractPtr Build(const std::shared_ptr &meta, const Options &options); }; } // namespace engine diff --git a/cpp/src/db/MemManager.cpp b/cpp/src/db/MemManager.cpp index ba8517cdbd3b9dc8ed77ab3bebbd14084231f8a6..dbf0703173a12c223664a8f3607efe6e74e6608b 100644 --- a/cpp/src/db/MemManager.cpp +++ b/cpp/src/db/MemManager.cpp @@ -15,22 +15,23 @@ #include #include + namespace zilliz { namespace milvus { namespace engine { -MemVectors::MemVectors(const std::shared_ptr& meta_ptr, - const meta::TableFileSchema& schema, const Options& options) - : meta_(meta_ptr), - options_(options), - schema_(schema), - id_generator_(new SimpleIDGenerator()), - active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) { +MemVectors::MemVectors(const std::shared_ptr &meta_ptr, + const meta::TableFileSchema &schema, const Options &options) + : meta_(meta_ptr), + options_(options), + schema_(schema), + id_generator_(new SimpleIDGenerator()), + active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType) schema_.engine_type_)) { } -Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) { - if(active_engine_ == nullptr) { +Status MemVectors::Add(size_t n_, const float *vectors_, IDNumbers &vector_ids_) { + if (active_engine_ == nullptr) { return Status::Error("index engine is null"); } @@ -39,13 +40,15 @@ Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) Status status = active_engine_->AddWithIds(n_, vectors_, vector_ids_.data()); auto end_time = METRICS_NOW_TIME; auto total_time = METRICS_MICROSECONDS(start_time, end_time); - server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), static_cast(schema_.dimension_), total_time); + server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), + static_cast(schema_.dimension_), + total_time); return status; } size_t MemVectors::RowCount() const { - if(active_engine_ == nullptr) { + if (active_engine_ == nullptr) { return 0; } @@ -53,15 +56,15 @@ size_t MemVectors::RowCount() const { } size_t MemVectors::Size() const { - if(active_engine_ == nullptr) { + if (active_engine_ == nullptr) { return 0; } return active_engine_->Size(); } -Status MemVectors::Serialize(std::string& table_id) { - if(active_engine_ == nullptr) { +Status MemVectors::Serialize(std::string &table_id) { + if (active_engine_ == nullptr) { return Status::Error("index engine is null"); } @@ -73,15 +76,16 @@ Status MemVectors::Serialize(std::string& table_id) { auto total_time = METRICS_MICROSECONDS(start_time, end_time); schema_.size_ = size; - server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size/total_time); + server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size / total_time); schema_.file_type_ = (size >= options_.index_trigger_size) ? - meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; + meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; auto status = meta_->UpdateTableFile(schema_); LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index") - << " file " << schema_.file_id_ << " of size " << (double)(active_engine_->Size()) / (double)meta::M << " M"; + << " file " << schema_.file_id_ << " of size " << (double) (active_engine_->Size()) / (double) meta::M + << " M"; active_engine_->Cache(); @@ -99,7 +103,7 @@ MemVectors::~MemVectors() { * MemManager */ MemManager::MemVectorsPtr MemManager::GetMemByTable( - const std::string& table_id) { + const std::string &table_id) { auto memIt = mem_id_map_.find(table_id); if (memIt != mem_id_map_.end()) { return memIt->second; @@ -116,22 +120,23 @@ MemManager::MemVectorsPtr MemManager::GetMemByTable( return mem_id_map_[table_id]; } -Status MemManager::InsertVectors(const std::string& table_id_, - size_t n_, - const float* vectors_, - IDNumbers& vector_ids_) { +Status MemManager::InsertVectors(const std::string &table_id_, + size_t n_, + const float *vectors_, + IDNumbers &vector_ids_) { + + LOG(DEBUG) << "MemManager::InsertVectors: mutable mem = " << GetCurrentMutableMem() << + ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + std::unique_lock lock(mutex_); return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_); } -Status MemManager::InsertVectorsNoLock(const std::string& table_id, - size_t n, - const float* vectors, - IDNumbers& vector_ids) { - - LOG(DEBUG) << "MemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() << - ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); +Status MemManager::InsertVectorsNoLock(const std::string &table_id, + size_t n, + const float *vectors, + IDNumbers &vector_ids) { MemVectorsPtr mem = GetMemByTable(table_id); if (mem == nullptr) { @@ -139,7 +144,7 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id, } //makesure each file size less than index_trigger_size - if(mem->Size() > options_.index_trigger_size) { + if (mem->Size() > options_.index_trigger_size) { std::unique_lock lock(serialization_mtx_); immu_mem_list_.push_back(mem); mem_id_map_.erase(table_id); @@ -152,8 +157,8 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id, Status MemManager::ToImmutable() { std::unique_lock lock(mutex_); MemIdMap temp_map; - for (auto& kv: mem_id_map_) { - if(kv.second->RowCount() == 0) { + for (auto &kv: mem_id_map_) { + if (kv.second->RowCount() == 0) { temp_map.insert(kv); continue;//empty vector, no need to serialize } @@ -164,12 +169,12 @@ Status MemManager::ToImmutable() { return Status::OK(); } -Status MemManager::Serialize(std::set& table_ids) { +Status MemManager::Serialize(std::set &table_ids) { ToImmutable(); std::unique_lock lock(serialization_mtx_); std::string table_id; table_ids.clear(); - for (auto& mem : immu_mem_list_) { + for (auto &mem : immu_mem_list_) { mem->Serialize(table_id); table_ids.insert(table_id); } @@ -177,7 +182,7 @@ Status MemManager::Serialize(std::set& table_ids) { return Status::OK(); } -Status MemManager::EraseMemVector(const std::string& table_id) { +Status MemManager::EraseMemVector(const std::string &table_id) { {//erase MemVector from rapid-insert cache std::unique_lock lock(mutex_); mem_id_map_.erase(table_id); @@ -186,8 +191,8 @@ Status MemManager::EraseMemVector(const std::string& table_id) { {//erase MemVector from serialize cache std::unique_lock lock(serialization_mtx_); MemList temp_list; - for (auto& mem : immu_mem_list_) { - if(mem->TableId() != table_id) { + for (auto &mem : immu_mem_list_) { + if (mem->TableId() != table_id) { temp_list.push_back(mem); } } @@ -199,7 +204,7 @@ Status MemManager::EraseMemVector(const std::string& table_id) { size_t MemManager::GetCurrentMutableMem() { size_t totalMem = 0; - for (auto& kv : mem_id_map_) { + for (auto &kv : mem_id_map_) { auto memVector = kv.second; totalMem += memVector->Size(); } @@ -208,7 +213,7 @@ size_t MemManager::GetCurrentMutableMem() { size_t MemManager::GetCurrentImmutableMem() { size_t totalMem = 0; - for (auto& memVector : immu_mem_list_) { + for (auto &memVector : immu_mem_list_) { totalMem += memVector->Size(); } return totalMem; diff --git a/cpp/src/db/MemManager.h b/cpp/src/db/MemManager.h index e8460c7a6dccf6dc2a1cda6386e23b7c6401c55e..5ad3d08b630cbf77ab7c7185b656e4851363f38e 100644 --- a/cpp/src/db/MemManager.h +++ b/cpp/src/db/MemManager.h @@ -17,45 +17,46 @@ #include #include + namespace zilliz { namespace milvus { namespace engine { namespace meta { - class Meta; +class Meta; } class MemVectors { -public: + public: using MetaPtr = meta::Meta::Ptr; using Ptr = std::shared_ptr; - explicit MemVectors(const std::shared_ptr&, - const meta::TableFileSchema&, const Options&); + explicit MemVectors(const std::shared_ptr &, + const meta::TableFileSchema &, const Options &); - Status Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_); + Status Add(size_t n_, const float *vectors_, IDNumbers &vector_ids_); size_t RowCount() const; size_t Size() const; - Status Serialize(std::string& table_id); + Status Serialize(std::string &table_id); ~MemVectors(); - const std::string& Location() const { return schema_.location_; } + const std::string &Location() const { return schema_.location_; } std::string TableId() const { return schema_.table_id_; } -private: + private: MemVectors() = delete; - MemVectors(const MemVectors&) = delete; - MemVectors& operator=(const MemVectors&) = delete; + MemVectors(const MemVectors &) = delete; + MemVectors &operator=(const MemVectors &) = delete; MetaPtr meta_; Options options_; meta::TableFileSchema schema_; - IDGenerator* id_generator_; + IDGenerator *id_generator_; ExecutionEnginePtr active_engine_; }; // MemVectors @@ -63,20 +64,20 @@ private: class MemManager : public MemManagerAbstract { -public: + public: using MetaPtr = meta::Meta::Ptr; using MemVectorsPtr = typename MemVectors::Ptr; using Ptr = std::shared_ptr; - MemManager(const std::shared_ptr& meta, const Options& options) + MemManager(const std::shared_ptr &meta, const Options &options) : meta_(meta), options_(options) {} - Status InsertVectors(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids) override; + Status InsertVectors(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids) override; - Status Serialize(std::set& table_ids) override; + Status Serialize(std::set &table_ids) override; - Status EraseMemVector(const std::string& table_id) override; + Status EraseMemVector(const std::string &table_id) override; size_t GetCurrentMutableMem() override; @@ -84,11 +85,11 @@ public: size_t GetCurrentMem() override; -private: - MemVectorsPtr GetMemByTable(const std::string& table_id); + private: + MemVectorsPtr GetMemByTable(const std::string &table_id); - Status InsertVectorsNoLock(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids); + Status InsertVectorsNoLock(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids); Status ToImmutable(); using MemIdMap = std::map; diff --git a/cpp/src/db/MemManagerAbstract.h b/cpp/src/db/MemManagerAbstract.h index 58c73ba6f85163e19faf42bc1873ccf29ea38743..943c454e465bda005a2e28d44c6b95708dffacc4 100644 --- a/cpp/src/db/MemManagerAbstract.h +++ b/cpp/src/db/MemManagerAbstract.h @@ -2,19 +2,20 @@ #include + namespace zilliz { namespace milvus { namespace engine { class MemManagerAbstract { -public: + public: - virtual Status InsertVectors(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids) = 0; + virtual Status InsertVectors(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids) = 0; - virtual Status Serialize(std::set& table_ids) = 0; + virtual Status Serialize(std::set &table_ids) = 0; - virtual Status EraseMemVector(const std::string& table_id) = 0; + virtual Status EraseMemVector(const std::string &table_id) = 0; virtual size_t GetCurrentMutableMem() = 0; diff --git a/cpp/src/db/MemTable.cpp b/cpp/src/db/MemTable.cpp index ba3875fbb5e3fe5bc9ec6cc0efda7451c2ff5dba..e05aa058ac7f23cb1691736b1ecdf8118a54639d 100644 --- a/cpp/src/db/MemTable.cpp +++ b/cpp/src/db/MemTable.cpp @@ -1,46 +1,50 @@ #include "MemTable.h" #include "Log.h" + namespace zilliz { namespace milvus { namespace engine { -MemTable::MemTable(const std::string& table_id, - const std::shared_ptr& meta, - const Options& options) : - table_id_(table_id), - meta_(meta), - options_(options) { +MemTable::MemTable(const std::string &table_id, + const std::shared_ptr &meta, + const Options &options) : + table_id_(table_id), + meta_(meta), + options_(options) { } -Status MemTable::Add(VectorSource::Ptr& source) { +Status MemTable::Add(VectorSource::Ptr &source) { + while (!source->AllAdded()) { - MemTableFile::Ptr currentMemTableFile; + + MemTableFile::Ptr current_mem_table_file; if (!mem_table_file_list_.empty()) { - currentMemTableFile = mem_table_file_list_.back(); + current_mem_table_file = mem_table_file_list_.back(); } + Status status; - if (mem_table_file_list_.empty() || currentMemTableFile->IsFull()) { - MemTableFile::Ptr newMemTableFile = std::make_shared(table_id_, meta_, options_); - status = newMemTableFile->Add(source); + if (mem_table_file_list_.empty() || current_mem_table_file->IsFull()) { + MemTableFile::Ptr new_mem_table_file = std::make_shared(table_id_, meta_, options_); + status = new_mem_table_file->Add(source); if (status.ok()) { - mem_table_file_list_.emplace_back(newMemTableFile); + mem_table_file_list_.emplace_back(new_mem_table_file); } + } else { + status = current_mem_table_file->Add(source); } - else { - status = currentMemTableFile->Add(source); - } + if (!status.ok()) { - std::string errMsg = "MemTable::Add failed: " + status.ToString(); - ENGINE_LOG_ERROR << errMsg; - return Status::Error(errMsg); + std::string err_msg = "MemTable::Add failed: " + status.ToString(); + ENGINE_LOG_ERROR << err_msg; + return Status::Error(err_msg); } } return Status::OK(); } -void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) { +void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr &mem_table_file) { mem_table_file = mem_table_file_list_.back(); } @@ -49,15 +53,15 @@ size_t MemTable::GetTableFileCount() { } Status MemTable::Serialize() { - for (auto memTableFile = mem_table_file_list_.begin(); memTableFile != mem_table_file_list_.end(); ) { - auto status = (*memTableFile)->Serialize(); + for (auto mem_table_file = mem_table_file_list_.begin(); mem_table_file != mem_table_file_list_.end();) { + auto status = (*mem_table_file)->Serialize(); if (!status.ok()) { - std::string errMsg = "MemTable::Serialize failed: " + status.ToString(); - ENGINE_LOG_ERROR << errMsg; - return Status::Error(errMsg); + std::string err_msg = "MemTable::Serialize failed: " + status.ToString(); + ENGINE_LOG_ERROR << err_msg; + return Status::Error(err_msg); } std::lock_guard lock(mutex_); - memTableFile = mem_table_file_list_.erase(memTableFile); + mem_table_file = mem_table_file_list_.erase(mem_table_file); } return Status::OK(); } @@ -66,17 +70,17 @@ bool MemTable::Empty() { return mem_table_file_list_.empty(); } -const std::string& MemTable::GetTableId() const { +const std::string &MemTable::GetTableId() const { return table_id_; } size_t MemTable::GetCurrentMem() { std::lock_guard lock(mutex_); - size_t totalMem = 0; - for (auto& memTableFile : mem_table_file_list_) { - totalMem += memTableFile->GetCurrentMem(); + size_t total_mem = 0; + for (auto &mem_table_file : mem_table_file_list_) { + total_mem += mem_table_file->GetCurrentMem(); } - return totalMem; + return total_mem; } } // namespace engine diff --git a/cpp/src/db/MemTable.h b/cpp/src/db/MemTable.h index 9bae932e62e13fe2b203f6e1c17b4493d84bbfe9..198fcc228ab9b99b7c7a31781a30ba442f8d595a 100644 --- a/cpp/src/db/MemTable.h +++ b/cpp/src/db/MemTable.h @@ -6,23 +6,24 @@ #include + namespace zilliz { namespace milvus { namespace engine { class MemTable { -public: + public: using Ptr = std::shared_ptr; using MemTableFileList = std::vector; using MetaPtr = meta::Meta::Ptr; - MemTable(const std::string& table_id, const std::shared_ptr& meta, const Options& options); + MemTable(const std::string &table_id, const std::shared_ptr &meta, const Options &options); - Status Add(VectorSource::Ptr& source); + Status Add(VectorSource::Ptr &source); - void GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file); + void GetCurrentMemTableFile(MemTableFile::Ptr &mem_table_file); size_t GetTableFileCount(); @@ -30,11 +31,11 @@ public: bool Empty(); - const std::string& GetTableId() const; + const std::string &GetTableId() const; size_t GetCurrentMem(); -private: + private: const std::string table_id_; MemTableFileList mem_table_file_list_; diff --git a/cpp/src/db/MemTableFile.cpp b/cpp/src/db/MemTableFile.cpp index 0ff91de00beb73707386bd2422ea2f000318caea..649a680cf3efba076fffdab47896ac22b986dd6e 100644 --- a/cpp/src/db/MemTableFile.cpp +++ b/cpp/src/db/MemTableFile.cpp @@ -6,23 +6,24 @@ #include + namespace zilliz { namespace milvus { namespace engine { -MemTableFile::MemTableFile(const std::string& table_id, - const std::shared_ptr& meta, - const Options& options) : - table_id_(table_id), - meta_(meta), - options_(options) { +MemTableFile::MemTableFile(const std::string &table_id, + const std::shared_ptr &meta, + const Options &options) : + table_id_(table_id), + meta_(meta), + options_(options) { current_mem_ = 0; auto status = CreateTableFile(); if (status.ok()) { execution_engine_ = EngineFactory::Build(table_file_schema_.dimension_, table_file_schema_.location_, - (EngineType)table_file_schema_.engine_type_); + (EngineType) table_file_schema_.engine_type_); } } @@ -33,31 +34,30 @@ Status MemTableFile::CreateTableFile() { auto status = meta_->CreateTableFile(table_file_schema); if (status.ok()) { table_file_schema_ = table_file_schema; - } - else { - std::string errMsg = "MemTableFile::CreateTableFile failed: " + status.ToString(); - ENGINE_LOG_ERROR << errMsg; + } else { + std::string err_msg = "MemTableFile::CreateTableFile failed: " + status.ToString(); + ENGINE_LOG_ERROR << err_msg; } return status; } -Status MemTableFile::Add(const VectorSource::Ptr& source) { +Status MemTableFile::Add(const VectorSource::Ptr &source) { if (table_file_schema_.dimension_ <= 0) { - std::string errMsg = "MemTableFile::Add: table_file_schema dimension = " + - std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_; - ENGINE_LOG_ERROR << errMsg; - return Status::Error(errMsg); + std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " + + std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_; + ENGINE_LOG_ERROR << err_msg; + return Status::Error(err_msg); } - size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; - size_t memLeft = GetMemLeft(); - if (memLeft >= singleVectorMemSize) { - size_t numVectorsToAdd = std::ceil(memLeft / singleVectorMemSize); - size_t numVectorsAdded; - auto status = source->Add(execution_engine_, table_file_schema_, numVectorsToAdd, numVectorsAdded); + size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; + size_t mem_left = GetMemLeft(); + if (mem_left >= single_vector_mem_size) { + size_t num_vectors_to_add = std::ceil(mem_left / single_vector_mem_size); + size_t num_vectors_added; + auto status = source->Add(execution_engine_, table_file_schema_, num_vectors_to_add, num_vectors_added); if (status.ok()) { - current_mem_ += (numVectorsAdded * singleVectorMemSize); + current_mem_ += (num_vectors_added * single_vector_mem_size); } return status; } @@ -73,8 +73,8 @@ size_t MemTableFile::GetMemLeft() { } bool MemTableFile::IsFull() { - size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; - return (GetMemLeft() < singleVectorMemSize); + size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; + return (GetMemLeft() < single_vector_mem_size); } Status MemTableFile::Serialize() { @@ -88,15 +88,15 @@ Status MemTableFile::Serialize() { auto total_time = METRICS_MICROSECONDS(start_time, end_time); table_file_schema_.size_ = size; - server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double)size/total_time); + server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time); table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ? - meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; + meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; auto status = meta_->UpdateTableFile(table_file_schema_); LOG(DEBUG) << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index") - << " file " << table_file_schema_.file_id_ << " of size " << (double)size / (double)M << " M"; + << " file " << table_file_schema_.file_id_ << " of size " << (double) size / (double) M << " M"; execution_engine_->Cache(); diff --git a/cpp/src/db/MemTableFile.h b/cpp/src/db/MemTableFile.h index 1be0ae78ba7bbaff2078e8c2fef14fcd0afc6349..4d0011b36233f769e7aaf945a3fa0b5bb54c86a2 100644 --- a/cpp/src/db/MemTableFile.h +++ b/cpp/src/db/MemTableFile.h @@ -5,20 +5,21 @@ #include "VectorSource.h" #include "ExecutionEngine.h" + namespace zilliz { namespace milvus { namespace engine { class MemTableFile { -public: + public: using Ptr = std::shared_ptr; using MetaPtr = meta::Meta::Ptr; - MemTableFile(const std::string& table_id, const std::shared_ptr& meta, const Options& options); + MemTableFile(const std::string &table_id, const std::shared_ptr &meta, const Options &options); - Status Add(const VectorSource::Ptr& source); + Status Add(const VectorSource::Ptr &source); size_t GetCurrentMem(); @@ -28,7 +29,7 @@ public: Status Serialize(); -private: + private: Status CreateTableFile(); diff --git a/cpp/src/db/NewMemManager.cpp b/cpp/src/db/NewMemManager.cpp index 3c78f3710185ab822a0ae18e8536b6f106897bb6..b0fcc9d4aed3b937c95be845a90bbd516bae88a0 100644 --- a/cpp/src/db/NewMemManager.cpp +++ b/cpp/src/db/NewMemManager.cpp @@ -5,11 +5,12 @@ #include + namespace zilliz { namespace milvus { namespace engine { -NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string& table_id) { +NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string &table_id) { auto memIt = mem_id_map_.find(table_id); if (memIt != mem_id_map_.end()) { return memIt->second; @@ -19,27 +20,27 @@ NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string& table return mem_id_map_[table_id]; } -Status NewMemManager::InsertVectors(const std::string& table_id_, +Status NewMemManager::InsertVectors(const std::string &table_id_, size_t n_, - const float* vectors_, - IDNumbers& vector_ids_) { + const float *vectors_, + IDNumbers &vector_ids_) { while (GetCurrentMem() > options_.maximum_memory) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } + LOG(DEBUG) << "NewMemManager::InsertVectors: mutable mem = " << GetCurrentMutableMem() << + ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + std::unique_lock lock(mutex_); return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_); } -Status NewMemManager::InsertVectorsNoLock(const std::string& table_id, +Status NewMemManager::InsertVectorsNoLock(const std::string &table_id, size_t n, - const float* vectors, - IDNumbers& vector_ids) { - - LOG(DEBUG) << "NewMemManager::InsertVectorsNoLock: mutable mem = " << GetCurrentMutableMem() << - ", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem(); + const float *vectors, + IDNumbers &vector_ids) { MemTablePtr mem = GetMemByTable(table_id); VectorSource::Ptr source = std::make_shared(n, vectors); @@ -54,37 +55,33 @@ Status NewMemManager::InsertVectorsNoLock(const std::string& table_id, Status NewMemManager::ToImmutable() { std::unique_lock lock(mutex_); MemIdMap temp_map; - for (auto& kv: mem_id_map_) { - if(kv.second->Empty()) { + for (auto &kv: mem_id_map_) { + if (kv.second->Empty()) { + //empty table, no need to serialize temp_map.insert(kv); - continue;//empty table, no need to serialize + } else { + immu_mem_list_.push_back(kv.second); } - immu_mem_list_.push_back(kv.second); } mem_id_map_.swap(temp_map); return Status::OK(); } -Status NewMemManager::Serialize(std::set& table_ids) { +Status NewMemManager::Serialize(std::set &table_ids) { ToImmutable(); std::unique_lock lock(serialization_mtx_); table_ids.clear(); - for (auto& mem : immu_mem_list_) { + for (auto &mem : immu_mem_list_) { mem->Serialize(); table_ids.insert(mem->GetTableId()); } immu_mem_list_.clear(); -// for (auto mem = immu_mem_list_.begin(); mem != immu_mem_list_.end(); ) { -// (*mem)->Serialize(); -// table_ids.insert((*mem)->GetTableId()); -// mem = immu_mem_list_.erase(mem); -// LOG(DEBUG) << "immu_mem_list_ size = " << immu_mem_list_.size(); -// } + return Status::OK(); } -Status NewMemManager::EraseMemVector(const std::string& table_id) { +Status NewMemManager::EraseMemVector(const std::string &table_id) { {//erase MemVector from rapid-insert cache std::unique_lock lock(mutex_); mem_id_map_.erase(table_id); @@ -93,8 +90,8 @@ Status NewMemManager::EraseMemVector(const std::string& table_id) { {//erase MemVector from serialize cache std::unique_lock lock(serialization_mtx_); MemList temp_list; - for (auto& mem : immu_mem_list_) { - if(mem->GetTableId() != table_id) { + for (auto &mem : immu_mem_list_) { + if (mem->GetTableId() != table_id) { temp_list.push_back(mem); } } @@ -105,20 +102,20 @@ Status NewMemManager::EraseMemVector(const std::string& table_id) { } size_t NewMemManager::GetCurrentMutableMem() { - size_t totalMem = 0; - for (auto& kv : mem_id_map_) { + size_t total_mem = 0; + for (auto &kv : mem_id_map_) { auto memTable = kv.second; - totalMem += memTable->GetCurrentMem(); + total_mem += memTable->GetCurrentMem(); } - return totalMem; + return total_mem; } size_t NewMemManager::GetCurrentImmutableMem() { - size_t totalMem = 0; - for (auto& memTable : immu_mem_list_) { - totalMem += memTable->GetCurrentMem(); + size_t total_mem = 0; + for (auto &mem_table : immu_mem_list_) { + total_mem += mem_table->GetCurrentMem(); } - return totalMem; + return total_mem; } size_t NewMemManager::GetCurrentMem() { diff --git a/cpp/src/db/NewMemManager.h b/cpp/src/db/NewMemManager.h index 9883480404f357080dedf5bee72331281de623ba..5b933c94ca39a2bb6018858a0c90806eeaaa53ad 100644 --- a/cpp/src/db/NewMemManager.h +++ b/cpp/src/db/NewMemManager.h @@ -11,25 +11,26 @@ #include #include + namespace zilliz { namespace milvus { namespace engine { class NewMemManager : public MemManagerAbstract { -public: + public: using MetaPtr = meta::Meta::Ptr; using Ptr = std::shared_ptr; using MemTablePtr = typename MemTable::Ptr; - NewMemManager(const std::shared_ptr& meta, const Options& options) - : meta_(meta), options_(options) {} + NewMemManager(const std::shared_ptr &meta, const Options &options) + : meta_(meta), options_(options) {} - Status InsertVectors(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids) override; + Status InsertVectors(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids) override; - Status Serialize(std::set& table_ids) override; + Status Serialize(std::set &table_ids) override; - Status EraseMemVector(const std::string& table_id) override; + Status EraseMemVector(const std::string &table_id) override; size_t GetCurrentMutableMem() override; @@ -37,11 +38,11 @@ public: size_t GetCurrentMem() override; -private: - MemTablePtr GetMemByTable(const std::string& table_id); + private: + MemTablePtr GetMemByTable(const std::string &table_id); - Status InsertVectorsNoLock(const std::string& table_id, - size_t n, const float* vectors, IDNumbers& vector_ids); + Status InsertVectorsNoLock(const std::string &table_id, + size_t n, const float *vectors, IDNumbers &vector_ids); Status ToImmutable(); using MemIdMap = std::map; diff --git a/cpp/src/db/VectorSource.cpp b/cpp/src/db/VectorSource.cpp index d032be51f68f7b5950f01b25e1f400a6173d06d3..74c07ae1f67a54e4301474a2d350091ce196de3b 100644 --- a/cpp/src/db/VectorSource.cpp +++ b/cpp/src/db/VectorSource.cpp @@ -4,6 +4,7 @@ #include "Log.h" #include "metrics/Metrics.h" + namespace zilliz { namespace milvus { namespace engine { @@ -11,16 +12,16 @@ namespace engine { VectorSource::VectorSource(const size_t &n, const float *vectors) : - n_(n), - vectors_(vectors), - id_generator_(new SimpleIDGenerator()) { + n_(n), + vectors_(vectors), + id_generator_(new SimpleIDGenerator()) { current_num_vectors_added = 0; } -Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, - const meta::TableFileSchema& table_file_schema, - const size_t& num_vectors_to_add, - size_t& num_vectors_added) { +Status VectorSource::Add(const ExecutionEnginePtr &execution_engine, + const meta::TableFileSchema &table_file_schema, + const size_t &num_vectors_to_add, + size_t &num_vectors_added) { auto start_time = METRICS_NOW_TIME; @@ -36,8 +37,7 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, vector_ids_.insert(vector_ids_.end(), std::make_move_iterator(vector_ids_to_add.begin()), std::make_move_iterator(vector_ids_to_add.end())); - } - else { + } else { ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); } diff --git a/cpp/src/db/VectorSource.h b/cpp/src/db/VectorSource.h index dec31f39e17db7efbd086f6646602f26531aaa09..7092805a6d79de741de24c078ad5e02cdfa4d15d 100644 --- a/cpp/src/db/VectorSource.h +++ b/cpp/src/db/VectorSource.h @@ -5,22 +5,23 @@ #include "IDGenerator.h" #include "ExecutionEngine.h" + namespace zilliz { namespace milvus { namespace engine { class VectorSource { -public: + public: using Ptr = std::shared_ptr; - VectorSource(const size_t& n, const float* vectors); + VectorSource(const size_t &n, const float *vectors); - Status Add(const ExecutionEnginePtr& execution_engine, - const meta::TableFileSchema& table_file_schema, - const size_t& num_vectors_to_add, - size_t& num_vectors_added); + Status Add(const ExecutionEnginePtr &execution_engine, + const meta::TableFileSchema &table_file_schema, + const size_t &num_vectors_to_add, + size_t &num_vectors_added); size_t GetNumVectorsAdded(); @@ -28,15 +29,15 @@ public: IDNumbers GetVectorIds(); -private: + private: const size_t n_; - const float* vectors_; + const float *vectors_; IDNumbers vector_ids_; size_t current_num_vectors_added; - IDGenerator* id_generator_; + IDGenerator *id_generator_; }; //VectorSource diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp index 818c3a6388e7611eaecee0c69c73359482049bf0..5b7972ec35f92375558bf26a2d5bcdae4e2ac11e 100644 --- a/cpp/unittest/db/mem_test.cpp +++ b/cpp/unittest/db/mem_test.cpp @@ -15,33 +15,34 @@ #include #include + using namespace zilliz::milvus; namespace { - static const std::string TABLE_NAME = "test_group"; - static constexpr int64_t TABLE_DIM = 256; - static constexpr int64_t VECTOR_COUNT = 250000; - static constexpr int64_t INSERT_LOOP = 10000; - - engine::meta::TableSchema BuildTableSchema() { - engine::meta::TableSchema table_info; - table_info.dimension_ = TABLE_DIM; - table_info.table_id_ = TABLE_NAME; - table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; - return table_info; - } +static const std::string TABLE_NAME = "test_group"; +static constexpr int64_t TABLE_DIM = 256; +static constexpr int64_t VECTOR_COUNT = 250000; +static constexpr int64_t INSERT_LOOP = 10000; + +engine::meta::TableSchema BuildTableSchema() { + engine::meta::TableSchema table_info; + table_info.dimension_ = TABLE_DIM; + table_info.table_id_ = TABLE_NAME; + table_info.engine_type_ = (int) engine::EngineType::FAISS_IDMAP; + return table_info; +} - void BuildVectors(int64_t n, std::vector& vectors) { - vectors.clear(); - vectors.resize(n*TABLE_DIM); - float* data = vectors.data(); - for(int i = 0; i < n; i++) { - for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); - data[TABLE_DIM * i] += i / 2000.; - } +void BuildVectors(int64_t n, std::vector &vectors) { + vectors.clear(); + vectors.resize(n * TABLE_DIM); + float *data = vectors.data(); + for (int i = 0; i < n; i++) { + for (int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); + data[TABLE_DIM * i] += i / 2000.; } } +} TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) { @@ -65,7 +66,7 @@ TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) { size_t num_vectors_added; engine::ExecutionEnginePtr execution_engine_ = engine::EngineFactory::Build(table_file_schema.dimension_, table_file_schema.location_, - (engine::EngineType)table_file_schema.engine_type_); + (engine::EngineType) table_file_schema.engine_type_); status = source.Add(execution_engine_, table_file_schema, 50, num_vectors_added); ASSERT_TRUE(status.ok()); @@ -82,10 +83,6 @@ TEST_F(NewMemManagerTest, VECTOR_SOURCE_TEST) { vector_ids = source.GetVectorIds(); ASSERT_EQ(vector_ids.size(), 100); -// for (auto& id : vector_ids) { -// std::cout << id << std::endl; -// } - status = impl_->DropAll(); ASSERT_TRUE(status.ok()); } @@ -99,7 +96,7 @@ TEST_F(NewMemManagerTest, MEM_TABLE_FILE_TEST) { auto status = impl_->CreateTable(table_schema); ASSERT_TRUE(status.ok()); - engine::MemTableFile memTableFile(TABLE_NAME, impl_, options); + engine::MemTableFile mem_table_file(TABLE_NAME, impl_, options); int64_t n_100 = 100; std::vector vectors_100; @@ -107,28 +104,28 @@ TEST_F(NewMemManagerTest, MEM_TABLE_FILE_TEST) { engine::VectorSource::Ptr source = std::make_shared(n_100, vectors_100.data()); - status = memTableFile.Add(source); + status = mem_table_file.Add(source); ASSERT_TRUE(status.ok()); -// std::cout << memTableFile.GetCurrentMem() << " " << memTableFile.GetMemLeft() << std::endl; +// std::cout << mem_table_file.GetCurrentMem() << " " << mem_table_file.GetMemLeft() << std::endl; engine::IDNumbers vector_ids = source->GetVectorIds(); ASSERT_EQ(vector_ids.size(), 100); size_t singleVectorMem = sizeof(float) * TABLE_DIM; - ASSERT_EQ(memTableFile.GetCurrentMem(), n_100 * singleVectorMem); + ASSERT_EQ(mem_table_file.GetCurrentMem(), n_100 * singleVectorMem); int64_t n_max = engine::MAX_TABLE_FILE_MEM / singleVectorMem; std::vector vectors_128M; BuildVectors(n_max, vectors_128M); engine::VectorSource::Ptr source_128M = std::make_shared(n_max, vectors_128M.data()); - status = memTableFile.Add(source_128M); + status = mem_table_file.Add(source_128M); vector_ids = source_128M->GetVectorIds(); ASSERT_EQ(vector_ids.size(), n_max - n_100); - ASSERT_TRUE(memTableFile.IsFull()); + ASSERT_TRUE(mem_table_file.IsFull()); status = impl_->DropAll(); ASSERT_TRUE(status.ok()); @@ -149,34 +146,34 @@ TEST_F(NewMemManagerTest, MEM_TABLE_TEST) { engine::VectorSource::Ptr source_100 = std::make_shared(n_100, vectors_100.data()); - engine::MemTable memTable(TABLE_NAME, impl_, options); + engine::MemTable mem_table(TABLE_NAME, impl_, options); - status = memTable.Add(source_100); + status = mem_table.Add(source_100); ASSERT_TRUE(status.ok()); engine::IDNumbers vector_ids = source_100->GetVectorIds(); ASSERT_EQ(vector_ids.size(), 100); - engine::MemTableFile::Ptr memTableFile; - memTable.GetCurrentMemTableFile(memTableFile); + engine::MemTableFile::Ptr mem_table_file; + mem_table.GetCurrentMemTableFile(mem_table_file); size_t singleVectorMem = sizeof(float) * TABLE_DIM; - ASSERT_EQ(memTableFile->GetCurrentMem(), n_100 * singleVectorMem); + ASSERT_EQ(mem_table_file->GetCurrentMem(), n_100 * singleVectorMem); int64_t n_max = engine::MAX_TABLE_FILE_MEM / singleVectorMem; std::vector vectors_128M; BuildVectors(n_max, vectors_128M); engine::VectorSource::Ptr source_128M = std::make_shared(n_max, vectors_128M.data()); - status = memTable.Add(source_128M); + status = mem_table.Add(source_128M); ASSERT_TRUE(status.ok()); vector_ids = source_128M->GetVectorIds(); ASSERT_EQ(vector_ids.size(), n_max); - memTable.GetCurrentMemTableFile(memTableFile); - ASSERT_EQ(memTableFile->GetCurrentMem(), n_100 * singleVectorMem); + mem_table.GetCurrentMemTableFile(mem_table_file); + ASSERT_EQ(mem_table_file->GetCurrentMem(), n_100 * singleVectorMem); - ASSERT_EQ(memTable.GetTableFileCount(), 2); + ASSERT_EQ(mem_table.GetTableFileCount(), 2); int64_t n_1G = 1024000; std::vector vectors_1G; @@ -184,16 +181,16 @@ TEST_F(NewMemManagerTest, MEM_TABLE_TEST) { engine::VectorSource::Ptr source_1G = std::make_shared(n_1G, vectors_1G.data()); - status = memTable.Add(source_1G); + status = mem_table.Add(source_1G); ASSERT_TRUE(status.ok()); vector_ids = source_1G->GetVectorIds(); ASSERT_EQ(vector_ids.size(), n_1G); int expectedTableFileCount = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM); - ASSERT_EQ(memTable.GetTableFileCount(), expectedTableFileCount); + ASSERT_EQ(mem_table.GetTableFileCount(), expectedTableFileCount); - status = memTable.Serialize(); + status = mem_table.Serialize(); ASSERT_TRUE(status.ok()); status = impl_->DropAll(); @@ -216,7 +213,7 @@ TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) { ASSERT_STATS(stat); ASSERT_EQ(table_info_get.dimension_, TABLE_DIM); - std::map> search_vectors; + std::map> search_vectors; { engine::IDNumbers vector_ids; int64_t nb = 1024000; @@ -231,8 +228,8 @@ TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) { std::mt19937 gen(rd()); std::uniform_int_distribution dis(0, nb - 1); - int64_t numQuery = 20; - for (int64_t i = 0; i < numQuery; ++i) { + int64_t num_query = 20; + for (int64_t i = 0; i < num_query; ++i) { int64_t index = dis(gen); std::vector search; for (int64_t j = 0; j < TABLE_DIM; j++) { @@ -243,8 +240,8 @@ TEST_F(NewMemManagerTest, SERIAL_INSERT_SEARCH_TEST) { } int k = 10; - for(auto& pair : search_vectors) { - auto& search = pair.second; + for (auto &pair : search_vectors) { + auto &search = pair.second; engine::QueryResults results; stat = db_->Query(TABLE_NAME, k, 1, search.data(), results); ASSERT_EQ(results[0][0].first, pair.first); @@ -329,18 +326,18 @@ TEST_F(NewMemManagerTest, CONCURRENT_INSERT_SEARCH_TEST) { uint64_t count = 0; uint64_t prev_count = 0; - for (auto j=0; j<10; ++j) { + for (auto j = 0; j < 10; ++j) { ss.str(""); db_->Size(count); prev_count = count; START_TIMER; stat = db_->Query(TABLE_NAME, k, qb, qxb.data(), results); - ss << "Search " << j << " With Size " << count/engine::meta::M << " M"; + ss << "Search " << j << " With Size " << count / engine::meta::M << " M"; STOP_TIMER(ss.str()); ASSERT_STATS(stat); - for (auto k=0; kInsertVectors(TABLE_NAME, qb, qxb.data(), target_ids); ASSERT_EQ(target_ids.size(), qb); } else { diff --git a/cpp/unittest/db/utils.h b/cpp/unittest/db/utils.h index d06500de5ce80e0a78d5651988d4e4e31c484d0b..9c126030c261dbc460289c4ef7f8e4e4fb97f155 100644 --- a/cpp/unittest/db/utils.h +++ b/cpp/unittest/db/utils.h @@ -30,7 +30,7 @@ #define STOP_TIMER(name) #endif -void ASSERT_STATS(zilliz::milvus::engine::Status& stat); +void ASSERT_STATS(zilliz::milvus::engine::Status &stat); //class TestEnv : public ::testing::Environment { //public: @@ -54,8 +54,8 @@ void ASSERT_STATS(zilliz::milvus::engine::Status& stat); // ::testing::AddGlobalTestEnvironment(new TestEnv); class DBTest : public ::testing::Test { -protected: - zilliz::milvus::engine::DB* db_; + protected: + zilliz::milvus::engine::DB *db_; void InitLog(); virtual void SetUp() override; @@ -64,13 +64,13 @@ protected: }; class DBTest2 : public DBTest { -protected: + protected: virtual zilliz::milvus::engine::Options GetOptions() override; }; class MetaTest : public DBTest { -protected: + protected: std::shared_ptr impl_; virtual void SetUp() override; @@ -78,17 +78,17 @@ protected: }; class MySQLTest : public ::testing::Test { -protected: + protected: // std::shared_ptr impl_; zilliz::milvus::engine::DBMetaOptions getDBMetaOptions(); }; -class MySQLDBTest : public ::testing::Test { -protected: +class MySQLDBTest : public ::testing::Test { + protected: zilliz::milvus::engine::Options GetOptions(); }; -class NewMemManagerTest : public ::testing::Test { +class NewMemManagerTest : public ::testing::Test { void InitLog(); - virtual void SetUp() override; + void SetUp() override; };