提交 b6d3c664 编写于 作者: Z zhiru 提交者: jinhai

Implemented add and serialize


Former-commit-id: 35a61d2fd0fc9c703ec4efbc9fe8399162e79b8e
上级 d2f205d1
......@@ -6,24 +6,26 @@ namespace milvus {
namespace engine {
MemTable::MemTable(const std::string& table_id,
const std::shared_ptr<meta::Meta>& meta) :
const std::shared_ptr<meta::Meta>& meta,
const Options& options) :
table_id_(table_id),
meta_(meta) {
meta_(meta),
options_(options) {
}
Status MemTable::Add(VectorSource::Ptr& source) {
while (!source->AllAdded()) {
MemTableFile::Ptr currentMemTableFile;
if (!mem_table_file_stack_.empty()) {
currentMemTableFile = mem_table_file_stack_.top();
if (!mem_table_file_list_.empty()) {
currentMemTableFile = mem_table_file_list_.back();
}
Status status;
if (mem_table_file_stack_.empty() || currentMemTableFile->isFull()) {
MemTableFile::Ptr newMemTableFile = std::make_shared<MemTableFile>(table_id_, meta_);
if (mem_table_file_list_.empty() || currentMemTableFile->IsFull()) {
MemTableFile::Ptr newMemTableFile = std::make_shared<MemTableFile>(table_id_, meta_, options_);
status = newMemTableFile->Add(source);
if (status.ok()) {
mem_table_file_stack_.push(newMemTableFile);
mem_table_file_list_.emplace_back(newMemTableFile);
}
}
else {
......@@ -39,11 +41,23 @@ Status MemTable::Add(VectorSource::Ptr& source) {
}
void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr& mem_table_file) {
mem_table_file = mem_table_file_stack_.top();
mem_table_file = mem_table_file_list_.back();
}
size_t MemTable::GetStackSize() {
return mem_table_file_stack_.size();
return mem_table_file_list_.size();
}
Status MemTable::Serialize() {
for (auto& memTableFile : mem_table_file_list_) {
auto status = memTableFile->Serialize();
if (!status.ok()) {
std::string errMsg = "MemTable::Serialize failed: " + status.ToString();
ENGINE_LOG_ERROR << errMsg;
return Status::Error(errMsg);
}
}
return Status::OK();
}
} // namespace engine
......
......@@ -15,10 +15,10 @@ class MemTable {
public:
using Ptr = std::shared_ptr<MemTable>;
using MemTableFileStack = std::stack<MemTableFile::Ptr>;
using MemTableFileList = std::vector<MemTableFile::Ptr>;
using MetaPtr = meta::Meta::Ptr;
MemTable(const std::string& table_id, const std::shared_ptr<meta::Meta>& meta);
MemTable(const std::string& table_id, const std::shared_ptr<meta::Meta>& meta, const Options& options);
Status Add(VectorSource::Ptr& source);
......@@ -26,13 +26,17 @@ public:
size_t GetStackSize();
Status Serialize();
private:
const std::string table_id_;
MemTableFileStack mem_table_file_stack_;
MemTableFileList mem_table_file_list_;
MetaPtr meta_;
Options options_;
}; //MemTable
} // namespace engine
......
......@@ -2,6 +2,7 @@
#include "Constants.h"
#include "Log.h"
#include "EngineFactory.h"
#include "metrics/Metrics.h"
#include <cmath>
......@@ -10,9 +11,11 @@ namespace milvus {
namespace engine {
MemTableFile::MemTableFile(const std::string& table_id,
const std::shared_ptr<meta::Meta>& meta) :
const std::shared_ptr<meta::Meta>& meta,
const Options& options) :
table_id_(table_id),
meta_(meta) {
meta_(meta),
options_(options) {
current_mem_ = 0;
auto status = CreateTableFile();
......@@ -40,6 +43,13 @@ Status MemTableFile::CreateTableFile() {
Status MemTableFile::Add(const VectorSource::Ptr& source) {
if (table_file_schema_.dimension_ <= 0) {
std::string errMsg = "MemTableFile::Add: table_file_schema dimension = " +
std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_;
ENGINE_LOG_ERROR << errMsg;
return Status::Error(errMsg);
}
size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;
size_t memLeft = GetMemLeft();
if (memLeft >= singleVectorMemSize) {
......@@ -62,11 +72,37 @@ size_t MemTableFile::GetMemLeft() {
return (MAX_TABLE_FILE_MEM - current_mem_);
}
bool MemTableFile::isFull() {
bool MemTableFile::IsFull() {
size_t singleVectorMemSize = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;
return (GetMemLeft() < singleVectorMemSize);
}
Status MemTableFile::Serialize() {
auto start_time = METRICS_NOW_TIME;
auto size = GetCurrentMem();
execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.size_ = size;
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double)size/total_time);
table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
auto status = meta_->UpdateTableFile(table_file_schema_);
LOG(DEBUG) << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << table_file_schema_.file_id_ << " of size " << (double)size / (double)M << " M";
execution_engine_->Cache();
return status;
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
......@@ -16,7 +16,7 @@ public:
using Ptr = std::shared_ptr<MemTableFile>;
using MetaPtr = meta::Meta::Ptr;
MemTableFile(const std::string& table_id, const std::shared_ptr<meta::Meta>& meta);
MemTableFile(const std::string& table_id, const std::shared_ptr<meta::Meta>& meta, const Options& options);
Status Add(const VectorSource::Ptr& source);
......@@ -24,7 +24,9 @@ public:
size_t GetMemLeft();
bool isFull();
bool IsFull();
Status Serialize();
private:
......@@ -36,6 +38,8 @@ private:
MetaPtr meta_;
Options options_;
size_t current_mem_;
ExecutionEnginePtr execution_engine_;
......
......@@ -2,6 +2,7 @@
#include "ExecutionEngine.h"
#include "EngineFactory.h"
#include "Log.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace milvus {
......@@ -21,12 +22,7 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine,
const size_t& num_vectors_to_add,
size_t& num_vectors_added) {
if (table_file_schema.dimension_ <= 0) {
std::string errMsg = "VectorSource::Add: table_file_schema dimension = " +
std::to_string(table_file_schema.dimension_) + ", table_id = " + table_file_schema.table_id_;
ENGINE_LOG_ERROR << errMsg;
return Status::Error(errMsg);
}
auto start_time = METRICS_NOW_TIME;
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ? num_vectors_to_add : n_ - current_num_vectors_added;
IDNumbers vector_ids_to_add;
......@@ -40,6 +36,10 @@ Status VectorSource::Add(const ExecutionEnginePtr& execution_engine,
ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString();
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_), static_cast<int>(table_file_schema.dimension_), total_time);
return status;
}
......
......@@ -28,8 +28,6 @@ public:
IDNumbers GetVectorIds();
// Status Serialize();
private:
const size_t n_;
......
......@@ -86,12 +86,13 @@ TEST(MEM_TEST, VECTOR_SOURCE_TEST) {
TEST(MEM_TEST, MEM_TABLE_FILE_TEST) {
std::shared_ptr<engine::meta::DBMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
auto options = engine::OptionsFactory::Build();
engine::meta::TableSchema table_schema = BuildTableSchema();
auto status = impl_->CreateTable(table_schema);
ASSERT_TRUE(status.ok());
engine::MemTableFile memTableFile(TABLE_NAME, impl_);
engine::MemTableFile memTableFile(TABLE_NAME, impl_, options);
int64_t n_100 = 100;
std::vector<float> vectors_100;
......@@ -120,7 +121,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) {
vector_ids = source_128M->GetVectorIds();
ASSERT_EQ(vector_ids.size(), n_max - n_100);
ASSERT_TRUE(memTableFile.isFull());
ASSERT_TRUE(memTableFile.IsFull());
status = impl_->DropAll();
ASSERT_TRUE(status.ok());
......@@ -129,6 +130,7 @@ TEST(MEM_TEST, MEM_TABLE_FILE_TEST) {
TEST(MEM_TEST, MEM_TABLE_TEST) {
std::shared_ptr<engine::meta::DBMetaImpl> impl_ = engine::DBMetaImplFactory::Build();
auto options = engine::OptionsFactory::Build();
engine::meta::TableSchema table_schema = BuildTableSchema();
auto status = impl_->CreateTable(table_schema);
......@@ -140,7 +142,7 @@ TEST(MEM_TEST, MEM_TABLE_TEST) {
engine::VectorSource::Ptr source_100 = std::make_shared<engine::VectorSource>(n_100, vectors_100.data());
engine::MemTable memTable(TABLE_NAME, impl_);
engine::MemTable memTable(TABLE_NAME, impl_, options);
status = memTable.Add(source_100);
ASSERT_TRUE(status.ok());
......@@ -184,6 +186,9 @@ TEST(MEM_TEST, MEM_TABLE_TEST) {
int expectedStackSize = 2 + std::ceil((n_1G - n_100) * singleVectorMem / engine::MAX_TABLE_FILE_MEM);
ASSERT_EQ(memTable.GetStackSize(), expectedStackSize);
status = memTable.Serialize();
ASSERT_TRUE(status.ok());
status = impl_->DropAll();
ASSERT_TRUE(status.ok());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册