From b0e1f1b3f1ecf858a93f38f58b99a2a43f82fa68 Mon Sep 17 00:00:00 2001 From: zhiru Date: Fri, 5 Jul 2019 15:03:40 +0800 Subject: [PATCH] add mem impl Former-commit-id: 8aeddae834ddd1289a9ab574bdb050e3e5377e99 --- cpp/src/db/Constants.h | 20 ++++ cpp/src/db/MemTable.cpp | 51 ++++++++++ cpp/src/db/MemTable.h | 40 ++++++++ cpp/src/db/MemTableFile.cpp | 66 +++++++++++++ cpp/src/db/MemTableFile.h | 44 +++++++++ cpp/src/db/VectorSource.cpp | 60 +++++++++++ cpp/src/db/VectorSource.h | 41 ++++++++ cpp/unittest/db/mem_test.cpp | 187 +++++++++++++++++++++++++++++++++++ 8 files changed, 509 insertions(+) create mode 100644 cpp/src/db/Constants.h create mode 100644 cpp/src/db/MemTable.cpp create mode 100644 cpp/src/db/MemTable.h create mode 100644 cpp/src/db/MemTableFile.cpp create mode 100644 cpp/src/db/MemTableFile.h create mode 100644 cpp/src/db/VectorSource.cpp create mode 100644 cpp/src/db/VectorSource.h create mode 100644 cpp/unittest/db/mem_test.cpp diff --git a/cpp/src/db/Constants.h b/cpp/src/db/Constants.h new file mode 100644 index 00000000..2bb2e0a0 --- /dev/null +++ b/cpp/src/db/Constants.h @@ -0,0 +1,20 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +namespace zilliz { +namespace milvus { +namespace engine { + +const size_t K = 1024UL; +const size_t M = K*K; +const size_t MAX_TABLE_FILE_MEM = 128 * M; + +const int VECTOR_TYPE_SIZE = sizeof(float); + +} // namespace engine +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/db/MemTable.cpp b/cpp/src/db/MemTable.cpp new file mode 100644 index 00000000..032d4799 --- /dev/null +++ b/cpp/src/db/MemTable.cpp @@ -0,0 +1,51 @@ +#include "MemTable.h" +#include "Log.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +MemTable::MemTable(const std::string& table_id, + const std::shared_ptr& meta) : + table_id_(table_id), + meta_(meta) { + +} + +Status MemTable::Add(VectorSource::Ptr& source) { + while (!source->AllAdded()) { + MemTableFile::Ptr currentMemTableFile; + if (!mem_table_file_stack_.empty()) { + currentMemTableFile = mem_table_file_stack_.top(); + } + Status status; + if (mem_table_file_stack_.empty() || currentMemTableFile->isFull()) { + MemTableFile::Ptr newMemTableFile = std::make_shared(table_id_, meta_); + status = newMemTableFile->Add(source); + if (status.ok()) { + mem_table_file_stack_.push(newMemTableFile); + } + } + else { + status = currentMemTableFile->Add(source); + } + if (!status.ok()) { + std::string errMsg = "MemTable::Add failed: " + status.ToString(); + ENGINE_LOG_ERROR << errMsg; + return Status::Error(errMsg); + } + } + return Status::OK(); +} + +void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) { + mem_table_file = mem_table_file_stack_.top(); +} + +size_t MemTable::GetStackSize() { + return mem_table_file_stack_.size(); +} + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTable.h b/cpp/src/db/MemTable.h new file mode 100644 index 00000000..b9fe4147 --- /dev/null +++ b/cpp/src/db/MemTable.h @@ -0,0 +1,40 @@ +#pragma once + +#include "Status.h" +#include "MemTableFile.h" +#include "VectorSource.h" + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +class MemTable { + +public: + + using Ptr = std::shared_ptr; + using MemTableFileStack = std::stack; + using MetaPtr = meta::Meta::Ptr; + + MemTable(const std::string& table_id, const std::shared_ptr& meta); + + Status Add(VectorSource::Ptr& source); + + void GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file); + + size_t GetStackSize(); + +private: + const std::string table_id_; + + MemTableFileStack mem_table_file_stack_; + + MetaPtr meta_; + +}; //MemTable + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTableFile.cpp b/cpp/src/db/MemTableFile.cpp new file mode 100644 index 00000000..26bc0d38 --- /dev/null +++ b/cpp/src/db/MemTableFile.cpp @@ -0,0 +1,66 @@ +#include "MemTableFile.h" +#include "Constants.h" +#include "Log.h" + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +MemTableFile::MemTableFile(const std::string& table_id, + const std::shared_ptr& meta) : + table_id_(table_id), + meta_(meta) { + + current_mem_ = 0; + CreateTableFile(); +} + +Status MemTableFile::CreateTableFile() { + + meta::TableFileSchema table_file_schema; + table_file_schema.table_id_ = table_id_; + auto status = meta_->CreateTableFile(table_file_schema); + if (status.ok()) { + table_file_schema_ = table_file_schema; + } + else { + std::string errMsg = "MemTableFile::CreateTableFile failed: " + status.ToString(); + ENGINE_LOG_ERROR << errMsg; + } + return status; +} + +Status MemTableFile::Add(const VectorSource::Ptr& source) { + + size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; + size_t memLeft = GetMemLeft(); + if (memLeft >= singleVectorMemSize) { + size_t numVectorsToAdd = std::ceil(memLeft / singleVectorMemSize); + size_t numVectorsAdded; + auto status = source->Add(table_file_schema_, numVectorsToAdd, numVectorsAdded); + if (status.ok()) { + current_mem_ += (numVectorsAdded * singleVectorMemSize); + } + return status; + } + return Status::OK(); +} + +size_t MemTableFile::GetCurrentMem() { + return current_mem_; +} + +size_t MemTableFile::GetMemLeft() { + return (MAX_TABLE_FILE_MEM - current_mem_); +} + +bool MemTableFile::isFull() { + size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; + return (GetMemLeft() < singleVectorMemSize); +} + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/MemTableFile.h b/cpp/src/db/MemTableFile.h new file mode 100644 index 00000000..1efe4c0b --- /dev/null +++ b/cpp/src/db/MemTableFile.h @@ -0,0 +1,44 @@ +#pragma once + +#include "Status.h" +#include "Meta.h" +#include "VectorSource.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class MemTableFile { + +public: + + using Ptr = std::shared_ptr; + using MetaPtr = meta::Meta::Ptr; + + MemTableFile(const std::string& table_id, const std::shared_ptr& meta); + + Status Add(const VectorSource::Ptr& source); + + size_t GetCurrentMem(); + + size_t GetMemLeft(); + + bool isFull(); + +private: + + Status CreateTableFile(); + + const std::string table_id_; + + meta::TableFileSchema table_file_schema_; + + MetaPtr meta_; + + size_t current_mem_; + +}; //MemTableFile + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/VectorSource.cpp b/cpp/src/db/VectorSource.cpp new file mode 100644 index 00000000..dff5423c --- /dev/null +++ b/cpp/src/db/VectorSource.cpp @@ -0,0 +1,60 @@ +#include "VectorSource.h" +#include "ExecutionEngine.h" +#include "EngineFactory.h" +#include "Log.h" + +namespace zilliz { +namespace milvus { +namespace engine { + + +VectorSource::VectorSource(const size_t &n, + const float *vectors) : + n_(n), + vectors_(vectors), + id_generator_(new SimpleIDGenerator()) { + current_num_vectors_added = 0; +} + +Status VectorSource::Add(const meta::TableFileSchema& table_file_schema, const size_t& num_vectors_to_add, size_t& num_vectors_added) { + + if (table_file_schema.dimension_ <= 0) { + std::string errMsg = "VectorSource::Add: table_file_schema dimension = " + + std::to_string(table_file_schema.dimension_) + ", table_id = " + table_file_schema.table_id_; + ENGINE_LOG_ERROR << errMsg; + return Status::Error(errMsg); + } + ExecutionEnginePtr engine = EngineFactory::Build(table_file_schema.dimension_, + table_file_schema.location_, + (EngineType)table_file_schema.engine_type_); + + num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added; + IDNumbers vector_ids_to_add; + id_generator_->GetNextIDNumbers(num_vectors_added, vector_ids_to_add); + Status status = engine->AddWithIds(num_vectors_added, vectors_ + current_num_vectors_added, vector_ids_to_add.data()); + if (status.ok()) { + current_num_vectors_added += num_vectors_added; + vector_ids_.insert(vector_ids_.end(), vector_ids_to_add.begin(), vector_ids_to_add.end()); + } + else { + ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); + } + + return status; +} + +size_t VectorSource::GetNumVectorsAdded() { + return current_num_vectors_added; +} + +bool VectorSource::AllAdded() { + return (current_num_vectors_added == n_); +} + +IDNumbers VectorSource::GetVectorIds() { + return vector_ids_; +} + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/src/db/VectorSource.h b/cpp/src/db/VectorSource.h new file mode 100644 index 00000000..170f3634 --- /dev/null +++ b/cpp/src/db/VectorSource.h @@ -0,0 +1,41 @@ +#pragma once + +#include "Status.h" +#include "Meta.h" +#include "IDGenerator.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +class VectorSource { + +public: + + using Ptr = std::shared_ptr; + + VectorSource(const size_t& n, const float* vectors); + + Status Add(const meta::TableFileSchema& table_file_schema, const size_t& num_vectors_to_add, size_t& num_vectors_added); + + size_t GetNumVectorsAdded(); + + bool AllAdded(); + + IDNumbers GetVectorIds(); + +private: + + const size_t n_; + const float* vectors_; + IDNumbers vector_ids_; + + size_t current_num_vectors_added; + + IDGenerator* id_generator_; + +}; //VectorSource + +} // namespace engine +} // namespace milvus +} // namespace zilliz \ No newline at end of file diff --git a/cpp/unittest/db/mem_test.cpp b/cpp/unittest/db/mem_test.cpp new file mode 100644 index 00000000..8418b9cd --- /dev/null +++ b/cpp/unittest/db/mem_test.cpp @@ -0,0 +1,187 @@ +#include "gtest/gtest.h" + +#include "db/VectorSource.h" +#include "db/MemTableFile.h" +#include "db/MemTable.h" +#include "utils.h" +#include "db/Factories.h" +#include "db/Constants.h" + +using namespace zilliz::milvus; + +namespace { + + static const std::string TABLE_NAME = "test_group"; + static constexpr int64_t TABLE_DIM = 256; + static constexpr int64_t VECTOR_COUNT = 250000; + static constexpr int64_t INSERT_LOOP = 10000; + + engine::meta::TableSchema BuildTableSchema() { + engine::meta::TableSchema table_info; + table_info.dimension_ = TABLE_DIM; + table_info.table_id_ = TABLE_NAME; + table_info.engine_type_ = (int)engine::EngineType::FAISS_IDMAP; + return table_info; + } + + void BuildVectors(int64_t n, std::vector& vectors) { + vectors.clear(); + vectors.resize(n*TABLE_DIM); + float* data = vectors.data(); + for(int i = 0; i < n; i++) { + for(int j = 0; j < TABLE_DIM; j++) data[TABLE_DIM * i + j] = drand48(); + data[TABLE_DIM * i] += i / 2000.; + } + } +} + +TEST(MEM_TEST, VECTOR_SOURCE_TEST) { + + std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + + engine::meta::TableSchema table_schema = BuildTableSchema(); + auto status = impl_->CreateTable(table_schema); + ASSERT_TRUE(status.ok()); + + engine::meta::TableFileSchema table_file_schema; + table_file_schema.table_id_ = TABLE_NAME; + status = impl_->CreateTableFile(table_file_schema); + ASSERT_TRUE(status.ok()); + + int64_t n = 100; + std::vector vectors; + BuildVectors(n, vectors); + + engine::VectorSource source(n, vectors.data()); + + size_t num_vectors_added; + status = source.Add(table_file_schema, 50, num_vectors_added); + ASSERT_TRUE(status.ok()); + + ASSERT_EQ(num_vectors_added, 50); + + engine::IDNumbers vector_ids = source.GetVectorIds(); + ASSERT_EQ(vector_ids.size(), 50); + + status = source.Add(table_file_schema, 60, num_vectors_added); + ASSERT_TRUE(status.ok()); + + ASSERT_EQ(num_vectors_added, 50); + + vector_ids = source.GetVectorIds(); + ASSERT_EQ(vector_ids.size(), 100); + +// for (auto& id : vector_ids) { +// std::cout << id << std::endl; +// } + + status = impl_->DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { + + std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + + engine::meta::TableSchema table_schema = BuildTableSchema(); + auto status = impl_->CreateTable(table_schema); + ASSERT_TRUE(status.ok()); + + engine::MemTableFile memTableFile(TABLE_NAME, impl_); + + int64_t n_100 = 100; + std::vector vectors_100; + BuildVectors(n_100, vectors_100); + + engine::VectorSource::Ptr source = std::make_shared(n_100, vectors_100.data()); + + status = memTableFile.Add(source); + ASSERT_TRUE(status.ok()); + +// std::cout << memTableFile.GetCurrentMem() << " " << memTableFile.GetMemLeft() << std::endl; + + engine::IDNumbers vector_ids = source->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), 100); + + size_t singleVectorMem = sizeof(float) * TABLE_DIM; + ASSERT_EQ(memTableFile.GetCurrentMem(), n_100 * singleVectorMem); + + int64_t n_max = engine::MAX_TABLE_FILE_MEM / singleVectorMem; + std::vector vectors_128M; + BuildVectors(n_max, vectors_128M); + + engine::VectorSource::Ptr source_128M = std::make_shared(n_max, vectors_128M.data()); + status = memTableFile.Add(source_128M); + + vector_ids = source_128M->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), n_max - n_100); + + ASSERT_TRUE(memTableFile.isFull()); + + status = impl_->DropAll(); + ASSERT_TRUE(status.ok()); +} + +TEST(MEM_TEST, MEM_TABLE_TEST) { + + std::shared_ptr impl_ = engine::DBMetaImplFactory::Build(); + + engine::meta::TableSchema table_schema = BuildTableSchema(); + auto status = impl_->CreateTable(table_schema); + ASSERT_TRUE(status.ok()); + + int64_t n_100 = 100; + std::vector vectors_100; + BuildVectors(n_100, vectors_100); + + engine::VectorSource::Ptr source_100 = std::make_shared(n_100, vectors_100.data()); + + engine::MemTable memTable(TABLE_NAME, impl_); + + status = memTable.Add(source_100); + ASSERT_TRUE(status.ok()); + + engine::IDNumbers vector_ids = source_100->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), 100); + + engine::MemTableFile::Ptr memTableFile; + memTable.GetCurrentMemTableFile(memTableFile); + size_t singleVectorMem = sizeof(float) * TABLE_DIM; + ASSERT_EQ(memTableFile->GetCurrentMem(), n_100 * singleVectorMem); + + int64_t n_max = engine::MAX_TABLE_FILE_MEM / singleVectorMem; + std::vector vectors_128M; + BuildVectors(n_max, vectors_128M); + + engine::VectorSource::Ptr source_128M = std::make_shared(n_max, vectors_128M.data()); + status = memTable.Add(source_128M); + ASSERT_TRUE(status.ok()); + + vector_ids = source_128M->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), n_max); + + memTable.GetCurrentMemTableFile(memTableFile); + ASSERT_EQ(memTableFile->GetCurrentMem(), n_100 * singleVectorMem); + + ASSERT_EQ(memTable.GetStackSize(), 2); + + int64_t n_1G = 1024000; + std::vector vectors_1G; + BuildVectors(n_1G, vectors_1G); + + engine::VectorSource::Ptr source_1G = std::make_shared(n_1G, vectors_1G.data()); + + status = memTable.Add(source_1G); + ASSERT_TRUE(status.ok()); + + vector_ids = source_1G->GetVectorIds(); + ASSERT_EQ(vector_ids.size(), n_1G); + + int expectedStackSize = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM); + ASSERT_EQ(memTable.GetStackSize(), expectedStackSize); + + status = impl_->DropAll(); + ASSERT_TRUE(status.ok()); +} + + -- GitLab