未验证 提交 ed012f29 编写于 作者: G groot 提交者: GitHub

Flush data when insert buffer is full (#2659)

* fix potential problem
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* insert buffer full trigger flush
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* cleanup files if no collection flushed
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* typo
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* format
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
Co-authored-by: NJin Hai <hai.jin@zilliz.com>
上级 3b38f7f1
......@@ -1023,6 +1023,10 @@ DBImpl::Flush(const std::string& collection_id) {
if (lsn != 0) {
swn_wal_.Notify();
flush_req_swn_.Wait();
} else {
// no collection flushed, call merge task to cleanup files
std::set<std::string> merge_collection_ids;
StartMergeTask(merge_collection_ids);
}
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
......@@ -1050,6 +1054,10 @@ DBImpl::Flush() {
if (lsn != 0) {
swn_wal_.Notify();
flush_req_swn_.Wait();
} else {
// no collection flushed, call merge task to cleanup files
std::set<std::string> merge_collection_ids;
StartMergeTask(merge_collection_ids);
}
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
......@@ -2557,16 +2565,11 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return max_lsn;
};
auto partition_flushed = [&](const std::string& collection_id, const std::string& partition,
const std::string& target_collection_name) {
if (options_.wal_enable_) {
uint64_t lsn = 0;
meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn);
wal_mgr_->PartitionFlushed(collection_id, partition, lsn);
auto force_flush_if_mem_full = [&]() -> uint64_t {
if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
InternalFlush();
}
std::set<std::string> merge_collection_ids = {target_collection_name};
StartMergeTask(merge_collection_ids);
};
Status status;
......@@ -2580,15 +2583,12 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return status;
}
std::set<std::string> flushed_collections;
status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids,
(record.data_size / record.length / sizeof(float)),
(const float*)record.data, record.attr_nbytes, record.attr_data_size,
record.attr_data, record.lsn, flushed_collections);
if (!flushed_collections.empty()) {
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
}
status = mem_mgr_->InsertEntities(
target_collection_name, record.length, record.ids, (record.data_size / record.length / sizeof(float)),
(const float*)record.data, record.attr_nbytes, record.attr_data_size, record.attr_data, record.lsn);
force_flush_if_mem_full();
// metrics
milvus::server::CollectInsertMetrics metrics(record.length, status);
break;
}
......@@ -2600,14 +2600,10 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return status;
}
std::set<std::string> flushed_collections;
status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
(record.data_size / record.length / sizeof(uint8_t)),
(const u_int8_t*)record.data, record.lsn, flushed_collections);
// even though !status.ok, run
if (!flushed_collections.empty()) {
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
}
(const u_int8_t*)record.data, record.lsn);
force_flush_if_mem_full();
// metrics
milvus::server::CollectInsertMetrics metrics(record.length, status);
......@@ -2622,14 +2618,10 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return status;
}
std::set<std::string> flushed_collections;
status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
(record.data_size / record.length / sizeof(float)),
(const float*)record.data, record.lsn, flushed_collections);
// even though !status.ok, run
if (!flushed_collections.empty()) {
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
}
(const float*)record.data, record.lsn);
force_flush_if_mem_full();
// metrics
milvus::server::CollectInsertMetrics metrics(record.length, status);
......
......@@ -27,18 +27,17 @@ class MemManager {
public:
virtual Status
InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) = 0;
const float* vectors, uint64_t lsn) = 0;
virtual Status
InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const uint8_t* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) = 0;
const uint8_t* vectors, uint64_t lsn) = 0;
virtual Status
InsertEntities(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, uint64_t>& attr_size,
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn,
std::set<std::string>& flushed_tables) = 0;
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn) = 0;
virtual Status
DeleteVector(const std::string& collection_id, IDNumber vector_id, uint64_t lsn) = 0;
......@@ -47,10 +46,11 @@ class MemManager {
DeleteVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) = 0;
virtual Status
Flush(const std::string& collection_id, bool apply_delete = true) = 0;
Flush(const std::string& collection_id) = 0;
virtual Status
Flush(std::set<std::string>& collection_ids, bool apply_delete = true) = 0;
Flush(std::set<std::string>& collection_ids) = 0;
// virtual Status
// Serialize(std::set<std::string>& table_ids) = 0;
......
......@@ -34,17 +34,7 @@ MemManagerImpl::GetMemByTable(const std::string& collection_id) {
Status
MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) {
flushed_tables.clear();
if (GetCurrentMem() > options_.insert_buffer_size_) {
// TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge
auto status = Flush(flushed_tables, false);
fiu_do_on("MemManagerImpl::InsertVectors_flush_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
}
const float* vectors, uint64_t lsn) {
VectorsData vectors_data;
vectors_data.vector_count_ = length;
vectors_data.float_data_.resize(length * dim);
......@@ -60,19 +50,7 @@ MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length,
Status
MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const uint8_t* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) {
flushed_tables.clear();
if (GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0)
<< "Insert buffer size exceeds limit. Performing force flush";
// TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge
auto status = Flush(flushed_tables, false);
if (!status.ok()) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Flush fail: " << status.message();
return status;
}
}
const uint8_t* vectors, uint64_t lsn) {
VectorsData vectors_data;
vectors_data.vector_count_ = length;
vectors_data.binary_data_.resize(length * dim);
......@@ -91,19 +69,7 @@ MemManagerImpl::InsertEntities(const std::string& collection_id, int64_t length,
int64_t dim, const float* vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, uint64_t>& attr_size,
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn,
std::set<std::string>& flushed_tables) {
flushed_tables.clear();
if (GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0)
<< "Insert buffer size exceeds limit. Performing force flush";
auto status = Flush(flushed_tables, false);
if (!status.ok()) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Flush fail: " << status.message();
return status;
}
}
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn) {
VectorsData vectors_data;
vectors_data.vector_count_ = length;
vectors_data.float_data_.resize(length * dim);
......@@ -174,7 +140,7 @@ MemManagerImpl::DeleteVectors(const std::string& collection_id, int64_t length,
}
Status
MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) {
MemManagerImpl::Flush(const std::string& collection_id) {
ToImmutable(collection_id);
// TODO: There is actually only one memTable in the immutable list
MemList temp_immutable_list;
......@@ -187,7 +153,7 @@ MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) {
auto max_lsn = GetMaxLSN(temp_immutable_list);
for (auto& mem : temp_immutable_list) {
LOG_ENGINE_DEBUG_ << "Flushing collection: " << mem->GetTableId();
auto status = mem->Serialize(max_lsn, apply_delete);
auto status = mem->Serialize(max_lsn, true);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Flush collection " << mem->GetTableId() << " failed";
return status;
......@@ -199,7 +165,7 @@ MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) {
}
Status
MemManagerImpl::Flush(std::set<std::string>& collection_ids, bool apply_delete) {
MemManagerImpl::Flush(std::set<std::string>& collection_ids) {
ToImmutable();
MemList temp_immutable_list;
......@@ -213,7 +179,7 @@ MemManagerImpl::Flush(std::set<std::string>& collection_ids, bool apply_delete)
auto max_lsn = GetMaxLSN(temp_immutable_list);
for (auto& mem : temp_immutable_list) {
LOG_ENGINE_DEBUG_ << "Flushing collection: " << mem->GetTableId();
auto status = mem->Serialize(max_lsn, apply_delete);
auto status = mem->Serialize(max_lsn, true);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Flush collection " << mem->GetTableId() << " failed";
return status;
......
......@@ -43,18 +43,17 @@ class MemManagerImpl : public MemManager, public server::CacheConfigHandler {
Status
InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) override;
const float* vectors, uint64_t lsn) override;
Status
InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const uint8_t* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) override;
const uint8_t* vectors, uint64_t lsn) override;
Status
InsertEntities(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, uint64_t>& attr_size,
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn,
std::set<std::string>& flushed_tables) override;
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn) override;
Status
DeleteVector(const std::string& collection_id, IDNumber vector_id, uint64_t lsn) override;
......@@ -63,10 +62,10 @@ class MemManagerImpl : public MemManager, public server::CacheConfigHandler {
DeleteVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) override;
Status
Flush(const std::string& collection_id, bool apply_delete = true) override;
Flush(const std::string& collection_id) override;
Status
Flush(std::set<std::string>& collection_ids, bool apply_delete = true) override;
Flush(std::set<std::string>& collection_ids) override;
// Status
// Serialize(std::set<std::string>& table_ids) override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册