提交 4a0c682b 编写于 作者: G groot

MS-110 - Avoid huge file size


Former-commit-id: a36dd97a21b2e0d6e5cf99250a7b884500991708
上级 0086b464
...@@ -11,6 +11,7 @@ Please mark all change in change log and use the ticket from JIRA. ...@@ -11,6 +11,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-89 - Fix compile failed, libgpufaiss.a link missing - MS-89 - Fix compile failed, libgpufaiss.a link missing
- MS-90 - Fix arch match incorrect on ARM - MS-90 - Fix arch match incorrect on ARM
- MS-99 - Fix compilation bug - MS-99 - Fix compilation bug
- MS-110 - Avoid huge file size
## Improvement ## Improvement
- MS-82 - Update server startup welcome message - MS-82 - Update server startup welcome message
......
...@@ -472,7 +472,7 @@ void DBImpl::StartCompactionTask() { ...@@ -472,7 +472,7 @@ void DBImpl::StartCompactionTask() {
} }
//serialize memory data //serialize memory data
std::vector<std::string> temp_table_ids; std::set<std::string> temp_table_ids;
mem_mgr_->Serialize(temp_table_ids); mem_mgr_->Serialize(temp_table_ids);
for(auto& id : temp_table_ids) { for(auto& id : temp_table_ids) {
compact_table_ids_.insert(id); compact_table_ids_.insert(id);
...@@ -543,7 +543,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date, ...@@ -543,7 +543,8 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ << ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
" of size=" << index->PhysicalSize()/(1024*1024) << " M"; " of size=" << index->PhysicalSize()/(1024*1024) << " M";
index->Cache(); //current disable this line to avoid memory
//index->Cache();
return status; return status;
} }
...@@ -659,7 +660,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) { ...@@ -659,7 +660,8 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
<< index->PhysicalSize()/(1024*1024) << " M" << index->PhysicalSize()/(1024*1024) << " M"
<< " from file " << to_remove.file_id_; << " from file " << to_remove.file_id_;
index->Cache(); //current disable this line to avoid memory
//index->Cache();
} catch (std::exception& ex) { } catch (std::exception& ex) {
return Status::Error("Build index encounter exception", ex.what()); return Status::Error("Build index encounter exception", ex.what());
...@@ -698,7 +700,7 @@ Status DBImpl::Size(uint64_t& result) { ...@@ -698,7 +700,7 @@ Status DBImpl::Size(uint64_t& result) {
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
shutting_down_.store(true, std::memory_order_release); shutting_down_.store(true, std::memory_order_release);
bg_timer_thread_.join(); bg_timer_thread_.join();
std::vector<std::string> ids; std::set<std::string> ids;
mem_mgr_->Serialize(ids); mem_mgr_->Serialize(ids);
} }
......
...@@ -20,36 +20,54 @@ namespace engine { ...@@ -20,36 +20,54 @@ namespace engine {
MemVectors::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr, MemVectors::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
const meta::TableFileSchema& schema, const Options& options) const meta::TableFileSchema& schema, const Options& options)
: pMeta_(meta_ptr), : meta_(meta_ptr),
options_(options), options_(options),
schema_(schema), schema_(schema),
pIdGenerator_(new SimpleIDGenerator()), id_generator_(new SimpleIDGenerator()),
pEE_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) { active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) {
} }
void MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) { Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
if(active_engine_ == nullptr) {
return Status::Error("index engine is null");
}
auto start_time = METRICS_NOW_TIME; auto start_time = METRICS_NOW_TIME;
pIdGenerator_->GetNextIDNumbers(n_, vector_ids_); id_generator_->GetNextIDNumbers(n_, vector_ids_);
pEE_->AddWithIds(n_, vectors_, vector_ids_.data()); Status status = active_engine_->AddWithIds(n_, vectors_, vector_ids_.data());
auto end_time = METRICS_NOW_TIME; auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time); auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_), static_cast<int>(schema_.dimension_), total_time); server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_), static_cast<int>(schema_.dimension_), total_time);
return status;
} }
size_t MemVectors::Total() const { size_t MemVectors::RowCount() const {
return pEE_->Count(); if(active_engine_ == nullptr) {
return 0;
}
return active_engine_->Count();
} }
size_t MemVectors::ApproximateSize() const { size_t MemVectors::Size() const {
return pEE_->Size(); if(active_engine_ == nullptr) {
return 0;
}
return active_engine_->Size();
} }
Status MemVectors::Serialize(std::string& table_id) { Status MemVectors::Serialize(std::string& table_id) {
if(active_engine_ == nullptr) {
return Status::Error("index engine is null");
}
table_id = schema_.table_id_; table_id = schema_.table_id_;
auto size = ApproximateSize(); auto size = Size();
auto start_time = METRICS_NOW_TIME; auto start_time = METRICS_NOW_TIME;
pEE_->Serialize(); active_engine_->Serialize();
auto end_time = METRICS_NOW_TIME; auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time); auto total_time = METRICS_MICROSECONDS(start_time, end_time);
schema_.size_ = size; schema_.size_ = size;
...@@ -59,20 +77,20 @@ Status MemVectors::Serialize(std::string& table_id) { ...@@ -59,20 +77,20 @@ Status MemVectors::Serialize(std::string& table_id) {
schema_.file_type_ = (size >= options_.index_trigger_size) ? schema_.file_type_ = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW; meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
auto status = pMeta_->UpdateTableFile(schema_); auto status = meta_->UpdateTableFile(schema_);
LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index") LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << schema_.file_id_ << " of size " << (double)(pEE_->Size()) / (double)meta::M << " M"; << " file " << schema_.file_id_ << " of size " << (double)(active_engine_->Size()) / (double)meta::M << " M";
pEE_->Cache(); active_engine_->Cache();
return status; return status;
} }
MemVectors::~MemVectors() { MemVectors::~MemVectors() {
if (pIdGenerator_ != nullptr) { if (id_generator_ != nullptr) {
delete pIdGenerator_; delete id_generator_;
pIdGenerator_ = nullptr; id_generator_ = nullptr;
} }
} }
...@@ -81,20 +99,20 @@ MemVectors::~MemVectors() { ...@@ -81,20 +99,20 @@ MemVectors::~MemVectors() {
*/ */
MemManager::MemVectorsPtr MemManager::GetMemByTable( MemManager::MemVectorsPtr MemManager::GetMemByTable(
const std::string& table_id) { const std::string& table_id) {
auto memIt = memMap_.find(table_id); auto memIt = mem_id_map_.find(table_id);
if (memIt != memMap_.end()) { if (memIt != mem_id_map_.end()) {
return memIt->second; return memIt->second;
} }
meta::TableFileSchema table_file; meta::TableFileSchema table_file;
table_file.table_id_ = table_id; table_file.table_id_ = table_id;
auto status = pMeta_->CreateTableFile(table_file); auto status = meta_->CreateTableFile(table_file);
if (!status.ok()) { if (!status.ok()) {
return nullptr; return nullptr;
} }
memMap_[table_id] = MemVectorsPtr(new MemVectors(pMeta_, table_file, options_)); mem_id_map_[table_id] = MemVectorsPtr(new MemVectors(meta_, table_file, options_));
return memMap_[table_id]; return mem_id_map_[table_id];
} }
Status MemManager::InsertVectors(const std::string& table_id_, Status MemManager::InsertVectors(const std::string& table_id_,
...@@ -114,37 +132,44 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id, ...@@ -114,37 +132,44 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id,
if (mem == nullptr) { if (mem == nullptr) {
return Status::NotFound("Group " + table_id + " not found!"); return Status::NotFound("Group " + table_id + " not found!");
} }
mem->Add(n, vectors, vector_ids);
return Status::OK(); //makesure each file size less than index_trigger_size
if(mem->Size() > options_.index_trigger_size) {
std::unique_lock<std::mutex> lock(serialization_mtx_);
immu_mem_list_.push_back(mem);
mem_id_map_.erase(table_id);
return InsertVectorsNoLock(table_id, n, vectors, vector_ids);
} else {
return mem->Add(n, vectors, vector_ids);
}
} }
Status MemManager::ToImmutable() { Status MemManager::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
for (auto& kv: memMap_) { for (auto& kv: mem_id_map_) {
immMems_.push_back(kv.second); immu_mem_list_.push_back(kv.second);
} }
memMap_.clear(); mem_id_map_.clear();
return Status::OK(); return Status::OK();
} }
Status MemManager::Serialize(std::vector<std::string>& table_ids) { Status MemManager::Serialize(std::set<std::string>& table_ids) {
ToImmutable(); ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_); std::unique_lock<std::mutex> lock(serialization_mtx_);
std::string table_id; std::string table_id;
table_ids.clear(); table_ids.clear();
for (auto& mem : immMems_) { for (auto& mem : immu_mem_list_) {
mem->Serialize(table_id); mem->Serialize(table_id);
table_ids.push_back(table_id); table_ids.insert(table_id);
} }
immMems_.clear(); immu_mem_list_.clear();
return Status::OK(); return Status::OK();
} }
Status MemManager::EraseMemVector(const std::string& table_id) { Status MemManager::EraseMemVector(const std::string& table_id) {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
memMap_.erase(table_id); mem_id_map_.erase(table_id);
return Status::OK(); return Status::OK();
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <ctime> #include <ctime>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <set>
namespace zilliz { namespace zilliz {
namespace milvus { namespace milvus {
...@@ -32,11 +33,11 @@ public: ...@@ -32,11 +33,11 @@ public:
explicit MemVectors(const std::shared_ptr<meta::Meta>&, explicit MemVectors(const std::shared_ptr<meta::Meta>&,
const meta::TableFileSchema&, const Options&); const meta::TableFileSchema&, const Options&);
void Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_); Status Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
size_t Total() const; size_t RowCount() const;
size_t ApproximateSize() const; size_t Size() const;
Status Serialize(std::string& table_id); Status Serialize(std::string& table_id);
...@@ -49,11 +50,11 @@ private: ...@@ -49,11 +50,11 @@ private:
MemVectors(const MemVectors&) = delete; MemVectors(const MemVectors&) = delete;
MemVectors& operator=(const MemVectors&) = delete; MemVectors& operator=(const MemVectors&) = delete;
MetaPtr pMeta_; MetaPtr meta_;
Options options_; Options options_;
meta::TableFileSchema schema_; meta::TableFileSchema schema_;
IDGenerator* pIdGenerator_; IDGenerator* id_generator_;
ExecutionEnginePtr pEE_; ExecutionEnginePtr active_engine_;
}; // MemVectors }; // MemVectors
...@@ -66,14 +67,14 @@ public: ...@@ -66,14 +67,14 @@ public:
using Ptr = std::shared_ptr<MemManager>; using Ptr = std::shared_ptr<MemManager>;
MemManager(const std::shared_ptr<meta::Meta>& meta, const Options& options) MemManager(const std::shared_ptr<meta::Meta>& meta, const Options& options)
: pMeta_(meta), options_(options) {} : meta_(meta), options_(options) {}
MemVectorsPtr GetMemByTable(const std::string& table_id); MemVectorsPtr GetMemByTable(const std::string& table_id);
Status InsertVectors(const std::string& table_id, Status InsertVectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids); size_t n, const float* vectors, IDNumbers& vector_ids);
Status Serialize(std::vector<std::string>& table_ids); Status Serialize(std::set<std::string>& table_ids);
Status EraseMemVector(const std::string& table_id); Status EraseMemVector(const std::string& table_id);
...@@ -82,11 +83,11 @@ private: ...@@ -82,11 +83,11 @@ private:
size_t n, const float* vectors, IDNumbers& vector_ids); size_t n, const float* vectors, IDNumbers& vector_ids);
Status ToImmutable(); Status ToImmutable();
using MemMap = std::map<std::string, MemVectorsPtr>; using MemIdMap = std::map<std::string, MemVectorsPtr>;
using ImmMemPool = std::vector<MemVectorsPtr>; using MemList = std::vector<MemVectorsPtr>;
MemMap memMap_; MemIdMap mem_id_map_;
ImmMemPool immMems_; MemList immu_mem_list_;
MetaPtr pMeta_; MetaPtr meta_;
Options options_; Options options_;
std::mutex mutex_; std::mutex mutex_;
std::mutex serialization_mtx_; std::mutex serialization_mtx_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册