提交 97d129fc 编写于 作者: Z zhiru 提交者: jinhai

Implemented add and serialize


Former-commit-id: 02d0ec1da6c441ff0c05d9933a487886dfbd0f96
上级 8b05eec6
...@@ -6,24 +6,26 @@ namespace milvus { ...@@ -6,24 +6,26 @@ namespace milvus {
namespace engine { namespace engine {
MemTable::MemTable(const std::string& table_id, MemTable::MemTable(const std::string& table_id,
const std::shared_ptr<meta::Meta>& meta) : const std::shared_ptr<meta::Meta>& meta,
const Options& options) :
table_id_(table_id), table_id_(table_id),
meta_(meta) { meta_(meta),
options_(options) {
} }
Status MemTable::Add(VectorSource::Ptr& source) { Status MemTable::Add(VectorSource::Ptr& source) {
while (!source->AllAdded()) { while (!source->AllAdded()) {
MemTableFile::Ptr currentMemTableFile; MemTableFile::Ptr currentMemTableFile;
if (!mem_table_file_stack_.empty()) { if (!mem_table_file_list_.empty()) {
currentMemTableFile = mem_table_file_stack_.top(); currentMemTableFile = mem_table_file_list_.back();
} }
Status status; Status status;
if (mem_table_file_stack_.empty() || currentMemTableFile->isFull()) { if (mem_table_file_list_.empty() || currentMemTableFile->IsFull()) {
MemTableFile::Ptr newMemTableFile = std::make_shared<MemTableFile>(table_id_, meta_); MemTableFile::Ptr newMemTableFile = std::make_shared<MemTableFile>(table_id_, meta_, options_);
status = newMemTableFile->Add(source); status = newMemTableFile->Add(source);
if (status.ok()) { if (status.ok()) {
mem_table_file_stack_.push(newMemTableFile); mem_table_file_list_.emplace_back(newMemTableFile);
} }
} }
else { else {
...@@ -39,11 +41,23 @@ Status MemTable::Add(VectorSource::Ptr& source) { ...@@ -39,11 +41,23 @@ Status MemTable::Add(VectorSource::Ptr& source) {
} }
void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) { void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) {
mem_table_file = mem_table_file_stack_.top(); mem_table_file = mem_table_file_list_.back();
} }
size_t MemTable::GetStackSize() { size_t MemTable::GetStackSize() {
return mem_table_file_stack_.size(); return mem_table_file_list_.size();
}
Status MemTable::Serialize() {
for (auto& memTableFile : mem_table_file_list_) {
auto status = memTableFile->Serialize();
if (!status.ok()) {
std::string errMsg = "MemTable::Serialize failed: " + status.ToString();
ENGINE_LOG_ERROR << errMsg;
return Status::Error(errMsg);
}
}
return Status::OK();
} }
} // namespace engine } // namespace engine
......
...@@ -15,10 +15,10 @@ class MemTable { ...@@ -15,10 +15,10 @@ class MemTable {
public: public:
using Ptr = std::shared_ptr<MemTable>; using Ptr = std::shared_ptr<MemTable>;
using MemTableFileStack = std::stack<MemTableFile::Ptr>; using MemTableFileList = std::vector<MemTableFile::Ptr>;
using MetaPtr = meta::Meta::Ptr; using MetaPtr = meta::Meta::Ptr;
MemTable(const std::string& table_id, const std::shared_ptr<meta::Meta>& meta); MemTable(const std::string& table_id, const std::shared_ptr<meta::Meta>& meta, const Options& options);
Status Add(VectorSource::Ptr& source); Status Add(VectorSource::Ptr& source);
...@@ -26,13 +26,17 @@ public: ...@@ -26,13 +26,17 @@ public:
size_t GetStackSize(); size_t GetStackSize();
Status Serialize();
private: private:
const std::string table_id_; const std::string table_id_;
MemTableFileStack mem_table_file_stack_; MemTableFileList mem_table_file_list_;
MetaPtr meta_; MetaPtr meta_;
Options options_;
}; //MemTable }; //MemTable
} // namespace engine } // namespace engine
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "Constants.h" #include "Constants.h"
#include "Log.h" #include "Log.h"
#include "EngineFactory.h" #include "EngineFactory.h"
#include "metrics/Metrics.h"
#include <cmath> #include <cmath>
...@@ -10,9 +11,11 @@ namespace milvus { ...@@ -10,9 +11,11 @@ namespace milvus {
namespace engine { namespace engine {
MemTableFile::MemTableFile(const std::string& table_id, MemTableFile::MemTableFile(const std::string& table_id,
const std::shared_ptr<meta::Meta>& meta) : const std::shared_ptr<meta::Meta>& meta,
const Options& options) :
table_id_(table_id), table_id_(table_id),
meta_(meta) { meta_(meta),
options_(options) {
current_mem_ = 0; current_mem_ = 0;
auto status = CreateTableFile(); auto status = CreateTableFile();
...@@ -40,6 +43,13 @@ Status MemTableFile::CreateTableFile() { ...@@ -40,6 +43,13 @@ Status MemTableFile::CreateTableFile() {
Status MemTableFile::Add(const VectorSource::Ptr& source) { Status MemTableFile::Add(const VectorSource::Ptr& source) {
if (table_file_schema_.dimension_ <= 0) {
std::string errMsg = "MemTableFile::Add: table_file_schema dimension = " +
std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_;
ENGINE_LOG_ERROR << errMsg;
return Status::Error(errMsg);
}
size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;
size_t memLeft = GetMemLeft(); size_t memLeft = GetMemLeft();
if (memLeft >= singleVectorMemSize) { if (memLeft >= singleVectorMemSize) {
...@@ -62,11 +72,37 @@ size_t MemTableFile::GetMemLeft() { ...@@ -62,11 +72,37 @@ size_t MemTableFile::GetMemLeft() {
return (MAX_TABLE_FILE_MEM - current_mem_); return (MAX_TABLE_FILE_MEM - current_mem_);
} }
bool MemTableFile::isFull() { bool MemTableFile::IsFull() {
size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE; size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;
return (GetMemLeft() < singleVectorMemSize); return (GetMemLeft() < singleVectorMemSize);
} }
Status MemTableFile::Serialize() {
auto start_time = METRICS_NOW_TIME;
auto size = GetCurrentMem();
execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.size_ = size;
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double)size/total_time);
table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
auto status = meta_->UpdateTableFile(table_file_schema_);
LOG(DEBUG) << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << table_file_schema_.file_id_ << " of size " << (double)size / (double)M << " M";
execution_engine_->Cache();
return status;
}
} // namespace engine } // namespace engine
} // namespace milvus } // namespace milvus
} // namespace zilliz } // namespace zilliz
\ No newline at end of file
...@@ -16,7 +16,7 @@ public: ...@@ -16,7 +16,7 @@ public:
using Ptr = std::shared_ptr<MemTableFile>; using Ptr = std::shared_ptr<MemTableFile>;
using MetaPtr = meta::Meta::Ptr; using MetaPtr = meta::Meta::Ptr;
MemTableFile(const std::string& table_id, const std::shared_ptr<meta::Meta>& meta); MemTableFile(const std::string& table_id, const std::shared_ptr<meta::Meta>& meta, const Options& options);
Status Add(const VectorSource::Ptr& source); Status Add(const VectorSource::Ptr& source);
...@@ -24,7 +24,9 @@ public: ...@@ -24,7 +24,9 @@ public:
size_t GetMemLeft(); size_t GetMemLeft();
bool isFull(); bool IsFull();
Status Serialize();
private: private:
...@@ -36,6 +38,8 @@ private: ...@@ -36,6 +38,8 @@ private:
MetaPtr meta_; MetaPtr meta_;
Options options_;
size_t current_mem_; size_t current_mem_;
ExecutionEnginePtr execution_engine_; ExecutionEnginePtr execution_engine_;
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "ExecutionEngine.h" #include "ExecutionEngine.h"
#include "EngineFactory.h" #include "EngineFactory.h"
#include "Log.h" #include "Log.h"
#include "metrics/Metrics.h"
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -21,12 +22,7 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, ...@@ -21,12 +22,7 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine,
const size_t& num_vectors_to_add, const size_t& num_vectors_to_add,
size_t& num_vectors_added) { size_t& num_vectors_added) {
if (table_file_schema.dimension_ <= 0) { auto start_time = METRICS_NOW_TIME;
std::string errMsg = "VectorSource::Add: table_file_schema dimension = " +
std::to_string(table_file_schema.dimension_) + ", table_id = " + table_file_schema.table_id_;
ENGINE_LOG_ERROR << errMsg;
return Status::Error(errMsg);
}
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added; num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added;
IDNumbers vector_ids_to_add; IDNumbers vector_ids_to_add;
...@@ -40,6 +36,10 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine, ...@@ -40,6 +36,10 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine,
ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString(); ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString();
} }
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_), static_cast<int>(table_file_schema.dimension_), total_time);
return status; return status;
} }
......
...@@ -28,8 +28,6 @@ public: ...@@ -28,8 +28,6 @@ public:
IDNumbers GetVectorIds(); IDNumbers GetVectorIds();
// Status Serialize();
private: private:
const size_t n_; const size_t n_;
......
...@@ -86,12 +86,13 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) { ...@@ -86,12 +86,13 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) {
TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { TEST(MEM_TEST, MEM_TABLE_FILE_TEST) {
std::shared_ptr<engine::meta::DBMetaImpl> impl_ = engine::DBMetaImplFactory::Build(); std::shared_ptr<engine::meta::DBMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
auto options = engine::OptionsFactory::Build();
engine::meta::TableSchema table_schema = BuildTableSchema(); engine::meta::TableSchema table_schema = BuildTableSchema();
auto status = impl_->CreateTable(table_schema); auto status = impl_->CreateTable(table_schema);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
engine::MemTableFile memTableFile(TABLE_NAME, impl_); engine::MemTableFile memTableFile(TABLE_NAME, impl_, options);
int64_t n_100 = 100; int64_t n_100 = 100;
std::vector<float> vectors_100; std::vector<float> vectors_100;
...@@ -120,7 +121,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { ...@@ -120,7 +121,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) {
vector_ids = source_128M->GetVectorIds(); vector_ids = source_128M->GetVectorIds();
ASSERT_EQ(vector_ids.size(), n_max - n_100); ASSERT_EQ(vector_ids.size(), n_max - n_100);
ASSERT_TRUE(memTableFile.isFull()); ASSERT_TRUE(memTableFile.IsFull());
status = impl_->DropAll(); status = impl_->DropAll();
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
...@@ -129,6 +130,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) { ...@@ -129,6 +130,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) {
TEST(MEM_TEST, MEM_TABLE_TEST) { TEST(MEM_TEST, MEM_TABLE_TEST) {
std::shared_ptr<engine::meta::DBMetaImpl> impl_ = engine::DBMetaImplFactory::Build(); std::shared_ptr<engine::meta::DBMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
auto options = engine::OptionsFactory::Build();
engine::meta::TableSchema table_schema = BuildTableSchema(); engine::meta::TableSchema table_schema = BuildTableSchema();
auto status = impl_->CreateTable(table_schema); auto status = impl_->CreateTable(table_schema);
...@@ -140,7 +142,7 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { ...@@ -140,7 +142,7 @@ TEST(MEM_TEST, MEM_TABLE_TEST) {
engine::VectorSource::Ptr source_100 = std::make_shared<engine::VectorSource>(n_100, vectors_100.data()); engine::VectorSource::Ptr source_100 = std::make_shared<engine::VectorSource>(n_100, vectors_100.data());
engine::MemTable memTable(TABLE_NAME, impl_); engine::MemTable memTable(TABLE_NAME, impl_, options);
status = memTable.Add(source_100); status = memTable.Add(source_100);
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
...@@ -184,6 +186,9 @@ TEST(MEM_TEST, MEM_TABLE_TEST) { ...@@ -184,6 +186,9 @@ TEST(MEM_TEST, MEM_TABLE_TEST) {
int expectedStackSize = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM); int expectedStackSize = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM);
ASSERT_EQ(memTable.GetStackSize(), expectedStackSize); ASSERT_EQ(memTable.GetStackSize(), expectedStackSize);
status = memTable.Serialize();
ASSERT_TRUE(status.ok());
status = impl_->DropAll(); status = impl_->DropAll();
ASSERT_TRUE(status.ok()); ASSERT_TRUE(status.ok());
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册