From b6d3c66476f7ea5e89edc4324f2eaeabe17b5e65 Mon Sep 17 00:00:00 2001 From: zhiru Date: Fri, 5 Jul 2019 16:46:15 +0800 Subject: [PATCH] Implemented add and serialize Former-commit-id: 35a61d2fd0fc9c703ec4efbc9fe8399162e79b8e --- cpp/src/db/MemTable.cpp | 32 +++++++++++++++++++-------- cpp/src/db/MemTable.h | 10 ++++++--- cpp/src/db/MemTableFile.cpp | 42 +++++++++++++++++++++++++++++++++--- cpp/src/db/MemTableFile.h | 8 +++++-- cpp/src/db/VectorSource.cpp | 12 +++++------ cpp/src/db/VectorSource.h | 2 -- cpp/unittest/db/mem_test.cpp | 11 +++++++--- 7 files changed, 89 insertions(+), 28 deletions(-) diff --git a/cpp/src/db/MemTable.cpp b/cpp/src/db/MemTable.cpp index 032d47999..86554695c 100644 --- a/cpp/src/db/MemTable.cpp +++ b/cpp/src/db/MemTable.cpp @@ -6,24 +6,26 @@ namespace milvus { namespace engine { MemTable::MemTable(const std::string& table_id, - const std::shared_ptr& meta) : + const std::shared_ptr& meta, + const Options& options) : table_id_(table_id), - meta_(meta) { + meta_(meta), + options_(options) { } Status MemTable::Add(VectorSource::Ptr& source) { while (!source->AllAdded()) { MemTableFile::Ptr currentMemTableFile; - if (!mem_table_file_stack_.empty()) { - currentMemTableFile = mem_table_file_stack_.top(); + if (!mem_table_file_list_.empty()) { + currentMemTableFile = mem_table_file_list_.back(); } Status status; - if (mem_table_file_stack_.empty() || currentMemTableFile->isFull()) { - MemTableFile::Ptr newMemTableFile = std::make_shared(table_id_, meta_); + if (mem_table_file_list_.empty() || currentMemTableFile->IsFull()) { + MemTableFile::Ptr newMemTableFile = std::make_shared(table_id_, meta_, options_); status = newMemTableFile->Add(source); if (status.ok()) { - mem_table_file_stack_.push(newMemTableFile); + mem_table_file_list_.emplace_back(newMemTableFile); } } else { @@ -39,11 +41,23 @@ Status MemTable::Add(VectorSource::Ptr& source) { } void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) { - mem_table_file = mem_table_file_stack_.top(); + mem_table_file = mem_table_file_list_.back(); } size_t MemTable::GetStackSize() { - return mem_table_file_stack_.size(); + return mem_table_file_list_.size(); +} + +Status MemTable::Serialize() { + for (auto& memTableFile : mem_table_file_list_) { + auto status = memTableFile->Serialize(); + if (!status.ok()) { + std::string errMsg = "MemTable::Serialize failed: " + status.ToString(); + ENGINE_LOG_ERROR << errMsg; + return Status::Error(errMsg); + } + } + return Status::OK(); } } // namespace engine diff --git a/cpp/src/db/MemTable.h b/cpp/src/db/MemTable.h index b9fe4147d..d5c7cc9e8 100644 --- a/cpp/src/db/MemTable.h +++ b/cpp/src/db/MemTable.h @@ -15,10 +15,10 @@ class MemTable { public: using Ptr = std::shared_ptr; - using MemTableFileStack = std::stack; + using MemTableFileList = std::vector; using MetaPtr = meta::Meta::Ptr; - MemTable(const std::string& table_id, const std::shared_ptr& meta); + MemTable(const std::string& table_id, const std::shared_ptr& meta, const Options& options); Status Add(VectorSource::Ptr& source); @@ -26,13 +26,17 @@ public: size_t GetStackSize(); + Status Serialize(); + private: const std::string table_id_; - MemTableFileStack mem_table_file_stack_; + MemTableFileList mem_table_file_list_; MetaPtr meta_; + Options options_; + }; //MemTable } // namespace engine diff --git a/cpp/src/db/MemTableFile.cpp b/cpp/src/db/MemTableFile.cpp index 58b76ab83..0ff91de00 100644 --- a/cpp/src/db/MemTableFile.cpp +++ b/cpp/src/db/MemTableFile.cpp @@ -2,6 +2,7 @@ #include "Constants.h" #include "Log.h" #include "EngineFactory.h" +#include "metrics/Metrics.h" #include @@ -10,9 +11,11 @@ namespace milvus { namespace engine { MemTableFile::MemTableFile(const std::string& table_id, - const std::shared_ptr& meta) : + const std::shared_ptr& meta, + const Options& options) : table_id_(table_id), - meta_(meta) { + meta_(meta), + options_(options) { current_mem_ = 0; auto status = CreateTableFile(); @@ -40,6 +43,13 @@ Status MemTableFile::CreateTableFile() { Status MemTableFile::Add(const VectorSource::Ptr& source) { + if (table_file_schema_.dimension_ <= 0) { + std::string errMsg = "MemTableFile::Add: table_file_schema dimension = " + + std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_; + ENGINE_LOG_ERROR << errMsg; + return Status::Error(errMsg); + } + size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; size_t memLeft = GetMemLeft(); if (memLeft >= singleVectorMemSize) { @@ -62,11 +72,37 @@ size_t MemTableFile::GetMemLeft() { return (MAX_TABLE_FILE_MEM - current_mem_); } -bool MemTableFile::isFull() { +bool MemTableFile::IsFull() { size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; return (GetMemLeft() < singleVectorMemSize); } +Status MemTableFile::Serialize() { + + auto start_time = METRICS_NOW_TIME; + + auto size = GetCurrentMem(); + + execution_engine_->Serialize(); + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + table_file_schema_.size_ = size; + + server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double)size/total_time); + + table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ? + meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; + + auto status = meta_->UpdateTableFile(table_file_schema_); + + LOG(DEBUG) << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index") + << " file " << table_file_schema_.file_id_ << " of size " << (double)size / (double)M << " M"; + + execution_engine_->Cache(); + + return status; +} + } // namespace engine } // namespace milvus } // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTableFile.h b/cpp/src/db/MemTableFile.h index 04f30178e..1be0ae78b 100644 --- a/cpp/src/db/MemTableFile.h +++ b/cpp/src/db/MemTableFile.h @@ -16,7 +16,7 @@ public: using Ptr = std::shared_ptr; using MetaPtr = meta::Meta::Ptr; - MemTableFile(const std::string& table_id, const std::shared_ptr& meta); + MemTableFile(const std::string& table_id, const std::shared_ptr& meta, const Options& options); Status Add(const VectorSource::Ptr& source); @@ -24,7 +24,9 @@ public: size_t GetMemLeft(); - bool isFull(); + bool IsFull(); + + Status Serialize(); private: @@ -36,6 +38,8 @@ private: MetaPtr meta_; + Options options_; + size_t current_mem_; ExecutionEnginePtr execution_engine_; diff --git a/cpp/src/db/VectorSource.cpp b/cpp/src/db/VectorSource.cpp index f7cef994f..b113b9ad5 100644 --- a/cpp/src/db/VectorSource.cpp +++ b/cpp/src/db/VectorSource.cpp @@ -2,6 +2,7 @@ #include "ExecutionEngine.h" #include "EngineFactory.h" #include "Log.h" +#include "metrics/Metrics.h" namespace zilliz { namespace milvus { @@ -21,12 +22,7 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, const size_t& num_vectors_to_add, size_t& num_vectors_added) { - if (table_file_schema.dimension_ <= 0) { - std::string errMsg = "VectorSource::Add: table_file_schema dimension = " + - std::to_string(table_file_schema.dimension_) + ", table_id = " + table_file_schema.table_id_; - ENGINE_LOG_ERROR << errMsg; - return Status::Error(errMsg); - } + auto start_time = METRICS_NOW_TIME; num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added; IDNumbers vector_ids_to_add; @@ -40,6 +36,10 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); } + auto end_time = METRICS_NOW_TIME; + auto total_time = METRICS_MICROSECONDS(start_time, end_time); + server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast(n_), static_cast(table_file_schema.dimension_), total_time); + return status; } diff --git a/cpp/src/db/VectorSource.h b/cpp/src/db/VectorSource.h index 597eee4ad..dec31f39e 100644 --- a/cpp/src/db/VectorSource.h +++ b/cpp/src/db/VectorSource.h @@ -28,8 +28,6 @@ public: IDNumbers GetVectorIds(); -// Status Serialize(); - private: const size_t n_; diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp index 111914f8a..f68d1eb8e 100644 --- a/cpp/unittest/db/mem_test.cpp +++ b/cpp/unittest/db/mem_test.cpp @@ -86,12 +86,13 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) { TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + auto options = engine::OptionsFactory::Build(); engine::meta::TableSchema table_schema = BuildTableSchema(); auto status = impl_->CreateTable(table_schema); ASSERT_TRUE(status.ok()); - engine::MemTableFile memTableFile(TABLE_NAME, impl_); + engine::MemTableFile memTableFile(TABLE_NAME, impl_, options); int64_t n_100 = 100; std::vector vectors_100; @@ -120,7 +121,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { vector_ids = source_128M->GetVectorIds(); ASSERT_EQ(vector_ids.size(), n_max - n_100); - ASSERT_TRUE(memTableFile.isFull()); + ASSERT_TRUE(memTableFile.IsFull()); status = impl_->DropAll(); ASSERT_TRUE(status.ok()); @@ -129,6 +130,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { TEST(MEM_TEST, MEM_TABLE_TEST) { std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + auto options = engine::OptionsFactory::Build(); engine::meta::TableSchema table_schema = BuildTableSchema(); auto status = impl_->CreateTable(table_schema); @@ -140,7 +142,7 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { engine::VectorSource::Ptr source_100 = std::make_shared(n_100, vectors_100.data()); - engine::MemTable memTable(TABLE_NAME, impl_); + engine::MemTable memTable(TABLE_NAME, impl_, options); status = memTable.Add(source_100); ASSERT_TRUE(status.ok()); @@ -184,6 +186,9 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { int expectedStackSize = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM); ASSERT_EQ(memTable.GetStackSize(), expectedStackSize); + status = memTable.Serialize(); + ASSERT_TRUE(status.ok()); + status = impl_->DropAll(); ASSERT_TRUE(status.ok()); } -- GitLab