未验证 提交 d7fff508 编写于 作者: G groot 提交者: GitHub

merge #2659 from 0.10.1 (#2666)

* merge #2659 from 0.10.1
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* merge #2657 from 0.10.1
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* format
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>

* typo
Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 18998a43
......@@ -8,13 +8,13 @@ Please mark all changes in change log and use the issue from GitHub
- \#2487 Remove timeout when creating collection in dev test
- \#2532 Fix Milvus docker image report illegal instruction
- \#2551 Fix test_hybrid_db and test_rpc error
- \#2557 fix random crash of INSERT_DUPLICATE_ID case
- \#2557 Fix random crash of INSERT_DUPLICATE_ID case
- \#2578 Result count doesn't match target vectors count
- \#2582 CreateHybridIndex.cpp compile error
- \#2598 fix Milvus docker image report illegal instruction
- \#2598 Fix Milvus docker image report illegal instruction
- \#2617 Fix HNSW and RNSG index files size
- \#2637 Suit the range of HNSW parameters
- \#2649 search parameter of annoy has conflict with document
- \#2649 Search parameter of annoy has conflict with document
## Feature
- \#2319 Redo metadata to support MVCC
......
......@@ -1024,6 +1024,10 @@ DBImpl::Flush(const std::string& collection_id) {
if (lsn != 0) {
swn_wal_.Notify();
flush_req_swn_.Wait();
} else {
// no collection flushed, call merge task to cleanup files
std::set<std::string> merge_collection_ids;
StartMergeTask(merge_collection_ids);
}
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
......@@ -1051,6 +1055,10 @@ DBImpl::Flush() {
if (lsn != 0) {
swn_wal_.Notify();
flush_req_swn_.Wait();
} else {
// no collection flushed, call merge task to cleanup files
std::set<std::string> merge_collection_ids;
StartMergeTask(merge_collection_ids);
}
} else {
LOG_ENGINE_DEBUG_ << "MemTable flush";
......@@ -3015,7 +3023,7 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
auto collections_flushed = [&](const std::string collection_id,
const std::set<std::string>& target_collection_names) -> uint64_t {
uint64_t max_lsn = 0;
if (options_.wal_enable_) {
if (options_.wal_enable_ && !target_collection_names.empty()) {
uint64_t lsn = 0;
for (auto& collection : target_collection_names) {
meta_ptr_->GetCollectionFlushLSN(collection, lsn);
......@@ -3034,16 +3042,11 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return max_lsn;
};
auto partition_flushed = [&](const std::string& collection_id, const std::string& partition,
const std::string& target_collection_name) {
if (options_.wal_enable_) {
uint64_t lsn = 0;
meta_ptr_->GetCollectionFlushLSN(target_collection_name, lsn);
wal_mgr_->PartitionFlushed(collection_id, partition, lsn);
auto force_flush_if_mem_full = [&]() -> uint64_t {
if (mem_mgr_->GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Insert buffer size exceeds limit. Force flush";
InternalFlush();
}
std::set<std::string> merge_collection_ids = {target_collection_name};
StartMergeTask(merge_collection_ids);
};
Status status;
......@@ -3057,15 +3060,12 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return status;
}
std::set<std::string> flushed_collections;
status = mem_mgr_->InsertEntities(target_collection_name, record.length, record.ids,
(record.data_size / record.length / sizeof(float)),
(const float*)record.data, record.attr_nbytes, record.attr_data_size,
record.attr_data, record.lsn, flushed_collections);
if (!flushed_collections.empty()) {
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
}
status = mem_mgr_->InsertEntities(
target_collection_name, record.length, record.ids, (record.data_size / record.length / sizeof(float)),
(const float*)record.data, record.attr_nbytes, record.attr_data_size, record.attr_data, record.lsn);
force_flush_if_mem_full();
// metrics
milvus::server::CollectInsertMetrics metrics(record.length, status);
break;
}
......@@ -3077,14 +3077,10 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return status;
}
std::set<std::string> flushed_collections;
status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
(record.data_size / record.length / sizeof(uint8_t)),
(const u_int8_t*)record.data, record.lsn, flushed_collections);
// even though !status.ok, run
if (!flushed_collections.empty()) {
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
}
(const u_int8_t*)record.data, record.lsn);
force_flush_if_mem_full();
// metrics
milvus::server::CollectInsertMetrics metrics(record.length, status);
......@@ -3099,14 +3095,10 @@ DBImpl::ExecWalRecord(const wal::MXLogRecord& record) {
return status;
}
std::set<std::string> flushed_collections;
status = mem_mgr_->InsertVectors(target_collection_name, record.length, record.ids,
(record.data_size / record.length / sizeof(float)),
(const float*)record.data, record.lsn, flushed_collections);
// even though !status.ok, run
if (!flushed_collections.empty()) {
partition_flushed(record.collection_id, record.partition_tag, target_collection_name);
}
(const float*)record.data, record.lsn);
force_flush_if_mem_full();
// metrics
milvus::server::CollectInsertMetrics metrics(record.length, status);
......@@ -3220,6 +3212,7 @@ DBImpl::BackgroundWalThread() {
next_auto_flush_time = get_next_auto_flush_time();
}
InternalFlush();
while (true) {
if (options_.auto_flush_interval_ > 0) {
if (std::chrono::system_clock::now() >= next_auto_flush_time) {
......
......@@ -27,18 +27,17 @@ class MemManager {
public:
virtual Status
InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) = 0;
const float* vectors, uint64_t lsn) = 0;
virtual Status
InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const uint8_t* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) = 0;
const uint8_t* vectors, uint64_t lsn) = 0;
virtual Status
InsertEntities(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, uint64_t>& attr_size,
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn,
std::set<std::string>& flushed_tables) = 0;
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn) = 0;
virtual Status
DeleteVector(const std::string& collection_id, IDNumber vector_id, uint64_t lsn) = 0;
......@@ -47,10 +46,10 @@ class MemManager {
DeleteVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) = 0;
virtual Status
Flush(const std::string& collection_id, bool apply_delete = true) = 0;
Flush(const std::string& collection_id) = 0;
virtual Status
Flush(std::set<std::string>& collection_ids, bool apply_delete = true) = 0;
Flush(std::set<std::string>& collection_ids) = 0;
// virtual Status
// Serialize(std::set<std::string>& table_ids) = 0;
......
......@@ -34,17 +34,7 @@ MemManagerImpl::GetMemByTable(const std::string& collection_id) {
Status
MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) {
flushed_tables.clear();
if (GetCurrentMem() > options_.insert_buffer_size_) {
// TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge
auto status = Flush(flushed_tables, false);
fiu_do_on("MemManagerImpl::InsertVectors_flush_fail", status = Status(milvus::SERVER_UNEXPECTED_ERROR, ""));
if (!status.ok()) {
return status;
}
}
const float* vectors, uint64_t lsn) {
VectorsData vectors_data;
vectors_data.vector_count_ = length;
vectors_data.float_data_.resize(length * dim);
......@@ -60,19 +50,7 @@ MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length,
Status
MemManagerImpl::InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const uint8_t* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) {
flushed_tables.clear();
if (GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0)
<< "Insert buffer size exceeds limit. Performing force flush";
// TODO(zhiru): Don't apply delete here in order to avoid possible concurrency issues with Merge
auto status = Flush(flushed_tables, false);
if (!status.ok()) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Flush fail: " << status.message();
return status;
}
}
const uint8_t* vectors, uint64_t lsn) {
VectorsData vectors_data;
vectors_data.vector_count_ = length;
vectors_data.binary_data_.resize(length * dim);
......@@ -91,19 +69,7 @@ MemManagerImpl::InsertEntities(const std::string& collection_id, int64_t length,
int64_t dim, const float* vectors,
const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, uint64_t>& attr_size,
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn,
std::set<std::string>& flushed_tables) {
flushed_tables.clear();
if (GetCurrentMem() > options_.insert_buffer_size_) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0)
<< "Insert buffer size exceeds limit. Performing force flush";
auto status = Flush(flushed_tables, false);
if (!status.ok()) {
LOG_ENGINE_DEBUG_ << LogOut("[%s][%ld] ", "insert", 0) << "Flush fail: " << status.message();
return status;
}
}
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn) {
VectorsData vectors_data;
vectors_data.vector_count_ = length;
vectors_data.float_data_.resize(length * dim);
......@@ -174,7 +140,7 @@ MemManagerImpl::DeleteVectors(const std::string& collection_id, int64_t length,
}
Status
MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) {
MemManagerImpl::Flush(const std::string& collection_id) {
ToImmutable(collection_id);
// TODO: There is actually only one memTable in the immutable list
MemList temp_immutable_list;
......@@ -187,7 +153,7 @@ MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) {
auto max_lsn = GetMaxLSN(temp_immutable_list);
for (auto& mem : temp_immutable_list) {
LOG_ENGINE_DEBUG_ << "Flushing collection: " << mem->GetTableId();
auto status = mem->Serialize(max_lsn, apply_delete);
auto status = mem->Serialize(max_lsn, true);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Flush collection " << mem->GetTableId() << " failed";
return status;
......@@ -199,7 +165,7 @@ MemManagerImpl::Flush(const std::string& collection_id, bool apply_delete) {
}
Status
MemManagerImpl::Flush(std::set<std::string>& collection_ids, bool apply_delete) {
MemManagerImpl::Flush(std::set<std::string>& collection_ids) {
ToImmutable();
MemList temp_immutable_list;
......@@ -213,7 +179,7 @@ MemManagerImpl::Flush(std::set<std::string>& collection_ids, bool apply_delete)
auto max_lsn = GetMaxLSN(temp_immutable_list);
for (auto& mem : temp_immutable_list) {
LOG_ENGINE_DEBUG_ << "Flushing collection: " << mem->GetTableId();
auto status = mem->Serialize(max_lsn, apply_delete);
auto status = mem->Serialize(max_lsn, true);
if (!status.ok()) {
LOG_ENGINE_ERROR_ << "Flush collection " << mem->GetTableId() << " failed";
return status;
......
......@@ -43,18 +43,17 @@ class MemManagerImpl : public MemManager, public server::CacheConfigHandler {
Status
InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) override;
const float* vectors, uint64_t lsn) override;
Status
InsertVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const uint8_t* vectors, uint64_t lsn, std::set<std::string>& flushed_tables) override;
const uint8_t* vectors, uint64_t lsn) override;
Status
InsertEntities(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, int64_t dim,
const float* vectors, const std::unordered_map<std::string, uint64_t>& attr_nbytes,
const std::unordered_map<std::string, uint64_t>& attr_size,
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn,
std::set<std::string>& flushed_tables) override;
const std::unordered_map<std::string, std::vector<uint8_t>>& attr_data, uint64_t lsn) override;
Status
DeleteVector(const std::string& collection_id, IDNumber vector_id, uint64_t lsn) override;
......@@ -63,10 +62,10 @@ class MemManagerImpl : public MemManager, public server::CacheConfigHandler {
DeleteVectors(const std::string& collection_id, int64_t length, const IDNumber* vector_ids, uint64_t lsn) override;
Status
Flush(const std::string& collection_id, bool apply_delete = true) override;
Flush(const std::string& collection_id) override;
Status
Flush(std::set<std::string>& collection_ids, bool apply_delete = true) override;
Flush(std::set<std::string>& collection_ids) override;
// Status
// Serialize(std::set<std::string>& table_ids) override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册